Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6100 #6102

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 13 additions & 34 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ include "system/inclrtl"

import os, tables, strutils, times, heapqueue, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
import nativesockets, net

export Port, SocketFlag
export asyncfutures, asyncstreams
Expand Down Expand Up @@ -163,7 +163,6 @@ export asyncfutures, asyncstreams
type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
callbacks*: Deque[proc ()]

proc processTimers(p: PDispatcherBase) {.inline.} =
#Process just part if timers at a step
Expand All @@ -173,11 +172,6 @@ proc processTimers(p: PDispatcherBase) {.inline.} =
p.timers.pop().fut.complete()
dec count

proc processPendingCallbacks(p: PDispatcherBase) =
while p.callbacks.len > 0:
var cb = p.callbacks.popFirst()
cb()

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
Expand All @@ -189,12 +183,6 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0

proc callSoon(cbproc: proc ()) {.gcsafe.}

proc initCallSoonProc =
if asyncfutures.getCallSoonProc().isNil:
asyncfutures.setCallSoonProc(callSoon)

when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
Expand Down Expand Up @@ -239,15 +227,11 @@ when defined(windows) or defined(nimdoc):
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
Expand All @@ -273,16 +257,18 @@ when defined(windows) or defined(nimdoc):
proc hasPendingOperations*(): bool =
## Returns `true` if the global dispatcher has pending operations.
let p = getGlobalDispatcher()
p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
p.handles.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0

proc poll*(timeout = 500) =
## Waits for completion events and processes them. Raises ``ValueError``
## if there are no pending operations.
let p = getGlobalDispatcher()
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
if not hasPendingOperations():
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

# Callback queue processing
processPendingCallbacks()
let p = getGlobalDispatcher()
if p.handles.len != 0:
let at = p.adjustedTimeout(timeout)
var llTimeout =
Expand Down Expand Up @@ -330,7 +316,7 @@ when defined(windows) or defined(nimdoc):
# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)
processPendingCallbacks()

var connectExPtr: pointer = nil
var acceptExPtr: pointer = nil
Expand Down Expand Up @@ -936,15 +922,11 @@ else:
new result
result.selector = newSelector()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
Expand Down Expand Up @@ -1005,14 +987,16 @@ else:

proc hasPendingOperations*(): bool =
let p = getGlobalDispatcher()
p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
p.selector.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0

proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
if not hasPendingOperations():
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

# Callback queue processing
processPendingCallbacks()
let p = getGlobalDispatcher()
if p.selector.len > 0:
for info in p.selector.select(p.adjustedTimeout(timeout)):
let data = PData(info.key.data)
Expand Down Expand Up @@ -1042,7 +1026,7 @@ else:
# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)
processPendingCallbacks()

proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] =
Expand Down Expand Up @@ -1337,11 +1321,6 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
return
add(result, c)

proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)

proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
Expand Down
49 changes: 32 additions & 17 deletions lib/pure/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,42 @@ type
when not defined(release):
var currentID = 0

var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}

proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
## Get current implementation of ``callSoon``.
return callSoonProc

proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
callSoonProc = p
var pendingCallbacksUpdated {.threadvar.}: proc() {.gcsafe.}
var pendingCallbacks* {.threadvar.}: seq[proc()]

proc getPendingCallbacksUpdatedProc*(): (proc() {.gcsafe.}) =
## Get current implementation of ``pendingCallbacksUpdated``.
return pendingCallbacksUpdated

proc setPendingCallbacksUpdatedProc*(p: (proc() {.gcsafe.})) =
## Change current implementation of ``pendingCallbacksUpdated``.
## This may be called by dispatcher when it is initialized.
pendingCallbacksUpdated = p

var tmpCallbacks {.threadvar.}: seq[proc() {.gcsafe.}]

proc processPendingCallbacks*() =
## Helper for dispatcher implementation to fire all of the pending callbacks.
## This proc accounts for reentrancy, that is, it doesn't return until all
## of the callbacks are fired, including those that were added during
## processing.
if tmpCallbacks.isNil:
tmpCallbacks = @[]
while pendingCallbacks.len != 0:
swap(pendingCallbacks, tmpCallbacks)
for i in 0 ..< tmpCallbacks.len:
tmpCallbacks[i]()
tmpCallbacks[i] = nil
tmpCallbacks.setLen(0)

