# mqueue.tcl -- # # Simple blocking message queue for Tcl threads. # # Copyright (c) 2007 Neil Madden. # # License: http://www.cs.nott.ac.uk/~nem/license.terms (Tcl-style). package require Tcl 8.4 package require Thread 2.6 package provide mqueue 0.2 namespace eval mqueue { namespace export create destroy push pop proc ::mqueue {subcommand args} { uplevel 1 [linsert $args 0 ::mqueue::$subcommand] } tsv::lock ::mqueue { if {![tsv::exists ::mqueue id]} { tsv::set ::mqueue id 0 } } proc lock {mutex script} { thread::mutex lock $mutex set rc [catch { uplevel 1 $script } ret] thread::mutex unlock $mutex return -code $rc $ret } proc create {{size 1}} { set id [tsv::incr ::mqueue id] set self "::mqueue$id" tsv::set $self mutex [thread::mutex create] tsv::set $self read [thread::cond create] tsv::set $self write [thread::cond create] tsv::set $self size $size tsv::set $self buffer [list] return $self } proc destroy queue { thread::cond destroy [tsv::get $queue read] thread::cond destroy [tsv::get $queue write] thread::mutex destroy [tsv::get $queue mutex] tsv::unset $queue } proc push {queue data} { lock [tsv::get $queue mutex] { while {[tsv::llength $queue buffer] >= [tsv::get $queue size]} { # Full already thread::cond wait [tsv::get $queue write] \ [tsv::get $queue mutex] } tsv::lappend $queue buffer $data thread::cond notify [tsv::get $queue read] } } proc pop queue { lock [tsv::get $queue mutex] { while {[tsv::llength $queue buffer] == 0} { # Empty thread::cond wait [tsv::get $queue read] \ [tsv::get $queue mutex] } set ret [tsv::lpop $queue buffer] thread::cond notify [tsv::get $queue write] } return $ret } }This should be basically thread-safe, except that calling destroy while the queue is in use will likely result in an error. However, shared state concurrency is hard to do right so a review would be welcome, especially from someone who actually uses the thread package regularly (I don't use it very often).As an example, here is a simple producer/consumer scenario:
package require mqueue 0.1
set t [thread::create]
thread::send $t {
package require mqueue 0.1
proc produce queue {
puts "Producer thread starting..."
while 1 {
puts "Looping"
mqueue push $queue "Message: [incr i]"
}
}
}
set q [mqueue create 5]
thread::send -async $t [list produce $q]
while 1 { puts [mqueue pop $q]; after 200 }[ Category Threads | Category Data Structure | Category Interprocess Communication ]