Blocking Queues & Thread’s Communication in C#

[digg]

In one of my previous [posts], I discussed about creating adapter around “Thread.Monitor”. Today I’ll extend the discussion on how to utilize the same technique in creating blocking & synchronized queues and how they can be used in thread’s communication as shared resources. In most of our real world applications, information is sent from one part of the application to another part in orderly fashion,  and the data structure we use for this purpose is First in First out (FIFO) data structure or a Queue.

Queues

Queue is an integral part of .NET collection classes under “System.Collections”. While I was writing a multi-threaded TCP/IP Socket application, I encountered that a very important functionality, that is,  the blocking functionality is altogether missing from the queue. The blocking queue is absolute necessary to synchronize a “Producer” and a “Consumer” Thread. So, I thought why not extend the queue using our famous and greatest “Extension Methods” technique and here is the code for its extension.

   1:  using System;
   2:  using System.Threading;
   3:  using System.Diagnostics;
   4:   
   5:  namespace System.Collections
   6:  {
   7:      public static class QueueSyncExtension
   8:      {
   9:         /// <summary>
  10:         /// Enqueue an item, and notify.
  11:         /// </summary>
  12:         /// <param name="queue:Queue"></param>
  13:         /// <param name="message:object"></param>
  14:          public static void EnqueueNotify(this Queue queue, object message)
  15:          {
  16:              if (message == null)
  17:                  throw new ArgumentNullException("message");
  18:   
  19:              Queue syncQueue = Queue.Synchronized(queue);
  20:              lock (syncQueue.SyncRoot)
  21:              {
  22:                  syncQueue.Enqueue(message);
  23:                  syncQueue.SyncRoot.Notify();
  24:              }
  25:          }
  26:   
  27:          public static object DequeueBlocking(this Queue queue)
  28:          {
  29:              return DequeueBlocking(queue, Timeout.Infinite);
  30:          }
  31:   
  32:          /// <summary>
  33:          /// Blocked Dequeue method
  34:          /// </summary>
  35:          /// <param name="queue;Queue"></param>
  36:          /// <param name="timeOutMilliseconds:int"></param>
  37:          /// <returns>message:object</returns>
  38:          public static object DequeueBlocking(this Queue queue, 
  39:              int timeOutMilliseconds)
  40:          {
  41:              // Create Sync copy
  42:              Queue syncQueue = Queue.Synchronized(queue);
  43:             
  44:              // The queue is blocked here till signaled.
  45:              bool status = Block(syncQueue, timeOutMilliseconds);
  46:              // Debug.WriteLine("Wait status: " + status.ToString());
  47:   
  48:              object message = null;
  49:              if (syncQueue.Count > 0)
  50:              {
  51:                  // Get the next message.
  52:                  lock (syncQueue.SyncRoot)
  53:                  {
  54:                      if (syncQueue.Count > 0)
  55:                      {
  56:                          message = queue.Dequeue();
  57:                      }
  58:                  }
  59:              }
  60:              return message;
  61:          }
  62:   
  63:          /// <summary>
  64:          /// Blocks forever, until signaled.
  65:          /// </summary>
  66:          /// <param name="syncQueue:Queue"></param>
  67:          /// <returns>bool</returns>
  68:          private static bool Block(Queue syncQueue)
  69:          {
  70:              return Block(syncQueue, Timeout.Infinite);
  71:          }
  72:   
  73:          /// <summary>
  74:          /// Blocks until signaled or timeout.
  75:          /// </summary>
  76:          /// <param name="syncQueue:Queue"></param>
  77:          /// <param name="timeOutMilliseconds:int"></param>
  78:          /// <returns>status:bool</returns>
  79:          private static bool Block(Queue syncQueue, 
  80:              int timeOutMilliseconds)
  81:          {
  82:              bool status = true;
  83:              while (syncQueue.Count <= 0)
  84:              {
  85:                  lock (syncQueue.SyncRoot)
  86:                  {
  87:                      status = syncQueue.SyncRoot.Wait(timeOutMilliseconds);
  88:                  }
  89:              }
  90:              return status;
  91:          }
  92:      }
  93:  }

Two new methods are introduced in the Queue, “EnqueueNotify” and “DequeueBlocking”. The EnqueueNotify method signals/notifies when a message is added to the queue while the DequeueBlocking is listener, blocks until notified that a message is added to the queue and removes it from the queue and starts listening again. For all these notifications like syncQueue.SyncRoot.Notify(…) and syncQueue.SyncRoot.Wait(…), please read my previous [post] on the Extension methods.

Show Time :)
I have created an application for elaborating this concept, I created two threads (Tx/Rx), where One of the thread is waiting on a Receiver Queue (RxSyncQueue), while other is waiting on Transmit Queue (TxSyncQueue). Here is the diagram for it.

   Threading

And here is the code for it:

   1:  namespace ConsoleTestApp
   2:  {
   3:      class Program
   4:      {
   5:          static void Main(string[] args)
   6:          {
   7:              Console.WriteLine("Main thread | " +Thread.CurrentThread.GetHashCode().ToString());
   8:              
   9:              Queue RxSyncQueue = Queue.Synchronized(new Queue());
  10:              Queue TxSyncQueue = Queue.Synchronized(new Queue());            
  11:              
  12:              Thread Rx = new Thread(delegate()
  13:              {
  14:                  Console.WriteLine("Entering Rx | " +  Thread.CurrentThread.GetHashCode().ToString());
  15:   
  16:                  // block for ever...
  17:                  int i = 0;
  18:                  while(true)
  19:                  {
  20:                      object message = RxSyncQueue.DequeueBlocking();
  21:                      Console.WriteLine("Dequeued message | " + message.ToString());
  22:                      TxSyncQueue.EnqueueNotify((++i).ToString() + ":" + message.ToString());
  23:                  }
  24:              });
  25:              Rx.Start();
  26:   
  27:              Thread Tx = new Thread(delegate()
  28:              {
  29:                  Console.WriteLine("Entering Tx | " + Thread.CurrentThread.GetHashCode().ToString());
  30:   
  31:                  // block for ever...
  32:                  while (true)
  33:                  {
  34:                      object message = TxSyncQueue.DequeueBlocking();
  35:                      // lets take some rest.
  36:                      Thread.Sleep(1000);
  37:                      // Enqueue to the Rx-Queue this message again.
  38:                      RxSyncQueue.EnqueueNotify(message);
  39:                  }
  40:              });
  41:              Tx.Start();
  42:   
  43:              // pump priming...
  44:              TxSyncQueue.EnqueueNotify("Run");
  45:   
  46:              // wait here for Tx/Rx to join us... (would never happen :);
  47:              Tx.Join();      // processing halts here until the Tx is done.
  48:              Rx.Join();      // processing halts here until the Rx is done.
  49:   
  50:              Console.WriteLine("Main thread again | " + Thread.CurrentThread.GetHashCode().ToString());
  51:          }
  52:          /*
  53:           * Output :>
  54:           * 
  55:              Main thread | 1
  56:              Entering Rx | 3
  57:              Entering Tx | 4
  58:              Dequeued message | Run
  59:              Dequeued message | 1:Run
  60:              Dequeued message | 2:1:Run
  61:              Dequeued message | 3:2:1:Run
  62:              Dequeued message | 4:3:2:1:Run
  63:              Dequeued message | 5:4:3:2:1:Run
  64:              ...
  65:              ...
  66:           * 
  67:           */
  68:      }
  69:  }

Both the Thread Delegates in “Tx” and “Rx”, become Closures (functional programming term) around the queues (“RxSyncQueue” and “TxSyncQueue”), and thus these queues are visible to the threads and that's why we call them shared resources. Now, In order to start pumping the messages back and forth I just added one message into the queue at Line # 44, for demonstration purpose and rest of the message pumping is automatic. Here in this scenario, both the threads are producers as well as consumers, where when a message is produced by the “Tx” thread is pumped to ”RxSyncQueue” so that it is consumed by “Rx” thread and when “Rx” produces a message is pumped to the “TxSyncQueue” that is being consumed by the “Tx” thread. Or we can say that both the threads are notifiers as well as listeners. The output is shown in lines 55-63. In real world application like the TCP/IP Client, one thread may be a receiver thread, that receives messages from the TCP-Server and do useful things, while and the other thread may be used to transmit messages to the TCP-server or so.

That's all for now folks, will talk on the generic blocking queues next time. I’ll be looking forward to your feedbacks/comments, enjoy :)

Download File - Threading Test Project

If you enjoyed reading this blog, leave your valuable feedback and consider subscribing to the RSS feed. You can also subscribe to it by email. Also, you can follow me on Twitter. Thank you!
Comments are closed