4.11 Dividing Your Program into Multiple Threads
Earlier in this chapter we discussed the delegation of work according to a specific strategy or approach called a thread model. Those thread models were:
-
delegation (bossworker)
-
peer-to-peer
-
pipeline
-
producerconsumer
Each model has its own WBS (Work Breakdown Structure) that determines who is responsible for thread creation and under what conditions threads are created. In this section we will show an example of a program for each model using Pthread library functions.
4.11.1 Using the Delegation Model
We discussed two approaches that can be used to implement the delegation approach to dividing a program into threads. To recall, in the delegation model, a single thread (boss) creates the threads (workers) and assigns each a task. The boss thread delegates the task each worker thread is to perform by specifying a function. With one approach, the boss thread creates threads as a result of requests made to the system. The boss thread processes each type of request in an event loop. As events occur, thread workers are created and assigned their duties. Example 4.5 shows the event loop in the boss thread and the worker threads in pseudocode.
Example 4.5 Approach 1: Skeleton program of boss and worker thread model.
//... pthread_mutex_t Mutex = PTHREAD_MUTEX_INITIALIZER int AvailableThreads pthread_t Thread[Max_Threads] void decrementThreadAvailability(void) void incrementThreadAvailability(void) int threadAvailability(void); // boss thread { //... if(sysconf(_SC_THREAD_THREADS_MAX) > 0){ AvailableThreads = sysconf(_SC_THREAD_THREADS_MAX) } else{ AvailableThreads = Default } int Count = 1; loop while(Request Queue is not empty) if(threadAvailability()){ Count++ decrementThreadAvailability() classify request switch(request type) { case X : pthread_create(&(Thread[Count])...taskX...) case Y : pthread_create(&(Thread[Count])...taskY...) case Z : pthread_create(&(Thread[Count])...taskZ...) //... } } else{ //free up thread resources } end loop } void *taskX(void *X) { // process X type request incrementThreadAvailability() return(NULL) } void *taskY(void *Y) { // process Y type request incrementThreadAvailability() return(NULL) } void *taskZ(void *Z) { // process Z type request decrementThreadAvailability() return(NULL) } //...
In Example 4.5, the boss thread dynamically creates a thread to process each new request that enters the system, but there are a maximum number of threads that will be created. There are n number of tasks to process n request types. To be sure the maximum number of threads per process will not be exceeded, these additional functions can be defined:
threadAvailability() incrementThreadAvailability() decrementThreadAvailability()
Example 4.6 shows pseudocode for these functions.
Example 4.6 Functions that manage thread availability count.
void incrementThreadAvailability(void) { //... pthread_mutex_lock(&Mutex) AvailableThreads++ pthread_mutex_unlock(&Mutex) } void decrementThreadAvailability(void) { //... pthread_mutex_lock(&Mutex) AvailableThreads pthread_mutex_unlock(&Mutex) } int threadAvailability(void) { //... pthread_mutex_lock(&Mutex) if(AvailableThreads > 1) return 1 else return 0 pthread_mutex_unlock(&Mutex) }
The threadAvailability() function will return 1 if the maximum number of threads allowed per process has not been reached. This function accesses a global variable ThreadAvailability that stores the number of threads still available for the process. The boss thread calls the decrementThreadAvailability() function, which decrements the global variable before the boss thread creates a thread. The worker threads call incrementThreadAvailability(), which increments the global variable before a worker thread exits. Both functions contain a call to pthread_mutex_lock() before accessing the variable and a call to pthread_mutex_unlock() after accessing the global variable. If the maximum number of threads are exceeded, then the boss thread can cancel threads if possible or spawn another process, if necessary. taskX(), taskY(), and taskZ() execute code that processes their type of request.
The other approach to the delegation model is to have the boss thread create a pool of threads that are reassigned new requests instead of creating a new thread per request. The boss thread creates a number of threads during initialization and then each thread is suspended until a request is added to the queue. The boss thread will still contain an event loop to extract requests from the queue. But instead of creating a new thread per request, the boss thread signals the appropriate thread to process the request. Example 4.7 shows the boss thread and the worker threads in pseudocode for this approach to the delegation model.
Example 4.7 Approach 2: Skeleton program of boss and worker thread model.
//... pthread_t Thread[N] // boss thread { pthread_create(&(Thread[1]...taskX...); pthread_create(&(Thread[2]...taskY...); pthread_create(&(Thread[3]...taskZ...); //... loop while(Request Queue is not empty get request classify request switch(request type) { case X : enqueue request to XQueue signal Thread[1] case Y : enqueue request to YQueue signal Thread[2] case Z : enqueue request to ZQueue signal Thread[3] //... } end loop } void *taskX(void *X) { loop suspend until awaken by boss loop while XQueue is not empty dequeue request process request end loop until done { void *taskY(void *Y) { loop suspend until awaken by boss loop while YQueue is not empty dequeue request process request end loop until done } void *taskZ(void *Z) { loop suspend until awaken by boss loop while (ZQueue is not empty) dequeue request process request end loop until done } //...
In Example 4.7, the boss thread creates N number of threads, one thread for each task to be executed. Each task is associated with processing a request type. In the event loop, the boss thread dequeues a request from the request queue, determines the request type, enqueues the request to the appropriate request queue, then signals the thread that processes the request in that queue. The functions also contain an event loop. The thread is suspended until it receives a signal from the boss that there is a request in its queue. Once awakened, in the inner loop, the thread processes all the requests in the queue until it is empty.
4.11.2 Using the Peer-to-Peer Model
In the peer-to-peer model, a single thread initially creates all the threads needed to perform all the tasks called peers. The peer threads process requests from their own input stream. Example 4.8. shows a skeleton program of the peer-to-peer approach of dividing a program into threads.
Example 4.8 Skeleton program using the peer-to-peer model
//... pthread_t Thread[N] // initial thread { pthread_create(&(Thread[1]...taskX...); pthread_create(&(Thread[2]...taskY...); pthread_create(&(Thread[3]...taskZ...); //... } void *taskX(void *X) { loop while (Type XRequests are available) extract Request process request end loop return(NULL) } //...
In the peer-to-peer model, each thread is responsible for its own stream of input. The input can be extracted from a database, file, and so on.
4.11.3 Using the Pipeline Model
In the pipeline model, there is a stream of input processed in stages. At each stage, work is performed on a unit of input by a thread. The input continues to move to each stage until the input has completed processing. This approach allows multiple inputs to be processed simultaneously. Each thread is responsible for producing its interim results or output, making them available to the next stage or next thread in the pipeline. Example 4.9 shows the skeleton program for the pipeline model.
Example 4.9 Skeleton program using the pipeline model.
//... pthread_t Thread[N] Queues[N] // initial thread { place all input into stage1's queue pthread_create(&(Thread[1]...stage1...); pthread_create(&(Thread[2]...stage2...); pthread_create(&(Thread[3]...stage3...); //... } void *stageX(void *X) { loop suspend until input unit is in queue loop while XQueue is not empty dequeue input unit process input unit enqueue input unit into next stage's queue end loop until done return(NULL) } //...
In Example 4.9, N queues are declared for N stages. The initial thread enqueues all the input into stage 1's queue. The initial thread then creates all the threads needed to execute each stage. Each stage has an event loop. The thread sleeps until an input unit has been enqueued. The inner loop continues to iterate until its queue is empty. The input unit is dequeued, processed, then that unit is then enqueued into the queue of the next stage.
4.11.4 Using the ProducerConsumer Model
In the producer-consumer model, the producer thread produces data consumed by the consumer thread or threads. The data is stored in a block memory shared between the producer and consumer threads. This model was used in Programs 4.5, 4.6, and 4.7. Example 4.10 shows the skeleton program for the producer-consumer model.
Example 4.10 Skeleton program using the producerconsumer model.
pthread_mutex_t Mutex = PTHREAD_MUTEX_INITIALIZER pthread_t Thread[2] Queue // initial thread { pthread_create(&(Thread[1]...producer...); pthread_create(&(Thread[2]...consumer...); //... } void *producer(void *X) { loop perform work pthread_mutex_lock(&Mutex) enqueue data pthread_mutex_unlock(&Mutex) signal consumer //... until done } void *consumer(void *X) { loop suspend until signaled loop while(Data Queue not empty) pthread_mutex_lock(&Mutex) dequeue data pthread_mutex_unlock(&Mutex) perform work end loop until done }
In Example 4.9, an initial thread creates the producer and consumer threads. The producer thread executes a loop in which it performs work then locks a mutex on the shared queue in order to enqueue the data it has produced. The producer unlocks the mutex then signals the consumer thread that there is data in the queue. The producer iterates through the loop until all work is done. The consumer thread also executes a loop in which it suspends itself until it is signaled. In the inner loop, the consumer thread processes all the data until the queue is empty. It locks the mutex on the shared queue before it dequeues any data and unlocks the mutex after the data has been dequeued. It then performs work on that data. In Program 4.6, the consumer thread enqueues its results to a file. The results could have been inserted into another data structure. This is often done by consumer threads in which it plays both the role of consumer and producer. It plays the role of consumer of the unprocessed data produced by the producer thread, then it plays the role of producer when it processes data stored in another shared queue consumed by another thread.
4.11.5 Creating Multithreaded Objects
The delegation, peer-to-peer, pipeline. and producerconsumer models demonstrate approaches to dividing a program into multiple threads along function lines. When using objects, member functions can create threads to perform multiple tasks. Threads can be used to execute code on behalf of the object: free-floating functions and other member functions.
In either case, the threads are declared within the object and created by one of the member functions (e.g., the constructor). The threads can then execute some free-floating functions (function defined outside the object), which invokes member functions of the object that are global. This is one approach to making an object multithreaded. Example 4.10 contains an example of a multithreaded object.
Example 4.10 Declaration and definition of multithreading an object.
#include <pthread.h> #include <iostream> #include <unistd.h> void *task1(void *); void *task2(void *); class multithreaded_object { pthread_t Thread1,Thread2; public: multithreaded_object(void); int c1(void); int c2(void); //... }; multithreaded_object::multithreaded_object(void) { //... pthread_create(&Thread1,NULL,task1,NULL); pthread_create(&Thread2,NULL,task2,NULL); pthread_join(Thread1,NULL); pthread_join(Thread2,NULL); //... } int multithreaded_object::c1(void) { // do work return(1); } int multithreaded_object::c2(void) { // do work return(1); } multithreaded_object MObj; void *task1(void *) { //... MObj.c1(); return(NULL); } void *task2(void *) { //... MObj.c2(); return(NULL); }
In Example 4.10, the class multithread_object declares two threads. From the constructor of the class, the threads are created and joined. Thread1 executes task1 and Thread2 executes task2. task1 and task2, then invokes member functions of the global object MObj.