proc callSoon*(cbproc: proc ()) =
## Call ``cbproc`` "soon".
##
## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
##
## If async dispatcher is not running, ``cbproc`` will be executed immediately.
if callSoonProc.isNil:
# Loop not initialized yet. Call the function directly to allow setup code to use futures.
cbproc()
else:
callSoonProc(cbproc)
## Adds `cbproc` to list of pending callbacks. It is dispatcher's responsibility
## to handle them.
pendingCallbacks.safeAdd(cbproc)
if not pendingCallbacksUpdated.isNil:
pendingCallbacksUpdated()

template setupFutureBase(fromProc: string) =
new(result)
Expand Down
47 changes: 13 additions & 34 deletions lib/upcoming/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ include "system/inclrtl"
import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import asyncfutures except callSoon

import nativesockets, net, deques
import nativesockets, net

export Port, SocketFlag
export asyncfutures, asyncstreams
Expand Down Expand Up @@ -135,7 +135,6 @@ export asyncfutures, asyncstreams
type
PDispatcherBase = ref object of RootRef
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
callbacks: Deque[proc ()]

proc processTimers(p: PDispatcherBase) {.inline.} =
#Process just part if timers at a step
Expand All @@ -145,11 +144,6 @@ proc processTimers(p: PDispatcherBase) {.inline.} =
p.timers.pop().fut.complete()
dec count

proc processPendingCallbacks(p: PDispatcherBase) =
while p.callbacks.len > 0:
var cb = p.callbacks.popFirst()
cb()

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
Expand All @@ -161,12 +155,6 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0

proc callSoon(cbproc: proc ()) {.gcsafe.}

proc initCallSoonProc =
if asyncfutures.getCallSoonProc().isNil:
asyncfutures.setCallSoonProc(callSoon)

when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
Expand Down Expand Up @@ -217,15 +205,11 @@ when defined(windows) or defined(nimdoc):
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
Expand All @@ -251,16 +235,18 @@ when defined(windows) or defined(nimdoc):
proc hasPendingOperations*(): bool =
## Returns `true` if the global dispatcher has pending operations.
let p = getGlobalDispatcher()
p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
p.handles.len != 0 or p.timers.len != 0 or pendingCallbacks.len != 0

proc poll*(timeout = 500) =
## Waits for completion events and processes them. Raises ``ValueError``
## if there are no pending operations.
let p = getGlobalDispatcher()
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
if not hasPendingOperations():
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

# Callback queue processing
processPendingCallbacks()
let p = getGlobalDispatcher()
let at = p.adjustedTimeout(timeout)
var llTimeout =
if at == -1: winlean.INFINITE
Expand Down Expand Up @@ -308,7 +294,7 @@ when defined(windows) or defined(nimdoc):
# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)
processPendingCallbacks()

var acceptEx*: WSAPROC_ACCEPTEX
var connectEx*: WSAPROC_CONNECTEX
Expand Down Expand Up @@ -1086,15 +1072,11 @@ else:
new result
result.selector = newSelector[AsyncData]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
Expand Down Expand Up @@ -1141,7 +1123,7 @@ else:

proc hasPendingOperations*(): bool =
let p = getGlobalDispatcher()
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
not p.selector.isEmpty() or p.timers.len != 0 or pendingCallbacks.len != 0

template processBasicCallbacks(ident, rwlist: untyped) =
# Process pending descriptor's and AsyncEvent callbacks.
Expand Down Expand Up @@ -1210,15 +1192,17 @@ else:
proc poll*(timeout = 500) =
var keys: array[64, ReadyKey]

let p = getGlobalDispatcher()
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}

if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
if not hasPendingOperations():
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

# Callback queue processing
processPendingCallbacks()
let p = getGlobalDispatcher()
if not p.selector.isEmpty():
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
var i = 0
Expand Down Expand Up @@ -1259,7 +1243,7 @@ else:
# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)
processPendingCallbacks()

proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] =
Expand Down Expand Up @@ -1612,11 +1596,6 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
return
add(result, c)

proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)

proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
Expand Down
1 change: 0 additions & 1 deletion tests/async/tasyncrecursion.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ proc asyncRecursionTest*(): Future[int] {.async.} =
inc(i)

when isMainModule:
setGlobalDispatcher(newDispatcher())
var i = waitFor asyncRecursionTest()
echo i
2 changes: 2 additions & 0 deletions tests/async/tcallbacks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ let f2: Future[int] = newFuture[int]()
f2.addCallback(proc() = echo 4)
f2.callback = proc() = echo 5
f2.complete(10)

processPendingCallbacks()