# pipeline.tcl --
#
# Streaming data pipeline infrastructure
#
# Copyright (C) 2019 Andy Goth <[email protected]>
# See the file "license.terms" for information on usage and redistribution
# of this file, and for a DISCLAIMER OF ALL WARRANTIES.
package require Tcl 8.6
package require argparse
package provide pipeline 0.3
# Create a namespace for pipeline commands.
namespace eval ::pipeline {}
# ::pipeline::new --
# Creates a pipeline containing the specified filter command prefixes and
# returns the command name used to execute the pipeline.
#
# The returned command accepts the following method name arguments:
#
# destroy Destroy the pipeline and cleans up all associated resources
# flow Feed input data through the pipeline and returns the output data
# get Get buffered output data accumulated by prior calls to [put]
# peek Get buffered output data without clearing the output buffer
# put Feed input data through the pipeline and buffers the output data
# run Feed raw chunks through the pipeline and returns raw chunk output
#
# The [flow] and [put] methods accept the following additional arguments:
#
# -meta META Arbitary metadata to associate with the input data
# -flush Commands pipeline flush
# data Input data, may be empty string
#
# Each argument to the [run] method is a raw chunk, which is a three-element
# list that will be used as the arguments to the first filter coroutine.
#
# A pipeline is a linear sequence of zero or more filters. Each filter operates
# on the output of its predecessor, with the first filter operating on the input
# to the pipeline. The output of the last filter is the output of the pipeline.
#
# Each filter is a command prefix to invoke on each pipeline input chunk. If a
# filter command name is in the ::pipeline or :: (global) namespace, it need not
# be fully qualified. The [pipeline::loop] command is useful to define filters.
#
# Pipelines are implemented in terms of coroutines. The pipeline as a whole is
# a coroutine, and each filter in the pipeline is a coroutine. Coroutines are
# automatically created by [pipeline::new], and filter commands need not create
# coroutines for themselves. The first time the filter command is invoked, it
# must yield after completing initialization, and the yielded value is ignored.
# When the filter command returns, its coroutine is automatically destroyed.
#
# Aside from the first time the filter command is invoked (described above),
# filter coroutines are given the following three arguments:
#
# - Current input data chunk
# - Arbitrary metadata associated with the input chunk
# - 1 if the pipeline is being flushed, 0 if not
#
# If a filter command is passed zero arguments, the filter must clean up any
# resources it allocated, then return.
#
# When a filter coroutine is invoked, it must yield a list containing zero or
# more output chunks. Each output chunk is a list containing the following:
#
# - Current output data chunk
# - Arbitrary metadata associated with the output chunk
# - 1 if flush is being commanded, 0 if not
# - 1 if the chunk is to be fed back as new pipeline input, 0 if not
#
# Output chunks may omit any number of elements. The omitted elements will be
# replaced with the corresponding elements from the input chunk. If restart is
# omitted, it defaults to 0. A filter that passes its input through unmodified
# may simply yield {{}}, i.e. a list containing only an empty list. If a filter
# yields {}, i.e. empty list, then all pipeline data is discarded and further
# filters are not invoked.
#
# Pipeline execution continues until all filter coroutines have been invoked on
# all input chunks. If any filter output chunks contain the restart flag, the
# entire pipeline sequence will be executed again, repeating until none of the
# filters output any chunks containing the restart flag.
proc ::pipeline::new {args} {
# Coroutine creation helper routine.
set coroNew {apply {{args} {
set i [llength [info commands ::pipeline::Coro*]]
while {[info commands [set coro ::pipeline::Coro$i]] ne {}} {
incr i
}
coroutine $coro {*}$args
return $coro
} ::pipeline}}
# Create a coroutine command for each filter in the pipeline, as well as a
# coroutine for the pipeline as a whole. Return its command name.
{*}$coroNew apply {{coros} {
# Pipeline execution core.
set run {apply {{coros args} {
# Loop until the input queue is completely drained. Filters may
# append chunks to the the input queue, so this loop may repeat.
set inChunks $args
set outChunks {}
while {$inChunks ne {}} {
# Transfer the pipeline inputs to the first filter inputs.
set pipeChunks $inChunks
set inChunks {}
# Progress through the pipeline, one filter at a time.
foreach coro $coros {
# Loop through all chunks currently in the pipeline.
foreach inChunk $pipeChunks[set pipeChunks {}] {
# Invoke the filter, and process its output chunks.
foreach outChunk [$coro {*}$inChunk] {
# Fill in omitted output elements with defaults.
if {[llength $outChunk] < 3} {
lappend outChunk {*}[lrange $inChunk\
[llength $outChunk] 2]
}
# Let the output chunk be the input to the next
# filter or to the first filter on the next pass.
if {[llength $outChunk] >= 4
&& [lindex $outChunk 3]} {
lappend inChunks $outChunk
} else {
lappend pipeChunks $outChunk
}
}
# If this input chunk commands flush, ensure the last
# output chunk arising from this input chunk also
# commands flush, creating an empty chunk if needed.
if {[lindex $inChunk 2]} {
if {$pipeChunks ne {}} {
lset pipeChunks end 2 1
} else {
set pipeChunks {{{} {} 1}}
}
}
}
}
# Collect the outputs of the last filter in the pipeline.
lappend outChunks {*}$pipeChunks
}
return $outChunks
}}}
# Loop until the destroy method is invoked.
set out {}
set buffer {}
while {1} {
# Yield the last result, then get the next method and its arguments.
set args [lassign [yieldto return -level 0 $out[set out {}]] method]
# Perform method name resolution.
set method [tcl::prefix match -message method\
{destroy flow get peek put run} $method]
# Several methods do not allow arguments.
if {$method in {destroy get peek} && $args ne {}} {
return -code error "wrong # args: should be\
\"[info coroutine] $method\""
}
# Invoke the method.
switch $method {
destroy {
foreach coro $coros {
$coro
}
break
} flow {
argparse -boolean {{-meta= -default {}} -flush data}
foreach chunk [{*}$run $coros [list $data $meta $flush]] {
append buffer [lindex $chunk 0]
}
set out $buffer
set buffer {}
} get {
set out $buffer
set buffer {}
} peek {
set out $buffer
} put {
argparse -boolean {{-meta= -default {}} -flush data}
foreach chunk [{*}$run $coros [list $data $meta $flush]] {
append buffer [lindex $chunk 0]
}
} run {
set out [{*}$run $coros {*}$args]
}}
}
} ::pipeline} [lmap filter $args {{*}$coroNew {*}$filter}]
}
# ::pipeline::procLoop --
# Creates a named pipeline filter using the [pipeline::loop] command.
proc ::pipeline::procLoop {name args} {
uplevel 1 [list interp alias {} $name {} ::pipeline::loop {*}$args]
}
# ::pipeline::loop --
# Pipeline main loop skeleton, suitable for implementing pipeline filters. The
# following arguments are accepted:
#
# -params PL [argparse] parameter definition list
# -init SCR Initialization script before beginning loop
# -command Positional arguments form a command prefix rather than a script
# -observe Do not modify pipeline data, ignoring return value or out variable
# -result Script result is used directly as the output chunk list
# -buffer Wait until delimiter is encountered before invoking command
# -raw Command operates on raw chunks rather than processed data
# -partial Run script for partial buffers as well as complete buffers
# -separate Disable buffered output merging
# -delim PAT Buffer delimiter regular expression, default \n
# -trim Strip buffer delimiter from the command argument
# SCRIPT Main loop body script or command prefix (multiple arguments)
# ARGS ... Arguments to bind to the parameter list, requires -params
#
# As an alternative to writing a full [proc] or [apply] script, the -params and
# -init switches may be used to set up the context in which the loop runs. If
# -params is used, [pipeline::loop] accepts additional arguments following the
# script argument. These arguments are parsed according to the [argparse]
# parameter list which is given as the argument to -params. The -init switch
# supplies a custom initialization script to evaluate after processing -params
# and before beginning the loop. -params and -init conflict with -command.
#
# The -params and -init switches are processed before all other arguments. As a
# result, their side effects (e.g. setting variables) will still occur even if
# there is an error in processing the remaining arguments.
#
# If the loopArgs variable is created by -params or -init, it is used as a list
# of additional arguments to pass to [pipeline::loop], which will be processed
# after -params and -init and before all other arguments. If a variable named
# loopArgs exists at the moment [pipeline:loop] is called, it will be unset
# before processing arguments. After processing -params and -init, loopArgs
# will be restored to its initial state.
#
# If -buffer is used, the pipeline data is divided into chunks according to the
# delimiter defined by the -delim regular expression. The data matched by the
# regular expression is included at the end of each chunk, except for the last
# chunk which may be incomplete. When flush is commanded, the buffer is emptied
# after being passed to the script or command prefix, even if incomplete.
#
# -buffer causes the script or command to only be executed when the buffer is
# complete or flush is commanded, unless -partial is used, in which case the
# script or command is executed for every chunk. When the script or command is
# not executed, subsequent filters in the pipeline are not executed either.
#
# It is an error for the -delim regular expression to match empty string.
#
# If -command is not used, the script argument is executed for each chunk that
# flows through the pipeline. If -buffer is used (and -partial is not), the
# script is instead only executed for complete buffers and when flush is
# commanded. The script may interact with the following variables:
#
# input Input chunk from the pipeline executive
# out Output chunk list to yield to the pipeline executive
# data Current input chunk data
# meta Arbitrary metadata associated with the input chunk
# flush 1 if pipeline is being flushed, 0 if not
#
# Additional variables are available when -buffer is used:
#
# prior Buffered data preceding this chunk, or empty for the first
# buffer All data since the last delimiter, excluding the current delimiter
# complete Delimiter string if complete, empty string if buffer is incomplete
#
# The script may also freely access any other caller variables. This allows the
# script to maintain state between iterations.
#
# If -result is used, the script result is automatically stored into the out
# variable. The script need only evaluate to the output chunk list. If the
# script uses [return], the return value will be stored into the out variable.
#
# At the start of each pass through the loop, the out variable defaults to {{}}.
# If -result is not used and the script does not modify this variable, the input
# will pass through to the output unmodified. If the script does modify out, it
# is used as a list of output chunks. See [pipeline::new] for details on the
# format and behavior of output chunk lists.
#
# Changing the input variable affects the default values that will be filled
# into omitted fields in the out variable. As a special case, if -buffer is
# used without -partial, the default value for the first element of the out
# variable is not the first element of the input variable, but rather the
# concatenation of the buffer and complete variables.
#
# If -command is used, the script argument is instead a sequence of one or more
# arguments forming a command prefix to which the input data will be appended.
# The choice of command arguments is determined by -raw, -buffer, and -trim.
#
# If -raw is not used, the command return value is the output data. If -buffer
# is not used, the command argument is the input chunk data. If -buffer is
# used, the command argument is all data buffered since the last delimiter. If
# -trim is used, the delimiter is not included in the argument but will be
# appended to the return value.
#
# If -raw is used, the command return value is a list of zero or more output
# chunks. The command argument is a three- or six-element list. The first
# three elements are data, meta, and flush, and (if -buffer is used) the next
# three are prior, buffer, and complete. See above for details.
#
# If -observe is used, the out variable, script result, or command return value
# is ignored, and the pipeline filter's output is equal to its input. This also
# prevents -buffer from pausing the pipeline when the buffer is incomplete.
#
# The output chunks of the script or command will be merged if possible, though
# they will remain distinct chunks when they command flush or have varying
# metadata or restart flags. -separate may be used to disable merging.
proc ::pipeline::loop {args} {
# If [pipeline::loop] is the top level of the coroutine, recursively invoke
# itself one time so that the [upvar] and [uplevel] commands store the
# variables for the caller-supplied scripts in this stack frame, avoiding
# conflict with [pipeline:loop]'s own variables.
if {[info level] == 1} {
unset args
return [{*}[info level 0]]
}
# Parameter definition list.
set definition {
-params=
-init=
{-command -forbid {params init}}
-observe
{-result -forbid {command observe}}
{-buffer -key bufferMode}
{-raw -require command}
{-partial -require buffer}
-separate
{-delim= -require buffer -default {\n}}
{-trim -require {buffer command} -forbid raw}
script
extra*
}
# Unset loopArgs and make a local backup.
upvar 1 loopArgs loopArgs
if {[array exists loopArgs]} {
array set loopArgsBackup [array get loopArgs]
} elseif {[info exists loopArgs]} {
set loopArgsBackup $loopArgs
}
unset -nocomplain loopArgs
try {
# Parse arguments.
argparse -boolean $definition
# Evaluate -params and -init if supplied.
if {[info exists params]} {
uplevel 1 [list argparse $params $extra]
}
if {[info exists init]} {
uplevel 1 $init
}
# If the loopArgs variable was created by -params or -init, prepend it
# to the argument list and parse again.
if {[info exists loopArgs]} {
set args [linsert $args 0 {*}$loopArgs]
argparse -boolean $definition
}
} finally {
# Restore loopArgs to its original state, even if an error occurred.
unset -nocomplain loopArgs
if {[array exists loopArgsBackup]} {
array set loopArgs [array get loopArgsBackup]
} elseif {[info exists loopArgsBackup]} {
set loopArgs $loopArgsBackup
}
}
# Perform some additional argument validation.
if {$bufferMode && [regexp $delim {}]} {
return -code error "delimiter pattern matches empty string: $delim"
} elseif {$extra ne {} && !$command && ![info exists params]} {
return -code error "too many arguments"
}
# Bind the script to the inputs and outputs. If -command is used, convert
# the command prefix to a script, potentially modified by -buffer, -raw, and
# -trim. Otherwise, precede the script with code to expand the input to
# separate variables.
if {$command} {
# Combine command name and arguments.
lappend script {*}$extra
append script " "
if {$raw} {
append script {$input}
} elseif {!$bufferMode} {
append script {[lindex $input 0]}
} elseif {$trim} {
append script {[lindex $input 4]}
} else {
append script {[lindex $input 4][lindex $input 5]}
}
# Store the command return value into the out variable.
if {!$observe} {
set script \[$script\]
if {$trim} {
append script {[lindex $input 5]}
}
if {!$raw} {
set script "\[list \[list $script\]\]"
}
set script "set out $script"
}
} else {
# If -result is used, store the script result into the out variable.
# Intercept both "ok" (normal result) and "return" codes.
if {$result} {
set script [list try $script on ok out {} on return out {}]
}
# Load the data into script variables.
set vars {data meta flush}
if {$bufferMode} {
lappend vars prior buffer complete
}
set script "lassign \$input $vars\n$script"
}
# Unless -separate is used, plan to merge consecutive chunks having the same
# metadata and restart flag. Two chunks cannot be merged if the first one
# commands flush but the second does not.
if {$separate} {
set merge {apply {{out} {return $out}}}
} else {
set merge {apply {{out} {
set i 0
set j 1
while {$j < [llength $out]} {
if {[lindex $out $i 1] eq [lindex $out $j 1]
&& (![lindex $out $i 2] || [lindex $out $j 2])
&& ([llength [lindex $out $i]] >= 4 && [lindex $out $i 3])
== ([llength [lindex $out $j]] >= 4 && [lindex $out $j 3])} {
lset out $i 0 [lindex $out $i 0][lindex $out $j 0]
lset out $i 2 [lindex $out $j 2]
set out [lreplace $out $j $j]
} else {
incr i
incr j
}
}
return $out
}}}
}
# Get access to caller input and output variables.
upvar 1 input input out scriptOut
if {$bufferMode} {
# Loop until the pipeline is destroyed.
set out {}
set buffer {}
while {[set input [yieldto return -level 0 $out]] ne {}} {
# Concatenate the buffer with the new input data, then divide into
# complete chunks, each chunk ending with the delimiter pattern.
lassign $input data meta flush
set in {}
while {[regexp -indices -- $delim [set str $buffer$data] match]} {
set len [expr {[lindex $match 1] - [string length $buffer]}]
lappend in [list [string range $data 0 $len] $meta 0 $buffer\
[string range $str 0 [expr {[lindex $match 0] - 1}]]\
[string range $str {*}$match]]
set data [string replace $data 0 $len]
set buffer {}
}
# Buffer leftover data, and put it into an incomplete chunk. Create
# an empty chunk if there are no chunks but meta or flush are used.
if {$data ne {} || ($in eq {} && ($meta ne {} || $flush))} {
lappend in [list $data $meta 0 $buffer [append buffer $data] {}]
}
# On flush, enable flush in the last chunk, and empty the buffer.
if {$flush} {
lset in end 2 1
set buffer {}
}
if {$observe} {
# In observation mode, simply run the script and ignore output.
set out {{}}
foreach input $in {
if {$partial || [lindex $input 2]
|| [lindex $input 5] ne {}} {
uplevel 1 $script
}
}
} else {
# Run the script body for each input chunk and collect output
# chunks. When -partial is not used, flush is not commanded,
# and the buffer is incomplete, do not run the script.
set out {}
foreach input $in {
if {$partial || [lindex $input 2]
|| [lindex $input 5] ne {}} {
# Run the loop body script.
set scriptOut {{}}
uplevel 1 $script
# Fill in omitted output elements with defaults.
foreach output $scriptOut {
if {!$partial && ![llength $output]} {
set output [list [uplevel 1 {
string cat $buffer $complete
}]]
}
if {[llength $output] < 3} {
lappend output {*}[lrange $input\
[llength $output] 2]
}
lappend out $output
}
}
}
set out [{*}$merge $out]
}
}
} else {
# For unbuffered mode, far less processing is required.
set scriptOut {}
while {[set input [yieldto return -level 0 $scriptOut]] ne {}} {
set scriptOut {{}}
uplevel 1 $script
set scriptOut [{*}$merge $scriptOut]
}
}
}
# ::pipeline::fork --
# Filter procedure for use with [pipeline::new]. Defines anonymous pipelines
# within the context of a parent pipeline. Each input chunk is used as the
# input to the first filter of each nested pipeline. The output of this filter
# is the output of the final filter of the first nested pipeline, and the
# outputs of the other nested pipelines are discarded. Each [pipeline::fork]
# argument is a list of pipeline filter command prefixes.
proc ::pipeline::fork {args} {
if {$args eq {}} {
discard
} else {
set first [pipeline::new {*}[lindex $args 0]]
set rest [lmap arg [lrange $args 1 end] {pipeline::new {*}$arg}]
loop {
set out [$first run $input]
foreach coro $rest {
$coro run $input
}
}
}
}
# ::pipeline::filter --
# Filter procedure for use with [pipeline::new]. Passes or discards chunks for
# which a filter criteria script evaluates to true or false, respectively.
#
# The criteria script has access to all the same variables as the script
# argument to [pipeline::loop], including the additional variables provided by
# the -buffer switch. The criteria script may also access variables set by the
# initial variable dict, the initialization script, or previous iterations of
# the criteria script.
#
# The following arguments are accepted:
#
# -vars VARS Dict mapping from variable names and initial values
# -setup SCR Initialization script
# -expr Script is instead a Tcl math [expr] expression
# -buffer Wait until delimiter is encountered before evaluating script
# -partial Evaluate script for partial buffers as well as complete buffers
# -delim PAT Buffer delimiter regular expression, default \n
# script Script to evaluate for each chunk
::pipeline::procLoop ::pipeline::filter -params {
{-vars= -default {}}
{-setup= -default {}}
{-expr -boolean}
{-buffer -pass loopArgs}
{-partial -pass loopArgs}
{-delim? -pass loopArgs}
test
} -init {
if {$expr} {
set test [list expr $test]
}
dict with vars {}
eval $setup
} {
if {![eval $test]} {
set out {}
}
}
# ::pipeline::echo --
# Filter procedure for use with [pipeline::new]. Echoes input data to a given
# channel (defaulting to stdout) then passes it through unmodified. If this
# filter is wrapped using [pipeline::buffer], and flush is not commanded, only
# complete buffers are echoed, with the delimiter appended. Otherwise, chunks
# are echoed as soon as they are received. The output channel is flushed after
# every write.
::pipeline::procLoop ::pipeline::echo -params {
{-buffer -pass loopArgs}
{-delim= -pass loopArgs -require buffer}
{chan? -default stdout}
} -observe {
chan puts -nonewline $chan $data
chan flush $chan
}
# ::pipeline::regsub --
# Filter procedure for use with [pipeline::new]. Applies [regsub] filtering to
# each complete buffer flowing through the pipeline.
#
# The initial arguments alternate between regular expressions and replacements.
# If an odd number of arguments are given, the final replacement is assumed to
# be empty string. Additionally, any standard [regsub] switches may be used.
#
# Regular expressions and replacements cannot begin with "-". One possible
# workaround is to instead begin with "\-". Another is to precede the regular
# expression and replacement arguments with the special "--" switch.
#
# Regular expression matching and substitution are not applied to the delimiter,
# which is newline by default. The delimiter can be changed using the -delim
# switch. See the documentation for [pipeline::buffer] for more information.
#
# If the -erase switch is used, at least one regular expression substitution
# succeeded, and the result is an empty buffer, it is removed in full, and no
# delimiter is appended. This mode allows [pipeline::regsub] to be used to
# delete entire lines of input, rather than make them be blank lines.
::pipeline::procLoop ::pipeline::regsub -params {
{{} -normalize -boolean -pass regsubArgs}
{-start= -pass regsubArgs}
-erase
{-delim= -pass loopArgs}
expReps*!
} -buffer -result {
# Apply regular expression substitutions.
foreach {exp rep} $expReps {
::regsub {*}$regsubArgs $exp $buffer $rep buffer
}
# Append the delimiter unless the buffer is being erased.
if {!$erase || $buffer ne {}} {
append buffer $complete
}
# Yield any output that may have been obtained.
if {$buffer ne {}} {
list [list $buffer]
}
}
# ::pipeline::trimTrailingSpace --
# Filter procedure for use with [pipeline::new]. Trims trailing whitespace from
# each buffer. The default delimiter is newline but can be changed with -delim.
# See the documentation for [pipeline::buffer] for more information.
::pipeline::procLoop ::pipeline::trimTrailingSpace -params {
{-delim= -pass loopArgs}
} -buffer -partial -result {
# Find the last non-whitespace character in the current chunk.
set output {}
if {[regexp -indices {.*[^ \f\n\r\t\v]} $buffer end]} {
# Find the last non-whitespace character preceding the current chunk.
# This was the last character that was output before.
if {[regexp -indices {.*[^ \f\n\r\t\v]} $prior start]} {
set start [expr {[lindex $start 1] + 1}]
} else {
set start 0
}
# Output all characters since the previous output for this buffer
# through the final non-whitespace character in the current chunk.
append output [string range $buffer $start [lindex $end 1]]
}
# If this is a complete buffer, append the delimiter to the output.
append output $complete
# Yield any output that may have been obtained.
if {$output ne {}} {
list [list $output]
}
}
# ::pipeline::squeeze --
# Filter procedure for use with [pipeline::new]. Removes empty buffers at the
# beginning and end of output and collapses consecutive empty buffers into one.
# The default delimiter is newline but can be changed with -delim. See the
# documentation for [pipeline::buffer] for more information.
::pipeline::procLoop ::pipeline::squeeze -params {
{-delim= -pass loopArgs}
} -init {
set empty 1
} -buffer -partial {
if {$buffer eq {} && $complete ne {}} {
# Do not output empty buffers.
set out {}
set empty 1
} elseif {$buffer ne {}} {
# If a non-empty buffer comes after at least one empty buffer which is
# not at the beginning of input, precede the output chunk with the most
# recently observed delimiter. Otherwise, fall back on the default
# behavior which is to pass the chunk through directly.
if {$empty && [info exists delim]} {
set out [list [list $delim] {}]
}
set empty 0
if {$complete ne {}} {
set delim $complete
}
}
}
# ::pipeline::removeFixed --
# Filter procedure for use with [pipeline::new]. Removes buffers that exactly
# match one or more literal pattern strings, which do not include the delimiter.
# The -prefix switch also removes buffers that begin with any pattern string.
#
# Unlike [pipeline::regsub], this procedure does not delay output until the
# delimiter is encountered. Buffering only happens in event of a prefix match.
#
# If a flush occurs in the middle of a partial buffer, it will be output as-is,
# even though it could potentially be followed by characters that would make it
# match the removal pattern.
#
# The default delimiter is newline but can be changed with -delim. See the
# documentation for [pipeline::buffer] for more information.
::pipeline::procLoop ::pipeline::removeFixed -params {
{{} -boolean}
-prefix
{-delim= -pass loopArgs}
patterns*
} -buffer -partial {
foreach pattern $patterns {
# Determine the match prefix length.
if {$prefix && (($complete ne {} || $flush)
|| [string length $pattern] < [string length $buffer])} {
set len [string length $pattern]
} elseif {$complete eq {} && !$flush} {
set len [string length $buffer]
} else {
set len -1
}
# Check for matches against the current and prior buffers.
if {[string equal -length $len $buffer $pattern]} {
# Discard or delay the input if the buffer is complete and exactly
# matches the pattern, or is incomplete and is a prefix of the
# pattern, or if prefix matching is enabled and the pattern is a
# prefix of the buffer.
set out {}
break
} elseif {$prior ne {} && [string equal -length [string length $prior]\
$prior $pattern]} {
# If the partial buffer was previously discarded, provisionally
# output it in full because it ultimately ended up not matching. It
# may yet be discarded if it matches another pattern.
set out [list [list $buffer$complete]]
}
}
}
# ::pipeline::tee --
# Filter procedure for use with [pipeline::new]. Tees one pipeline off another,
# connecting the output of the current pipeline at the current point to the
# input of the other pipeline, without affecting the data flowing through the
# current pipeline. If nothing will call [pipeline::get] on the other pipeline,
# it is best that it contain the [pipeline::discard] filter to avoid unbounded
# growth of its output buffer.
::pipeline::procLoop ::pipeline::tee -params pipeline -observe {
$pipeline put -meta $meta {*}[if {$flush} {list -flush}] $data
}
# ::pipeline::splice --
# Filter procedure for use with [pipeline::new]. Splices one pipeline into
# another, connecting the output of the current pipeline at the current point to
# the input of the other pipeline, and vice versa.
proc ::pipeline::splice {pipeline} {
loop -command -raw $pipeline run
}
# ::pipeline::discard --
# Filter procedure for use with [pipeline::new]. Discards all input. The
# pipeline is ended immediately unless flush is commanded, in which case any
# subsequent filters (there probably won't be any) are executed with no input.
# This filter is useful in combination with [pipeline::tee] to terminate a teed
# pipeline on which [pipeline::get] will never be called.
::pipeline::procLoop ::pipeline::discard -result {}
# vim: set sts=4 sw=4 tw=80 et ft=tcl: