A higher-level channel API

NEM 12 Oct 2006: Despite the generally very high-level of abstraction offered by Tcl's built-in channel capabilities, I still find myself having to search through documentation and examples to find exactly what combination of eof, fblocked, fileevent and fconfigure magic I need to get a particular example working. In most of these cases, the result is pretty similar. It therefore seems like a good idea to try and package up some of the more common patterns as reusable commands, possibly for inclusion in tcllib. To this end, I've knocked up an initial "channel" package/ensemble to complement 8.5's chan command. This initial version supports a single command "iterate" that can be used to easily set-up a correct fileevent with much of the configuration and error handling taken care of. The API is:

channel iterate chan ?options..? (cmd | params body):

Arranges for a callback to be invoked for every block of data read from the channel. The callback can be given as either a command prefix list (like other callbacks in Tcl), or as a pair of arguments specifying the parameters and body of an anonymous procedure to be invoked using apply. In either case, the callback will be invoked for each block of data read from the channel with the channel and the actual data as arguments. Note that the callback is executed in a new scope, so variables defined outside of the body will not be visible when the body runs.

The options available are:

-blocksize
"line" or an integer (default: "line"). Max # of chars read before each callback.
-errorcommand
command prefix (default: "channel error"). Callback to invoke if an error occurs on the channel. Called with channel, errorCode and error message args.
-eofcommand
command prefix (default: "chan close"). Callback to invoke when the channel reaches eof. Responsible for closing the channel.

As an example, here is a simple echo server that can handle multiple simultaneous connections:

 proc accept {sock addr port} {
     puts "CONNECT: $sock $addr\:$port"
     chan configure $sock -buffering line
     channel iterate $sock {sock line} { puts $sock $line }
     # or just: channel iterate $sock puts, as puts takes the right args
 }
 socket -server accept 9000

That's it! We can add disconnection logging easily, just change the channel iterate to:

 channel iterate $sock -eofcommand cleanup {sock line} { puts $sock $line }

and add:

 proc cleanup sock {
     puts "DISCONNECT: $sock"
     close $sock
 }

Comments and suggestions very much welcome. In particular, if someone can check through the logic to make sure I have actually dealt with the various error conditions correctly. Possible future enhancements:

  • Add support for synchronous iteration too (i.e., basically just a read loop without using the event loop). Maybe make default, with a -async option?
  • Related to above; support for blocking channels (currently all chans are configured -blocking 0).
  • Should the "params body" form actually create a simple closure (as in simple closures and objects), to ease quoting issues? (Might then also add a -environment option to specify the environment dict).
  • Anything else? Other commands people would like to see?

APN 20101026 Neat, but perhaps it would be better to disable the read event handler before invoking the user callback and re-enable it once the user callback returns. Otherwise, unexpected behaviour might result if the callback enters the event loop (something as simple as a puts to tkcon) while more input data is pending in the channel.


NEM slightly later: added a "with" command, which is a slight variation on using. Example:

 channel with [open somefile.txt] fd { puts [read $fd] }

I re-arranged the args slightly so that the channel comes before the varName. This is partly with an eye towards TOOTification -- if channel is first arg of all methods, then you can sugar an object-like interface:

 proc channel: {chan method args} {
     uplevel 1 [linsert $args 0 ::channel $method $chan]
 }
 proc def {name = cmd args} { interp alias {} $name {} $cmd {*}$args }
 def out = channel: [socket localhost 9000]
 out configure -blocking 0 -buffering line
 out puts "Enter text:"
 out iterate {sock line} {
     puts $line
     puts $sock [gets stdin]
 }
 vwait forever

Also changed the ensemble to delegate unknown commands to chan, as you can see from the above.


SEH 20120502 -- If an eofcommand other than the default is specified that doesn't happen to close the channel, the command goes into an infinite loop, since a channel that has reached eof is still considered readable according to the manual. So the readable fileevent fires endlessly. I added a couple of lines to remove the readable fileevent if an error or eof is encountered (bumped version to 0.3).


