gatb.core-API-1.4.2
Multithreading snippets

Presentation

This page presents some code snippets related to the use of Multithreading API.

Some of the snippets presented below can be used online.

Additional snippets are available in directory: gatb-core/gatb-core/examples/tools.

Iteration in a multithreaded fashion

This snippet shows how to iterate some Iterator object (here a range of integers) with N threads in order to speed up the iteration.

This snippet introduces the Dispatcher class and shows how to simply use it for parallelizing one iteration.

Note: this approach can work only if the items can be iterated and processed independently from each other.

Code is from example multithreading1.cpp:

// We include what we need for the test
using namespace std;
/********************************************************************************/
/* Multithreaded iteration of an integer range */
/********************************************************************************/
// We define a functor that will be cloned by the dispatcher
struct Functor { void operator() (int i)
{
// In this instruction block, we are executing in one of the nbCores threads
// created by the dispatcher. Note that 'i' is one value of our range
}};
/********************************************************************************/
int main (int argc, char* argv[])
{
// We get the number of cores to be used. If we don't give any number,
// we set to 0 which implies the usage of all available cores
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
// We create an iterator over an integer range
Range<int>::Iterator it (1,1000);
// We create a dispatcher configured for 'nbCores' cores.
Dispatcher dispatcher (nbCores);
// We dispatch the range iteration with the dispatcher.
// This will create nbCores threads and each thread will be fed with
// one value of the defined range
// NOTE: we could also use lambda expression (easing the code readability)
// Note: third argument is set to groupSize of 1 instead of 1000 (default),
// to avoid that 1000 tasks are batched in the same thread.
// In practice, when iterating over a large set of elements, set a reasonable
// groupSize value, because a groupSize=1 will incur significant overhead
// if Functor() is a very quick task.
IDispatcher::Status status = dispatcher.iterate (it, Functor(), 1);
// We dump some information about the dispatching
cout << "nbCores=" << status.nbCores << " time=" << status.time << endl;
// IMPORTANT: usage of Dispatcher has sense only if the iterated items
// can be processed independently from each other.
// The point to understand with the Dispatcher is that it can
// iterate any instance of Iterator class. If you have any set of items
// that can be enumerated through an Iterator implementation, then you
// can parallelize the iteration with a Dispatcher instance
}

[go back to top]

Multithreaded iteration and shared resources

This snippet shows how to parallelize an iteration and how several threads can modify a common resource throughout the iteration.

The important point here is to understand that shared resources must be modified cautiously by different threads running at the same time.

Code is from example multithreading2.cpp:

// We include what we need for the test
using namespace std;
/********************************************************************************/
/* Multithreaded iteration and modification of a shared resource. */
/* */
/* WARNING ! THIS SNIPPET SHOWS ALSO HOW TO USE LAMBDA EXPRESSIONS, SO YOU NEED */
/* TO USE A COMPILER THAT SUPPORTS THIS FEATURE. */
/* */
/********************************************************************************/
int main (int argc, char* argv[])
{
// We get the number of cores to be used. If we don't give any number,
// we set to 0 which implies the usage of all available cores
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
// We create an iterator over an integer range
int nmax = 10000;
Range<int>::Iterator it (1,nmax);
// We create a dispatcher configured for 'nbCores' cores.
// The second argument tells how many consecutive values will be received by
// each thread. The second argument tells how to group items per thread (set
// here to 1 to emphasize concurrent access issue).
Dispatcher dispatcher (nbCores, 1);
// The idea here is to sum the integers of our range with an iteration.
// (Note: we know that the result is N*(N+1)/2)
int sum1=0, sum2=0;
// First iteration: WRONG WAY
// Our first attempt is to use an integer variable to sum the iterated value.
// This variable will be shared by all the threads and, since they access to it
// without caution wrt concurrent accesses, the sum result should be wrong (unless
// you use one core only)
dispatcher.iterate (it, [&] (int i) { sum1 += i; });
// Second iteration: CORRECT WAY
// As previously, our second attempt will share the same integer variable.
// But now, we take care about concurrent accesses with the use of the
// __sync_fetch_and_add intrinsic instruction. This instruction ensures that
// the shared integer can be modified by only one thread at one time.
dispatcher.iterate (it, [&] (int i) { __sync_fetch_and_add (&sum2, i); });
// CONCLUSION
cout << "First iteration: sum=" << sum1 << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
cout << "Second iteration: sum=" << sum2 << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
// Parallelization of Iterator is pretty simple with the Dispatcher class.
// Moreover, usage of lambda expressions make the whole thing easy to write.
// Note that the instruction block of the lambda expression doesn't even know that
// it may be executed in different threads. In other words, the block doesn't refer
// any stuff related to thread management; it just receives one of the item of the
// iteration and process some action on it.
// IMPORTANT ! As we have seen here, the user has to be aware that a shared resource (one
// integer here) can be modified by several threads at the same time, so the user must use
// some kind of synchronization for modifying the shared resource. We will see in other
// examples that GATB provides mechanisms for this purpose.
}

[go back to top]

Multithreaded iteration with synchronization of a shared resource

Here, our shared resource is a file, so we can't use intrinsic instruction like we did before for integer addition.

We need some general synchronization mechanism that will ensure that a portion of code can be executed only by one thread at one time.

Code is from example multithreading3.cpp:

// We include what we need for the test
#include <fstream>
using namespace std;
/********************************************************************************/
/* Multithreaded iteration and modification of a shared resource. */
/********************************************************************************/
// We define a functor that will be cloned by the dispatcher
struct Functor
{
ISynchronizer* synchro; fstream& file;
Functor (ISynchronizer* synchro, fstream& file) : synchro(synchro), file(file) {}
void operator() (int i)
{
// We lock the synchronizer
synchro->lock ();
// We dump the current integer into the file
file << i << endl;
// We unlock the synchronizer
synchro->unlock ();
}
};
/********************************************************************************/
int main (int argc, char* argv[])
{
// We get the number of cores to be used. If we don't give any number,
// we set to 0 which implies the usage of all available cores
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
// We create an iterator over an integer range
int nmax = 10000;
Range<int>::Iterator it (1,nmax);
// We open a file. This will be our shared resource between threads.
fstream file ("out", std::fstream::out);
// For our file, we can't use intrinsics like we did for integer addition,
// so we need a general synchronization mechanism that will be shared by the threads.
ISynchronizer* synchro = System::thread().newSynchronizer();
// We create a dispatcher configured for 'nbCores' cores.
Dispatcher dispatcher (nbCores, 1);
// We iterate the range. NOTE: we could also use lambda expression (easing the code readability)
dispatcher.iterate (it, Functor(synchro,file));
// We close the file
file.close();
// We get rid of the synchronizer
delete synchro;
}

[go back to top]

Multithreaded iteration with synchronization of a shared resource (bis)

This snippet is similar to the previous one. It only shows how to use the LocalSynchronizer class to simply lock/unlock the containing instruction block.

This is useful for avoiding classical deadlock bugs when one forgets to unlock a synchronizer.

Code is from example multithreading4.cpp:

// We include what we need for the test
#include <fstream>
using namespace std;
/********************************************************************************/
/* Multithreaded iteration and modification of a shared resource. */
/* */
/* WARNING ! THIS SNIPPET SHOWS ALSO HOW TO USE LAMBDA EXPRESSIONS, SO YOU NEED */
/* TO USE A COMPILER THAT SUPPORTS THIS FEATURE. */
/* */
/********************************************************************************/
int main (int argc, char* argv[])
{
// We get the number of cores to be used. If we don't give any number,
// we set to 0 which implies the usage of all available cores
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
// We create an iterator over an integer range
int nmax = 1000;
Range<int>::Iterator it (1,nmax);
// We open a file. This will be our shared resource between threads.
fstream file ("out", std::fstream::out);
// For our file, we can't use intrinsics like we did for integer addition,
// so we need a general synchronization mechanism that will be shared by the threads.
ISynchronizer* synchro = System::thread().newSynchronizer();
// We create a dispatcher configured for 'nbCores' cores.
Dispatcher dispatcher (nbCores, 1);
// We iterate the range
dispatcher.iterate (it, [&] (int i)
{
// We use a helper class that will protect the full containing instruction block
// against concurrent access. Note it uses our shared synchro object.
// We don't have to do the tandem lock/unlock, a single LocalSynchronizer
// declaration will protect the containing block. This may be useful because
// if the user forget to call the 'unlock' method, it would block the full
// program execution for ever.
LocalSynchronizer sync (synchro);
// We dump the current integer into the file
file << i << endl;
});
// We close the file
file.close();
// We get rid of the synchronizer
delete synchro;
}

[go back to top]

Multithreaded iteration without shared resources management

This snippet introduces the ThreadObject class designed to avoid concurrent accesses issues.

Instead of working on a single shared resource, threads use local resources during the iteration and then, a final aggregation of the local resources is done after the iteration.

Such an approach skips the need of synchronization mechanisms when threads directly uses a single shared resource. This may be interesting since synchronization mechanisms may introduce time overheads.

Code is from example multithreading5.cpp:

