Version 26 of pipeline

Updated 2019-02-18 08:58:16 by AMG

A pipeline is a series of processes where each process consumes the output of the prior process and produces output for the next.

Description

In shells, the connected processes are individual programs connected by their standard streams. Shell pipelines are a mainstay of traditional Unix programming.

here is a shell pipeline example:

grep MyName logfile | sort -u -f | wc -l

In it, grep reads logfile, producing on stdout only those lines containing the string MyName. This output is passed, to the stdin of a sort, which sorts the resulting lines, deduplicates them without regard to case, and writes them to its standard output. wc reads this output and produces on its own output a count of the number of lines.

In Tcl, shell pipelines can be constructed using exec or BLT's bgexec.

Pipelines can be constructed mechanisms other than standard channels to pass data between the components. callbacks, coroutines, traces, and bind are among the tools used to create pipelines.

A Tcl Connection To a Single Process

In Tcl, open can be used to execute another program and connect both the standard input and output of that program. This is problematic in some other scripting languages, e.g., Python, because they do not internally manage buffers for the streams, delegating that rather complex task to the script programmer. Tcl, however, makes this easy:

set channel [open |external_command r+]

Tcl opens an external process, and connects to the stdin and stdout of $external_command. Reading and writing to stdin and stdout are with the usual Tcl I/O commands: gets, puts, and read. (It may be necessary to flush resp. fflush to prevent deadlocks caused by buffering).

This makes for exceptionally handy glue for many situations.


This transcript of an interaction session illustrates a simple pipeline with Unix's bc executable:

% set channel [open |bc r+]
file3
% puts $channel "1234567890 * 987654321"
% flush $channel
% puts [gets $channel]
1219326311126352690

with winnt cmd:

% set channel [open |cmd r+]
file3
% gets $channel
% gets $channel
% puts $channel "hostname"
% flush $channel
% gets $channel
% gets $channel
% puts [gets $channel]
% close $channel

Can someone look at this example and explain where the writer went wrong, if anywhere?

# Goal - to eventually return the filetype of a list of files;

set aid [open [list |file --brief --files-from -] r+]
fconfigure $aid -buffering line
fileevent $aid readable {puts [gets $aid]}
puts $aid {/win/d/movies/mp3/en/Giganten CD1/track01.cdda.ogg}
puts $aid /etc/motd
vwait forever

CL's guess: put

flush $aid

after the puts-s.

A Pipeline Problem

AMG: Tcl is a great language for producing filters, but a limitation in its I/O library severely hampers its ability to use external filters.

set chan [open |[list cat -nE] a+]
puts $chan hello
flush $chan
puts [gets $chan]
# Output: "1 hello$"

This works, but I shouldn't have to do that flush. Instead I should be able to close the output portion of $chan, forcing a flush and causing cat to detect on its input, and then continue to read from the input portion of $chan until cat closes its output.

This problem makes it impossible to use external filters such as tac [L1 ] which don't output anything until receiving EOF. Also one usually must know in advance the number of lines or characters the filter will produce, because relatively few filters write EOF before reading EOF.

NEM: See TIP 332 [L2 ] that added exactly this ability:

close $chan write

AMG: Some filters operate on more than just stdin and stdout, for example multitee [L3 ]. Is there any ability to write to or read from child process file descriptors other than 0, 1, and 2 (stdin, stdout, and stderr, respectively)? Demultiplexing stdout and stderr can be done with [chan pipe], but what about nonstandard file descriptors?

pipeline package

AMG: I have written a pipeline package to facilitate pipeline-oriented programming. argparse is required.

Documentation forthcoming. I'm considering putting this in Tcllib after I put it through its paces.

Examples

This sample pipeline strips leading spaces, converts everything to uppercase, then prints to stdout.

package require pipeline
set pipeline [pipeline::new {regsub {^ *}} {command string toupper} echo]
$pipeline flow "  this\n    text\n"
$pipeline flow "       has\n   indents\n"
$pipeline destroy

The output is:

THIS
TEXT
HAS
INDENTS

Code

pipeline.tcl

package require Tcl 8.6
package require argparse
package provide pipeline 0.1

# Create a namespace for pipeline commands.
namespace eval ::pipeline {}

# ::pipeline::Coroutine --
# Creates a uniquely named pipeline coroutine command and returns its name.
proc ::pipeline::Coroutine {args} {
    for {set i [llength [info commands ::pipeline::Coro*]]}\
            {[info commands [set coro ::pipeline::Coro$i]] ne {}} {incr i} {}
    coroutine $coro {*}$args
    return $coro
}

# ::pipeline::Run --
# Core pipeline processing logic.
proc ::pipeline::Run {coros args} {
    # Loop until the input queue is completely drained.  Filters may append
    # chunks to the the input queue, so this loop may repeat.
    set outChunks {}
    while {$args ne {}} {
        # Transfer the pipeline inputs to the first filter inputs.
        set pipeChunks $args
        set args {}

        # Progress through the pipeline, one filter at a time.
        foreach coro $coros {
            # Loop through all chunks currently present in the pipeline.
            foreach inChunk $pipeChunks[set pipeChunks {}] {
                # Invoke the filter, and process its output chunks.
                foreach outChunk [$coro {*}$inChunk] {
                    # Fill in omitted output elements with defaults.
                    if {[llength $outChunk] < 3} {
                        lappend outChunk {*}[lrange $inChunk\
                                [llength $outChunk] 2]
                    }

                    # Extract the output chunk fields.
                    lassign $outChunk data meta flush restart

                    # Put the output chunk into the pipeline, to be the input to
                    # the next filter, or append it to the input queue to later
                    # be an input to the first filter.
                    if {$restart ne {} && $restart} {
                        lappend args $outChunk
                    } else {
                        lappend pipeChunks $outChunk
                    }
                }

                # If this input chunk commands flush, ensure the last output
                # chunk arising from this input chunk also commands flush,
                # creating an empty chunk if needed.
                if {[lindex $inChunk 2]} {
                    if {$pipeChunks ne {}} {
                        lset pipeChunks end 2 1
                    } else {
                        set pipeChunks {{{} {} 1}}
                    }
                }
            }
        }

        # Collect the outputs of the last filter in the pipeline.
        lappend outChunks {*}$pipeChunks
    }

    # Return all output chunks.
    return $outChunks
}

# ::pipeline::new --
# Creates a pipeline containing the specified filter command prefixes and
# returns the command name used to execute the pipeline.
#
# The returned command accepts the following arguments:
# - destroy   Destroys the pipeline and cleans up all associated resources
# - flow      Feeds input data through the pipeline and returns the output data
# - get       Gets buffered output data accumulated by prior calls to [put]
# - peek      Gets buffered output data without clearing the output buffer
# - put       Feeds input data through the pipeline and buffers the output data
# - run       Feeds raw chunks through the pipeline and returns raw chunk output
#
# The [flow] and [put] methods accept the following additional arguments:
# -meta META  Arbitary metadata to associate with the input data
# -flush      Commands pipeline flush
# data        Input data, may be empty string
#
# Each argument to the [run] method is a raw chunk, which is a three-element
# list that will be used as the arguments to the first filter coroutine.
#
# A pipeline is a linear sequence of zero or more filters.  Each filter operates
# on the output of its predecessor, with the first filter operating on the input
# to the pipeline.  The output of the last filter is the output of the pipeline.
#
# Each filter is a command prefix to invoke on each pipeline input chunk.  If a
# filter command name is in the ::pipeline or :: (global) namespace, it need not
# be fully qualified.  The [apply] command enables defining anonymous filters.
#
# Pipelines are implemented in terms of coroutines.  The pipeline as a whole is
# a coroutine, and each filter in the pipeline is a coroutine.  Coroutines are
# automatically created by [pipeline::new], and filter commands need not create
# coroutines for themselves.  The first time the filter command is invoked, it
# must yield after completing initialization, and the yielded value is ignored.
# When the filter command returns, its coroutine is automatically destroyed.
#
# Filter coroutines are given the following three arguments:
# - Current input data chunk
# - Arbitrary metadata associated with the input chunk
# - 1 if the pipeline is being flushed, 0 if not
#
# If a filter command is passed zero arguments, the filter must clean up any
# resources it allocated, then return.
#
# When a filter coroutine is invoked, it must yield a list containing zero or
# more output chunks.  Each output chunk is a list containing the following:
# - Current output data chunk
# - Arbitrary metadata associated with the output chunk
# - 1 if flush is being commanded, 0 if not
# - 1 if the chunk is to be fed back as new pipeline input, 0 if not
#
# Output chunks may omit elements and in fact may be empty lists.  The missing
# elements are replaced with the corresponding elements from the input chunk.
# If the restart element is omitted, it defaults to 0.  Consequently, a filter
# that passes its input through unmodified may simply yield {{}}, i.e. a list
# containing only an empty list.  If a filter yields {}, i.e. empty list, then
# all pipeline data is discarded and further filters are not invoked.
#
# Pipeline execution continues until all filter coroutines have been invoked on
# all input chunks.  If any filter output chunks contain the restart flag, the
# entire pipeline sequence will be executed again, repeating until none of the
# filters output any chunks containing the restart flag.
proc ::pipeline::new {args} {
    # Create a coroutine command for each filter in the pipeline, as well as a
    # coroutine for the pipeline as a whole.  Return its command name.
    Coroutine apply {{coros} {
        # Loop until the destroy method is invoked.
        set ret {}
        set buffer {}
        while {1} {
            # Yield the last result, then get the next method and its arguments.
            set args [lassign [yieldto return -level 0 $ret[set ret {}]] method]

            # Perform method name resolution.
            set method [tcl::prefix match -message method\
                    {destroy flow get peek put run} $method]

            # Several methods do not allow arguments.
            if {$method in {destroy get peek} && $args ne {}} {
                return -code error "wrong # args: should be\
                        \"[info coroutine] $method\""
            }

            # Invoke the method.
            switch $method {
            destroy {
                foreach coro $coros {
                    $coro
                }
                break
            } flow {
                argparse {{-meta= -default {}} {-flush -boolean} data}
                foreach chunk [Run $coros [list $data $meta $flush]] {
                    append buffer [lindex $chunk 0]
                }
                set ret $buffer
                set buffer {}
            } get {
                set ret $buffer
                set buffer {}
            } peek {
                set ret $buffer
            } put {
                argparse {{-meta= -default {}} {-flush -boolean} data}
                foreach chunk [Run $coros [list $data $meta $flush]] {
                    append buffer [lindex $chunk 0]
                }
            } run {
                set ret [Run $coros {*}$args]
            }}
        }
    } ::pipeline} [lmap filter $args {Coroutine {*}$filter}]
}

# ::pipeline::buffer --
# Pipeline filter adapter procedure.  The pipeline data is divided into chunks
# according to a delimiter defined via regular expression.  The data matched by
# the regular expression is included at the end of each chunk, except for the
# last chunk which may be incomplete.  The pipeline filter command prefix being
# adapted is invoked for each chunk, which is a list containing the following:
#
# - Current input data chunk
# - Arbitrary metadata associated with the input chunk
# - 1 if the pipeline is being flushed, 0 if not
# - Buffered characters preceding this chunk, or empty for the first chunk
# - Accumulated data since the prior delimiter, excluding the current delimiter
# - Delimiter appearing at the end of the buffer, or empty string if incomplete
#
# When flush is commanded, the buffer is emptied after being passed to the
# command prefix, even if the buffer is incomplete.
#
# The return values of the pipeline command are merged if possible but remain
# distinct chunks when they command flush or have varying metadata.
#
# This command accepts the following arguments:
#
# -delim PAT   Buffer delimiter regular expression, default \n
# -separate    Disables output merging
# command ...  Filter command and arguments being adapted
#
# It is an error for the delimiter regular expression to match empty string.
proc ::pipeline::buffer {args} {
    argparse -boolean {
        {-delim= -default {\n}}
        -separate
        -multi
        args*!
    }

    # Avoid infinite loops by rejecting patterns matching empty string.
    if {[regexp $delim {}]} {
        return -code error "delimiter pattern matches empty string: $delim"
    }

    # Create a coroutine for the adapted command.
    set coros [list [Coroutine {*}$args]]

    # Loop over each input chunk.
    set ret {}
    set buffer {}
    while {[set input [yieldto return -level 0 $ret]] ne {}} {
        lassign $input data meta flush

        # Concatenate the existing buffer with the new input data, then divide
        # into complete chunks, each chunk ending with the delimiter pattern.
        set inputs {}
        while {[regexp -indices -- $delim [set concat $buffer$data] match]} {
            set len [expr {[lindex $match 1] - [string length $buffer]}]
            lappend inputs [list [string range $data 0 $len] $meta 0 $buffer\
                    [string range $concat 0 [expr {[lindex $match 0] - 1}]]\
                    [string range $concat {*}$match]]
            set data [string replace $data 0 $len]
            set buffer {}
        }

        # Buffer leftover data, and put it into an incomplete chunk.  Create an
        # empty chunk if there are no chunks but meta or flush are used.
        if {$data ne {} || ($inputs eq {} && ($meta ne {} || $flush))} {
            lappend inputs [list $data $meta 0 $buffer [append buffer $data] {}]
        }

        # If flush is active, enable it in the last chunk, and empty the buffer.
        if {$flush} {
            lset inputs end 2 1
            set buffer {}
        }

        # Run the filter on each chunk, and collect the new outputs.
        set ret [Run $coros {*}$inputs]

        # Unless separation is enabled, merge consecutive chunks that have the
        # same metadata.  Two chunks cannot be merged if the first one commands
        # flush but the second does not.
        if {!$separate} {
            set i 0
            set j 1
            while {$j < [llength $ret]} {
                if {[lindex $ret $i 1] eq [lindex $ret $j 1]
                 && (![lindex $ret $i 2] || [lindex $ret $j 2])} {
                    lset ret $i 0 [lindex $ret $i 0][lindex $ret $j 0]
                    lset ret $i 2 [lindex $ret $j 2]
                    set ret [lreplace $ret $j $j]
                } else {
                    incr i
                    incr j
                }
            }
        }
    }

    # Let the wrapped filter clean up and terminate.
    [lindex $coros 0]
}

# ::pipeline::command --
# Filter procedure for use with [pipeline::new].  Sends pipeline data to the
# designated command prefix, which is resolved in the :: (global) namespace.  If
# this filter is wrapped with [pipeline::buffer], and flush is not commanded,
# only complete buffers are sent.  Otherwise, chunks are sent as soon as they
# are received.
#
# If the -raw switch is used, the command receives and returns raw chunks rather
# than output data.  The chunk is sent as three or six arguments, depending on
# whether or not this filter is wrapped with [pipeline::buffer].  The arguments
# and return values are the same as are passed to and received from normal
# filter coroutine commands.
#
# If the -observe switch is used, the command's return value is ignored, and the
# pipeline filter's output is always equal to its input.  Otherwise, the return
# value is used as the filter output.
proc ::pipeline::command {args} {
    argparse -boolean {-raw -observe command*!}
    set ret {}
    while {[set input [yieldto return -level 0 $ret]] ne {}} {
        lassign $input data meta flush prior buffer complete

        # Conditionally invoke the command and collect its output.
        set ret [if {$raw} {
            # If raw mode is enabled, pass the raw chunk to the command and
            # receive its raw chunk list output.
            namespace eval :: [list {*}$command {*}$input]
        } elseif {$buffer eq {}} {
            # If [pipeline::buffer] is not being used, or if there is no
            # data, call the command on the unbuffered data chunk.
            list [list [namespace eval :: [list {*}$command $data]]]
        } elseif {$flush || $complete ne {}} {
            # If [pipeline::buffer] is being used and flush is commanded or
            # this is a complete buffer, call the command on the buffer with
            # the delimiter appended.
            list [list [namespace eval :: [list {*}$command $buffer$complete]]]
        } else {
            # If [pipeline::buffer] is being used but flush is not commanded
            # and this is an incomplete buffer, output nothing and stop.
        }]

        # If observe mode is enabled, ignore the output.
        if {$observe} {
            set ret {{}}
        }
    }
}

# ::pipeline::echo --
# Filter procedure for use with [pipeline::new].  Echoes input data to a given
# channel (defaulting to stdout) then passes it through unmodified.  If this
# filter is wrapped using [pipeline::buffer], and flush is not commanded, only
# complete buffers are echoed, with the delimiter appended.  Otherwise, chunks
# are echoed as soon as they are received.  The output channel is flushed after
# every write.
proc ::pipeline::echo {{chan stdout}} {
    command -observe apply {{chan data} {
        chan puts -nonewline $chan $data
        chan flush $chan
    }} $chan
}

# ::pipeline::regsub --
# Filter procedure for use with [pipeline::new].  Applies [regsub] filtering to
# each complete buffer flowing through the pipeline.
#
# The initial arguments alternate between regular expressions and replacements.
# If an odd number of arguments are given, the final replacement is assumed to
# be empty string.  Additionally, any standard [regsub] switches may be used.
#
# Regular expressions and replacements cannot begin with "-".  One possible
# workaround is to instead begin with "\-".
#
# Regular expression matching and substitution are not applied to the delimiter,
# which is newline by default.  The delimiter can be changed using the -delim
# switch.  See the documentation for [pipeline::buffer] for more information.
#
# If the -erase switch is used, at least one regular expression substitution
# succeeded, and the result is an empty buffer, it is removed in full, and no
# delimiter is appended.  This mode allows [pipeline::regsub] to be used to
# delete entire lines of input, rather than make them be blank lines.
proc ::pipeline::regsub {args} {
    argparse -normalize -pass regsubArgs {
        {-start=    -pass regsubArgs}
        {-erase     -boolean}
        {-delim=    -pass bufferArgs}
        expReps*!
    }
    buffer {*}$bufferArgs command -raw apply {{regsubArgs erase expReps
            data meta flush prior buffer complete} {
        if {$complete ne {} || $flush} {
            # Apply regular expression substitutions.
            foreach {exp rep} $expReps {
                ::regsub {*}$regsubArgs $exp $buffer $rep buffer
            }

            # Append the delimiter unless the buffer is being erased.
            if {!$erase || $buffer ne {}} {
                append buffer $complete
            }

            # Yield any output that may have been obtained.
            if {$buffer ne {}} {
                list [list $buffer]
            }
        }
    }} $regsubArgs $erase $expReps
}

# ::pipeline::trimTrailingSpace --
# Filter procedure for use with [pipeline::new].  Trims trailing whitespace from
# each bufer.  The default delimiter is newline but can be changed with -delim.
# See the documentation for [pipeline::buffer] for more information.
proc ::pipeline::trimTrailingSpace {args} {
    buffer {*}$args command -raw apply {{data meta flush
            prior buffer complete} {
        # Find the last non-whitespace character in the current chunk.
        set output {}
        if {[regexp -indices {.*[^ \f\n\r\t\v]} $buffer end]} {
            # Find the last non-whitespace character preceding the current
            # chunk.  This was the last character that was output before.
            if {[regexp -indices {.*[^ \f\n\r\t\v]} $prior start]} {
                set start [expr {[lindex $start 1] + 1}]
            } else {
                set start 0
            }

            # Output all characters since the previous output for this buffer
            # through the final non-whitespace character in the current chunk.
            append output [string range $buffer $start [lindex $end 1]]
        }

        # If this is a complete buffer, append the delimiter to the output.
        append output $complete

        # Yield any output that may have been obtained.
        if {$output ne {}} {
            list [list $output]
        }
    }}
}

# ::pipeline::squeezeBlank --
# Filter procedure for use with [pipeline::new].  Removes blank buffers at the
# beginning of output and collapses consecutive blank buffers into one.  Blank
# buffers at the end of output cannot be removed since this would delay output
# of partial buffers.  The default delimiter is newline but can be changed with
# -delim.  See the documentation for [pipeline::buffer] for more information.
proc ::pipeline::squeezeBlank {args} {
    buffer {*}$args apply {{} {
        set ret {}
        set blank 1
        while {[set input [yieldto return -level 0 $ret]] ne {}} {
            lassign $input data meta flush pior buffer complete
            if {$buffer eq {} && $blank} {
                set ret {}
            } else {
                set ret {{}}
            }
            set blank [expr {$buffer eq {}}]
        }
    }}
}

# ::pipeline::removeFixed --
# Filter procedure for use with [pipeline::new].  Removes buffers that exactly
# match a literal pattern string, which does not include the delimiter.  The
# -prefix switch also removes buffers that begin with the pattern string.
#
# Unlike [pipeline::regsub], this procedure does not delay output until the
# delimiter is encountered.  Buffering only happens in event of a prefix match.
#
# If a flush occurs in the middle of a partial buffer, it will be output as-is,
# even though it could potentially be followed by characters that would make it
# match the removal pattern.
#
# The default delimiter is newline but can be changed with -delim.  See the
# documentation for [pipeline::buffer] for more information.
proc ::pipeline::removeFixed {args} {
    argparse {
        {-prefix    -boolean}
        {-delim=    -pass bufferArgs}
        pattern
    }
    buffer {*}$bufferArgs command -raw apply {{prefix pattern
            data meta flush prior buffer complete} {
        # Determine the match prefix length.
        if {$prefix} {
            set len [string length $pattern]
        } elseif {$complete eq {}} {
            set len [string length $buffer]
        } else {
            set len -1
        }

        if {[string equal -length $len $buffer $pattern]} {
            # Discard or delay the input if the buffer is complete and
            # exactly matches the pattern, or is incomplete and is a prefix
            # of the pattern, or if prefix matching is enabled and the
            # pattern is a prefix of the buffer.
        } elseif {$prior ne {} && [string equal\
                -length [string length $prior] $prior $pattern]} {
            # If the partial buffer was previously discarded, now output it
            # in full because it ultimately ended up not matching.
            list [list $buffer$complete]
        } else {
            # Otherwise, output the chunk immediately.
            list {}
        }
    }} $prefix $pattern
}

# ::pipeline::tee --
# Filter procedure for use with [pipeline::new].  Tees one pipeline off another,
# connecting the output of the current pipeline at the current point to the
# input of the other pipeline, without affecting the data flowing through the
# current pipeline.  If nothing will call [pipeline::get] on the other pipeline,
# it is best that it contain the [pipeline::discard] filter to avoid unbounded
# growth of its output buffer.
proc ::pipeline::tee {pipeline} {
    command -raw -observe apply {{pipeline data meta flush} {
        $pipeline put -meta $meta {*}[if {$flush} {list -flush}] $data
    }} $pipeline
}

# ::pipeline::splice --
# Filter procedure for use with [pipeline::new].  Splices one pipeline into
# another, connecting the output of the current pipeline at the current point to
# the input of the other pipeline, and vice versa.
proc ::pipeline::splice {pipeline} {
    command -raw apply {{pipeline args} {$pipeline run $args}} $pipeline
}

# ::pipeline::discard --
# Filter procedure for use with [pipeline::new].  Discards all input.  The
# pipeline is ended immediately unless flush is commanded, in which case any
# subsequent filters (there probably won't be any) are executed with no input.
# This filter is useful in combination with [pipeline::tee] to terminate a teed
# pipeline on which [pipeline::get] will never be called.
proc ::pipeline::discard {} {
    command -raw apply {{args} {}}
}

pkgIndex.tcl

package ifneeded pipeline 0.1 [list source [file join $dir pipeline.tcl]]

See Also

filter
a pipeline whose components massage the data
glue
often implies pipelines
How Tcl is special
Concepts of Architectural Design for Tcl Applications
Scripted Wrappers for Legacy Applications, Cameron Laird and Kathryn Soraiz, 2001-03-16
client/server with fileevent
Pipe servers in C from Tcl
VFS, exec and command pipelines
Inventory of IPC methods
While (classic) MacOS supports no Tcl pipelines, there are generalizations that apply there and elsewhere.
named pipe
Pipeline programming
SS implements a value pipeline, while Brian Theado implements a "command pipeline"
Commands pipe
more implementations of the "value pipeline" from pipeline programming