Attachment "reflected channel - multi-threaded demo-test.tcl" to
ticket [b9bfaf5fdb]
added by
sebres
2016-07-11 19:18:20.
proc __log {args} {::puts [format " %.3f | %s | %s" [expr {[clock milliseconds] / 1000.0}] [thread::id] [join $args " | "]]}
## create worker that process real channel:
set wth [thread::create {
## init thread:
proc __log {args} {::puts [format " %.3f | %s | %s" [expr {[clock milliseconds] / 1000.0}] [thread::id] [join $args " | "]]}
## reflected handler of demo reflected channel ...
namespace eval ::DemoRCHandler {
proc initialize {ch args} {
__log -WRK " -- channel initialize $ch ... $args"
upvar [namespace current]::_hdl_$ch hdl
array set hdl {eof 0 content "1st data" blocking 0 wait-in-worker 1}
__log -WRK " ++ channel [array get hdl]"
return {initialize finalize watch read write configure cget cgetall blocking}
}
proc finalize {ch args} {
__log -WRK " -- channel finalize $ch ... $args"
upvar [namespace current]::_hdl_$ch hdl
__log -WRK " -- channel [array get hdl]"
unset -nocomplain hdl
return 0
}
proc watch {ch args} {
__log -WRK " .. channel watch $ch ... $args"
}
proc configure {ch args} {
__log -WRK " .. channel configure $ch ... $args"
upvar [namespace current]::_hdl_$ch hdl
foreach {n v} $args {
switch -- $n \
"-alive" {
set hdl(eof) [expr {!$v}]
} \
"-wait-in-worker" {
set hdl(wait-in-worker) $v
} \
"-content" {
append hdl(content) $v
}
}
}
proc cget {ch name} {
__log -WRK " .. channel cget $ch ... $name"
upvar [namespace current]::_hdl_$ch hdl
switch -- $name \
"-alive" {
expr {!$hdl(eof)}
} \
"-wait-in-worker" {
set hdl(wait-in-worker)
}
}
proc cgetall {ch} {
__log -WRK " .. channel cgetall $ch ..."
upvar [namespace current]::_hdl_$ch hdl
dict create \
-alive [expr {!$hdl(eof)}] -wait-in-worker $hdl(wait-in-worker)
}
proc blocking {ch mode} {
__log -WRK " -- channel blocking $ch ... $mode"
upvar [namespace current]::_hdl_$ch hdl
set hdl(blocking) $mode
}
proc read {ch size} {
__log -WRK " -> channel read $ch ... $size"
upvar [namespace current]::_hdl_$ch hdl
while 1 {
## returns buffer read from channel (or "" for eof), or throws EAGAIN if no data currently:
set buf [string range $hdl(content) 0 [expr {$size-1}]]
set hdl(content) [string range $hdl(content) $size end]
## if no data, we will signal the higher layers that EOF has been reached on the channel:
if {![set len [string length $buf]] && !$hdl(eof)} {
## ERROR if we don't wait here - if wait-in-worker = 0 ...
if {$hdl(blocking) && $hdl(wait-in-worker)} {
## main channel is in blocking mode, don't return - vwait here for read
## (caution, we can process another requests in this thread, so don't really block)...
vwait [namespace current]::_hdl_$ch
continue
}
## not eof, retry later:
__log -WRK " <- channel read $ch ... EAGAIN"
return -code error EAGAIN
}
__log -WRK " <- channel read $ch ... [string length $buf] [if {![string length $buf]} {set _ {- EOF}}]"
return $buf
}
}
proc handler {args} {
#__log -WRK " -> channel handler ... $args"
{*}$args
}
proc demo-chan-create {} {
if {[catch {
set ch [chan create {read} ::DemoRCHandler::handler]
thread::detach $ch
} msg]} {
__log ERROR $::errorInfo
set ch -1
}
set ch
}
proc demo-chan-eof {ch content} {
if {[catch {
__log -WRK " ** last content ... "
upvar [namespace current]::_hdl_$ch hdl
set hdl(content) $content
set hdl(eof) 1
} msg]} {
__log ERROR $::errorInfo
}
}
}; ## end of rc demo-namespace.
## init end.
thread::wait
}]
after 10
foreach {block wait_in_worker} {
0 0
1 1
1 0
} {
__log =MTH [string repeat == 40]
__log =MTH "==== Blocking: $block, Wait-in-worker: $wait_in_worker\t==================="
__log =MTH [string repeat == 40]
set ch {}
set err [catch {
## create reflected channel in worker - send it to main thread for test:
set ch [thread::send $wth [list ::DemoRCHandler::demo-chan-create]]
thread::attach $ch
__log =MTH "CREATED ... $ch"
fconfigure $ch -blocking $block -translation binary -wait-in-worker $wait_in_worker
## produce 2nd content and eof with latency in worker:
thread::send -async $wth [list after 250 [list ::DemoRCHandler::demo-chan-eof $ch ", 2nd data"]]
set data [set buf [read $ch]]
__log =MTH !!BUF:$buf
## if non-blocking - wait for eof:
if {!$block} {
while {![eof $ch]} {
if {[set buf [read $ch]] ne ""} {
append data $buf
__log =MTH !!BUF:$buf
}
## dummy wait
after 50
}
}
if {[string first ", 2nd data" $data] != -1} {
__log =MTH OK**READ:[string length $data]--$data
} else {
__log =ERR ERROR**READ:[string length $data]--$data
}
} msg opt]
if {$err} {
__log ERROR $::errorInfo
}
__log =MTH "CLOSE ... $ch"
if {$ch ne {}} {close $ch; set ch {}}
if {$err} {
thread::release $wth; set wth {}
return {*}$opt $msg
}
}
__log =MTH [string repeat == 40]
if {$wth ne {}} {thread::release $wth}