Multithreaded Work Queue in C++

Work QueueIn my previous blog Java Style Thread Class in C++ I discussed how to develop a C++ class that enables you to create Pthread based objects that resemble Java threads. The next step to building a multithreaded application is to devise a means to distribute tasks between threads so they can be processed concurrently.

Queues are good devices for transferring work items from one thread to another. In this article I’ll discuss the design of a work queue class implemented in C++ that can be used with Thread class objects to easily build a mutlthreaded application.

Producer-Consumer Model

One approach to multithreading is the producer-consumer model where one thread – the producer - places work items in a queue and one or more consumer threads waits for and removes the items to process. For the work wqueue class in this article we’ll use one producer thread and two consumer threads.

Producer-Consumer Model

When a consumer thread runs it checks the number of items on the queue. If there are one ore more work items on the queue the consumer removes one and processes it. If none are available the consumer waits for the producer to add items to the queue. These steps are repreated continually for the lifetime of the application.

Work Queue Class

Interface

The work queue class wqueue will be defined in the file wqueue.h. It is based on the list class from the Standard C++ Library. Lists provide methods for adding work items to the tail of the queue and removing items from the head of the queue – first in first out (FIFO) - in constant time. To serialize access to the queue and enable the producer thread to signal the consumer threads that work items are available for processing the queue class will be instrumented with a Pthread mutex and condition variable – defined by the m_mutex and m_condv member variables respectively in this case.

#include <pthread.h>
#include <list>

using namespace std;

template <typename T> class wqueue
{ 
    list<T>   m_queue;
    pthread_mutex_t m_mutex;
    pthread_cond_t  m_condv;

The wqueue class is defined as a template class since it uses a list object to queue work items of arbitrary class. The work item classes used in the test application will be discussed later in the article.

The great advantage to creating a work queue class in C++ is it encpasulates the Pthread mechanisms necessary to serialize access to the work items on the list and signal when work items are added to the list. Programs that use the work queue can add and remove items – with single method calls add() and remove() as you’ll see shortly – without having to concern themselves with the intricacies of making Pthread calls.

Constructor

The constructor simply initializes the Pthread mutex and condition variable data members.

  public:
    wqueue() {
        pthread_mutex_init(&m_mutex, NULL);
        pthread_cond_init(&m_condv, NULL);
    }

Destructor

The destructor deletes the mutex and condition variables. The list object is destroyed automatically.

    ~wqueue() {
        pthread_mutex_destroy(&m_mutex);
        pthread_cond_destroy(&m_condv);
    }

Add a Work Item

To add a work item to the queue the add() method is called passing a copy of the work item object. Normally standard C++ collections keep references to the template class object. But for the work queue example the typename T will be work item pointers, so when the add() method is called it will be passed a pointer by value and a reference to the pointer is stored in the list. You are better off storing pointers to work items on a queue so that you can control when they are destroyed.

To serialize access to the list the mutex is locked and when the lock is acquired a reference to an item pointer is pushed to the back of the list. Then the condition variable is signaled with a call to pthread_cond_signal() which wakes up one of the consumer threads waiting to remove an item.

    void add(T item) {
        pthread_mutex_lock(&m_mutex);
        m_queue.push_back(item);
        pthread_cond_signal(&m_condv);
        pthread_mutex_unlock(&m_mutex);
    }

Calling pthread_cond_broadcast() to signal the condition variable would also work but this would cause all the consumer threads to wake up. Since only one of the consumers at any given time can get a work item, the others would have to go back to sleep waiting for additional work items to placed on the queue. By signalling the condition instead of broadcasting, we ensure that only one thread wakes up at a time for each item added.

Remove a Work Item

The remove() method locks the mutex then checks to see if any work items are available. If not, pthread_cond_wait() is called which automatically unlocks the mutex and waits for the producer thread to add an item. When the condition is signaled after an item is added by the producer thread, a copy of a pointer to a work item is taken off the list and returned to the consumer thread.

    T remove() {
        pthread_mutex_lock(&m_mutex);
        while (m_queue.size() == 0) {
            pthread_cond_wait(&m_condv, &m_mutex);
        }
        T item = m_queue.front();
        m_queue.pop_front();
        pthread_mutex_unlock(&m_mutex);
        return item;
    }

Note that if items are added to the queue while all the consumer threads are busy, there will be no consumer threads to receive the condition variable signals. However this is not a problem since the consumers always check the queue size when they return from doing work before trying to remove any work items.

Queue Size

The size() method is just a utility method we can use to externally check the number of items on to the queue. The mutex must be locked and unlocked during this operation to avoid a race condition with the producer thread trying to add or another consumer thread trying to remove an item.

    int size() {
        pthread_mutex_lock(&m_mutex);
        int size = m_queue.size();
        pthread_mutex_unlock(&m_mutex);
        return size;
    }
};

Worker Item Class

Work items will simply be a string and a number that are set to arbitrary values in the producer thread.  The

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include "thread.h"
#include "wqueue.h"

class WorkItem
{
    string m_message;
    int    m_number;

  public:
    WorkItem(const char* message, int number) 
          : m_message(message), m_number(number) {}
    ~WorkItem() {}

    const char* getMessage() { return m_message.c_str(); }
    int getNumber() { return m_number; }
};

Consumer Thread Class

The Thread class from my previous blog is used to create the consumer threads. Each thread is passed a reference the the work queue so it can grab work items. The run() method continually waits for and removes items to be processed which in this case just means displaying the string message and number assigned in the producer thread. The ID of each thread is also displayed to differentiate them in the printed output.

class ConsumerThread : public Thread
{
    wqueue<WorkItem*>& m_queue;

  public:
    ConsumerThread(wqueue<WorkItem*>& queue) : m_queue(queue) {}

    void* run() {
        // Remove 1 item at a time and process it. Blocks if no items are 
        // available to process.
        for (int i = 0;; i++) {
            printf("thread %lu, loop %d - waiting for item...\n", 
                  (long unsigned int)self(), i);
            WorkItem* item = (WorkItem*)m_queue.remove();
            printf("thread %lu, loop %d - got one item\n", 
                  (long unsigned int)self(), i);
            printf("thread %lu, loop %d - item: message - %s, number - %d\n", 
                  (long unsigned int)self(), i, item->getMessage(), 
                   item->getNumber());
            delete item;
        }
        return NULL;
    }
}

Test Application

Producer Thread

The producer thread is nothing more that the main() routine of the test application which is defined in the file main.cpp as is the remainder of the code in this article. The number of iterations through the main loop is passed in the command line. Two consumer threads are created and a single work queue. After the threads are started they will wait for items to be placed on the queue.

int main(int argc, char** argv)
{
    // Process command line arguments
    if ( argc != 2 ) {
        printf("usage: %s <iterations>\n", argv[0]);
        exit(-1);
    }
    int iterations = atoi(argv[1]);

    // Create the queue and consumer (worker) threads
    wqueue<WorkItem*>  queue;
    ConsumerThread* thread1 = new ConsumerThread(queue);
    ConsumerThread* thread2 = new ConsumerThread(queue);
    thread1->start();
    thread2->start();

    // Add items to the queue
    WorkItem* item;
    for (int i = 0; i < iterations; i++) {
        item = new WorkItem("abc", 123);
        queue.add(item);
        item = new WorkItem("def", 456);
        queue.add(item);
        item = new WorkItem("ghi", 789);
        queue.add(item);
        sleep(2);
    }

    // Ctrl-C to end program
    sleep(1)
    printf("Enter Ctrl-C to end the program...\n");
    while (1);
    exit(0);
}

Each time through the main loop, 3 items are placed in the queue. After the specified number of iterations the producer will wait for a Ctrl-C to end the program.

