Multithreaded Work Queue Based Server in C++

multithreaded-225x225Creating a multithreaded TCP/IP protocol based server requires the capabilities to handle network communication, multithreading and transferring data between threads.

I have described how to build C++ components to handle this functionality in previous blogs. This time I’ll show you how to combine these components to create a simple multithreaded server.


Background Articles

The core server functionality that I’ll use in this project come from source code presented in the following previous articles of mine. Note you can get the source code for the articles on Github.

  1. Network communicationTCP/IP Network Programming Design Patterns in C++
  2. Multithreading – Java Style Thread Class in C++
  3. Transferring data between threadsMultithreaded Work Queue in C++

Server Structure

Producer-Consumer Model

The server is based on the producer-consumer multithreaded model I discussed in Multithreaded Work Queue in C++, where a single producer thread passes work items to 1 or more consumer threads via a work queue, implemented with the wqueue class. Threads will be created using the Thread class discussed in Java Style Thread Class in C++.

In the case of the TCP/IP server, the producer thread accepts connections then queues the connections for the consumer threads which in turn handle the connection processing as shown in this diagram.

TCP Clients and Server

Producer Thread

The producer thread in the server is implemented in the main() function. It’s job is to create the work queue and consumer threads then accept connections from clients and pass the connections off to the consumer threads to handle. Specifically, the producer thread takes the following actions:

  1. Create a work queue object.
  2. Create the consumer threads.
  3. Start listening for connections from clients.
  4. Wait to accept a connections from a client using a TCPAcceptor object – discussed in the TCP/IP Network Programming Design Patterns in C++ blog.
  5. For each connection create a work item that transfers the connected socket – contained in a TCPStream object – to a consumer thread to handle the connection.
  6. Return to step 4.

Consumer Thread

The consumer threads are the workers that do the protocol session handling for the server. Each consumer thread handles a connection in the following manner:

  1. Wait for a work item to be added to the queue.
  2. Remove a work item from the queue.
  3. Extract the TCPStream object from the work item.
  4. Wait to receive a request from the client.
  5. Process the request when it is received.
  6. Send the reply back to the client.
  7. Repeat steps 4 – 6 until the client closes the connection.
  8. Close the server end of the connection when the client closes the connection.
  9. Delete the work item.
  10. Return to step 1.

Work Queue

The wqueue class supports the methods to add and remove work items. It encapsulates a Standard C++ list object along with the Pthread functions to serialize access to the work items and enable the producer thread to signal each consumer thread when items are added to the queue.

Server Application

WorkItem Class

The server code for the project resides in a single file server.cpp. It starts off with the headers files and the definition of the WorkItem class.

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include "thread.h"
#include "wqueue.h"
#include "tcpacceptor.h"

class WorkItem
    TCPStream* m_stream;

    WorkItem(TCPStream* stream) : m_stream(stream) {}
    ~WorkItem() { delete m_stream; }

    TCPStream* getStream() { return m_stream; }

The constructor accepts a TCPStream object pointer which can be accessed through a call to the WorkItem::getStream() method. When the WorkItem object is deleted it closes the connection by deleting the TCPStream object.

ConnectionHandler Class – Consumer Thread

The consumer threads are implemented by the ConnectionHandler class which is derived from the Thread class. The constructor is passed a reference to the work queue created in the main() function.

The run() method implements the steps discussed in the Consumer Thread section of this article. All the thread mutex locking a condition signaling is handled internally by the work queue class so we don’t have to worry about.

