Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Merge refcounting machinery for ChannelBuffer. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
0c1015d94dbe56a9ff99dd07d2de7cdd |
User & Date: | dgp 2014-04-21 19:04:08 |
Context
2014-04-22
| ||
15:09 | Merge the [testchannel transform] fixes. check-in: 9bf970cda4 user: dgp tags: trunk | |
2014-04-21
| ||
20:02 | merge trunk check-in: 58bcc7a29a user: dgp tags: dgp-trunk-read | |
19:04 | Merge refcounting machinery for ChannelBuffer. check-in: 0c1015d94d user: dgp tags: trunk | |
18:55 | Added a refcounting mechanism to ChannelBuffers. Other edits to stop segfaults in tests iocmd-21.2[... check-in: 13886141d8 user: dgp tags: core-8-5-branch | |
2014-04-17
| ||
18:10 | Merge reflected channel improvements. check-in: 3c6eba5c93 user: dgp tags: trunk | |
Changes
Changes to generic/tclIO.c.
︙ | ︙ | |||
158 159 160 161 162 163 164 165 166 167 168 169 170 171 | } CloseCallback; /* * Static functions in this file: */ static ChannelBuffer * AllocChannelBuffer(int length); static void ChannelTimerProc(ClientData clientData); static int CheckChannelErrors(ChannelState *statePtr, int direction); static int CheckForDeadChannel(Tcl_Interp *interp, ChannelState *statePtr); static void CheckForStdChannelsBeingClosed(Tcl_Channel chan); static void CleanupChannelHandlers(Tcl_Interp *interp, | > > | 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | } CloseCallback; /* * Static functions in this file: */ static ChannelBuffer * AllocChannelBuffer(int length); static void PreserveChannelBuffer(ChannelBuffer *bufPtr); static void ReleaseChannelBuffer(ChannelBuffer *bufPtr); static void ChannelTimerProc(ClientData clientData); static int CheckChannelErrors(ChannelState *statePtr, int direction); static int CheckForDeadChannel(Tcl_Interp *interp, ChannelState *statePtr); static void CheckForStdChannelsBeingClosed(Tcl_Channel chan); static void CleanupChannelHandlers(Tcl_Interp *interp, |
︙ | ︙ | |||
383 384 385 386 387 388 389 390 391 392 393 394 395 396 | ChanRead( Channel *chanPtr, char *dst, int dstSize, int *errnoPtr) { if (WillRead(chanPtr) < 0) { return -1; } return chanPtr->typePtr->inputProc(chanPtr->instanceData, dst, dstSize, errnoPtr); } | > | 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | ChanRead( Channel *chanPtr, char *dst, int dstSize, int *errnoPtr) { if (WillRead(chanPtr) < 0) { *errnoPtr = Tcl_GetErrno(); return -1; } return chanPtr->typePtr->inputProc(chanPtr->instanceData, dst, dstSize, errnoPtr); } |
︙ | ︙ | |||
2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 | n = length + CHANNELBUFFER_HEADER_SIZE + BUFFER_PADDING + BUFFER_PADDING; bufPtr = ckalloc(n); bufPtr->nextAdded = BUFFER_PADDING; bufPtr->nextRemoved = BUFFER_PADDING; bufPtr->bufLength = length + BUFFER_PADDING; bufPtr->nextPtr = NULL; return bufPtr; } /* *---------------------------------------------------------------------- * * RecycleBuffer -- * * Helper function to recycle input and output buffers. Ensures that two | > > > > > > > > > > > > > > > > > > | 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 | n = length + CHANNELBUFFER_HEADER_SIZE + BUFFER_PADDING + BUFFER_PADDING; bufPtr = ckalloc(n); bufPtr->nextAdded = BUFFER_PADDING; bufPtr->nextRemoved = BUFFER_PADDING; bufPtr->bufLength = length + BUFFER_PADDING; bufPtr->nextPtr = NULL; bufPtr->refCount = 1; return bufPtr; } static void PreserveChannelBuffer( ChannelBuffer *bufPtr) { bufPtr->refCount++; } static void ReleaseChannelBuffer( ChannelBuffer *bufPtr) { if (--bufPtr->refCount) { return; } ckfree(bufPtr); } /* *---------------------------------------------------------------------- * * RecycleBuffer -- * * Helper function to recycle input and output buffers. Ensures that two |
︙ | ︙ | |||
2318 2319 2320 2321 2322 2323 2324 | * always. */ { /* * Do we have to free the buffer to the OS? */ if (mustDiscard) { | | | | 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 | * always. */ { /* * Do we have to free the buffer to the OS? */ if (mustDiscard) { ReleaseChannelBuffer(bufPtr); return; } /* * Only save buffers which are at least as big as the requested buffersize * for the channel. This is to honor dynamic changes of the buffersize * made by the user. */ if ((bufPtr->bufLength - BUFFER_PADDING) < statePtr->bufSize) { ReleaseChannelBuffer(bufPtr); return; } /* * Only save buffers for the input queue if the channel is readable. */ |
︙ | ︙ | |||
2364 2365 2366 2367 2368 2369 2370 | } } /* * If we reached this code we return the buffer to the OS. */ | | | 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 | } } /* * If we reached this code we return the buffer to the OS. */ ReleaseChannelBuffer(bufPtr); return; keepBuffer: bufPtr->nextRemoved = BUFFER_PADDING; bufPtr->nextAdded = BUFFER_PADDING; bufPtr->nextPtr = NULL; } |
︙ | ︙ | |||
2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 | break; /* Out of the "while (1)". */ } /* * Produce the output on the channel. */ toWrite = BytesLeft(bufPtr); if (toWrite == 0) { written = 0; } else { written = ChanWrite(chanPtr, RemovePoint(bufPtr), toWrite, &errorCode); } | > | 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 | break; /* Out of the "while (1)". */ } /* * Produce the output on the channel. */ PreserveChannelBuffer(bufPtr); toWrite = BytesLeft(bufPtr); if (toWrite == 0) { written = 0; } else { written = ChanWrite(chanPtr, RemovePoint(bufPtr), toWrite, &errorCode); } |
︙ | ︙ | |||
2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 | if (IsBufferEmpty(bufPtr)) { statePtr->outQueueHead = bufPtr->nextPtr; if (statePtr->outQueueHead == NULL) { statePtr->outQueueTail = NULL; } RecycleBuffer(statePtr, bufPtr, 0); } } /* Closes "while (1)". */ /* * If we wrote some data while flushing in the background, we are done. * We can't finish the background flush until we run out of data and the * channel becomes writable again. This ensures that all of the pending * data has been flushed at the system level. | > | 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 | if (IsBufferEmpty(bufPtr)) { statePtr->outQueueHead = bufPtr->nextPtr; if (statePtr->outQueueHead == NULL) { statePtr->outQueueTail = NULL; } RecycleBuffer(statePtr, bufPtr, 0); } ReleaseChannelBuffer(bufPtr); } /* Closes "while (1)". */ /* * If we wrote some data while flushing in the background, we are done. * We can't finish the background flush until we run out of data and the * channel becomes writable again. This ensures that all of the pending * data has been flushed at the system level. |
︙ | ︙ | |||
2764 2765 2766 2767 2768 2769 2770 | DiscardInputQueued(statePtr, 1); /* * Discard a leftover buffer in the current output buffer field. */ if (statePtr->curOutPtr != NULL) { | | | 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 | DiscardInputQueued(statePtr, 1); /* * Discard a leftover buffer in the current output buffer field. */ if (statePtr->curOutPtr != NULL) { ReleaseChannelBuffer(statePtr->curOutPtr); statePtr->curOutPtr = NULL; } /* * The caller guarantees that there are no more buffers queued for output. */ |
︙ | ︙ | |||
3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 | } } static int WillRead( Channel *chanPtr) { if ((chanPtr->typePtr->seekProc != NULL) && (Tcl_OutputBuffered((Tcl_Channel) chanPtr) > 0)) { if ((chanPtr->state->curOutPtr != NULL) && IsBufferReady(chanPtr->state->curOutPtr)) { SetFlag(chanPtr->state, BUFFER_READY); } if (FlushChannel(NULL, chanPtr, 0) != 0) { | > > > > > | 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 | } } static int WillRead( Channel *chanPtr) { if (chanPtr->typePtr == NULL) { /* Prevent read attempts on a closed channel */ Tcl_SetErrno(EINVAL); return -1; } if ((chanPtr->typePtr->seekProc != NULL) && (Tcl_OutputBuffered((Tcl_Channel) chanPtr) > 0)) { if ((chanPtr->state->curOutPtr != NULL) && IsBufferReady(chanPtr->state->curOutPtr)) { SetFlag(chanPtr->state, BUFFER_READY); } if (FlushChannel(NULL, chanPtr, 0) != 0) { |
︙ | ︙ | |||
4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 | * that we need to stick at the beginning of this buffer. */ memcpy(InsertPoint(bufPtr), safe, (size_t) saved); bufPtr->nextAdded += saved; saved = 0; } dst = InsertPoint(bufPtr); dstLen = SpaceLeft(bufPtr); result = Tcl_UtfToExternal(NULL, encoding, src, srcLimit, statePtr->outputEncodingFlags, &statePtr->outputEncodingState, dst, dstLen + BUFFER_PADDING, &srcRead, &dstWrote, NULL); /* See chan-io-1.[89]. Tcl Bug 506297. */ statePtr->outputEncodingFlags &= ~TCL_ENCODING_START; if ((result != TCL_OK) && (srcRead + dstWrote == 0)) { /* We're reading from invalid/incomplete UTF-8 */ if (total == 0) { Tcl_SetErrno(EINVAL); return -1; } break; } | > > | 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 | * that we need to stick at the beginning of this buffer. */ memcpy(InsertPoint(bufPtr), safe, (size_t) saved); bufPtr->nextAdded += saved; saved = 0; } PreserveChannelBuffer(bufPtr); dst = InsertPoint(bufPtr); dstLen = SpaceLeft(bufPtr); result = Tcl_UtfToExternal(NULL, encoding, src, srcLimit, statePtr->outputEncodingFlags, &statePtr->outputEncodingState, dst, dstLen + BUFFER_PADDING, &srcRead, &dstWrote, NULL); /* See chan-io-1.[89]. Tcl Bug 506297. */ statePtr->outputEncodingFlags &= ~TCL_ENCODING_START; if ((result != TCL_OK) && (srcRead + dstWrote == 0)) { /* We're reading from invalid/incomplete UTF-8 */ ReleaseChannelBuffer(bufPtr); if (total == 0) { Tcl_SetErrno(EINVAL); return -1; } break; } |
︙ | ︙ | |||
4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 | return -1; } flushed += statePtr->bufSize; if (saved == 0 || src[-1] != '\n') { needNlFlush = 0; } } } if ((flushed < total) && (GotFlag(statePtr, CHANNEL_UNBUFFERED) || (needNlFlush && GotFlag(statePtr, CHANNEL_LINEBUFFERED)))) { SetFlag(statePtr, BUFFER_READY); if (FlushChannel(NULL, chanPtr, 0) != 0) { return -1; } | > | 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 | return -1; } flushed += statePtr->bufSize; if (saved == 0 || src[-1] != '\n') { needNlFlush = 0; } } ReleaseChannelBuffer(bufPtr); } if ((flushed < total) && (GotFlag(statePtr, CHANNEL_UNBUFFERED) || (needNlFlush && GotFlag(statePtr, CHANNEL_LINEBUFFERED)))) { SetFlag(statePtr, BUFFER_READY); if (FlushChannel(NULL, chanPtr, 0) != 0) { return -1; } |
︙ | ︙ | |||
6366 6367 6368 6369 6370 6371 6372 | /* * If discardSavedBuffers is nonzero, must also discard any previously * saved buffer in the saveInBufPtr field. */ if (discardSavedBuffers && statePtr->saveInBufPtr != NULL) { | | | 6397 6398 6399 6400 6401 6402 6403 6404 6405 6406 6407 6408 6409 6410 6411 | /* * If discardSavedBuffers is nonzero, must also discard any previously * saved buffer in the saveInBufPtr field. */ if (discardSavedBuffers && statePtr->saveInBufPtr != NULL) { ReleaseChannelBuffer(statePtr->saveInBufPtr); statePtr->saveInBufPtr = NULL; } } /* *--------------------------------------------------------------------------- * |
︙ | ︙ | |||
6459 6460 6461 6462 6463 6464 6465 | * Check the actual buffersize against the requested buffersize. * Buffers which are smaller than requested are squashed. This is done * to honor dynamic changes of the buffersize made by the user. */ if ((bufPtr != NULL) && (bufPtr->bufLength - BUFFER_PADDING < statePtr->bufSize)) { | | | 6490 6491 6492 6493 6494 6495 6496 6497 6498 6499 6500 6501 6502 6503 6504 | * Check the actual buffersize against the requested buffersize. * Buffers which are smaller than requested are squashed. This is done * to honor dynamic changes of the buffersize made by the user. */ if ((bufPtr != NULL) && (bufPtr->bufLength - BUFFER_PADDING < statePtr->bufSize)) { ReleaseChannelBuffer(bufPtr); bufPtr = NULL; } if (bufPtr == NULL) { bufPtr = AllocChannelBuffer(statePtr->bufSize); } bufPtr->nextPtr = NULL; |
︙ | ︙ | |||
6521 6522 6523 6524 6525 6526 6527 6528 6529 6530 6531 6532 6533 6534 6535 6536 6537 6538 | */ nread = -1; result = EWOULDBLOCK; } else #endif /* TCL_IO_TRACK_OS_FOR_DRIVER_WITH_BAD_BLOCKING */ { nread = ChanRead(chanPtr, InsertPoint(bufPtr), toRead, &result); } if (nread > 0) { bufPtr->nextAdded += nread; /* * If we get a short read, signal up that we may be BLOCKED. We should * avoid calling the driver because on some platforms we will block in * the low level reading code even though the channel is set into * nonblocking mode. | > > | 6552 6553 6554 6555 6556 6557 6558 6559 6560 6561 6562 6563 6564 6565 6566 6567 6568 6569 6570 6571 | */ nread = -1; result = EWOULDBLOCK; } else #endif /* TCL_IO_TRACK_OS_FOR_DRIVER_WITH_BAD_BLOCKING */ { PreserveChannelBuffer(bufPtr); nread = ChanRead(chanPtr, InsertPoint(bufPtr), toRead, &result); } if (nread > 0) { result = 0; bufPtr->nextAdded += nread; /* * If we get a short read, signal up that we may be BLOCKED. We should * avoid calling the driver because on some platforms we will block in * the low level reading code even though the channel is set into * nonblocking mode. |
︙ | ︙ | |||
6548 6549 6550 6551 6552 6553 6554 6555 6556 6557 6558 6559 6560 6561 6562 | * [Bug 943274]: We have read the available data, clear flag. */ ResetFlag(statePtr, CHANNEL_HAS_MORE_DATA); } #endif /* TCL_IO_TRACK_OS_FOR_DRIVER_WITH_BAD_BLOCKING */ } else if (nread == 0) { SetFlag(statePtr, CHANNEL_EOF); statePtr->inputEncodingFlags |= TCL_ENCODING_END; } else if (nread < 0) { if ((result == EWOULDBLOCK) || (result == EAGAIN)) { SetFlag(statePtr, CHANNEL_BLOCKED); result = EAGAIN; } Tcl_SetErrno(result); | > < > | | 6581 6582 6583 6584 6585 6586 6587 6588 6589 6590 6591 6592 6593 6594 6595 6596 6597 6598 6599 6600 6601 6602 6603 6604 6605 6606 | * [Bug 943274]: We have read the available data, clear flag. */ ResetFlag(statePtr, CHANNEL_HAS_MORE_DATA); } #endif /* TCL_IO_TRACK_OS_FOR_DRIVER_WITH_BAD_BLOCKING */ } else if (nread == 0) { result = 0; SetFlag(statePtr, CHANNEL_EOF); statePtr->inputEncodingFlags |= TCL_ENCODING_END; } else if (nread < 0) { if ((result == EWOULDBLOCK) || (result == EAGAIN)) { SetFlag(statePtr, CHANNEL_BLOCKED); result = EAGAIN; } Tcl_SetErrno(result); } ReleaseChannelBuffer(bufPtr); return result; } /* *---------------------------------------------------------------------- * * Tcl_Seek -- * |
︙ | ︙ |
Changes to generic/tclIO.h.
︙ | ︙ | |||
32 33 34 35 36 37 38 39 40 41 42 43 44 45 | /* * struct ChannelBuffer: * * Buffers data being sent to or from a channel. */ typedef struct ChannelBuffer { int nextAdded; /* The next position into which a character * will be put in the buffer. */ int nextRemoved; /* Position of next byte to be removed from * the buffer. */ int bufLength; /* How big is the buffer? */ struct ChannelBuffer *nextPtr; /* Next buffer in chain. */ | > | 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | /* * struct ChannelBuffer: * * Buffers data being sent to or from a channel. */ typedef struct ChannelBuffer { int refCount; /* Current uses count */ int nextAdded; /* The next position into which a character * will be put in the buffer. */ int nextRemoved; /* Position of next byte to be removed from * the buffer. */ int bufLength; /* How big is the buffer? */ struct ChannelBuffer *nextPtr; /* Next buffer in chain. */ |
︙ | ︙ |
Changes to generic/tclIOCmd.c.
︙ | ︙ | |||
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 | newline = 1; #endif } } resultPtr = Tcl_NewObj(); Tcl_IncrRefCount(resultPtr); charactersRead = Tcl_ReadChars(chan, resultPtr, toRead, 0); if (charactersRead < 0) { /* * TIP #219. * Capture error messages put by the driver into the bypass area and * put them into the regular interpreter result. Fall back to the * regular message if nothing was found in the bypass. */ if (!TclChanCaughtErrorBypass(interp, chan)) { Tcl_SetObjResult(interp, Tcl_ObjPrintf( "error reading \"%s\": %s", TclGetString(chanObjPtr), Tcl_PosixError(interp))); } Tcl_DecrRefCount(resultPtr); return TCL_ERROR; } /* * If requested, remove the last newline in the channel if at EOF. */ if ((charactersRead > 0) && (newline != 0)) { const char *result; int length; result = TclGetStringFromObj(resultPtr, &length); if (result[length - 1] == '\n') { Tcl_SetObjLength(resultPtr, length - 1); } } Tcl_SetObjResult(interp, resultPtr); Tcl_DecrRefCount(resultPtr); return TCL_OK; } /* *---------------------------------------------------------------------- * | > > > | 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 | newline = 1; #endif } } resultPtr = Tcl_NewObj(); Tcl_IncrRefCount(resultPtr); Tcl_Preserve(chan); charactersRead = Tcl_ReadChars(chan, resultPtr, toRead, 0); if (charactersRead < 0) { /* * TIP #219. * Capture error messages put by the driver into the bypass area and * put them into the regular interpreter result. Fall back to the * regular message if nothing was found in the bypass. */ if (!TclChanCaughtErrorBypass(interp, chan)) { Tcl_SetObjResult(interp, Tcl_ObjPrintf( "error reading \"%s\": %s", TclGetString(chanObjPtr), Tcl_PosixError(interp))); } Tcl_Release(chan); Tcl_DecrRefCount(resultPtr); return TCL_ERROR; } /* * If requested, remove the last newline in the channel if at EOF. */ if ((charactersRead > 0) && (newline != 0)) { const char *result; int length; result = TclGetStringFromObj(resultPtr, &length); if (result[length - 1] == '\n') { Tcl_SetObjLength(resultPtr, length - 1); } } Tcl_SetObjResult(interp, resultPtr); Tcl_Release(chan); Tcl_DecrRefCount(resultPtr); return TCL_OK; } /* *---------------------------------------------------------------------- * |
︙ | ︙ |
Changes to tests/ioCmd.test.
︙ | ︙ | |||
808 809 810 811 812 813 814 815 816 817 818 819 820 821 | set ch [chan create {read write} foo] } -body { list [catch {chan configure $ch -blocking 0} m] $m } -cleanup { close $ch rename foo {} } -match glob -result {1 {*nested eval*}} # --- --- --- --------- --------- --------- # Helper commands to record the arguments to handler methods. # Stored in a script so that the threads and interpreters needing this # code do not need their own copy but can access this variable. | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 | set ch [chan create {read write} foo] } -body { list [catch {chan configure $ch -blocking 0} m] $m } -cleanup { close $ch rename foo {} } -match glob -result {1 {*nested eval*}} test iocmd-21.21 {[close] in [read] segfaults} -setup { proc foo {method chan args} { switch -- $method initialize { return {initialize finalize watch read} } finalize {} watch {} read { close $chan return a } } set ch [chan create read foo] } -body { read $ch 0 } -cleanup { close $ch rename foo {} } -result {} test iocmd-21.22 {[close] in [read] segfaults} -setup { proc foo {method chan args} { switch -- $method initialize { return {initialize finalize watch read} } finalize {} watch {} read { catch {close $chan} return a } } set ch [chan create read foo] } -body { read $ch 1 } -returnCodes error -cleanup { catch {close $ch} rename foo {} } -match glob -result {*invalid argument*} # --- --- --- --------- --------- --------- # Helper commands to record the arguments to handler methods. # Stored in a script so that the threads and interpreters needing this # code do not need their own copy but can access this variable. |
︙ | ︙ |