Version 0 of A Thread-Safe Message Queue

Updated 2007-02-05 15:22:48 by NEM

NEM 5 Feb 2007: A message on comp.lang.tcl asked if there was an implementation of a message queue for Tcl's threads. A message queue is a queue data structure that supports inter-process communication (IPC). The idea is that a producer thread pushes messages onto the queue at one rate and then a consumer thread can remove them as it is ready to process them. The implementation below is synchronous, in that the queue has a fixed size. Any attempt to push a message onto a full queue or to read from an empty queue will cause the calling thread to suspend until the condition is satisified. This allows for a simple means of coordinating workload between two threads. (In other words, the implementation below is equivalent to a simple bounded buffer). More advanced implementations are possible, such as allowing the consumer to specify a pattern -- only messages which match the pattern are removed from the queue (similar to Erlang's receive statement).

 # mqueue.tcl --
 #
 #       Simple blocking message queue for Tcl threads.
 #
 # Copyright (c) 2007 Neil Madden.
 #
 # License: http://www.cs.nott.ac.uk/~nem/license.terms (Tcl-style).

 package require Tcl     8.5
 package require Thread  2.6
 package provide mqueue  0.1

 namespace eval mqueue {
     namespace export create destroy push pop
     namespace ensemble create
     tsv::lock ::mqueue {
         if {![tsv::exists ::mqueue id]} {
             tsv::set ::mqueue id 0
         }
     }

     proc lock {mutex script} {
         thread::mutex lock $mutex
         catch { uplevel 1 $script } ret opts
         thread::mutex unlock $mutex
         return -options $opts $ret
     }

     proc create {{size 1}} {
         tsv::lock ::mqueue {
             set id [tsv::get ::mqueue id]
             set self "::mqueue[incr id]"
             tsv::set $self mutex [thread::mutex create]
             tsv::set $self read  [thread::cond create]
             tsv::set $self write [thread::cond create]
             tsv::set $self size  $size
             tsv::set $self buffer [list]
         }
         return $self
     }

     proc destroy queue {
         tsv::lock ::mqueue {
             thread::cond  destroy [tsv::get $queue read]
             thread::cond  destroy [tsv::get $queue write]
             thread::mutex destroy [tsv::get $queue mutex]
             tsv::unset $queue
         }
     }

     proc push {queue data} {
         lock [tsv::get $queue mutex] {
             while {[tsv::llength $queue buffer] >= [tsv::get $queue size]} {
                 # Full already
                 thread::cond wait [tsv::get $queue write] \
                     [tsv::get $queue mutex]
             }
             tsv::lappend $queue buffer $data
             thread::cond notify [tsv::get $queue read]
         }
     }

     proc pop queue {
         lock [tsv::get $queue mutex] {
             while {[tsv::llength $queue buffer] == 0} {
                 # Empty
                 thread::cond wait [tsv::get $queue read] \
                     [tsv::get $queue mutex]
             }
             tsv::set $queue buffer [lassign [tsv::get $queue buffer] ret]
             thread::cond notify [tsv::get $queue write]
         }
         return $ret
     }
 }

This should be basically thread-safe, except that calling destroy while the queue is in use will likely result in an error. However, shared state concurrency is hard to do right so a review would be welcome, especially from someone who actually uses the thread package regularly (I don't use it very often).

As an example, here is a simple producer/consumer scenario:

 package require mqueue 0.1
 set t [thread::create]
 thread::send $t {
     package require mqueue 0.1
     proc produce queue {
         puts "Producer thread starting..."
         while 1 {
             puts "Looping"
             mqueue push $queue "Message: [incr i]"
         }
     }
 }
 set q [mqueue create 5]
 thread::send -async $t [list produce $q]
 while 1 { puts [mqueue pop $q]; after 200 }

[ Category Threads | Category Data Structure | Category Interprocess Communication ]