Build and Run

You can get the source code for the project from Github - https://github.com/vichargrave/wqueue.git. The main() routine, work item class and consumer thread class definitions are all contained in the main.cpp file. You can build the test application by going into the wqueue directory and running make. Note that you must get the Thread class code before trying to make wqueue.

If you run the test application with an argument of 3 this is what the output will look like:

$ ./wqueue 3
thread 4547428352, loop 0 - waiting for item...
thread 4549251072, loop 0 - waiting for item...
thread 4547428352, loop 0 - got one item
thread 4549251072, loop 0 - got one item
thread 4547428352, loop 0 - item: message - abc, number - 123
thread 4549251072, loop 0 - item: message - def, number - 456
thread 4547428352, loop 1 - waiting for item...
thread 4549251072, loop 1 - waiting for item...
thread 4547428352, loop 1 - got one item
thread 4547428352, loop 1 - item: message - ghi, number - 789
thread 4547428352, loop 2 - waiting for item...
thread 4549251072, loop 1 - got one item
thread 4547428352, loop 2 - got one item
thread 4549251072, loop 1 - item: message - abc, number - 123
thread 4547428352, loop 2 - item: message - def, number - 456
thread 4549251072, loop 2 - waiting for item...
thread 4547428352, loop 3 - waiting for item...
thread 4549251072, loop 2 - got one item
thread 4549251072, loop 2 - item: message - ghi, number - 789
thread 4549251072, loop 3 - waiting for item...
thread 4547428352, loop 3 - got one item
thread 4549251072, loop 3 - got one item
thread 4547428352, loop 3 - item: message - abc, number - 123
thread 4549251072, loop 3 - item: message - def, number - 456
thread 4547428352, loop 4 - waiting for item...
thread 4549251072, loop 4 - waiting for item...
thread 4547428352, loop 4 - got one item
thread 4547428352, loop 4 - item: message - ghi, number - 789
thread 4547428352, loop 5 - waiting for item...
done

Author:

FacebookTwitterGoogle+LinkedInBufferPrintFriendlyEmailShare
This entry was posted in Programming and tagged , , , , . Bookmark the permalink.

20 Responses to Multithreaded Work Queue in C++

  1. dgu says:

    Hello,

    Very clear and good article !!! when the next one ?

    regards
    david

  2. Pingback: Multithreaded Work Queue Based Server in C++ | VicHargrave.com

  3. So, looking from the design perspective, any data structure (in our case it’s wqueue), which is accessed by multiple threads asynchronously, needs to have its own serialization mechanism. That liability is not with the threads, or the main application. Am I right Mr Hargrave?

    -Divyang

    • vic says:

      Yes, if you want to build components that are thread safe and will be used in a multithreaded program more often than not, you should have those components handle serialization so the main program does not have to worry about it. This is the case with the multithreaded server.

  4. Joel Carlson says:

    Hey Vic,
    I download your source code from git and found one bug. In the main routine there is a while loop waiting for the queue to be empty. The operator should be greater than 0 instead of less than. Thanks a lot for this code. It’s been very helpful in helping me understand a multithreaded work queues.
    Joel

    • vic says:

      I believe this is the line you are referring to:

      while ((len = stream->receive(input, sizeof(input)-1)) != 0 ){ …

      Yes you are right. It should look like this:

      while ((len = stream->receive(input, sizeof(input)-1)) > 0 ){ …

      Nice catch. Thank you.

      • Thomas Baumgartner says:

        Hi Vic,
        very good tutorial. I downloaded the example from github today.
        To me it seems that
        #include
        is missing in the main.cpp file.
        Thanks and greetings from Austria

        • vic says:

          I’m not sure what you meant by “#include is missing”. The project builds and runs as advertised, so I’m not sure what it missing.

          • Jeff Holtz says:

            Just grabbed the code and tried to compile it under Ubuntu 14.04. It appears that both the threads and wqueue makefile needs alteration to place the pthreads library at the end of the line rather than at the front:
            $(TARGET): $(OBJECTS)
            $(CC) $(OBJECTS) -o $@ $(LDFLAGS)
            instead of…
            $(TARGET): $(OBJECTS)
            $(CC) $(LDFLAGS) $(OBJECTS) -o $@

            Also in the main.cpp file the following is also needed
            #include

            Hopefully this helps anyone else trying to compile

          • Jeff Holtz says:

            the include is missing “”

            I had to place this in quote to get through the HTML tags

          • Jeff Holtz says:

            Okay last time…. need to place unistd.h in an include at for main.cpp

          • vic says:

            Thanks for that Jeff. I’ll make those changes in the base code.

  5. Josiah says:

    Hi Vic

    Thanks for the nice tutorial. I just have a question though: would it not be necessary to place some sort of overflow handling on the size of the queue in case the rate at which the producer fills the queue is way faster than the rate of dequeuing by the consumers? For instance, I am designing a data acquisition and processing application and would like to implement it using the producer/consumer pattern. However the rate of acquisition is faster than the rate of data processing and since I am working with large datasets (workItems) I fear that this approach might stretch memory requirements. What would be the best way to approach this design?

    Thanks.

    • vic says:

      You raise a good point. If you are getting data sent to the server very quickly then you will have to increase the number of the worker threads to keep up with the load. You could add code that does this dynamically so you add threads to the pool as you see that the queue is growing over a certain time period. The work queue in the article is built on the standard template library list object so it will increase in size as necessary. This will most likely take care of most situations if you have the load goes up and down over time.

      If the server load increases sharply and stays at a high level for long periods of time you may need to store data to be processed in files. I suppose if you are truly processing a lot of data – as in 100′s of GB – you could ingest the data into HBase or Elasticsearch then process the data with some other kind of server. But this idea does not handle connections in real time.

      I’m just tossing out ideas because I’d need to see more of your requirements to make a better recommendation.

      • Josiah says:

        Thanks for your suggestions. Well in extreme cases, the data can reach about a few GB. I am really trying to increase the frame-rate of a real-time acquisition-processing-display system. So I’m thinking one way of doing that would be to decouple the acquisition – processing – display pipeline. So that one thread reads data (1 – 10 GBs) over a 1 Gbps ethernet and stores them in a work queue. Multiple threads (processors) then read from the queue and store their outputs in another queue. Then a thread responsible for display reads from that queue and displays the processed data. However it is vital that the order in which the data is acquired will be same order in which the data is displayed. I am assuming (if multiple threads spend roughly the same processing time) that this condition will be satisfied using this pattern. What do you think?

        Thanks.

        • vic says:

          I think your idea makes sense, but I’m not sure you need to intermediate queues. I think it is going to be difficult to keep the data ordered across several queues in the middle. If you have just in the input and output (to the display) queues, then keeping the data in order is easy.

  6. nipun says:

    can i use consumer without using sleep method ?
    if i remove sleep then order of execution is correct ?

  7. Arkady says:

    First: this example cover a model 1:M i.e one producer and M consumers. What is about a common model n:m? Its much more complicated model is not?
    The second point: When a consumer thread is in waiting state there is no control to the thread. So a good think it will be if the waiting state will be under some timer control. and if the time is expired the empty message (or NULL pointer will be returned). As I understand the condition variable can use a timer facilities.

    • Vic Hargrave says:

      N:M is not really more complicated. If there are multiple producers the same queue mechanism would work. Each producer locks the queue to place an item on it which signals a consumer to lock and retrieve the item from the queue.

      There is no point in adding timers to the condition variable for the queue because each worker thread simply waits indefinitely for an item to be placed in the queue. Why time out waiting?

Leave a Reply

Your email address will not be published. Required fields are marked *


You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>