// We include what we need for the test
using namespace std;
/********************************************************************************/
/* Multithreaded iteration without modification of a shared resource. */
/* */
/* WARNING ! THIS SNIPPET SHOWS ALSO HOW TO USE LAMBDA EXPRESSIONS, SO YOU NEED */
/* TO USE A COMPILER THAT SUPPORTS THIS FEATURE. */
/* */
/********************************************************************************/
int main (int argc, char* argv[])
{
// We get the number of cores to be used. If we don't give any number,
// we set to 0 which implies the usage of all available cores
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
// We create an iterator over an integer range
int nmax = 1000;
Range<int>::Iterator it (1,nmax);
// We create a dispatcher configured for 'nbCores' cores.
// The second argument tells how many consecutive values will be received by
// each thread.
Dispatcher dispatcher (nbCores, 1);
// In this example, we have a different approach: we won't modify the same
// shared integer value. Instead, each thread will use its own local integer
// and at the end, all the local sums will be summed into the final one.
// By doing this, we don't have any more concurrent accesses issues.
// In order to ease this approach, we use a ThreadObject object. Such an object
// will provide local sums for each executing thread. After the iteration, it also
// provides a mean to get all the local sums and modify the global sum accordingly.
ThreadObject<int> sum;
// We iterate our range.
dispatcher.iterate (it, [&] (int i)
{
// We retrieve the local sum for the current executing thread with 'sum()'
// Note that this block instruction still doesn't refer explicit thread
// management; this is hidden through the () operator of the ThreadObject class.
sum() += i;
});
// We retrieve all local sums through the 'foreach' method.
// This loop is done 'nbCores' times.
sum.foreach ([&] (int localSum)
{
// Here, the *s expression represents the global integer; we add to it the
// current 'localSum' computed by one of the threads.
*sum += localSum;
});
cout << "sum=" << *sum << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
// In brief, the ThreadObject is a facility to avoid concurrent accesses to a shared
// resource. It encapsulates all the tedious management of local resources and final
// result aggregation.
}

[go back to top]

Multithreaded iteration of a bank

This snippet shows how to iterate sequences of a bank and counts how many A,C,G,T it contains. The interesting part is to see that the Bank class can create Iterator instances that can be iterated through a Dispatcher instance.

Note: iterating a bank from a disk makes a lot of I/O, so parallelizing such an iteration may not lead to significant better performance. However, if the snippet is launched once, the bank (if not too big) may be in the RAM cache, so it is interesting to relaunch the snippet with varying number of cores and see how execution time evolves.

Code is from example multithreading6.cpp:

// We include what we need for the test
using namespace std;
/********************************************************************************/
/* Multithreaded iteration of a bank. */
/* */
/* */
/* Cmd-line: multithreading6 <fasta/q file> */
/* */
/* Sample: multithreading6 gatb-core/gatb-core/test/db/reads1.fa */
/* */
/* WARNING ! THIS SNIPPET SHOWS ALSO HOW TO USE LAMBDA EXPRESSIONS, SO YOU NEED */
/* TO USE A COMPILER THAT SUPPORTS THIS FEATURE. */
/* */
/********************************************************************************/
int main (int argc, char* argv[])
{
if (argc < 2)
{
cerr << "you must provide at least the FASTA file path." << endl;
return EXIT_FAILURE;
}
// We get a handle on a bank
BankFasta bank (argv[1]);
// We get the number of cores to be used.
size_t nbCores = (argc >=3 ? atoi(argv[2]) : 0);
// We create a dispatcher (use all cores by default).
Dispatcher dispatcher (nbCores);
// We will count nucleotides occurrences.
ThreadObject<int> sumA, sumC, sumG, sumT, sumN;
// We iterate the bank. Note how we provide a bank iterator to the dispatcher
dispatcher.iterate (bank.iterator(), [&] (const Sequence& seq)
{
// We use shortcuts references for the different local sums. It avoids to retrieve
// them each time a nucleotide of the sequence is handled (see for loop below)
// and may give much better performance.
int& localA = sumA();
int& localC = sumC();
int& localG = sumG();
int& localT = sumT();
int& localN = sumN();
// We loop the nucleotides of the current sequence.
for (size_t i=0; i<seq.getDataSize(); i++)
{
switch (seq.getDataBuffer()[i])
{
case 'A': localA++; break;
case 'C': localC++; break;
case 'G': localG++; break;
case 'T': localT++; break;
case 'N': localN++; break;
}
}
}, 1 /*groupSize of 1*/);
sumA.foreach ([&] (int n) { *sumA += n; });
sumC.foreach ([&] (int n) { *sumC += n; });
sumG.foreach ([&] (int n) { *sumG += n; });
sumT.foreach ([&] (int n) { *sumT += n; });
sumN.foreach ([&] (int n) { *sumN += n; });
cout << "|A|=" << *sumA << endl;
cout << "|C|=" << *sumC << endl;
cout << "|G|=" << *sumG << endl;
cout << "|T|=" << *sumT << endl;
cout << "|N|=" << *sumN << endl;
}

[go back to top]