Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Virtual async events 2 #12372

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
- `macros.newLit` now works for ref object types.
- `system.writeFile` has been overloaded to also support `openarray[byte]`.
- Added overloaded `strformat.fmt` macro that use specified characters as delimiter instead of '{' and '}'.
- Added a new Async Event implementation: `asyncdispatch.VirtualAsyncEvent`.
- The implementation is designed to enable efficient coordination of async code across threads.

## Library changes

Expand Down
165 changes: 162 additions & 3 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,27 @@
include "system/inclrtl"

import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import options, math, std/monotimes
import options, math, std/monotimes, hashes
import asyncfutures except callSoon

import nativesockets, net, deques

when compileOption("threads"):
import rlocks

export Port, SocketFlag
export asyncfutures except callSoon
export asyncstreams

#{.injectStmt: newGcInvariant().}

template withLockIfThreads(lock, code: untyped) =
when compileOption("threads"):
withRLock lock:
code
else:
code

# TODO: Check if yielded future is nil and throw a more meaningful exception

type
Expand Down Expand Up @@ -246,6 +256,12 @@ when defined(windows) or defined(nimdoc):
PDispatcher* = ref object of PDispatcherBase
ioPort: Handle
handles: HashSet[AsyncFD]
vd: VirtualEventDispatcher

VirtualEventDispatcher = ref object
virtualHandles: Table[VirtualFD, VirtualAsyncEvent] # pseudo handles for custom AsyncEvents.
nextVirtualHandle: VirtualFD
virtualMuxHandle: AsyncEvent # all the virtual handles get multiplexed through a single real handle.

CustomOverlapped = object of OVERLAPPED
data*: CompletionData
Expand All @@ -267,10 +283,26 @@ when defined(windows) or defined(nimdoc):
pcd: PostCallbackDataPtr
AsyncEvent* = ptr AsyncEventImpl

VirtualFD* = distinct int

VirtualAsyncEventImpl = object
triggered: bool
when compileOption("threads"):
eventLock: RLock
vd: VirtualEventDispatcher
vFD: VirtualFD
cb: Callback
VirtualAsyncEvent* = ptr VirtualAsyncEventImpl

Callback = proc (fd: AsyncFD): bool {.closure, gcsafe.}
NativeCallback = proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.closure,gcsafe.}

proc hash(x: AsyncFD): Hash {.borrow.}
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
proc hash(x: VirtualFD): Hash {.borrow.}
proc `==`*(x: VirtualFD, y: VirtualFD): bool {.borrow.}

const InvalidVirtualFD = VirtualFD(-1)

proc newDispatcher*(): owned PDispatcher =
## Creates a new Dispatcher instance.
Expand All @@ -279,6 +311,8 @@ when defined(windows) or defined(nimdoc):
result.handles = initSet[AsyncFD]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)
result.vd = new VirtualEventDispatcher
result.vd.virtualHandles = initTable[VirtualFD, VirtualAsyncEvent]()

var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher

Expand Down Expand Up @@ -924,8 +958,8 @@ when defined(windows) or defined(nimdoc):
## receiving notifications.
registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)

template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
handleCallback) =
proc registerWaitableHandle(p: PDispatcher, hEvent: Handle, flags: DWORD, pcd: PostCallbackDataPtr, timeout: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

handleCallback: NativeCallback) =
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
Expand Down Expand Up @@ -1101,12 +1135,34 @@ else:

AsyncEvent* = distinct SelectEvent

VirtualFD* = distinct int

VirtualAsyncEventImpl = object
triggered: bool
when compileOption("threads"):
eventLock: RLock
vd: VirtualEventDispatcher
vFD: VirtualFD
cb: Callback
VirtualAsyncEvent* = ptr VirtualAsyncEventImpl

PDispatcher* = ref object of PDispatcherBase
selector: Selector[AsyncData]
vd: VirtualEventDispatcher

VirtualEventDispatcher = ref object
virtualHandles: Table[VirtualFD, VirtualAsyncEvent] # pseudo handles for custom AsyncEvents.
nextVirtualHandle: VirtualFD
virtualMuxHandle: AsyncEvent # all the virtual handles get multiplexed through a single real handle.

proc `==`*(x, y: AsyncFD): bool {.borrow.}
proc `==`*(x, y: AsyncEvent): bool {.borrow.}

proc hash(x: VirtualFD): Hash {.borrow.}
proc `==`*(x: VirtualFD, y: VirtualFD): bool {.borrow.}

const InvalidVirtualFD = VirtualFD(-1)

template newAsyncData(): AsyncData =
AsyncData(
readList: newSeqOfCap[Callback](InitCallbackListSize),
Expand All @@ -1118,6 +1174,8 @@ else:
result.selector = newSelector[AsyncData]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
result.vd = new VirtualEventDispatcher
result.vd.virtualHandles = initTable[VirtualFD, VirtualAsyncEvent]()

var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher

Expand Down Expand Up @@ -1851,6 +1909,107 @@ proc send*(socket: AsyncFD, data: string,

return retFuture

proc close(vd: VirtualEventDispatcher) =
assert vd.virtualHandles.len == 0, "Cannot close Virtual Event Dispatcher. There are still Pending Events."
vd.virtualMuxHandle.unregister()
vd.virtualMuxHandle.close()
vd.virtualMuxHandle = nil
vd.nextVirtualHandle = 0.VirtualFD

proc unregister*(ev: VirtualAsyncEvent) =
## Unregisters event ``ev``.
doAssert(ev.vFD != InvalidVirtualFD, "Event is not registered in the queue!")

let vd = ev.vd
withLockIfThreads ev.eventLock:
ev.vd.virtualHandles.del ev.vFD
ev.vd = nil
ev.cb = nil
ev.vFD = InvalidVirtualFD

if vd.virtualHandles.len == 0:
# lazy close the physical event with the Dispatcher
# The main reason we need this is to make
# hasPendingOperations still work
# without modifying the ioselector code.
vd.close

proc initVirtualEventDispatcher(vd: VirtualEventDispatcher) =
vd.virtualMuxHandle = newAsyncEvent()

proc nativeEventCB(fd: AsyncFD): bool {.gcsafe.} =
# find the virtual events that triggered the physical event and
# add them to the callback list.
# Not very efficient, but requires the least coordination between threads.

proc vcbFactory(ev: VirtualAsyncEvent): proc() {.gcsafe.} =
result =
proc() {.gcSafe.} =
if ev.cb(ev.vfd.AsyncFD):
# the convention is that if the callback returns true,
# we unregister the event.
ev.unregister

let disp = getGlobalDispatcher()
for vfd, ev in vd.virtualHandles:
withLockIfThreads ev.eventLock:
if ev.triggered:
ev.triggered = false
disp.callbacks.addLast(vcbFactory(ev))

# always return false b/c we never want to unregister this event
return false

vd.virtualMuxHandle.addEvent(nativeEventCB)

proc newVirtualAsyncEvent*(): VirtualAsyncEvent =
## Creates a new thread-safe ``VirtualAsyncEvent`` object.
##
## New ``VirtualAsyncEvent`` object is not automatically registered with
## dispatcher like ``AsyncSocket``.
result = cast[VirtualAsyncEvent](allocShared0(sizeof(VirtualAsyncEventImpl)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can't get around this being shared?

Copy link
Contributor Author

@rayman22201 rayman22201 Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meant for multithreading so no :-P

to be more specific:
the trigger proc for VirtualAsyncEvent is meant to be callable from other threads by design. So this is shared memory by design.

result.cb = nil
result.vFD = InvalidVirtualFD
when compileOption("threads"):
initRLock result.eventLock

proc trigger*(ev: VirtualAsyncEvent) =
## Sets new ``VirtualAsyncEvent`` to signaled state.
withLockIfThreads ev.eventLock:
if ev.vFD == InvalidVirtualFD:
raise newException(ValueError,
"VirtualAsyncEvent is not registered with a dispatcher.")
ev.triggered = true

# send the signal to wake up the dispatcher thread.
trigger(ev.vd.virtualMuxHandle)

proc addEvent*(ev: VirtualAsyncEvent, cb: Callback) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, I know you're being consistent with the AsyncEvent API here, but it annoys me that this API isn't consistent with the socket API...

In any case, it might be worth to add a signal async proc that works for both VirtualAsyncEvent and AsyncEvent. We aim to avoid callbacks after all...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is arguably a lower level api that you wouldn't use in async level code.
The FlowVar -> Promise integration for example. A user never actually sees AsyncEvent.

IDK if it's necessary, but it might be a nice convenience to have some api that wraps a AsyncEvent / VirtualAsyncEvent in a Promise for you.

I'd rather focus on getting the fundamentals in and making that a separate PR, since I consider that sugar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to think about how the API will evolve, remember that once this is released we won't be able to change it in a breaking manner.

## Registers callback ``cb`` to be called when ``ev`` will be signaled
doAssert(ev.vFD == InvalidVirtualFD, "Event is already registered in the queue!")

let p = getGlobalDispatcher()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use more descriptive names please:

Suggested change
let p = getGlobalDispatcher()
let disp = getGlobalDispatcher()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just following the convention that was set when I got here. But sure.


if p.vd.virtualMuxHandle == nil:
# lazy register the physical event with the Dispatcher
# The main reason we need this is to make hasPendingOperations still work
# without modifying the ioselector code.
Comment on lines +1995 to +1996
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on this? Don't really understand why this lazy loading is necessary.

Copy link
Contributor Author

@rayman22201 rayman22201 Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to do with hasPendingOperations() and the multiplexer io select object.

The simplest approach is to simply create a singleton AsyncEvent object attached to the VirtualEventDispatcher , which you initialize on creation.
This AsyncEvent essentially lives forever and waits for VirtualEvents to trigger it.

This has one big problem. The lives forever part. The io selector is never closed. It must remain alive. This means the underlying ioselector implementation will always have at least one open io select that it is listening on.

If any code relies on all io selects being closed before continuing, that code will never be able to run. hasPendingOperations() and any code that relies on it is a prime example of code like this.

In #12232 I "fixed" this problem by modifying all of the io_selector implementations and asyncdispatch.nim to treat a single open io select as the same as 0 open io selects (no pending events).

After thinking about it, I realized that has approach has big problems:

  • First, it is a very dramatic and invasive change to the ioselector implementations. That code is foundational, so I want to be very careful with changes that I make there. The changes in Virtual async events #12232 seemed too big, and not safe.

  • Second, It's possible that code might rely on the process actually having all physical kernel io selectors closed (for a graceful shutdown or resource management for example). If we treat 1 open io selector the same as 0, we are essentially lying to the user, and that is not ok IMO.

I don't love lazy loading, because it is non-deterministic, but in this case, I think it's the right tool for the job. This approach is a modular solution to all of the above problems:

  • It works with 0 changes to the underyling ioselector code
  • It works with 0 changes to the core asyncdispatch code.
    • By core I mean no changes to AsyncEvent or any core run loop code such as runOnce()
  • hasPendingOperations() works correctly with 0 changes and without lying to the user
  • All changes are contained within the VirtualEvent additions.

All of that adds up to make this a much safer PR.

initVirtualEventDispatcher(p.vd)

withLockIfThreads ev.eventLock:
ev.cb = cb
ev.vFD = p.vd.nextVirtualHandle
p.vd.nextVirtualHandle = VirtualFD(int(p.vd.nextVirtualHandle) + 1)
p.vd.virtualHandles[ev.vFD] = ev
ev.vd = p.vd

proc close*(ev: VirtualAsyncEvent) =
## Closes event ``ev``.
doAssert(ev.vFD != InvalidVirtualFD, "Must unregister Event before you close it!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC for pretty much all other things in asyncdispatch, the close proc will implicitly unregister the FD. Please keep that consistent.

Copy link
Contributor Author

@rayman22201 rayman22201 Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is safer than what happens in other places in asyncdispatch.

If you look at AsyncEvent, the following happens:

It calls close on the underlying ioselector, which closes it at the kernel level, and dealloc's the memory, but it doesn't actually unregister the FD from the select dispatcher:

proc close*(ev: AsyncEvent) =

proc close*[T](s: Selector[T]) =

Maybe that is a bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh

Yeah, it's not consistent with sockets at all then...

Nim/lib/pure/asyncdispatch.nim

Lines 1320 to 1334 in 5288b54

proc closeSocket*(sock: AsyncFD) =
let selector = getGlobalDispatcher().selector
if sock.SocketHandle notin selector:
raise newException(ValueError, "File descriptor not registered.")
let data = selector.getData(sock.SocketHandle)
sock.unregister()
sock.SocketHandle.close()
# We need to unblock the read and write callbacks which could still be
# waiting for the socket to become readable and/or writeable.
for cb in data.readList & data.writeList:
if not cb(sock):
raise newException(
ValueError, "Expecting async operations to stop when fd has closed."
)

when compileOption("threads"):
deinitRLock(ev.eventLock)
deallocShared(cast[pointer](ev))

# -- Await Macro
include asyncmacro

Expand Down
13 changes: 13 additions & 0 deletions tests/async/testasyncevent.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
discard """
output: '''
event triggered!
'''
"""

import asyncDispatch

let ev = newAsyncEvent()
addEvent(ev, proc(fd: AsyncFD): bool {.gcsafe.} = echo "event triggered!"; true)
ev.trigger()

drain()
25 changes: 25 additions & 0 deletions tests/async/testmanyasynceventsovertime.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
discard """
output: '''
triggerCount: 10
'''
"""

import asyncDispatch

var triggerCount = 0
var evs = newSeq[AsyncEvent]()

for i in 0 ..< 10:
var ev = newAsyncEvent()
evs.add(ev)
addEvent(ev, proc(fd: AsyncFD): bool {.gcsafe,closure.} = triggerCount += 1; true)

proc main() {.async.} =
for ev in evs:
ev.trigger()
await sleepAsync(10)
ev.close

asyncCheck main()
drain()
echo "triggerCount: ", triggerCount
21 changes: 21 additions & 0 deletions tests/async/testmanyvirtualasyncevents.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
discard """
output: '''
triggerCount: 8000
'''
"""

import asyncDispatch

var triggerCount = 0
var evs = newSeq[VirtualAsyncEvent]()

for i in 0 ..< 8000: # some number way higher than the typical physical fd limit
var ev = newVirtualAsyncEvent()
evs.add(ev)
addEvent(ev, proc(fd: AsyncFD): bool {.gcsafe,closure.} = triggerCount += 1; true)

for ev in evs:
ev.trigger()

drain()
echo "triggerCount: ", triggerCount
28 changes: 28 additions & 0 deletions tests/async/testmanyvirtualasynceventsovertime.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
discard """
output: '''
runForever should throw ValueError, this is expected
triggerCount: 100
'''
"""

import asyncDispatch

var triggerCount = 0
var evs = newSeq[VirtualAsyncEvent]()

for i in 0 ..< 100:
var ev = newVirtualAsyncEvent()
evs.add(ev)
addEvent(ev, proc(fd: AsyncFD): bool {.gcsafe,closure.} = triggerCount += 1; true)

proc main() {.async.} =
for ev in evs:
await sleepAsync(10)
ev.trigger()

try:
asyncCheck main()
runForever()
except ValueError:
echo "runForever should throw ValueError, this is expected"
echo "triggerCount: ", triggerCount
13 changes: 13 additions & 0 deletions tests/async/testvirtualasyncevent.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
discard """
output: '''
event triggered!
'''
"""

import asyncDispatch

let ev = newVirtualAsyncEvent()
addEvent(ev, proc(fd: AsyncFD): bool {.gcsafe.} = echo "event triggered!"; true)
ev.trigger()

drain()
Loading