You are here: ABI » ThesesHome » BScParallelBamIO

BScParallelBamIO

Implementation of parallel de-/compression of BAM files.

Background

BAM [1] files are used for storing alignments of reads to reference sequences. Since Next-Generation-Sequencing technologies create huge amounts of data, the data is compressed. The BAM format stores compressed blocks of data (each having the same length when uncompressed) using the the library zlib [2] (BGZF Format).

When processing such files, one common bottleneck is the decompression of the BAM file. Since the data is stored in blocks, each block can be decompressed independently of the others. This decompression can be performed in parallel.

Topic

The task of this project is to implement the reading and writing of BAM files using parallel compression and decompression of blocks. For this, the C++11 threading support from the STL (header <thread>) should be used.

One aim is the later integration into the SeqAn library, i.e. usable from other programs using the existing interface. This way, all existing applications using BAM I/O from the SeqAn library can benefit from the parallelization.

Technically, parallelization is to be added to the BGZF Stream class.

Requirements:

  • support the SeqAn Stream interface reading/writing
  • allow jumping in file using BAI indices (requires stopping of all pending decompression threads)
  • correct, and to be useful: efficient, good scalability

Task

The following gives a breakdown of the task in a sensible fashion:

1. BgzfCompressor and BgzfDecompressor

Extract the compression and decompression into a BgzfCompressor and a BgzfDecompressor class.

  • Both classes should take a buffer (e.g. seqan::CharString) for the input and a buffer for the output and in a run() method they should do the compression/decompression.
  • The BgzfCompressor class should try to compress the input data in 64kb (64*1024 bytes) blocks as the current BGZF code in SeqAn does [4] and write the compressed blocks after each other. The output buffer should then be resized to fit the resulting size..
  • The BgzfDecompressor should take the input and assume that it is a concatenation of full BGZF blocks (a check can be performed and execution stopped if the input is invalid).

In the parallel program, the compression and decompression operation of B BGZF blocks (where B is configurable before opening the files) can be seen as an atomic operation in the parallel program: The operation cannot be split or only performed half-way.

You could use the following interface:

class BgzfCompressor
{
public:
    // Constructor.
   BgzfCompressor(unsigned maxBlockSize = 64*1024);

   // Compress from input to result.
   void run(seqan::CharString & result, seqan::CharString const & input);
};

// BgzfDecompressor could have the same interface.

Write tests for these classes.

After this step, you will have the two classes and test for them. You could put them into a module par_bgzf that you can create using util/bin/skel.py and also use this script for creating the test structure. Create the module and tests in your sandbox.

2. Create Initial (Sequential) Stream<ParallelBgzf>

  • Copy the Stream<Bgzf> class into a Stream<ParallelBgzf> class in your sandbox.
  • Also copy the tests (for the Bgzf Stream class for your Parallel Bgzf Stream class.
  • Add tests for larger files (~2 MB should work fine). Note that tests for being binary identical to the bgzf tool by Heng Li probably would not work very well because of ambiguity in the files. Rather do compression/decompression of a text file and test for the result being equal. Tests against the current BgzfStream should also work well. There could be differences if some blocks do not compress well enough but that should be rare.

After this step, will have the new Stream class an tests for them.

3. Parallelize Stream<ParallelBgzf> in a Fork/Join Fashion

Here, we will create a simple parallelization of the Parallel Bgzf Stream class that uses the relatively simple fork/join parallelism fashion. Let n be the number of threads.

  • Add two Strings of buffers to the Parallel Bgzf Stream class (possibly themselves in a string, e.g. buffers[0] is the first String of buffers and buffer[1] is the second one).
  • Add an integer bufno that gives the index (0 or 1) of the current entry of buffers to read from/write to by the stream while the threads are working on buffer[bufno - 1].
  • buffers[0] and buffers[1] contain the decompressed data that is given to the Stream's caller/user when reading and the uncompressed data to compress and write to the file when writing.
  • Each String in buffer has n entries of type CharString that serve as the target buffers.
  • Create a BgzfDecompressionThread and a BgzfCompressionThread class.
  • Each gets a CharString buffer for the data from the file.
  • The Parallel Bgzf Stream class gets a readAndDecompress function that does the following:
    • For i=1..n: Read B blocks in the buffer for thread i. (This is done sequentially).
    • Kick off decompression for each thread. Each thread i writes the result to buffers[1 - bufno][i].
  • The Parallel Bgzf Stream class gets a compressAndWrite function that does the following:
    • Kick off parallel compression for each thread. The thread compresses from buffers[1 - bufno][i] to its internal buffer.
    • Wait for all threads to complete.
    • Write out the data for i=1..n sequentially.

The idea is that the stream writes into buffers[bufno] while the threads compress and write out buffers[bufno - 1] (if it is filled) and vice versa on reading. The class will need some synchronization primitives, and counters and probably also a flag whether buffers[bufno - 1] is filled when at the beginning of reading/writing.

Useful synchronization primitives are mutex (plus lock_guard), condition_variable. A thread barrier class could be constructed using a mutex and a condition_variable.

There should be tests for this with larger files. The original sequential Stream<Bgzf> is probably correct so you can compress or decompress in parallel using your implementation and then decompress or compress and look at whether the result is the same as the input.

3. Write Parallel BGZF/BAM-Handling Tools

  1. You can now easily write a parallel bgzf compression and decompression tool that just reads from your Parallel Bgzf Stream and writes to std::fstream and vice versa.
  2. It should now also be fairly simple to use readRecord(..., Sam/Bam) and writeRecord(..., Sam/Bam) to write a parallel Bam <-> Sam converter.

This demonstrates the practicability of the tool and gives a good estimate on the scalability with larger data sets for (1) raw (de-)compression and (2) including the simpler but not "null" task of SAM<->BAM Conversion.

4a. Optional: Write Parallel BAM Sorter

BAM can be sorted by (a) coordinate or (b) query name as follows:

  1. Allocate a large buffer and fill it with decompressed BAM.
  2. While decompressing, collect the (a) coordinate as Pair<int, int> or (b) read name plus flag as Pair<char const *, int> in a second buffer keys
  3. Sort keys using std::stable_sort() or std::sort(). (Parallelizing this step is a no-brainer: The GCC STL provides a parallel std::sort() implementation, the OMPTL library also offers a drop-in for std::sort()).
  4. Write out the uncompressed buffer to a temporary file in the order given by the sorted keys.
  5. Go to 1. until the file is at the ending.
  6. Merge all temporary files and write out as compressed, sorted BAM file.

The hardest thing here is probably the handling of incomplete BAM records at the end of the file. Together with the parallel compression/decompression and the parallel sorting, this should make a simple and scalable BAM sorter.

4b. Optional: Asynchronous Parallelism

The weakest point in the task described above is the synchronous compression and decompression. In the ideal case, we would have all threads doing work all the time (except a given maximal number of allocated buffers is filled and not read/written yet). This can be done using an appropriate Queue data structure:

Consecutive numbers starting with 0 are given to each chunk of B blocks read from the file or written by the user to the stream. Each thread tries to grab the first available chunk, compresses/decompresses it and then puts it into the queue. A special thread is responsible for filling the buffer in the stream that the user reads from or for writing out the compressed data to the file. This thread knows which chunk to write out next and queries the queue for this. The queue blocks this thread until the chunk is available. The threads for doing the compression/decompression are blocked if the Queue is too full and cannot add their result to the queue. The exception is that if a thread has a chunk whose index is smaller than the index of the largest chunk already in the queue then it is allowed to put it in the queue also.

References

This site is powered by FoswikiCopyright © by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding Foswiki? Send feedback