Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Modified the httpd example code Fixes to the cron package to better interact with coroutines from the tool framework. Added support for TWAPI 3.1+ in nettool Replaced bare "puts", "flush", and "close" calls with the chan ensemble versions. Added a TCP based transaction system to udpcluster, as well as a central directory of information per machine. |
---|---|
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
a20eacae85067033ba39ea9b9b1b44e4 |
User & Date: | tne 2016-07-19 15:45:31 |
Context
2016-07-23
| ||
11:06 | New version 2.0 for the cron package. It now provides a "task" ensemble for manipulating schedules and also tracks time internally in milliseconds. It provides a new coroutine aware "sleep" function to pause a script but keep background tasks firing off. cron also now includes all of the functions needed to track and clean up after coroutines and events that have been created by TclOO objects. Tool has been modified to make use of the new features in cron. Replaced the sleep function in udpcluster with the sleep function in cron. Updated the dependencies for processman check-in: d9b48225d8 user: hypnotoad tags: trunk | |
2016-07-19
| ||
15:45 | Modified the httpd example code Fixes to the cron package to better interact with coroutines from the tool framework. Added support for TWAPI 3.1+ in nettool Replaced bare "puts", "flush", and "close" calls with the chan ensemble versions. Added a TCP based transaction system to udpcluster, as well as a central directory of information per machine. check-in: a20eacae85 user: tne tags: trunk | |
15:39 | Merging changes from trunk check-in: b83ef84eec user: tne tags: odie | |
2016-07-12
| ||
07:58 | fumagic {minor change to documentation} check-in: 3525edeeef user: pooryorick tags: trunk | |
Changes
Changes to examples/httpd/httpd.tcl.
︙ | ︙ | |||
12 13 14 15 16 17 18 | ### # This script creates two toplevel domains: # * Hosting the tcllib embedded documentation as static content # * Hosting a local fossil mirror of the tcllib repository ### package require httpd | | | > > > > > > > | > > > > > > | | < < < > | > > > > > < < | < < > < < < < < < < < < < < < < > > > > > > > > > > | > | > > > > < < < > | | | > | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | ### # This script creates two toplevel domains: # * Hosting the tcllib embedded documentation as static content # * Hosting a local fossil mirror of the tcllib repository ### package require httpd tool::class create httpd::content::fossil_root { method content {} { my reset my puts "<HTML><HEAD><TITLE>Local Fossil Repositories</TITLE></HEAD><BODY>" global recipe my puts "<UL>" set dbfiles [exec fossil all list] foreach file [lsort -dictionary $dbfiles] { dict set result [file rootname [file tail $file]] $file } foreach {module dbfile} [lsort -dictionary -stride 2 $result] { my puts "<li><a HREF=/fossil/$module>$module</a>" } my puts {</UL></BODY></HTML>} } } ### # Fossil nodes are actually handoffs to fossil passthrough handlers ### tool::class create httpd::content::fossil_node_scgi { superclass httpd::content::scgi method scgi_info {} { file mkdir ~/tmp set uri [my query_headers get REQUEST_URI] set prefix [my query_headers get prefix] set module [lindex [split $uri /] 2] puts [list *** $uri -> $module] if {![info exists ::fossil_process($module)]} { package require processman package require nettool set port [::nettool::allocate_port 40000] set handle fossil:$port set dbfiles [exec fossil all list] foreach file [lsort -dictionary $dbfiles] { dict set result [file rootname [file tail $file]] $file } set dbfile [dict get $result $module] if {![file exists $dbfile]} { tailcall my error 400 {Not Found} } set server_name [my <server> cget server_name] set mport [my <server> port_listening] set baseurl http://${server_name}:${mport}/fossil/$module set cmd [list fossil server --baseurl $baseurl --port $port --localhost --scgi $dbfile 2>~/tmp/$module.err >~/tmp/$module.log] dict set ::fossil_process($module) repo $dbfile dict set ::fossil_process($module) port $port dict set ::fossil_process($module) handle $handle dict set ::fossil_process($module) cmd $cmd dict set ::fossil_process($module) SCRIPT_NAME $prefix/$module ::puts [list Launching SCGI $module] foreach {field value} $::fossil_process($module) { ::puts [list $field: $value] } } dict with ::fossil_process($module) {} if {![::processman::running $handle]} { set process [::processman::spawn $handle {*}$cmd] my varname paused after 500 } return [list localhost $port $SCRIPT_NAME] } } tool::class create ::docserver::server { superclass ::httpd::server::dispatch ::httpd::server method log args { puts [list {*}$args] } } ::docserver::server create appmain doc_root $DEMOROOT {*}$argv appmain add_uri /tcllib* [list mixin httpd::content::file path [file join $tcllibroot embedded www]] appmain add_uri /fossil {mixin httpd::content::fossil_root} appmain add_uri /fossil/* {mixin httpd::content::fossil_node_scgi} puts [list LISTENING] tool::main |
Changes to modules/cron/cron.tcl.
︙ | ︙ | |||
112 113 114 115 116 117 118 | ### # topic: 1f8d4726623321acc311734c1dadcd8e # description: # Run through our process table and # kick off overdue tasks ### | | | > > > > | 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | ### # topic: 1f8d4726623321acc311734c1dadcd8e # description: # Run through our process table and # kick off overdue tasks ### proc ::cron::runProcesses {{coro 0}} { variable processTable set now [clock seconds] ### # Determine what tasks to run this timestep ### set tasks {} set cancellist {} foreach {process} [lsort -dictionary [array names processTable]] { dict with processTable($process) { if { $scheduled <= $now } { lappend tasks $process if { $frequency <= 0 } { lappend cancellist $process } else { set scheduled [expr {$frequency + $lastrun}] if { $scheduled <= $now } { set scheduled [expr {$frequency + $now}] } } set lastrun $now } set lastevent $now } } foreach task $tasks { dict set processTable($task) lastrun $now doOneEvent $task if {$coro} { yield 0 } } foreach {task} $cancellist { unset -nocomplain processTable($task) } } ### |
︙ | ︙ | |||
202 203 204 205 206 207 208 | ### # Do this forever ### variable processTable variable processing while 1 { set lastevent 0 | | > < < < | | | | < < < < < < < < < < < | < < | > | | < | | < < | < < | 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 | ### # Do this forever ### variable processTable variable processing while 1 { set lastevent 0 runProcesses 1 # Wake me up in 5 minute intervals, just out of principle set now [clock seconds] set nextevent [expr {$now-($now % 300) + 300}] set nexttask {} foreach {process} [lsort -dictionary [array names processTable]] { dict with processTable($process) { if {$scheduled < $nextevent} { set nexttask $process set nextevent $scheduled } set lastevent $now } } set delay [expr {$nextevent-$now}] if {$delay < 0} { yield 0 } else { if {$delay > 120} { set delay [expr {$delay-($delay % 60) + 60}] } yield $delay } } } proc ::cron::wake {} { |
︙ | ︙ |
Changes to modules/httpd/content.tcl.
︙ | ︙ | |||
115 116 117 118 119 120 121 122 123 124 125 126 127 128 | } ### # Output the result or error to the channel # and destroy this object ### method DoOutput {} { chan event $chan writable {} my variable reply_body reply_file reply_chan chan chan configure $chan -translation {binary binary} set headers [my reply_headers dump] if {[dict exists $headers Status:]} { set result "[my EncodeStatus [dict get $headers Status:]]\n" | > | 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | } ### # Output the result or error to the channel # and destroy this object ### method DoOutput {} { my variable chan chan event $chan writable {} my variable reply_body reply_file reply_chan chan chan configure $chan -translation {binary binary} set headers [my reply_headers dump] if {[dict exists $headers Status:]} { set result "[my EncodeStatus [dict get $headers Status:]]\n" |
︙ | ︙ | |||
180 181 182 183 184 185 186 187 188 | if {$sockinfo eq {}} { my error 404 {Not Found} return } lassign $sockinfo scgihost scgiport scgiscript set sock [::socket $scgihost $scgiport] # Add a few headers that SCGI needs my query_headers set SCRIPT_NAME $scgiscript my query_headers set SCGI 1.0 | > > > > > > | | 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | if {$sockinfo eq {}} { my error 404 {Not Found} return } lassign $sockinfo scgihost scgiport scgiscript set sock [::socket $scgihost $scgiport] # Add a few headers that SCGI needs my query_headers set SERVER_NAME [my <server> cget server_name] my query_headers set SCRIPT_NAME $scgiscript my query_headers set SERVER_PORT [my <server> port_listening] set ::env(SCRIPT_NAME) $scgiscript my query_headers set SCGI 1.0 ::puts {HEADERS} foreach {field element} [my query_headers dump] { ::puts [list $field $element] } chan configure $chan -translation binary -blocking 0 -buffering full -buffersize 4096 chan configure $sock -translation binary -blocking 0 -buffering full -buffersize 4096 ### # Convert our query headers into netstring format. Note that # MimeParse as already rigged it such that CONTENT_LENGTH is first # and always populated (even if zero), per SCGI requirements ### |
︙ | ︙ | |||
208 209 210 211 212 213 214 215 | # Wake this object up after the SCGI process starts to respond ### #chan configure $sock -translation {auto crlf} -blocking 0 -buffering line chan event $sock readable [namespace code {my output}] } method DoOutput {} { chan event $chan writable {} | > | > < > > > > > > | | 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | # Wake this object up after the SCGI process starts to respond ### #chan configure $sock -translation {auto crlf} -blocking 0 -buffering line chan event $sock readable [namespace code {my output}] } method DoOutput {} { my variable chan sock chan event $chan writable {} if {![info exists sock] || [my query_headers getnull HTTP_ERROR] ne {}} { ### # If something croaked internally, handle this page as a normal reply ### next return } set replyhead [my HttpHeaders $sock] puts [list REPLY HEADERS $replyhead] set replydat [my MimeParse $replyhead] ### # Convert the Status: header from the SCGI service to # a standard service reply line from a web server, but # otherwise spit out the rest of the headers verbatim ### if {![dict exists $replydat HTTP_STATUS]} { set status 200 } else { set status [dict get $replydat HTTP_STATUS] } set replybuffer "HTTP/1.1 $status\n" append replybuffer $replyhead chan configure $chan -translation {auto crlf} -blocking 0 -buffering full -buffersize 4096 chan puts $chan $replybuffer ### # Output the body ### chan configure $sock -translation binary -blocking 0 -buffering full -buffersize 4096 |
︙ | ︙ | |||
291 292 293 294 295 296 297 298 | # Wake this object up after the proxied process starts to respond ### chan configure $sock -translation {auto crlf} -blocking 1 -buffering line chan event $sock readable [namespace code {my output}] } method DoOutput {} { chan event $chan writable {} | > | > < | 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 | # Wake this object up after the proxied process starts to respond ### chan configure $sock -translation {auto crlf} -blocking 1 -buffering line chan event $sock readable [namespace code {my output}] } method DoOutput {} { my variable chan sock chan event $chan writable {} if {![info exists sock] || [my query_headers getnull HTTP_ERROR] ne {}} { ### # If something croaked internally, handle this page as a normal reply ### next return } set length 0 chan configure $sock -translation {crlf crlf} -blocking 1 set replystatus [gets $sock] set replyhead [my HttpHeaders $sock] set replydat [my MimeParse $replyhead] ### |
︙ | ︙ |
Changes to modules/httpd/httpd.tcl.
︙ | ︙ | |||
83 84 85 86 87 88 89 | # a flag which will terminate the vwait. # # We do this rather than entering blocking mode to prevent the process # from locking up if it's starved for input. (Or in the case of the test # suite, when we are opening a blocking channel on the other side of the # socket back to ourselves.) ### | | < | > | > | 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | # a flag which will terminate the vwait. # # We do this rather than entering blocking mode to prevent the process # from locking up if it's starved for input. (Or in the case of the test # suite, when we are opening a blocking channel on the other side of the # socket back to ourselves.) ### chan configure $sock -translation {crlf crlf} -blocking 0 -buffering line my variable MimeHeadersSock set MimeHeadersSock($sock) {} set MimeHeadersSock($sock.done) {} chan event $sock readable [namespace code [list my HttpHeaderLine $sock]] vwait [my varname MimeHeadersSock]($sock.done) ### # Return our buffer ### return $MimeHeadersSock($sock) } method HttpHeaderLine {sock} { my variable MimeHeadersSock if {[chan eof $sock]} { # Socket closed... die tailcall my destroy } try { gets $sock line if {$line eq {}} { set [my varname MimeHeadersSock]($sock.done) 1 chan event $sock readable {} } else { append MimeHeadersSock($sock) $line \n } } trap {POSIX EBUSY} {err info} { # Happens... } on error {err info} { puts "ERROR $err" |
︙ | ︙ | |||
192 193 194 195 196 197 198 | } } # Dispatch to the URL implementation. my content } on error {err info} { dict print $info #puts stderr $::errorInfo | | | 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | } } # Dispatch to the URL implementation. my content } on error {err info} { dict print $info #puts stderr $::errorInfo my error 500 $err [dict get $info -errorinfo] } finally { my output } } dictobj query_headers query_headers { initialize { |
︙ | ︙ | |||
217 218 219 220 221 222 223 | dictobj reply_headers reply_headers { initialize { Content-Type: {text/html; charset=ISO-8859-1} Connection: close } } | | | 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | dictobj reply_headers reply_headers { initialize { Content-Type: {text/html; charset=ISO-8859-1} Connection: close } } method error {code {msg {}} {errorInfo {}}} { puts [list [self] ERROR $code $msg] my query_headers set HTTP_ERROR $code my reset my variable error_codes set qheaders [my query_headers dump] if {![info exists error_codes($code)]} { set errorstring "Unknown Error Code" |
︙ | ︙ | |||
254 255 256 257 258 259 260 | <p> The server encountered an internal error: <p> <pre>$msg</pre> <p> For deeper understanding: <p> | | | 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | <p> The server encountered an internal error: <p> <pre>$msg</pre> <p> For deeper understanding: <p> <pre>$errorInfo</pre> " } my puts "</BODY> </HTML>" } |
︙ | ︙ | |||
442 443 444 445 446 447 448 | # 3) By default it will only listen on localhost ### ::tool::define ::httpd::server { option port {default: auto} option myaddr {default: 127.0.0.1} option server_string [list default: [list TclHttpd $::httpd::version]] | > | | 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 | # 3) By default it will only listen on localhost ### ::tool::define ::httpd::server { option port {default: auto} option myaddr {default: 127.0.0.1} option server_string [list default: [list TclHttpd $::httpd::version]] option server_name [list default: [list [info hostname]]] property socket buffersize 32768 property socket translation {auto crlf} property reply_class ::httpd::reply constructor {args} { my configure {*}$args my start |
︙ | ︙ | |||
593 594 595 596 597 598 599 | set port [my cget port] if { $port in {auto {}} } { package require nettool set port [::nettool::allocate_port 8015] } set port_listening $port set myaddr [my cget myaddr] | | | | | 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 | set port [my cget port] if { $port in {auto {}} } { package require nettool set port [::nettool::allocate_port 8015] } set port_listening $port set myaddr [my cget myaddr] puts [list [self] listening on $port $myaddr] if {$myaddr ni {* {}}} { foreach ip $myaddr { lappend socklist [socket -server [namespace code [list my connect]] -myaddr $ip $port] } } else { lappend socklist [socket -server [namespace code [list my connect]] $port] } ::cron::every [self] 120 [namespace code {my CheckTimeout}] |
︙ | ︙ |
Changes to modules/nettool/platform_windows.tcl.
︙ | ︙ | |||
15 16 17 18 19 20 21 | lappend result [string map {- :} $macid] $ipaddr } } } return $result } | < < < < < < < < < < < < < < < < > < < < < < < < < < < | < < < < < < < < < | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | lappend result [string map {- :} $macid] $ipaddr } } } return $result } ### # topic: 57fdc331bc60c7bf2bd3f3214e9a906f ### proc ::nettool::hwaddr_to_ipaddr args { return [::twapi::hwaddr_to_ipaddr {*}$args] } ### # topic: dd2e2c0810cea69909399808f2a68949 # title: Return a list of unique hardware ids ### proc ::nettool::hwid_list {} { # Use the serial number on the hard drive catch {exec {*}[auto_execok vol] c:} voldat set num [lindex [lindex [split $voldat \n] end] end] return 0x[string map {- {}} $num] } if {[info command ::twapi::get_netif_indices] ne {}} { ### # topic: 4b87d977492bd10802bfc0327cd07ac2 # title: Return list of network interfaces ### proc ::nettool::if_list {} { return [::twapi::get_netif_indices] } ### # topic: ac9d6815d47f60d45930f0c8c8ae8f16 # title: Return list of mac numbers for this computer (primary first) ### proc ::nettool::mac_list {} { set result {} |
︙ | ︙ | |||
110 111 112 113 114 115 116 117 118 119 120 121 122 123 | set mask [::ip::maskToInt $netmask] set addri [::ip::toInteger $addr] lappend result [ip::nativeToPrefix [list [expr {$addri & $mask}] $netmask] -ipv4] } } return [lsort -unique $result] } proc ::nettool::status {} { set result {} #dict set result load [::twapi::] set cpus [::twapi::get_processor_count] set usage 0 for {set p 0} {$p < $cpus} {incr p} { | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | set mask [::ip::maskToInt $netmask] set addri [::ip::toInteger $addr] lappend result [ip::nativeToPrefix [list [expr {$addri & $mask}] $netmask] -ipv4] } } return [lsort -unique $result] } } else { if {[info commands ::twapi::get_network_adapters] ne {}} { proc ::nettool::if_list {} { return [::twapi::get_network_adapters] } } if {[info commands ::twapi::get_network_adapter_info] ne {}} { proc ::nettool::mac_list {} { set result {} foreach iface [if_list] { set dat [::twapi::get_network_adapter_info $iface -physicaladdress] set addr [string map {- :} [lindex $dat 1]] if {[string length $addr] eq 0} continue if {[string range $addr 0 5] eq "00:00:"} continue lappend result $addr } return $result } proc ::nettool::network_list {} { set result {} foreach iface [if_list] { set dat [::twapi::get_network_adapter_info $iface -prefixes] foreach kvlist [lindex $dat 1] { if {![dict exists $kvlist -address]} continue if {![dict exists $kvlist -prefixlength]} continue set length [dict get $kvlist -prefixlength] if {$length>31} continue set address [dict get $kvlist -address] if {[string range $address 0 1] eq "ff"} continue lappend result $address/$length } } return [lsort -unique $result] } } } ### # topic: 92ebbfa155883ad41c37d3f843392be4 # title: Return list of broadcast addresses for local networks ### proc ::nettool::broadcast_list {} { set result {} lappend result 127.0.0.1 foreach net [network_list] { if {$net in {224.0.0.0/4 127.0.0.0/8}} continue lappend result [::ip::broadcastAddress $net] } return [lsort -unique -dictionary $result] } ### # topic: 417672d3f31b80d749588365af88baf6 # title: Return list of ip addresses for this computer (primary first) ### set body {} if {[info commands ::twapi::get_ip_addresses] ne {}} { proc ::nettool::ip_list {} { set result [::twapi::get_ip_addresses] ldelete result 127.0.0.1 return $result } } elseif {[info commands ::twapi::get_system_ipaddrs] ne {}} { # They changed commands names on me... proc ::nettool::ip_list {} { set result [::twapi::get_system_ipaddrs -version 4] ldelete result 127.0.0.1 return $result } } proc ::nettool::status {} { set result {} #dict set result load [::twapi::] set cpus [::twapi::get_processor_count] set usage 0 for {set p 0} {$p < $cpus} {incr p} { |
︙ | ︙ |
Changes to modules/tool/index.tcl.
︙ | ︙ | |||
51 52 53 54 55 56 57 | set ::tool::tool_root [file dirname $cwd] ::tool::pathload $cwd { uuid.tcl ensemble.tcl metaclass.tcl event.tcl } $idxfile | | | 51 52 53 54 55 56 57 58 59 | set ::tool::tool_root [file dirname $cwd] ::tool::pathload $cwd { uuid.tcl ensemble.tcl metaclass.tcl event.tcl } $idxfile package provide tool 0.5.5 |
Changes to modules/tool/metaclass.tcl.
︙ | ︙ | |||
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | } return $result } proc ::tool::object_create objname { foreach varname { object_info object_subscribe } { variable $varname set ${varname}($objname) {} } set object_info($objname) [list class [info object class $objname]] } proc ::tool::object_destroy objname { ::tool::event::generate $objname object_destroy [list objname $objname] foreach varname { object_info object_subscribe } { variable $varname unset -nocomplain ${varname}($objname) } } #------------------------------------------------------------------------- | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 | } return $result } proc ::tool::object_create objname { foreach varname { object_info object_signal object_subscribe object_coroutine } { variable $varname set ${varname}($objname) {} } set object_info($objname) [list class [info object class $objname]] } proc ::tool::object_rename {object newname} { foreach varname { object_info object_signal object_subscribe object_coroutine } { variable $varname if {[info exists ${varname}($object)]} { set ${varname}($newname) [set ${varname}($object)] unset ${varname}($object) } } variable coroutine_object foreach {coro coro_objname} [array get coroutine_object] { if { $object eq $coro_objname } { set coroutine_object($coro) $newname } } rename $object ::[string trimleft $newname] ::tool::event::generate $object object_rename [list newname $newname] } proc ::tool::object_destroy objname { ::tool::event::generate $objname object_destroy [list objname $objname] ::tool::event::cancel $objname * variable coroutine_object foreach {coro coro_objname} [array get coroutine_object] { if { $objname eq $coro_objname } { coroutine_unregister $coro } } foreach varname { object_info object_signal object_subscribe object_coroutine } { variable $varname unset -nocomplain ${varname}($objname) } } #------------------------------------------------------------------------- |
︙ | ︙ |
Changes to modules/tool/pipeline.tcl.
1 2 3 4 5 6 7 | ::namespace eval ::tool::signal {} proc ::tool::coroutine_register {objname coroutine} { variable all_coroutines variable object_coroutines variable coroutine_object # Wake a sleeping main loop | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | ::namespace eval ::tool::signal {} package require coroutine::auto proc ::tool::coroutine_register {objname coroutine} { variable all_coroutines variable object_coroutines variable coroutine_object # Wake a sleeping main loop set ::tool::wake_up 0 set ::tool::rouser ::tool::coroutine_register if {$coroutine in $all_coroutines} { return 1 } lappend all_coroutines $coroutine lappend object_coroutines($objname) $coroutine set coroutine_object($coroutine) $objname |
︙ | ︙ | |||
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | } proc ::tool::do_events {} { # Process coroutines variable all_coroutines variable coroutine_object variable last_event set last_event [clock seconds] set count 0 foreach coro $all_coroutines { if {[info command $coro] eq {}} { #puts "$coro quit" coroutine_unregister $coro continue } #puts [list RUN $coro] try $coro on return {} { # Terminate the coroutine coroutine_unregister $coro } on break {} { # Terminate the coroutine coroutine_unregister $coro | > > > > > > > > | 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | } proc ::tool::do_events {} { # Process coroutines variable all_coroutines variable coroutine_object variable coroutine_busy variable last_event set last_event [clock seconds] set count 0 foreach coro $all_coroutines { if {![info exists coroutine_busy($coro)]} { set coroutine_busy($coro) 0 } # Prevent a stuck coroutine from logjamming the entire event loop if {$coroutine_busy($coro)} continue set coroutine_busy($coro) 1 if {[info command $coro] eq {}} { #puts "$coro quit" coroutine_unregister $coro continue } set deleted 0 #puts [list RUN $coro] try $coro on return {} { # Terminate the coroutine coroutine_unregister $coro } on break {} { # Terminate the coroutine coroutine_unregister $coro |
︙ | ︙ | |||
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | puts *** puts $errorinfo } on continue {result opts} { # Ignore continue if { $result eq "done" } { incr count coroutine_unregister $coro } } on ok {result opts} { if { $result eq "done" } { coroutine_unregister $coro } else { incr count } } } return $count } proc ::tool::Main_Service {} { | > > > > > > > > < > > > > > | > | < > > | > < | > > | | | > < < < < < | > | < < < | > | < | < < < < < < | < < | | < < < < < | | < < < | < < < < < < < < | < | | < | < < < | > > | < < < < < < < < > > > > > > > | 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | puts *** puts $errorinfo } on continue {result opts} { # Ignore continue if { $result eq "done" } { incr count coroutine_unregister $coro set deleted 1 } } on ok {result opts} { if { $result eq "done" } { coroutine_unregister $coro set deleted 1 } else { incr count } } if {$deleted} { unset -nocomplain coroutine_busy($coro) } else { set coroutine_busy($coro) 0 } } return $count } proc ::tool::Main_Service {} { if {[info command ::CRON] eq {}} { coroutine ::CRON ::cron::runTasksCoro } set now [clock seconds] set cron_delay [::CRON] set ::tool::busy 1 set tool_running [::tool::do_events] set ::tool::busy 0 if {$cron_delay==0 || $tool_running>0} { set ::tool::wake_up 0 set ::tool::rouser {Main_Service active} incr ::tool::loops(active) } else { set ::tool::rouser [list Main_Service idle $cron_delay] set ::tool::wake_up [expr {$cron_delay+$now}] incr ::tool::loops(idle) } } proc ::tool::main {} { set ::tool::rouser STARTUP package require cron 1.2 variable event_loops variable last_event variable trace if {[info exists ::tool::main($event_loops)]} { if {$::tool::main($event_loops)} { set last_event -1 set ::tool::wake_up 0 set ::tool::rouser RESTART_EVENT_LOOP update if {$last_event>0} { return } } } ### # Have the cron::wake procedure wake up an idle loop instead # of it's normal run commands in the background ### proc ::cron::wake {} { set ::tool::wake_up 0 set ::tool::rouser ::cron::wake } # Signal for all other MAIN loops to terminate for {set x 0} {$x < $event_loops} {incr x} { set ::tool::main($x) 0 } set ::tool::wake_up 0 update set this [incr event_loops] set ::tool::main($this) 1 set ::tool::wake_up 0 set ::tool::busy 0 while {$::tool::main($this)} { # Call an update just to give the rest of the event loop a chance update incr ::tool::loops(all) if {$::tool::busy==0} { # Kick off a new round of event processing # only if the current round # has completed set panic [after 120000 {puts "Warning: Tool event loop has not responded in 2 minutes" ; set ::tool::rouser PANIC ; set ::tool::busy 0}] after idle ::tool::Main_Service update } if {$::tool::wake_up > 0} { set delay [expr {(${::tool::wake_up}-[clock seconds])*1000}] if {$trace} { puts [list EVENT LOOP WILL WAKE IN [expr {$delay/1000}]s active: $::tool::loops(active) idle: $::tool::loops(idle) busy: $::tool::busy rouser: $::tool::rouser] } set next [after $delay {set ::tool::wake_up 0}] set ::tool::rouser IDLELOOP set ::tool::wake_up 0 vwait ::tool::wake_up after cancel $next } if {${::tool::busy} == 0} { after cancel $panic } } } namespace eval ::tool { variable trace 0 variable event_loops if {![info exists event_loops]} { set event_loops 0 } if {![info exists ::tool::loops]} { array set ::tool::loops { active 0 all 0 idle 0 } } variable all_coroutines if {![info exists all_coroutines]} { set all_coroutines {} } } package provide tool::pipeline 0.1 |
Changes to modules/tool/pkgIndex.tcl.
1 2 3 4 5 6 7 8 9 10 11 | # Tcl package index file, version 1.1 # This file is generated by the "pkg_mkIndex" command # and sourced either when an application starts up or # by a "package unknown" script. It invokes the # "package ifneeded" command to set up package-related # information so that packages will be loaded automatically # in response to "package require" commands. When this # script is sourced, the variable $dir must contain the # full path name of this file's directory. if {![package vsatisfies [package provide Tcl] 8.6]} {return} | | | 1 2 3 4 5 6 7 8 9 10 11 12 | # Tcl package index file, version 1.1 # This file is generated by the "pkg_mkIndex" command # and sourced either when an application starts up or # by a "package unknown" script. It invokes the # "package ifneeded" command to set up package-related # information so that packages will be loaded automatically # in response to "package require" commands. When this # script is sourced, the variable $dir must contain the # full path name of this file's directory. if {![package vsatisfies [package provide Tcl] 8.6]} {return} package ifneeded tool 0.5.5 [list source [file join $dir index.tcl]] |
Changes to modules/udpcluster/pkgIndex.tcl.
1 2 3 | if {![package vsatisfies [package provide Tcl] 8.5]} {return} # Backward compadible alias package ifneeded nameserv::cluster 0.2.5 {package require udpcluster ; package provide nameserv::cluster 0.2.5} | | | 1 2 3 4 | if {![package vsatisfies [package provide Tcl] 8.5]} {return} # Backward compadible alias package ifneeded nameserv::cluster 0.2.5 {package require udpcluster ; package provide nameserv::cluster 0.2.5} package ifneeded udpcluster 0.3.1 [list source [file join $dir udpcluster.tcl]] |
Changes to modules/udpcluster/udpcluster.tcl.
︙ | ︙ | |||
29 30 31 32 33 34 35 | proc ::cluster::broadcast {args} { if {$::cluster::config(debug)} { puts [list $::cluster::local_pid SEND $args] } variable discovery_port listen while {[catch { | < | < < < | 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | proc ::cluster::broadcast {args} { if {$::cluster::config(debug)} { puts [list $::cluster::local_pid SEND $args] } variable discovery_port listen while {[catch { foreach net [::nettool::broadcast_list] { if {$::cluster::config(debug)} { puts [list BROADCAST -> $net $args] } set s [udp_open] udp_conf $s $net $discovery_port chan puts -nonewline $s [list [pid] {*}$args] chan flush $s chan close $s } } error]} { set ::cluster::broadcast_sock {} if {$::cluster::config(debug)} { puts "Broadcast ERR: $error - Reopening Socket" ::cluster::sleep 2000 } else { # Double the delay |
︙ | ︙ | |||
72 73 74 75 76 77 78 79 80 81 82 83 84 85 | set host * } if {$host in {local localhost}} { set host [::cluster::self] } return $service@$host } ### # topic: 3f5f9e197cc9666dd7953d97fef34019 ### proc ::cluster::ipaddr macid { # Convert rawname to a canonical name if {$macid eq [::cluster::self]} { | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | set host * } if {$host in {local localhost}} { set host [::cluster::self] } return $service@$host } proc ::cluster::Directory args { # Fullfill locally switch [lindex $args 0] { alloc_port { return [Get_free_port [lindex $args 1]] } port_busy { return [::nettool::port_busy [lindex $args 1]] } pid { return [pid] } } error "UNKNOWN COMMAND [lindex $args 0]" } proc ::cluster::directory args { ::cluster::listen variable directory_sock if {$directory_sock ne {}} { return [Directory {*}$args] } # We are not acting as the directory, query who is variable directory_port set sock [socket localhost $directory_port] chan configure $sock -translation crlf -buffering line -blocking 1 chan puts $sock $args chan flush $sock update set reply {} while {[chan gets $sock line]>0} { append reply \n $line if {[::info complete $reply]} break } catch {chan close $sock} lassign $reply result errdat return $result {*}$errdat } ### # topic: 3f5f9e197cc9666dd7953d97fef34019 ### proc ::cluster::ipaddr macid { # Convert rawname to a canonical name if {$macid eq [::cluster::self]} { |
︙ | ︙ | |||
101 102 103 104 105 106 107 | ### proc ::cluster::listen {} { variable broadcast_sock if {$broadcast_sock != {}} { return $broadcast_sock } | | < < < < < < < < < < < < < | | > > > > > > > > > > > > > > > > > > > > > > > > > > | 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | ### proc ::cluster::listen {} { variable broadcast_sock if {$broadcast_sock != {}} { return $broadcast_sock } variable discovery_port # Open a local discovery port to catch non-IP traffic variable discovery_group set broadcast_sock [udp_open $discovery_port reuse] fconfigure $broadcast_sock -buffering none -blocking 0 \ -broadcast 1 \ -mcastadd $discovery_group \ -remote [list $discovery_group $discovery_port] chan event $broadcast_sock readable [list [namespace current]::UDPPacket $broadcast_sock] ::cron::every cluster_heartbeat 30 ::cluster::heartbeat variable directory_sock variable directory_pid if {$directory_sock eq {} && $directory_pid eq {}} { variable directory_port # Nobody is acting as the directory. Have this process step on if {![catch {socket -server ::cluster::TCPAccept $directory_port} newsock]} { set directory_sock $newsock set directory_pid [pid] } else { set directory_sock {} set directory_pid {} } } return $broadcast_sock } proc ::cluster::TCPAccept {sock host port} { chan configure $sock -translation {crlf crlf} -buffering line -blocking 1 set packet [chan gets $sock] if {![string is ascii $packet]} return if {![::info complete $packet]} return if {[catch {Directory {*}$packet} reply errdat]} { chan puts $sock [list $reply $errdat] } else { chan puts $sock [list $reply {}] } chan flush $sock chan close $sock } ### # topic: 2a33c825920162b0791e2cdae62e6164 ### proc ::cluster::UDPPacket sock { variable ptpdata set pid [pid] set packet [string trim [read $sock]] |
︙ | ︙ | |||
171 172 173 174 175 176 177 | } } set now [clock seconds] set serviceurl [lindex $packet 2] set serviceinfo [lindex $packet 3] set ::cluster::ping_recv($serviceurl) $now | > | | 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | } } set now [clock seconds] set serviceurl [lindex $packet 2] set serviceinfo [lindex $packet 3] set ::cluster::ping_recv($serviceurl) $now UDPPortInfo $serviceurl $messagetype $serviceinfo if {[dict exists $serviceinfo pid] && [dict get $serviceinfo pid] eq [pid] } { # Ignore attempts to overwrite locally managed services from the network return } # Always update the IP of the service info dict set ptpdata($serviceurl) ipaddr $ipaddr dict set ptpdata($serviceurl) updated $now |
︙ | ︙ | |||
236 237 238 239 240 241 242 | foreach {url info} [search_local $serviceurl] { broadcast PONG $url $info } } } } | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > | | 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 | foreach {url info} [search_local $serviceurl] { broadcast PONG $url $info } } } } proc ::cluster::UDPPortInfo {serviceurl msgtype newinfo} { variable ptpdata # We only care about port changes on the local machine if {[dict exists $newinfo macid]} { set macid [dict get $newinfo macid] if {$macid ne [::cluster::self]} { return } } elseif {[::info exists ptpdata($serviceurl)] && [dict exists $ptpdata($serviceurl) macid]} { set macid [dict get $ptpdata($serviceurl) macid] if {$macid ne [::cluster::self]} { return } } else { return } set newport {} set oldport {} if {[dict exists $newinfo port]} { set newport [dict get $newinfo port] } if {[::info exists ptpdata($serviceurl)] && [dict exists $ptpdata($serviceurl) port]} { set oldport [dict get $ptpdata($serviceurl) port] } switch -- $msgtype { -SERVICE { if {$oldport ne {}} { ::nettool::release_port $oldport } if {$newport ne {}} { ::nettool::release_port $newport } } default { if {$oldport ne {}} { ::nettool::release_port $oldport } if {$newport ne {}} { ::nettool::claim_port $newport } } } } proc ::cluster::ping {rawname {timeout -1}} { set rcpt [cname $rawname] variable ptpdata set starttime [clock seconds] set ::cluster::ping_recv($rcpt) 0 broadcast PING $rcpt update if {$timeout <= 0} { set timeout $::cluster::config(ping_timeout) } while 1 { if {$::cluster::ping_recv($rcpt)} break if {([clock seconds] - $starttime) > $timeout} { error "Could not locate $rcpt on the network" } broadcast PING $rcpt sleep $::cluster::config(ping_sleep) } if {[::info exists ptpdata($rcpt)]} { return [dict getnull $ptpdata($rcpt) ipaddr] |
︙ | ︙ | |||
320 321 322 323 324 325 326 | } if {$send} { broadcast ~SERVICE $url $local_data($url) update } } | | | < < < < | | < < | > | > > > > > > | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 | } if {$send} { broadcast ~SERVICE $url $local_data($url) update } } proc ::cluster::Get_free_port {{port 50000}} { if {$port in {{} 0 -1}} { set port 50000 } set conflict 1 while {$conflict} { set conflict 0 set port [::nettool::find_port $port] foreach {url info} [search *@[macid]] { if {[dict exists $info port] && [dict get $info port] eq $port} { incr port set conflict 1 break } } if {$port >= 65336 } { error "All ports consumed" } } ::nettool::claim_port $port return $port } proc ::cluster::get_free_port {{startport 50000}} { return [directory alloc_port $startport] } proc ::cluster::log args { broadcast LOG {*}$args } ### # topic: 2c04e58c7f93798f9a5ed31a7f5779ab |
︙ | ︙ | |||
426 427 428 429 430 431 432 | return } if [catch {::comm::comm send -async $commid $command {*}$args} reply] { puts $stderr "ERR: SEND $service $reply" } } | | > | > > > | > | > > | 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 | return } if [catch {::comm::comm send -async $commid $command {*}$args} reply] { puts $stderr "ERR: SEND $service $reply" } } proc ::cluster::sleep_handle {ms} { set eventid [incr ::cluster::eventcount] set var ::cluster::event($eventid) set ${var} [list [clock seconds] [expr {[clock milliseconds]+$ms}]] after $ms [list set $var -1] return $var } proc ::cluster::sleep ms { set handle [sleep_handle $ms] vwait $handle } ### # topic: c8475e832c912e962f238c61580b669e ### proc ::cluster::search pattern { _Winnow |
︙ | ︙ | |||
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 | local_registry 0 ping_timeout 120 ping_sleep 250 } variable eventcount 0 variable cache {} variable broadcast_sock {} variable cache_maxage 500 variable discovery_port 38573 # Currently an unassigned group in the # Local Network Control Block (224.0.0/24) # See: RFC3692 and http://www.iana.org variable discovery_group 224.0.0.200 variable local_port {} variable local_macid [lindex [::nettool::mac_list] 0] variable local_pid [::uuid::uuid generate] } | > > > > > | | 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 | local_registry 0 ping_timeout 120 ping_sleep 250 } variable eventcount 0 variable cache {} variable broadcast_sock {} variable directory_sock {} variable cache_maxage 500 variable discovery_port 38573 variable directory_port 38574 variable directory_pid {} # Currently an unassigned group in the # Local Network Control Block (224.0.0/24) # See: RFC3692 and http://www.iana.org variable discovery_group 224.0.0.200 variable local_port {} variable local_macid [lindex [::nettool::mac_list] 0] variable local_pid [::uuid::uuid generate] } package provide udpcluster 0.3.1 |
Changes to modules/udpcluster/udpcluster.test.
︙ | ︙ | |||
193 194 195 196 197 198 199 200 201 | # Have a non-existant service fail ### test cluster-comm-5.0 {Service lookup failture} { catch {cluster::resolve foo@bar} pat } {1} #puts $pat testsuiteCleanup return | > > > > > > > > > > > > > > > > > > > > | 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | # Have a non-existant service fail ### test cluster-comm-5.0 {Service lookup failture} { catch {cluster::resolve foo@bar} pat } {1} #puts $pat ### # Test port allocation ### set port [interp eval otherclient { ::cluster::get_free_port 58080 }] # Check that the port is allocated in this thread as well test cluster-port-alloc-1.0 {Port allocation} { ::cluster::directory port_busy $port } 1 set otherport [interp eval otherclient { ::cluster::get_free_port 58080 }] puts [list GET FREE PORT $port $otherport [::cluster::get_free_port 58080]] test cluster-port-alloc-2.0 {Port allocation} { expr {$otherport != $port} } 1 puts "DONE" testsuiteCleanup return |