Skip to content

read_parallel -- second closure needs to iterate over all record sets to drive execution #25

@julibeg

Description

@julibeg

First of all, thanks for seq_io, it's really handy!

I got tripped today though: I got a use case where I'm not using the second closure in read_parallel to accumulate results but instead the worker closure modifies a shared variable behind an Arc<Mutex>. I naively assumed that I can leave the second closure as || {} in this case and when testing on dummy data it actually worked fine. For larger files though (larger than the default buffer size of 64 kB), I got unexpected results.

The MWE below illustrates what I mean. It calculates the total sequence length of a file but, when using an empty second closure, it will give wrong results (i.e. it'll print "128 kb" although we expect 1000 kb). I added another read_parallel call with a closure that loops loops over the record sets (without doing anything) and this gives the right result.

To produce the test fasta file (with 1 Mb in total), run

$ for i in {1..1000}; do echo ">sequence_$i"; printf 'A%.0s' {1..1000}; echo; done > test.fa

Then, run

use seq_io::fasta::{Reader, RecordSet};
use seq_io::parallel::read_parallel;
use std::sync::{Arc, Mutex};

fn main() {
    let path = "test.fa";

    let fasta_reader = Reader::from_path(&path).unwrap();

    // arc of mutex to share the total count between threads
    let total_count = Arc::new(Mutex::new(0usize));

    // define worker closure beforehand (so that we're not duplicating it for each `read_parallel` call)
    let worker = |record_set: &mut RecordSet| {
        let mut local_count = 0usize;

        for record in record_set.into_iter() {
            local_count += record.full_seq().len();
        }

        // update the shared counter with this batch's count
        let mut count = total_count.lock().unwrap();
        *count += local_count;
    };

    // call read parallel with empty second closure; this gives the wrong result
    read_parallel(fasta_reader, 4, 2, worker, |_| {});

    // print the total sequence length (we expect 1000 kb, but this prints "128 kb")
    println!("result with empty closure = {} kb", *total_count.lock().unwrap() / 1000);

    // reset the total count and get a new reader of the file
    *total_count.lock().unwrap() = 0;
    let fasta_reader = Reader::from_path(&path).unwrap();

    // call `read_parallel` again and this time iterate through the record sets in the second
    // closure (without doing anything)
    read_parallel(fasta_reader, 4, 2, worker, |record_sets| {
        while let Some(_) = record_sets.next() {
            // do nothing
        }
    });

    // print the result again (this time we get the 1000 kb)
    println!("result with looping closure = {} kb", *total_count.lock().unwrap() / 1000);
}

For me this produces

result with empty closure = 128 kb
result with looping closure = 1000 kb

I don't think that this necessarily should be changed as it's not really how read_parallel was intended to be used, but it'd be great if you could mention it in the docs as it took me quite a while to figure out what's going on here and it might happen to others as well.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions