Tcl Source Code

Artifact [e860f9f9f8]
Login

Artifact e860f9f9f812bed9971ab1ec2fd62b954e4a68f3:

Attachment "reflected channel - multi-threaded demo-test.tcl" to ticket [b9bfaf5fdb] added by sebres 2016-07-11 19:18:20.
proc __log {args} {::puts [format " %.3f | %s | %s" [expr {[clock milliseconds] / 1000.0}] [thread::id] [join $args " | "]]}

## create worker that process real channel:
set wth [thread::create {
  ## init thread:
  proc __log {args} {::puts [format " %.3f | %s | %s" [expr {[clock milliseconds] / 1000.0}] [thread::id] [join $args " | "]]}

  ## reflected handler of demo reflected channel ...
  namespace eval ::DemoRCHandler {

    proc initialize {ch args} {
      __log -WRK " -- channel initialize $ch ... $args"
      upvar [namespace current]::_hdl_$ch hdl
      array set hdl {eof 0 content "1st data" blocking 0 wait-in-worker 1}
      __log -WRK " ++ channel [array get hdl]"
      return {initialize finalize watch read write configure cget cgetall blocking}
    }
    proc finalize {ch args} {
      __log -WRK " -- channel finalize $ch ... $args"
      upvar [namespace current]::_hdl_$ch hdl
      __log -WRK " -- channel [array get hdl]"
      unset -nocomplain hdl
      return 0
    }
    proc watch {ch args} {
      __log -WRK " .. channel watch $ch ... $args"
    }
    proc configure {ch args} {
      __log -WRK " .. channel configure $ch ... $args"
      upvar [namespace current]::_hdl_$ch hdl
      foreach {n v} $args {
        switch -- $n \
        "-alive" {
          set hdl(eof) [expr {!$v}]
        } \
        "-wait-in-worker" {
          set hdl(wait-in-worker) $v
        } \
        "-content" {
          append hdl(content) $v
        }
      }
    }
    proc cget {ch name} {
      __log -WRK " .. channel cget $ch ... $name"
      upvar [namespace current]::_hdl_$ch hdl
      switch -- $name \
        "-alive" {
          expr {!$hdl(eof)}
        } \
        "-wait-in-worker" {
          set hdl(wait-in-worker)
        }
    }
    proc cgetall {ch} {
      __log -WRK " .. channel cgetall $ch ..."
      upvar [namespace current]::_hdl_$ch hdl
      dict create \
        -alive [expr {!$hdl(eof)}] -wait-in-worker $hdl(wait-in-worker)
    }
    proc blocking {ch mode} {
      __log -WRK " -- channel blocking $ch ... $mode"
      upvar [namespace current]::_hdl_$ch hdl
      set hdl(blocking) $mode
    }
    proc read {ch size} {
      __log -WRK " -> channel read $ch ... $size"
      upvar [namespace current]::_hdl_$ch hdl
      while 1 {
        ## returns buffer read from channel (or "" for eof), or throws EAGAIN if no data currently:
        set buf [string range $hdl(content) 0 [expr {$size-1}]]
        set hdl(content) [string range $hdl(content) $size end]
        ## if no data, we will signal the higher layers that EOF has been reached on the channel:
        if {![set len [string length $buf]] && !$hdl(eof)} {
          ## ERROR if we don't wait here - if wait-in-worker = 0 ...
          if {$hdl(blocking) && $hdl(wait-in-worker)} {
            ## main channel is in blocking mode, don't return - vwait here for read 
            ## (caution, we can process another requests in this thread, so don't really block)...
            vwait [namespace current]::_hdl_$ch
            continue
          }
          ## not eof, retry later:
          __log -WRK " <- channel read $ch ... EAGAIN"
          return -code error EAGAIN
        }
        __log -WRK " <- channel read $ch ... [string length $buf] [if {![string length $buf]} {set _ {- EOF}}]"
        return $buf
      }
    }

    proc handler {args} {
      #__log -WRK " -> channel handler ... $args"
      {*}$args
    }

    proc demo-chan-create {} {
      if {[catch {
        set ch [chan create {read} ::DemoRCHandler::handler]
        thread::detach $ch
      } msg]} {
        __log ERROR $::errorInfo
        set ch -1
      }
      set ch
    }

    proc demo-chan-eof {ch content} {
      if {[catch {
        __log -WRK " ** last content ... "
        upvar [namespace current]::_hdl_$ch hdl
        set hdl(content) $content
        set hdl(eof) 1
      } msg]} {
        __log ERROR $::errorInfo
      }
    }
  }; ## end of rc demo-namespace.

  ## init end.
  thread::wait
}]

after 10

foreach {block wait_in_worker} {
  0 0
  1 1 
  1 0
} {
  __log =MTH [string repeat == 40]
  __log =MTH "==== Blocking: $block, Wait-in-worker: $wait_in_worker\t==================="
  __log =MTH [string repeat == 40]
  set ch {}
  set err [catch {
    ## create reflected channel in worker - send it to main thread for test:
    set ch [thread::send $wth [list ::DemoRCHandler::demo-chan-create]]
    thread::attach $ch
    __log =MTH "CREATED ... $ch"
    fconfigure $ch -blocking $block -translation binary -wait-in-worker $wait_in_worker
    
    ## produce 2nd content and eof with latency in worker:
    thread::send -async $wth [list after 250 [list ::DemoRCHandler::demo-chan-eof $ch ", 2nd data"]]

    set data [set buf [read $ch]]
    __log =MTH !!BUF:$buf
    ## if non-blocking - wait for eof:
    if {!$block} {
      while {![eof $ch]} {
        if {[set buf [read $ch]] ne ""} {
          append data $buf
          __log =MTH !!BUF:$buf
        }
        ## dummy wait
        after 50
      }
    }
    if {[string first ", 2nd data" $data] != -1} {
      __log =MTH OK**READ:[string length $data]--$data
    } else {
      __log =ERR ERROR**READ:[string length $data]--$data
    }
    
  } msg opt]
  if {$err} {
    __log ERROR $::errorInfo
  }
  __log =MTH "CLOSE ... $ch"
  if {$ch ne {}} {close $ch; set ch {}}
  if {$err} {
    thread::release $wth; set wth {}
    return {*}$opt $msg
  }
}

__log =MTH [string repeat == 40]
if {$wth ne {}} {thread::release $wth}