Updated 2018-04-08 00:52:06 by AMG

AMG: The async package manages asynchronous communication with any number of child processes.

Example  edit

coroutine xxx apply {{} {
    set id1 [async::exec sort]
    chan puts $id1 b\na\nc
    set id2 [async::exec sort]
    async::close -direction stdin $id1
    chan puts $id2 z\na\nc
    async::close -direction stdin $id2
    chan puts [async::get $id2]
    chan puts [async::read -limit 2 -dropNewline $id2]
    chan puts [async::read -dropNewline $id1]
    async::close $id2
    async::close $id1
    set ::end 1
}}
vwait end

The above will print "a c a b c", each character on a separate line.

Commands  edit

[async::exec]

Executes a child process command. The returned channel identifier can be used to asynchronously communicate with the child process via the [async::*] and selected [chan *] commands. The valid [chan] subcommands are: puts, flush, copy (recommend outputChan only), configure, push, and pop. The channel is configured to be nonblocking with binary translation and line buffering. Please do not change the blocking mode. To block, instead omit the -noWait switch to [async::get] and [async::read].

[async::channels]

Returns a list of all currently open asynchronous channel IDs.

[async::close]

Closes a currently open asynchronous channel. A channel may be partially closed, for example to send the sort(1) program EOF on its stdin after which it will start writing results to its stdout. Unread data will be discarded, possibly except for stderr. If stderr is being closed and -stderr ignore was not used, any unread stderr data is thrown with ASYNC STDERR.

async::close ?-direction all|stdin|stdout|stderr? ?-stderr ignore|throw? id
-direction all Fully closes channel. The default.
-direction stdin Closes stdin component of channel.
-direction stdout Closes stdout component of channel.
-direction stderr Closes stderr component of channel.
-stderr ignore Ignores any unread data on stderr.
-stderr throw Throws any stderr data with ASYNC STDERR. The default.
id The asynchronous channel identifier returned by [async::exec].

[async::get]

Gets one line of text from the stdout and/or stderr of an asynchronous child process, according to the value of the -stderr switch, which defaults to -stderr throw. The trailing newline is stripped, unless the -keepNewline switch is used. Yields until the requested data is available, unless the -noWait switch is used. The result is returned or written into a variable, in which case the number of characters read is returned. In the case of -noWait with a variable, -1 is returned rather than 0 if the requested data is not immediately available. At end of file, the last line of stdout or stderr is considered complete even if it does not end in a newline. If called at end of file after all data has already been read and returned, ASYNC EOF is thrown.

async::get ?-noWait? ?-keepNewline? ?-variable varName? ?-stderr mode? id
-noWait Returns immediately rather than yield if data is not available.
-keepNewline Does not strip the trailing newline.
-variable varName Name of variable into which to write the result.
-stderr ignore Ignores stderr and reads only stdout.
-stderr merge Reads from either stderr or stdout, whichever is available.
-stderr tag Like merge, except the return value is a tagged list.
-stderr read Ignores stdout and reads only stderr.
-stderr throw Returns stdout normally and throws stderr with ASYNC STDERR.
id The asynchronous channel identifier returned by [async::exec].

[async::read]

Gets a block of data from the stdout and/or stderr of an asynchronous child process, according to the value of the -stderr switch, which defaults to -stderr throw. Yields until the requested data is available, unless the -noWait switch is used. If called at end of file after all data has already been read and returned, ASYNC EOF is thrown.

async::get ?-noWait? ?-dropNewline? ?-limit size? ?-stderr mode? id
-noWait Returns immediately rather than yield if data is not available.
-dropNewline Strips the trailing newline.
-limit size Maximum number of characters to read and return.
-stderr ignore Ignores stderr and reads only stdout.
-stderr merge Reads from either stderr or stdout, whichever is available.
-stderr tag Like merge, except the return value is a tagged list.
-stderr read Ignores stdout and reads only stderr.
-stderr throw Returns stdout normally and throws stderr with ASYNC STDERR.
id The asynchronous channel identifier returned by [async::exec].

[async::fill]

Reads all available data from stdout and stderr into internal buffers. The current coroutine yields until at least one character is successfully read or end of file is reached. This procedure need not be called by user code.

[async::flow]

Reads all immediately available data from stdout or stderr into internal buffers. This procedure need not be called by user code. Returns the number of characters that were read, 0 if no data was available, or -1 if at end of file or the requested direction was locally closed.

Discussion  edit

AMG: I'm a little hesitant about a few design choices.

Is it really the right thing to have separate [async::get] and [async::read] commands? I did this to mirror [chan gets] and [chan read], but then my switch syntax diverged considerably. There's so much code overlap, I could combine the two, but perhaps it's simpler to use if they are kept separate.

Do I really want to have the block size, line termination mode, etc. apply to stderr when using -stderr throw? Or should I have -stderr throw always just throw all available stderr text just the way it appears?

AMG: It might be nice to allow [async::get] and [async::read] to take more than one id argument so they'll return data from whichever is first available. To be useful, this would require an expansion of the tagged list return format, or else the caller wouldn't be able to tell where the data is coming from.

Another way to get some of the same capability would be a command to report which channels are readable, optionally waiting for one to be readable. Then get data from whichever channel ID it returns.

Having timeouts or being interrupted by other kinds of events could be cool too. Just call the coroutine from wherever. However, this would require establishing a protocol by which the coroutine is invoked so that it knows what kind of event occurred and how to handle it and report it to the original caller. See also: Wibble's "icc" system.

AMG: Maybe this can be included in tcllib someday.

PYK 2018-03-24: Doesn't tcllib::coroutine already cover precisely this territory?

AMG: Maybe. I didn't have access to Tcllib (or the Internet) in the environment for which I needed this code.

AMG: Actually, no. I just had a look at tcllib::coroutine and saw that it does not bundle stdin, stdout, and stderr the way this async package does, making it an ill fit for separately managing stdout and stderr of a child process. Aside from that, it comes very close.

AMG: There needs to be a way to import channels created externally, e.g. from sockets or regular file opens, or even just stdin/stdout. Alternately, or additionally, file and network open commands can be provided. Right now it's only possible to talk to child processes, short of manually editing the ::async::Channels dict.

Code  edit

# async.tcl
# Andy Goth <[email protected]>

# Require [dict], [throw], [chan pipe], [chan close dir], and [coroutine].
package require Tcl 8.6

# Create namespace.
namespace eval ::async {}

# Asynchronous channel internal data.
set ::async::Channels {}

# async::exec --
# Executes a child process command.  The returned channel identifier can be used
# to asynchronously communicate with the child process via the [async::*] and
# selected [chan *] commands.  The valid [chan] subcommands are: puts, flush,
# copy (recommend outputChan only), configure, push, and pop.  The channel is
# configured to be nonblocking with binary translation and line buffering.
# Please do not change the blocking mode.  To block, instead omit the -noWait
# switch to [async::get] and [async::read].
proc ::async::exec {args} {
    variable Channels

    # Create a pipe to collect stderr from the child process.
    lassign [chan pipe] chanStderr chanStderrWrite

    # Execute the child process.
    set chanInOut [open |[list {*}$args 2>@ $chanStderrWrite] a+]
    chan close $chanStderrWrite

    # Configure stdin/stdout and stderr channels.
    chan configure $chanInOut -blocking 0 -translation binary -buffering line
    chan configure $chanStderr -blocking 0 -translation binary -buffering line

    # Initialize the channel buffers.
    dict set Channels $chanInOut open {stdin {} stdout {} stderr {}}
    dict set Channels $chanInOut chan stdout $chanInOut
    dict set Channels $chanInOut chan stderr $chanStderr
    dict set Channels $chanInOut buf {stdout {} stderr {}}
    dict set Channels $chanInOut eof {flag 0 message {}}

    # Return the input/output channel to the caller.
    return $chanInOut
}

# async::channels --
# Returns a list of all currently open asynchronous channel IDs.
proc ::async::channels {} {
    variable Channels
    dict keys $Channels
}

# async::close --
# Closes a currently open asynchronous channel.  A channel may be partially
# closed, for example to send the sort(1) program EOF on its stdin after which
# it will start writing results to its stdout.  Unread data will be discarded,
# possibly except for stderr.  If stderr is being closed and -stderr ignore was
# not used, any unread stderr data is thrown with ASYNC STDERR.
#
# async::close ?-direction all|stdin|stdout|stderr? ?-stderr ignore|throw? id
# -direction all: Fully closes channel.  The default.
# -direction stdin: Closes stdin component of channel.
# -direction stdout: Closes stdout component of channel.
# -direction stderr: Closes stderr component of channel.
# -stderr ignore: Ignores any unread data on stderr.
# -stderr throw: Throws any stderr data with ASYNC STDERR.  The default.
# id: The asynchronous channel identifier returned by [async::exec].
proc ::async::close {args} {
    variable Channels

    # Parse arguments.
    set direction all
    set stderrMode throw
    while {[llength $args]} {
        set args [lassign $args arg]
        if {[info exists id]} {
            return -code error "wrong # args: should be \"async::close\
                    ?-direction all|stdin|stdout|stderr?\
                    ?-stderr ignore|throw? id"
        } elseif {$arg eq "-direction"} {
            if {![llength $args]
             || [lindex $args 0] ni {all stdin stdout stderr}} {
                return -code error "-direction switch must be followed by:\
                        all, stdin, stdout, or stderr"
            }
            set args [lassign $args direction]
        } elseif {$arg eq "-stderr"} {
            if {![llength $args]
             || [lindex $args 0] ni {ignore throw}} {
                return -code error "-stderr switch must be followed by:\
                        ignore or throw"
            }
            set args [lassign $args stderrMode]
        } elseif {![dict exists $Channels $arg]} {
            return -code error "not an open asynchronous channel: $arg"
        } else {
            set id $arg
        }
    }

    # Get access to the channel status variables.
    dict with Channels $id {
        # When closing stderr and using -stderr throw, check for unread stderr.
        if {$stderrMode eq "throw" && $direction in {all stderr}} {
            flow $id stderr
            set stderrData [dict get $buf stderr]
        }

        if {$direction eq "all"} {
            # If requested, close all open channels.
            if {[dict exists $open stderr]} {
                chan close [dict get $chan stderr]
            }
            if {[dict exists $open stdin] || [dict exists $open stdout]} {
                chan close [dict get $chan stdout]
            }
            dict unset Channels $id
        } elseif {![dict exists $open $direction]} {
            # Check for double close.
            return -code error "direction \"$direction\" already closed for\
                    asynchronous channel: $id"
        } else {
            # Close only the requested direction.
            if {$direction eq "stdin"} {
                chan close [dict get $chan stdout] write
            } elseif {$direction eq "stdout"} {
                chan close [dict get $chan stdout] read
            } else {
                chan close [dict get $chan stderr]
            }

            # Clean up the asynchronous channel data as appropriate.
            dict unset open $direction
            if {![dict size $open]} {
                dict unset Channels $id
            }
        }

        # Throw any unread stderr data.
        if {[info exists stderrData] && $stderrData ne {}} {
            throw {ASYNC STDERR} $stderrData
        }
    }
}

# async::get --
# Gets one line of text from the stdout and/or stderr of an asynchronous child
# process, according to the value of the -stderr switch, which defaults to
# -stderr throw.  The trailing newline is stripped, unless the -keepNewline
# switch is used.  Yields until the requested data is available, unless the
# -noWait switch is used.  The result is returned or written into a variable, in
# which case the number of characters read is returned.  In the case of -noWait
# with a variable, -1 is returned rather than 0 if the requested data is not
# immediately available.  At end of file, the last line of stdout or stderr is
# considered complete even if it does not end in a newline.  If called at end of
# file after all data has already been read and returned, ASYNC EOF is thrown.
#
# async::get ?-noWait? ?-keepNewline? ?-variable varName? ?-stderr mode? id
# -noWait: Returns immediately rather than yield if data is not available.
# -keepNewline: Does not strip the trailing newline.
# -variable varName: Name of variable into which to write the result.
# -stderr ignore: Ignores stderr and reads only stdout.
# -stderr merge: Reads from either stderr or stdout, whichever is available.
# -stderr tag: Like merge, except the return value is a tagged list.
# -stderr read: Ignores stdout and reads only stderr.
# -stderr throw: Returns stdout normally and throws stderr with ASYNC STDERR.
# id: The asynchronous channel identifier returned by [async::exec].
proc ::async::get {args} {
    variable Channels

    # Parse arguments.
    set stderrMode throw
    while {[llength $args]} {
        set args [lassign $args arg]
        if {[info exists id]} {
            return -code error "wrong # args: should be \"async::get\
                    ?-noWait? ?-keepNewline? ?-variable varName?\
                    ?-stderr ignore|merge|tag|read|throw? id\""
        } elseif {$arg eq "-noWait"} {
            set noWait {}
        } elseif {$arg eq "-keepNewline"} {
            set keepNewline {}
        } elseif {$arg eq "-variable"} {
            set varName $arg
        } elseif {$arg eq "-stderr"} {
            if {![llength $args]
             || [lindex $args 0] ni {ignore merge tag read throw}} {
                return -code error "-stderr switch must be followed by\
                        ignore, merge, tag, read, or throw"
            }
            set args [lassign $args stderrMode]
        } elseif {![dict exists $Channels $arg]} {
            return -code error "not an open asynchronous channel: $arg"
        } else {
            set id $arg
        }
    }

    # Confirm the requested directions are open.
    dict with Channels $id open {
        if {![info exists stdout] && ![info exists stderr]} {
            return -code error "both stdout and stderr closed for\
                    asynchronous channel: $id"
        } elseif {$stderrMode eq "ignore" && ![info exists stdout]} {
            return -code error "stdout closed for asynchronous channel: $id"
        } elseif {$stderrMode eq "read" && ![info exists stderr]} {
            return -code error "stderr closed for asynchronous channel: $id"
        }
    }

    # Loop until a complete line is available.  Return if -noWait was given and
    # both stdout and stderr are empty, incomplete, or unwanted.  At end of
    # file, the remainder of the buffer is considered to be complete even if it
    # does not end in newline.
    set rest {}
    while (1) {
        # First try reading from stderr, then stdout.
        foreach {dir skipMode} {stderr ignore stdout read} {
            # Check if this direction is open, not being ignored, and has a
            # complete line or has an incomplete line at end of file.
            set line [dict get $Channels $id buf $dir]
            if {$stderrMode ne $skipMode
             && [dict exists $Channels $id open $dir]
             && ([regexp {([^\n]*\n)(.*)} $line _ line rest]
              || ([dict get $Channels $id eof flag] && $line ne {}))} {
                # Remove the data from the buffer.
                dict set Channels $id buf $dir $rest

                # Unless -keepNewline, strip trailing newline.
                if {![info exists keepNewline]} {
                    regsub {\n$} $line {} line
                }

                # If -stderr throw (the default) and a complete line was read
                # from stderr, throw it rather than return it.
                if {$dir eq "stderr" && $stderrMode eq "throw"} {
                    throw {ASYNC STDERR} $line
                }

                # Build the result.  With -stderr tag, the result is a list
                # consisting of "stdout" or "stderr" followed by the received
                # data.  Otherwise, the result is the data.  -stderr merge is
                # the only case where the caller won't be able to tell where the
                # data came from.
                if {$stderrMode eq "tag"} {
                    set result [list $dir $line]
                } else {
                    set result $line
                }

                # Give the result to the caller.
                if {[info exists varName]} {
                    uplevel 1 [list set $varName $result]
                    return [string length $line]
                } else {
                    return $result
                }
            }
        }

        # At this point, stderr and stdout are both unwanted or unavailable.
        if {[dict get $Channels $id eof flag]} {
            # Throw an exception if at end of file.
            throw {ASYNC EOF} [dict get $Channels $id eof message]
        } elseif {![info exists noWait]} {
            # Fill the stdout/stderr buffers and retry if not -noWait.
            fill $id
        } elseif {[info exists varName]} {
            # If -noWait with a variable, set it empty and return -1.
            uplevel 1 [list set $varName {}]
            return -1
        } else {
            # If -noWait without a variable, return empty.
            return {}
        }
    }
}

# async::read --
# Gets a block of data from the stdout and/or stderr of an asynchronous child
# process, according to the value of the -stderr switch, which defaults to
# -stderr throw.  Yields until the requested data is available, unless the
# -noWait switch is used.  If called at end of file after all data has already
# been read and returned, ASYNC EOF is thrown.
#
# async::read ?-noWait? ?-dropNewline? ?-limit size? ?-stderr mode? id
# -noWait: Returns immediately rather than yield if data is not available.
# -dropNewline: Strips the trailing newline.
# -limit size: Maximum number of characters to read and return.
# -stderr ignore: Ignores stderr and reads only stdout.
# -stderr merge: Reads from either stderr or stdout, whichever is available.
# -stderr tag: Like merge, except the return value is a tagged list.
# -stderr read: Ignores stdout and reads only stderr.
# -stderr throw: Returns stdout normally and throws stderr with ASYNC STDERR.
# id: The asynchronous channel identifier returned by [async::exec].
proc ::async::read {args} {
    variable Channels

    # Parse arguments.
    set stderrMode throw
    while {[llength $args]} {
        set args [lassign $args arg]
        if {[info exists id]} {
            return -code error "wrong # args: should be \"async::read\
                    ?-noWait? ?-dropNewline? ?-limit size?\
                    ?-stderr ignore|merge|tag|read|throw? id\""
        } elseif {$arg eq "-noWait"} {
            set noWait {}
        } elseif {$arg eq "-dropNewline"} {
            set dropNewline {}
        } elseif {$arg eq "-limit"} {
            if {![llength $args]
             || ![string is entier [lindex $args 0]] || [lindex $args 0] <= 0} {
                return -code error "-limit switch must be followed by\
                        a positive integer"
            }
            set args [lassign $args limit]
        } elseif {$arg eq "-stderr"} {
            if {![llength $args]
             || [lindex $args 0] ni {ignore merge tag read throw}} {
                return -code error "-stderr switch must be followed by\
                        ignore, merge, tag, read, or throw"
            }
            set args [lassign $args stderrMode]
        } elseif {![dict exists $Channels $arg]} {
            return -code error "not an open asynchronous channel: $arg"
        } else {
            set id $arg
        }
    }

    # Confirm the requested directions are open.
    dict with Channels $id open {
        if {![info exists stdout] && ![info exists stderr]} {
            return -code error "both stdout and stderr closed for\
                    asynchronous channel: $id"
        } elseif {$stderrMode eq "ignore" && ![info exists stdout]} {
            return -code error "stdout closed for asynchronous channel: $id"
        } elseif {$stderrMode eq "read" && ![info exists stderr]} {
            return -code error "stderr closed for asynchronous channel: $id"
        }
    }

    # Loop until a complete block is available.  Return if -noWait was given
    # and both stdout and stderr are empty, incomplete, or unwanted.  At end
    # of file, the remainder of the buffer is considered to be complete even
    # if its size is less than -limit value.
    while (1) {
        # First try reading from stderr, then stdout.
        foreach {dir skipMode} {stderr ignore stdout read} {
            # Check if this direction is open, not being ignored, and has a
            # complete block or has an incomplete block at end of file.
            set block [dict get $Channels $id buf $dir]
            if {$stderrMode ne $skipMode
             && [dict exists $Channels $id open $dir]
             && (![info exists limit] || [dict get $Channels $id eof flag]
               ? $block ne {} : [string length $block] >= $limit)} {
                # Remove the data from the buffer.
                if {[info exists limit]} {
                    dict set Channels $id buf $dir\
                            [string range $block $limit end]
                    set block [string replace $block $limit end]
                } else {
                    dict set Channels $id buf $dir {}
                }

                # If -dropNewline, strip trailing newline.
                if {[info exists dropNewline]} {
                    regsub {\n$} $block {} block
                }

                # If -stderr throw (the default) and a complete block was
                # read from stderr, throw it rather than return it.
                if {$dir eq "stderr" && $stderrMode eq "throw"} {
                    throw {ASYNC STDERR} $block
                }

                # Return the result.  With -stderr tag, the result is a list
                # consisting of "stdout" or "stderr" followed by the received
                # data.  Otherwise, the result is the data.  -stderr merge
                # is the only case where the caller won't be able to tell
                # where the data came from.
                if {$stderrMode eq "tag"} {
                    return [list $dir $block]
                } else {
                    return $block
                }
            }
        }

        # At this point, stderr and stdout are both unwanted or unavailable.
        if {[dict get $Channels $id eof flag]} {
            # Throw an exception if at end of file.
            throw {ASYNC EOF} [dict get $Channels $id eof message]
        } elseif {![info exists noWait]} {
            # Fill the stdout/stderr buffers and retry if not -noWait.
            fill $id
        } else {
            # If -noWait, return empty.
            return {}
        }
    }
}

# async::fill --
# Reads all available data from stdout and stderr into internal buffers.  The
# current coroutine yields until at least one character is successfully read or
# end of file is reached.  This procedure need not be called by user code.
proc ::async::fill {id} {
    variable Channels

    # Don't attempt to read when already at end of file or when both stdout
    # and stderr have been closed.
    if {![dict get $Channels $id eof flag]
     && ([dict exists $Channels $id open stdout]
      || [dict exists $Channels $id open stderr])} {
        # Loop until data could be read.
        while {![flow $id stdout] && ![flow $id stderr]} {
            # Schedule this coroutine to be resumed when there is data.
            foreach dir {stdout stderr} {
                if {[dict exists $Channels $id open $dir]} {
                    chan event [dict get $Channels $id chan $dir] readable\
                            [info coroutine]
                }
            }

            # Wait until at least one channel is readable.
            yield

            # Remove the scheduled channel event handlers.
            foreach dir {stdout stderr} {
                if {[dict exists $Channels $id open $dir]} {
                    chan event [dict get $Channels $id chan $dir] readable {}
                }
            }
        }
    }
}

# async::flow --
# Reads all immediately available data from stdout or stderr into internal
# buffers.  This procedure need not be called by user code.  Returns the number
# of characters that were read, 0 if no data was available, or -1 if at end of
# file or the requested direction was locally closed.
proc ::async::flow {id direction} {
    variable Channels

    # Get access to the channel status variables.
    dict with Channels $id {
        if {[dict get $eof flag] || ![dict exists $open $direction]} {
            # Already at end of file, or the requested direction was closed.
            return -1
        } elseif {[catch {chan read [dict get $chan $direction]} data]
               || ($data eq {} && [chan eof [dict get $chan $direction]])} {
            # At end of file, or an error occurred.
            dict set eof flag 1
            if {$data eq {}} {
                dict set eof message "end of file"
            } else {
                dict set eof message $data
            }
            return -1
        } else {
            # Not at end of file.  Possibly got some data.  Append to buffer.
            dict append buf $direction $data
            return [string length $data]
        }
    }
}

# vim: set sts=4 sw=4 tw=80 et ft=tcl: