From fed67f1086887890f23032ec3cec9fbd5e922029 Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Sat, 15 Jul 2017 01:25:39 +0300 Subject: [PATCH] Fixes #6100 --- lib/pure/asyncdispatch.nim | 47 +++++++++---------------------- lib/pure/asyncfutures.nim | 49 +++++++++++++++++++++------------ lib/upcoming/asyncdispatch.nim | 47 +++++++++---------------------- tests/async/tasyncrecursion.nim | 1 - tests/async/tcallbacks.nim | 2 ++ 5 files changed, 60 insertions(+), 86 deletions(-) diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 28b20feaac235..0bf34864c6e1d 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, tables, strutils, times, heapqueue, options, asyncstreams import asyncfutures except callSoon -import nativesockets, net, deques +import nativesockets, net export Port, SocketFlag export asyncfutures, asyncstreams @@ -163,7 +163,6 @@ export asyncfutures, asyncstreams type PDispatcherBase = ref object of RootRef timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] - callbacks*: Deque[proc ()] proc processTimers(p: PDispatcherBase) {.inline.} = #Process just part if timers at a step @@ -173,11 +172,6 @@ proc processTimers(p: PDispatcherBase) {.inline.} = p.timers.pop().fut.complete() dec count -proc processPendingCallbacks(p: PDispatcherBase) = - while p.callbacks.len > 0: - var cb = p.callbacks.popFirst() - cb() - proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = # If dispatcher has active timers this proc returns the timeout # of the nearest timer. Returns `timeout` otherwise. @@ -189,12 +183,6 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 -proc callSoon(cbproc: proc ()) {.gcsafe.} - -proc initCallSoonProc = - if asyncfutures.getCallSoonProc().isNil: - asyncfutures.setCallSoonProc(callSoon) - when defined(windows) or defined(nimdoc): import winlean, sets, hashes type @@ -239,15 +227,11 @@ when defined(windows) or defined(nimdoc): result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setGlobalDispatcher*(disp: PDispatcher) = - if not gDisp.isNil: - assert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: @@ -273,16 +257,18 @@ when defined(windows) or defined(nimdoc): proc hasPendingOperations*(): bool = ## Returns `true` if the global dispatcher has pending operations. let p = getGlobalDispatcher() - p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + p.handles.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0 proc poll*(timeout = 500) = ## Waits for completion events and processes them. Raises ``ValueError`` ## if there are no pending operations. - let p = getGlobalDispatcher() - if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + if not hasPendingOperations(): raise newException(ValueError, "No handles or timers registered in dispatcher.") + # Callback queue processing + processPendingCallbacks() + let p = getGlobalDispatcher() if p.handles.len != 0: let at = p.adjustedTimeout(timeout) var llTimeout = @@ -330,7 +316,7 @@ when defined(windows) or defined(nimdoc): # Timer processing. processTimers(p) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks() var connectExPtr: pointer = nil var acceptExPtr: pointer = nil @@ -936,15 +922,11 @@ else: new result result.selector = newSelector() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setGlobalDispatcher*(disp: PDispatcher) = - if not gDisp.isNil: - assert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: @@ -1005,14 +987,16 @@ else: proc hasPendingOperations*(): bool = let p = getGlobalDispatcher() - p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + p.selector.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0 proc poll*(timeout = 500) = - let p = getGlobalDispatcher() - if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + if not hasPendingOperations(): raise newException(ValueError, "No handles or timers registered in dispatcher.") + # Callback queue processing + processPendingCallbacks() + let p = getGlobalDispatcher() if p.selector.len > 0: for info in p.selector.select(p.adjustedTimeout(timeout)): let data = PData(info.key.data) @@ -1042,7 +1026,7 @@ else: # Timer processing. processTimers(p) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks() proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -1337,11 +1321,6 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = return add(result, c) -proc callSoon(cbproc: proc ()) = - ## Schedule `cbproc` to be called as soon as possible. - ## The callback is called when control returns to the event loop. - getGlobalDispatcher().callbacks.addLast(cbproc) - proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim index a9e97c14c3913..21043c5eaf9fd 100644 --- a/lib/pure/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -32,27 +32,42 @@ type when not defined(release): var currentID = 0 -var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.} - -proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) = - ## Get current implementation of ``callSoon``. - return callSoonProc - -proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) = - ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized. - callSoonProc = p +var pendingCallbacksUpdated {.threadvar.}: proc() {.gcsafe.} +var pendingCallbacks* {.threadvar.}: seq[proc()] + +proc getPendingCallbacksUpdatedProc*(): (proc() {.gcsafe.}) = + ## Get current implementation of ``pendingCallbacksUpdated``. + return pendingCallbacksUpdated + +proc setPendingCallbacksUpdatedProc*(p: (proc() {.gcsafe.})) = + ## Change current implementation of ``pendingCallbacksUpdated``. + ## This may be called by dispatcher when it is initialized. + pendingCallbacksUpdated = p + +var tmpCallbacks {.threadvar.}: seq[proc() {.gcsafe.}] + +proc processPendingCallbacks*() = + ## Helper for dispatcher implementation to fire all of the pending callbacks. + ## This proc accounts for reentrancy, that is, it doesn't return until all + ## of the callbacks are fired, including those that were added during + ## processing. + if tmpCallbacks.isNil: + tmpCallbacks = @[] + while pendingCallbacks.len != 0: + swap(pendingCallbacks, tmpCallbacks) + for i in 0 ..< tmpCallbacks.len: + tmpCallbacks[i]() + tmpCallbacks[i] = nil + tmpCallbacks.setLen(0) proc callSoon*(cbproc: proc ()) = ## Call ``cbproc`` "soon". ## - ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick. - ## - ## If async dispatcher is not running, ``cbproc`` will be executed immediately. - if callSoonProc.isNil: - # Loop not initialized yet. Call the function directly to allow setup code to use futures. - cbproc() - else: - callSoonProc(cbproc) + ## Adds `cbproc` to list of pending callbacks. It is dispatcher's responsibility + ## to handle them. + pendingCallbacks.safeAdd(cbproc) + if not pendingCallbacksUpdated.isNil: + pendingCallbacksUpdated() template setupFutureBase(fromProc: string) = new(result) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 4e3b061739d37..fbb917e700eb5 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -12,7 +12,7 @@ include "system/inclrtl" import os, tables, strutils, times, heapqueue, lists, options, asyncstreams import asyncfutures except callSoon -import nativesockets, net, deques +import nativesockets, net export Port, SocketFlag export asyncfutures, asyncstreams @@ -135,7 +135,6 @@ export asyncfutures, asyncstreams type PDispatcherBase = ref object of RootRef timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] - callbacks: Deque[proc ()] proc processTimers(p: PDispatcherBase) {.inline.} = #Process just part if timers at a step @@ -145,11 +144,6 @@ proc processTimers(p: PDispatcherBase) {.inline.} = p.timers.pop().fut.complete() dec count -proc processPendingCallbacks(p: PDispatcherBase) = - while p.callbacks.len > 0: - var cb = p.callbacks.popFirst() - cb() - proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = # If dispatcher has active timers this proc returns the timeout # of the nearest timer. Returns `timeout` otherwise. @@ -161,12 +155,6 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 -proc callSoon(cbproc: proc ()) {.gcsafe.} - -proc initCallSoonProc = - if asyncfutures.getCallSoonProc().isNil: - asyncfutures.setCallSoonProc(callSoon) - when defined(windows) or defined(nimdoc): import winlean, sets, hashes type @@ -217,15 +205,11 @@ when defined(windows) or defined(nimdoc): result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setGlobalDispatcher*(disp: PDispatcher) = - if not gDisp.isNil: - assert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: @@ -251,16 +235,18 @@ when defined(windows) or defined(nimdoc): proc hasPendingOperations*(): bool = ## Returns `true` if the global dispatcher has pending operations. let p = getGlobalDispatcher() - p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + p.handles.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0 proc poll*(timeout = 500) = ## Waits for completion events and processes them. Raises ``ValueError`` ## if there are no pending operations. - let p = getGlobalDispatcher() - if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + if not hasPendingOperations(): raise newException(ValueError, "No handles or timers registered in dispatcher.") + # Callback queue processing + processPendingCallbacks() + let p = getGlobalDispatcher() let at = p.adjustedTimeout(timeout) var llTimeout = if at == -1: winlean.INFINITE @@ -308,7 +294,7 @@ when defined(windows) or defined(nimdoc): # Timer processing. processTimers(p) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks() var acceptEx*: WSAPROC_ACCEPTEX var connectEx*: WSAPROC_CONNECTEX @@ -1086,15 +1072,11 @@ else: new result result.selector = newSelector[AsyncData]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc setGlobalDispatcher*(disp: PDispatcher) = - if not gDisp.isNil: - assert gDisp.callbacks.len == 0 gDisp = disp - initCallSoonProc() proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: @@ -1141,7 +1123,7 @@ else: proc hasPendingOperations*(): bool = let p = getGlobalDispatcher() - not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + not p.selector.isEmpty() or p.timers.len != 0 or pendingCallbacks.len != 0 template processBasicCallbacks(ident, rwlist: untyped) = # Process pending descriptor's and AsyncEvent callbacks. @@ -1210,15 +1192,17 @@ else: proc poll*(timeout = 500) = var keys: array[64, ReadyKey] - let p = getGlobalDispatcher() when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, Event.Vnode} - if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + if not hasPendingOperations(): raise newException(ValueError, "No handles or timers registered in dispatcher.") + # Callback queue processing + processPendingCallbacks() + let p = getGlobalDispatcher() if not p.selector.isEmpty(): var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) var i = 0 @@ -1259,7 +1243,7 @@ else: # Timer processing. processTimers(p) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks() proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -1612,11 +1596,6 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} = return add(result, c) -proc callSoon(cbproc: proc ()) = - ## Schedule `cbproc` to be called as soon as possible. - ## The callback is called when control returns to the event loop. - getGlobalDispatcher().callbacks.addLast(cbproc) - proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: diff --git a/tests/async/tasyncrecursion.nim b/tests/async/tasyncrecursion.nim index 1aeebe9b45eff..54482edab7dfd 100644 --- a/tests/async/tasyncrecursion.nim +++ b/tests/async/tasyncrecursion.nim @@ -17,6 +17,5 @@ proc asyncRecursionTest*(): Future[int] {.async.} = inc(i) when isMainModule: - setGlobalDispatcher(newDispatcher()) var i = waitFor asyncRecursionTest() echo i diff --git a/tests/async/tcallbacks.nim b/tests/async/tcallbacks.nim index 8c08032cd8491..1b09b65d0cf9c 100644 --- a/tests/async/tcallbacks.nim +++ b/tests/async/tcallbacks.nim @@ -18,3 +18,5 @@ let f2: Future[int] = newFuture[int]() f2.addCallback(proc() = echo 4) f2.callback = proc() = echo 5 f2.complete(10) + +processPendingCallbacks()