class ConnectionHandler : public Thread
    wqueue<WorkItem*>& m_queue;

    ConnectionHandler(wqueue<WorkItem*>& queue) : m_queue(queue) {}

    void* run() {
        // Remove 1 item at a time and process it. Blocks until an item 
        // is placed on the queue.
        for (int i = 0;; i++) {
            printf("thread %lu, loop %d - waiting for item...\n", 
                   (long unsigned int)self(), i);
            WorkItem* item = m_queue.remove();
            printf("thread %lu, loop %d - got one item\n", 
                   (long unsigned int)self(), i);
            TCPStream* stream = item->getStream();

            // Echo messages back the client until the connection is 
            // closed
            char input[256];
            int len;
            while ((len = stream->receive(input, sizeof(input)-1)) > 0 ){
                input[len] = NULL;
                stream->send(input, len);
                printf("thread %lu, echoed '%s' back to the client\n", 
                       (long unsigned int)self(), input);
            delete item; 

        // Should never get here
        return NULL;

[Lines 12-17] Prints the thread ID and waiting status. Blocks on the wqueue::remove() call until a work item is placed in the queue. Prints an indication that an item has been placed on the queue then removes the item and extracts the TCPStream object it contains.

[Lines 23-34] Continually receives messages from the client, prints them to stdout and echoes them back to the client. When the client closes the connection, the WorkItem object is deleted then the thread returns to get another item from the queue.

Main Function – Producer Thread

The main() function implements the steps discussed in the Producer Thread section of this article.

int main(int argc, char** argv)
    // Process command line arguments
    if ( argc < 3 || argc > 4 ) {
        printf("usage: %s <workers> <port> <ip>\n", argv[0]);
    int workers = atoi(argv[1]);
    int port = atoi(argv[2]);
    string ip;
    if (argc == 4) { 
        ip = argv[3];

    // Create the queue and consumer (worker) threads
    wqueue<WorkItem*>  queue;
    for (int i = 0; i < workers; i++) {
        ConnectionHandler* handler = new ConnectionHandler(queue);
        if (!handler) {
            printf("Could not create ConnectionHandler %d\n", i);

    // Create an acceptor then start listening for connections
    WorkItem* item;
    TCPAcceptor* connectionAcceptor;
    if (ip.length() > 0) {
        connectionAcceptor = new TCPAcceptor(port, (char*)ip.c_str());
    else {
        connectionAcceptor = new TCPAcceptor(port);        
    if (!connectionAcceptor || connectionAcceptor->start() > 0) {
        printf("Could not create an connection acceptor\n");

    // Add a work item to the queue for each connection
    while (1) {
        TCPStream* connection = connectionAcceptor->accept(); 
        if (!connection) {
            printf("Could not accept a connection\n");
        item = new WorkItem(connection);
        if (!item) {
            printf("Could not create work item a connection\n");

    // Should never get here

[Lines 4-13] The number of consumer threads, the listening port and the server IP address are specified on the command line. Note that the specification of a listening IP address is optional.

[Lines 16-24] Create the work queue object and the number of ConnectionHandler threads specified on the command line. For each handler call the Thread::start() method ultimately calls the ConnectionHandler::run() method.

[Lines 27-38] Create the TCPAcceptor object for the listening port and IP address, if specified, or just the listening port if the IP address is not specified. Note that specifying the server IP address will cause the TCPAcceptor to listen for connections on the network interface to which the IP address is bound. When no IP address is specified, the TCPAcceptor listens on all network interfaces.

[Lines 41-53] Called in an infinite loop, TCPAcceptor::accept() blocks until it receives a connection.  For each connection a WorkItem is created and passed a pointer to the resulting TCPStream object then placed onto the work queue.

Client Application

The client application code resides in a single file client.cpp. It starts with the header files we need from C/C++ environment and the interfaces for the TCPConnector class.  The client simply makes a connection, sends a message to the server and waits for the server to echo it back. This action is performed twice. In both cases the message sent and received back is displayed to stdout.

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include "tcpconnector.h"

using namespace std;

int main(int argc, char** argv)
    if (argc != 3) {
        printf("usage: %s <port> <ip>\n", argv[0]);

    int len;
    string message;
    char line[256];
    TCPConnector* connector = new TCPConnector();
    TCPStream* stream = connector->connect(argv[2], atoi(argv[1]));
    if (stream) {
        message = "Is there life on Mars?";
        stream->send(message.c_str(), message.size());
        printf("sent - %s\n", message.c_str());
        len = stream->receive(line, sizeof(line));
        line[len] = NULL;
        printf("received - %s\n", line);
        delete stream;

    stream = connector->connect(argv[2], atoi(argv[1]));
    if (stream) {
        message = "Why is there air?";
        stream->send(message.c_str(), message.size());
        printf("sent - %s\n", message.c_str());
        len = stream->receive(line, sizeof(line));
        line[len] = NULL;
        printf("received - %s\n", line);
        delete stream;

Test Server and Client


Get the code for the project from Github – You’ll also need the code from these repositories:

Place all the directories in the same folder then cd into mtserver/ and run make. This will build the client, server and all dependencies across the folders.


First run the server listening on TCP port 9999 and with 5 consumer threads like this:

$ ./server 5 9999 localhost
thread 4426719232, loop 0 - waiting for item...
thread 4430274560, loop 0 - waiting for item...
thread 4429737984, loop 0 - waiting for item...
thread 4428664832, loop 0 - waiting for item...
thread 4429201408, loop 0 - waiting for item...

Next run a series of clients like this:

$ client 9999 localhost; client 9999 localhost; client 9999 localhost

Six messages, two by each client, are sent to the server. Both the original and echoed messages are printed to stdout. The output of the series of client apps should look like this:

sent - Is there life on Mars?
received - Is there life on Mars?
sent - Why is there air?
received - Why is there air?
sent - Is there life on Mars?
received - Is there life on Mars?
sent - Why is there air?
received - Why is there air?
sent - Is there life on Mars?
received - Is there life on Mars?
sent - Why is there air?
received - Why is there air?

The server output should show the thread status and the messages it receives from the clients. Note that different threads handle different connections indicating the server is distributing the work items as expected.

thread 4426719232, loop 0 - got one item
thread 4426719232, echoed 'Is there life on Mars?' back to the client
thread 4430274560, loop 0 - got one item
thread 4430274560, echoed 'Why is there air?' back to the client
thread 4429737984, loop 0 - got one item
thread 4429737984, echoed 'Is there life on Mars?' back to the client
thread 4428664832, loop 0 - got one item
thread 4428664832, echoed 'Why is there air?' back to the client
thread 4429201408, loop 0 - got one item
thread 4429201408, echoed 'Is there life on Mars?' back to the client
thread 4430274560, loop 1 - waiting for item...
thread 4426719232, loop 1 - waiting for item...
thread 4430274560, loop 1 - got one item
thread 4430274560, echoed 'Why is there air?' back to the client
thread 4429737984, loop 1 - waiting for item...
thread 4428664832, loop 1 - waiting for item...
thread 4429201408, loop 1 - waiting for item...
thread 4430274560, loop 2 - waiting for item...


Article by Vic Hargrave

Software developer, blogger and family man enjoying life one cup of coffee at a time. I like programming and writing articles on tech topics. And yeah, I like coffee.


    1. Well the consumer threads are created in advance of any connections so they are a thread pool. You can specify the number of consumer threads to add to the pool on the server command line. Thanks for your comments.

  1. Great explanation.
    Could you please throw some light on handling the corruption in consumer threads?.
    Also let us suppose the consumer thread is crashing every-time it try to process particular kind of job. ?

    1. It depends on the particular problem you are talking about. By crashing I assume you mean the thread core dumps. If that happens it takes the whole process down and all the other threads the process contains. Again it is difficult to address this question generally. There are lots of things that cause processes and their threads to crash.

    1. Any type of item can be stored in the queue because it is a template collection. However, you should store pointers to object in the queue so the item is not destroyed when each item is removed from the queue.

  2. Hi Vic,
    am trying to build an asynchronous multi threaded service application. Does the below approach make sense?

    Using select() in producer thread to handle all the connections ,receive messages and add these messages to queue. And workers thread shall process the messages as per business requirement send response to client.

    1. You could use select() in the producer thread, but it’s easier to just block on accept(). With select() you’ll have to pick out the socket descriptors one-by-one anyway and hand them off to the worker threads. In my opinion select() only adds complexity to the the multithreaded model I propose in my blog.

      Now if you wanted to do away with threads altogether, you could use select() to detect incoming events, like connection requests and incoming data on connected sockets, then dispatch these events to event handlers. You could run all of this in single thread and then avoid any mutex locking. The only kicker with an event based approach is you have to make sure you don’t spend too much time transferring data in given event handler – on a given socket – so you don’t prevent the other event handlers from performing network I/O.

  3. this application may need to serve up to 1000 clients. Thus, I don’t want to create a separate thread for every client connection. Instead, single thread (producer) to handle connections accept and receive messages using select() call. And thread pool (worker threads) to process all the messages received from multiple clients.

    1. Using the select() call does not help you get better through put, necessarily, unless you do away with all threads and go to a purely event driven model. But don’t worry about 1000 clients hitting this server, unless they keep the connections open indefinitely. If transactions are on the order of seconds or even minutes, all you have to do with my model is add more threads, say 20 – 30 and set a queue size of 50 or so. This server will be able to handle a 100 clients easily, without having to worry about select(). Again, as long as the queue size does *not* grow over time – which is what happens when any given transaction per worker thread is indefinitely long – you’ll be fine with the model I propose in the blog and the tweaks I’ve suggested.

  4. I would like to add support for both ascii and wide character set (in case of unicode). How do I achieve this at the read() end?

    1. You read the bytes the same as shown in the example. Then sequence through the byte stream with a char* pointer, copying 1 byte for each ASCII character and 2 (or more) bytes for each wide character. To traverse the entire buffer you’ll increment the pointer by 1 for each ASCII character you encounter and 2 (or more) for each wide character. The key is that you need to know which characters are ASCII and which are wide if you have a mixed stream of of both.

  5. Hi Vic, is there any way to implement a timer for each connection? I have to implement multithreaded tcp server and if the client is connected longer than 45 seconds the server closes the connection. Thanks for any suggestion.

    1. I’m assuming you want your server to time out on a connection after 45 seconds of inactivity on the socket. To do this you want to use select() to detect if and when some data is received on the socket. Here is a TCPStream method that wraps that capability:

      bool TCPStream::waitForReadEvent(int timeout)
      fd_set sdset;
      struct timeval tv;

      tv.tv_sec = timeout;
      tv.tv_usec = 0;
      FD_SET(m_sd, &sdset);
      if (select(m_sd+1, &sdset, NULL, NULL, &tv) > 0)
      return true;
      return false;

      Add this as a private method in the TCPStream class – so that it can get the m_sd socket descriptor. Change the receive call in tcpstream.cpp to look like this (note you’ll have to change the declaration of this method in tcpsocket.h):

      ssize_t TCPStream::receive(char* buffer, size_t len, int timeout)
      if (timeout < = 0) return read(m_sd, buffer, len); if (waitForReadEvent(timeout) == true) { return read(m_sd, buffer, len); } return connectionTimedOut; } Then change your server receive call to look like this: if (stream != NULL) { ssize_t len; char line[256]; while ((len = stream->receive(line, sizeof(line)), 45) > 0) {
      line[len] = 0;
      printf(“received – %s\n”, line);
      stream->send(line, len);
      if (len == TCPStream::connectionTimedOut) printf(“Connection to client timed out”);
      delete stream;

      Finally, test your server by using telnet to connect to the listening IP and port but never enter a string to send. Wait for 45 seconds. The server should display a message shown indicating the connection time out.

  6. Hi Vic,
    I have implemented same multi threaded server in C which will be connected to around 150 clients at a time. All clients will wait for commands to arrive in queue else they will send every 1 second monitoring command by their own to the client. Connections will be opened indefinitely.
    Up till now I have not got into any problem.
    I need your advice should I change my design or it will work well.

    1. Hi Raju. I thought I responded to you in a private email. If not sorry about that.

      If you open 150 connections and keep them open you will need 150 threads to accommodate each client connection, again if you are keeping them open indefinitely. This will work if all you ever have to service is 150 clients. If that number increases then you’ll have to create more threads on the fly. To tell you the truth I wouldn’t do it this way. As the clients report in you can just hand off the connected socket to a thread to answer. If you want to prod the clients if you haven’t heard from them for awhile, then create a separate thread that does nothing but tell the clients to “phone home”.

      Just my 2 cents.

      1. HI Vic,
        Thank You for the reply. Kindly guide me how it can be done efficiently. I have 150 device which are connected to central server. I want every 1 seconds monitoring data from all 150 device as well as whenever their is command from user, it should be delivered to the particular device.
        I have made a prototype server based on producer consumer approach which is connected to 19 devices. I need your guidance to make it more robust & reliable.

  7. Could c++ do this without the need for these three external dependencies (threads, wqueue, tcpsockets)?

    Could the server to include the code from those dependencies what is required, to produce a stand-alone project?

  8. Hi Vic!
    I would like to comment a problem, perhaps have overlooked.
    if you perform more connections than workers threads, the last connection is able to send data, but not receive response, until some other thread is free.

    To solve this perhaps have to change the TCPAcceptor class for not allow the last conennection. Can you tell me how to include this functionality in the code?


    1. There is no problem here, per se. The queue will queue up request items that the worker threads will process as they are done with whatever they are doing at the time. If work comes in fast and each worker thread stays busy longer, then you can just increase the queue and the number of worker threads. The problem with this scenario arises when the queue grows in an unbounded fashion, i.e. the worker threads can’t keep finish their work fast enough to get to the increasing number of work items. That situation requires more thought to solve, namely restructuring the backend to have multiple servers behind a load balancer or restrucuting the client-server protocol to rate limit the request stream. At any rate, coping with unbounded queue growth is beyond the scope of this article.

  9. hi Vic!
    Thank you for this tutorial. It’s help me more understand multithread .
    I can use Epoll to improved ability to respond multiple client?
    I have one problem like this you need help? and reuse of whether your problem is not?

    Make a sample program of Multi-thread fulfilling the following conditions.

    Languages: C or C++


    ?Use JSON for data transfer

    ?The number of threads is Minimum 5

    ?Create both in-data program and out-data program.(Use TCP/IP)

    ?Enable to confirm if it works with multi-threads all times and capture it.

    ?Use 2 consoles(terminals) on the sending side and the receiving side for the operation verification.

    1. Glad you liked the tutorial. With architecture in the article you do not need to use epoll(). That function would replace teh whole threading model. It’s a viable alternative, but not what you want to with this code. All of the other system characteristics are easy to implement with the code provided in the blog.

  10. Very great article. I’m just getting into TCP and have minimal threading experience as well…double whammy. Perhaps I am looking at this from a serial-centric view as opposed to TCP but here’s what I’d like to do. I have a PC acting as a TCP server and multiple (could be up to 1000) TCP clients (embedded devices). The clients connect to the server but unlike the quasi-one-way communications demonstrated in your article, I would like to also allow the server to send data/commands to clients as well. I am thinking (please correct me if I am going about this improperly) that I can create a Transmit Data Buffer in each thread that I can stuff data in from the main thread to send. The read of the socket will be timed maybe to 50ms timeout and all received data that is deemed a proper packet, will be sent to the main thread. After reading the socket, I’d like to send any data in the Transmit Buffer out. First is this feasible? Second, what is the best way to ensure that the memory accessed between threads stay in sync? I have done mostly embedded (single threaded) applications and in that case, I simply disable interrupts, so that any shared data is only accessed by one process at a time to prevent data corruption.

    1. I think your scheme would work, but here is another approach to try. Instead of the server pushing out data to the clients, have the clients periodically heartbeat into the server. By ‘”heartbeat” I mean, send a request to the server that in effect says, “I’m here and I’d like some commands to act upon”. The server would respond to these requests by send back the commands that the clients are to perform and later on the clients send back the data, if any, they collect. This is a very easy and scalable mechanism for clients picking up things to to do from a server.

      1. Thanks for your response. I guess I still have the same underlying question though: How do I offer commands to the server thread for inclusion to send? The commands are user initiated, so from my main thread I’ll need to safely sync data between main thread and server thread.

        Also, I’ve been considering the thought of the ‘master’ PC being client and server. So the server handles connections, reads any data from the connected clients. Also, it makes (as a client) a connection back to the device. This was I kind of have a 2 lane highway for information. I figure this may use more resources, but have better response due to the blocking nature of many of the communications calls. Thoughts on that?

  11. Great article! Let say if I create 5 worker threads. How do I kill these worker threads? I use pthread_cancel(workerThreadID) call with different “workerThreadID” but I can only kill one worker thread. I wonder why?

    1. Just delete the Thread subclass object, the Thread class destructor will do the rest. Consider this trivial example:

      #include “thread.h”

      class MyThread : public Thread
      void *run() {
      return NULL;

      ~MyThread() {
      printf(“MyThread cancelled\n”);

      int main(int argc, char** argv)
      MyThread* thread1 = new MyThread();
      MyThread* thread2 = new MyThread();
      delete thread1;
      delete thread2;
      printf(“main done\n”);

  12. Thank you for your quick response. I tried with trivial code above. I put an extra sleep(10) between delete statements. However, when it executes the first “delete thread1;” statement, both of my threads were kill. I even took out the second “delete thread2;” statement and the result was the same, both threads were kill. Verified by “ps -efL”. So how do I just kill one thread and leave the other running? Anyway, back to my original problem of testing your Producer Thread. I create 5 worker threads and save the ConnectionHandler* handler(s) (line 18) into a list and than later on delete the handler from the list. However, I experienced the same problem as using the pthread_cancel, only one worker thread was kill and the rest still left running. Any thoughts?

  13. Sorry my fault. After further testing the trivial code works as expected. I did not sleep long enough in the thread. However, I still having problem with the Producer Thread and not able to kill more than one worker thread. I suspect may be because of the wait for remove statement in the ConnectionHandler class (line 14) WorkItem* item = m_queue.remove(); that may have prevented the worker threads from being killed. Any ideas?

    1. You don’t want the worker threads to be killed. You want them around so your server can handle more requests. The whole idea of creating a thread pool is to avoid having to create threads as requests are received by the server. You only want to delete work items as they are finished.

  14. The needed for killing worker threads. In some cases, I don’t want to provide services 24/7 and or on the same server port. I want to be able to close and perhaps open for services on a different server port without the needed to restart the application.

  15. Thanks for great article Vic.
    But i have one question in my mind regarding monitoring of thread. Suppose we have created 5 Producer each of them waiting for work to arrive at specified location . Once work packet is received producer will process the request and place data in queue for worker thread pool to process it . How do i monitor all this process ensure all thread are working. If any Producer thread get stopped or killed , it should get restart to process next incoming request. Should i create another thread called as monitor thread who can keep track of all the thread working status? It would be great if u can help me with design of this class.

    1. The key metric to monitor is whether the queue is growing over time, which means the number of active threads cannot keep up with the request load. When this happens the server needs to increase the number of worker threads. One way to do that is create a separate monitor thread, as you suggested, that checks the queue size every few minutes (5-10, say) to make sure that the queue is not growing. If it is then this monitor thread should create a few more.

      When it comes to threads being killed, that won’t happen unless the server kills them. If a thread encounters a problem that makes it crash, then the whole server will go down, not just the one thread. Remember threads operate within a single process. To prevent this situation, your server should use exception and/or signal handlers to catch issue the come up so the server doesn’t crash.

Leave a Reply

Your email address will not be published. Required fields are marked *