Code

 # channel.tcl --
 #
 #       General high-level channel utilities to complement Tcl's [chan]
 #       ensemble.
 #
 # Copyright (c) 2006 Neil Madden.
 #
 # License: http://www.cs.nott.ac.uk/~nem/license.terms
 #
 
 package require Tcl     8.5
 package provide channel 0.4
 
 namespace eval channel {
     namespace export iterate error with
     namespace ensemble create -unknown ::channel::unknown

     # delegate unknown commands to the ::chan ensemble
     proc unknown {_ cmd args} { list ::chan $cmd }
 
     # channel iterate chan ?options..? [cmd | params body]
     #
     #       Arranges for a callback to be invoked for every block of data read
     #       from the channel. The callback can be given as either a command
     #       prefix list or as a pair of arguments specifying the parameters and
     #       body of an anonymous procedure to be invoked. In either case, the
     #       callback will be invoked for each block of data read from the
     #       channel with the channel and the data read as arguments. Note that
     #       the callback is executed in a new scope, rather than in the caller's
     #       scope, so variables defined outside of the body will not be
     #       visible when the body runs.
     #
     # OPTIONS
     #       -blocksize      "line" or an integer > 0 [default: "line"]
     #                       Maximum number of characters to read before each
     #                       callback. If the value is "line" then [chan gets] is
     #                       used rather than [chan read].
     #       -errorcommand   cmd [default "channel error"]
     #                       Callback to invoke if an error occurs on the
     #                       channel. Called with the channel, the errorCode and
     #                       the error message of the error as arguments.
     #       -eofcommand     cmd [default "chan close"]
     #                       Callback to invoke when the end of the channel is
     #                       reached.
     #
     # RETURNS
     #       The channel.
     #
     # SIDE-EFFECTS
     #       Sets up a channel event handler to invoke the callback. Other
     #       side-effects may be caused by various callbacks. Configures the
     #       channel to be non-blocking.
     #
     proc iterate {chan args} {
         set usage "channel iterate chan ?options..? (cmd | params body)"
         if {[llength $args] < 1} {
             return -code error "wrong # args: should be \"$usage\""
         }
         set options {
             -blocksize          "line"
             -errorcommand       ::channel::error
             -eofcommand         {::chan close}
         }
         if {[llength $args] % 2 == 0} {
             if {[llength $args] < 2} {
                 return -code error "wrong # args: should be \"$usage\""
             }
             set params  [lindex $args end-1]
             set body    [lindex $args end]
             set ns      [uplevel 1 { namespace current }]
             set command [list ::apply [list $params $body $ns]]
             set args [lrange $args 0 end-2]
         } else {
             set command [lindex $args end]
             set args [lrange $args 0 end-1]
         }
         dict for {option value} $args {
             if {![dict exists $options $option]} {
                 set ops [CommaJoin [dict keys $options]]
                 return -code error "invalid option \"$option\": must be $ops"
             }
             dict set options $option $value
         }
         chan configure $chan -blocking 0
         chan event $chan readable [list [my ReadChan] $chan $command $options]
         return $chan
     }
 
     # channel error chan code message --
     #
     #       Default callback for handling errors on a channel. This callback
     #       simply closes the channel and then rethrows the error.
     #
     proc error {chan code message} {
         catch { chan close $chan }
         return -code error -errorcode $code $message
     }

     # channel with chan varName body --
     #
     #      Assigns the varName with the channel given and then evaluates the
     #      body in the caller's scope. Once the body has completed the channel
     #      is closed (even in the case of error).
     #
     proc with {chan varName body} {
         upvar 1 $varName var
         set var $chan
         set rc [catch { uplevel 1 $body } result ops]
         catch { chan close $chan }
         # TODO: do we need to process -level specially?
         return -options $ops $result
     }
 
     interp alias {} [namespace current]::my {} namespace which -command
 
     # CommaJoin list --
     # 
     #       Creates a human-readable version of the list, by separating elements
     #       with commas. The last two elements are separated by ", or " if there
     #       are more than 2 elements in total, and by " or " if there are
     #       exactly two elements.
     #
     proc CommaJoin list {
         if {[llength $list] < 2} {
             return $list
         } elseif {[llength $list] == 2} {
             return [join $list " or "]
         } else {
             set start [join [lrange $list 0 end-1] ", "]
             return [concat $start ", or " [lindex $list end]]
         }
     }
 
     # ReadChan chan command options --
     #
     #       Main channel readable event handler. Invokes appropriate callbacks
     #       according to the condition of the channel.
     #
     proc ReadChan {chan command options} {
         set size [dict get $options -blocksize]
         if {$size eq "line"} {
             set status [catch { gets $chan } data]
         } else {
             set status [catch { read $chan $size } data]
         }
         if {$status != 0} {
             # Error occurred
             chan event $chan readable {}
             invoke [dict get $options -errorcommand] $chan $::errorCode $data
         } elseif {[chan eof $chan]} {
             chan event $chan readable {}
             invoke [dict get $options -eofcommand] $chan
         } elseif {[chan blocked $chan]} {
             # Not enough data, just return
             return
         } else {
             # OK
             invoke $command $chan $data
         }
     }
     proc invoke {cmd args} { uplevel #0 $cmd $args }
 }