Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncdispatch: callSoon() implementation. #4159

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ include "system/inclrtl"

import os, oids, tables, strutils, macros, times, heapqueue

import nativesockets, net
import nativesockets, net, queues

export Port, SocketFlag

Expand Down Expand Up @@ -155,6 +155,9 @@ type

when not defined(release):
var currentID = 0

proc callSoon*(cbproc: proc ()) {.gcsafe.}

proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
Expand Down Expand Up @@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
## passes ``future`` as a param to the callback.
future.cb = cb
if future.finished:
future.cb()
callSoon(future.cb)

proc `callback=`*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
Expand Down Expand Up @@ -355,11 +358,17 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
type
PDispatcherBase = ref object of RootRef
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
dQueue: Queue[proc ()]

proc processTimers(p: PDispatcherBase) {.inline.} =
while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
p.timers.pop().fut.complete()

proc processPendingCallbacks(p: PDispatcherBase) =
while p.dQueue.len > 0:
var cb = p.dQueue.dequeue()
cb()

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
Expand Down Expand Up @@ -403,6 +412,7 @@ when defined(windows) or defined(nimdoc):
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers.newHeapQueue()
result.dQueue = initQueue[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -469,6 +479,8 @@ when defined(windows) or defined(nimdoc):

# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)

var connectExPtr: pointer = nil
var acceptExPtr: pointer = nil
Expand Down Expand Up @@ -930,6 +942,7 @@ else:
new result
result.selector = newSelector()
result.timers.newHeapQueue()
result.dQueue = initQueue[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -1025,7 +1038,10 @@ else:
# (e.g. socket disconnected).
discard

# Timer processing.
processTimers(p)
# Callback queue processing
processPendingCallbacks(p)

proc connect*(socket: AsyncFD, address: string, port: Port,
domain = AF_INET): Future[void] =
Expand Down Expand Up @@ -1604,6 +1620,9 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
return
add(result, c)

proc callSoon*(cbproc: proc ()) =
getGlobalDispatcher().dQueue.enqueue(cbproc)

proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
Expand Down