diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 8345d43e5cd2e..13d20a15fa96c 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, oids, tables, strutils, macros, times, heapqueue -import nativesockets, net +import nativesockets, net, queues export Port, SocketFlag @@ -155,6 +155,9 @@ type when not defined(release): var currentID = 0 + +proc callSoon*(cbproc: proc ()) {.gcsafe.} + proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = ## Creates a new future. ## @@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = ## passes ``future`` as a param to the callback. future.cb = cb if future.finished: - future.cb() + callSoon(future.cb) proc `callback=`*[T](future: Future[T], cb: proc (future: Future[T]) {.closure,gcsafe.}) = @@ -355,11 +358,17 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = type PDispatcherBase = ref object of RootRef timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] + dQueue: Queue[proc ()] proc processTimers(p: PDispatcherBase) {.inline.} = while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt: p.timers.pop().fut.complete() +proc processPendingCallbacks(p: PDispatcherBase) = + while p.dQueue.len > 0: + var cb = p.dQueue.dequeue() + cb() + proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = # If dispatcher has active timers this proc returns the timeout # of the nearest timer. Returns `timeout` otherwise. @@ -403,6 +412,7 @@ when defined(windows) or defined(nimdoc): result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() result.timers.newHeapQueue() + result.dQueue = initQueue[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -469,6 +479,8 @@ when defined(windows) or defined(nimdoc): # Timer processing. processTimers(p) + # Callback queue processing + processPendingCallbacks(p) var connectExPtr: pointer = nil var acceptExPtr: pointer = nil @@ -930,6 +942,7 @@ else: new result result.selector = newSelector() result.timers.newHeapQueue() + result.dQueue = initQueue[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1025,7 +1038,10 @@ else: # (e.g. socket disconnected). discard + # Timer processing. processTimers(p) + # Callback queue processing + processPendingCallbacks(p) proc connect*(socket: AsyncFD, address: string, port: Port, domain = AF_INET): Future[void] = @@ -1604,6 +1620,9 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} = return add(result, c) +proc callSoon*(cbproc: proc ()) = + getGlobalDispatcher().dQueue.enqueue(cbproc) + proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: