forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathremotecall.jl
576 lines (473 loc) · 17.4 KB
/
remotecall.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# This file is a part of Julia. License is MIT: https://julialang.org/license
"""
client_refs
Tracks whether a particular `AbstractRemoteRef`
(identified by its RRID) exists on this worker.
The `client_refs` lock is also used to synchronize access to `.refs` and associated `clientset` state.
"""
const client_refs = WeakKeyDict{Any, Void}() # used as a WeakKeySet
abstract type AbstractRemoteRef end
mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Nullable{Any}
Future(w::Int, rrid::RRID) = Future(w, rrid, Nullable{Any}())
Future(w::Int, rrid::RRID, v) = (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
Future(t::Tuple) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
end
mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
where::Int
whence::Int
id::Int
function RemoteChannel{T}(w::Int, rrid::RRID) where T<:AbstractChannel
r = new(w, rrid.whence, rrid.id)
return test_existing_ref(r)
end
function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel
return new(t[1],t[2],t[3])
end
end
function test_existing_ref(r::AbstractRemoteRef)
found = getkey(client_refs, r, nothing)
if found !== nothing
@assert r.where > 0
if isa(r, Future) && isnull(found.v) && !isnull(r.v)
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
found.v = r.v
end
return found::typeof(r)
end
client_refs[r] = nothing
finalizer(finalize_ref, r)
return r
end
function finalize_ref(r::AbstractRemoteRef)
if r.where > 0 # Handle the case of the finalizer having been called manually
if islocked(client_refs)
# delay finalizer for later, when it's not already locked
finalizer(finalize_ref, r)
return nothing
end
delete!(client_refs, r)
if isa(r, RemoteChannel)
send_del_client(r)
else
# send_del_client only if the reference has not been set
isnull(r.v) && send_del_client(r)
r.v = Nullable{Any}()
end
r.where = 0
end
nothing
end
Future(w::LocalProcess) = Future(w.id)
Future(w::Worker) = Future(w.id)
"""
Future(pid::Integer=myid())
Create a `Future` on process `pid`.
The default `pid` is the current process.
"""
Future(pid::Integer=myid()) = Future(pid, RRID())
"""
RemoteChannel(pid::Integer=myid())
Make a reference to a `Channel{Any}(1)` on process `pid`.
The default `pid` is the current process.
"""
RemoteChannel(pid::Integer=myid()) = RemoteChannel{Channel{Any}}(pid, RRID())
"""
RemoteChannel(f::Function, pid::Integer=myid())
Create references to remote channels of a specific size and type. `f` is a function that
when executed on `pid` must return an implementation of an `AbstractChannel`.
For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a
channel of type `Int` and size 10 on `pid`.
The default `pid` is the current process.
"""
function RemoteChannel(f::Function, pid::Integer=myid())
remotecall_fetch(pid, f, RRID()) do f, rrid
rv=lookup_ref(rrid, f)
RemoteChannel{typeof(rv.c)}(myid(), rrid)
end
end
hash(r::AbstractRemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::AbstractRemoteRef, s::AbstractRemoteRef) = (r.whence==s.whence && r.id==s.id)
"""
Base.remoteref_id(r::AbstractRemoteRef) -> RRID
`Future`s and `RemoteChannel`s are identified by fields:
* `where` - refers to the node where the underlying object/storage
referred to by the reference actually exists.
* `whence` - refers to the node the remote reference was created from.
Note that this is different from the node where the underlying object
referred to actually exists. For example calling `RemoteChannel(2)`
from the master process would result in a `where` value of 2 and
a `whence` value of 1.
* `id` is unique across all references created from the worker specified by `whence`.
Taken together, `whence` and `id` uniquely identify a reference across all workers.
`Base.remoteref_id` is a low-level API which returns a `Base.RRID`
object that wraps `whence` and `id` values of a remote reference.
"""
remoteref_id(r::AbstractRemoteRef) = RRID(r.whence, r.id)
"""
Base.channel_from_id(id) -> c
A low-level API which returns the backing `AbstractChannel` for an `id` returned by
[`remoteref_id`](@ref).
The call is valid only on the node where the backing channel exists.
"""
function channel_from_id(id)
rv = lock(client_refs) do
return get(PGRP.refs, id, false)
end
if rv === false
throw(ErrorException("Local instance of remote reference not found"))
end
return rv.c
end
lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f)
function lookup_ref(pg, rrid, f)
return lock(client_refs) do
rv = get(pg.refs, rrid, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue(invokelatest(f))
pg.refs[rrid] = rv
push!(rv.clientset, rrid.whence)
end
return rv
end::RemoteValue
end
"""
isready(rr::Future)
Determine whether a [`Future`](@ref) has a value stored to it.
If the argument `Future` is owned by a different node, this call will block to wait for the answer.
It is recommended to wait for `rr` in a separate task instead
or to use a local [`Channel`](@ref) as a proxy:
c = Channel(1)
@async put!(c, remotecall_fetch(long_computation, p))
isready(c) # will not block
"""
function isready(rr::Future)
!isnull(rr.v) && return true
rid = remoteref_id(rr)
return if rr.where == myid()
isready(lookup_ref(rid).c)
else
remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid)
end
end
"""
isready(rr::RemoteChannel, args...)
Determine whether a [`RemoteChannel`](@ref) has a value stored to it.
Note that this function can cause race conditions, since by the
time you receive its result it may no longer be true. However,
it can be safely used on a [`Future`](@ref) since they are assigned only once.
"""
function isready(rr::RemoteChannel, args...)
rid = remoteref_id(rr)
return if rr.where == myid()
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid)
end
end
del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
del_client(id, client) = del_client(PGRP, id, client)
function del_client(pg, id, client)
lock(client_refs) do
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
if isempty(rv.clientset)
delete!(pg.refs, id)
#print("$(myid()) collected $id\n")
end
end
end
nothing
end
function del_clients(pairs::Vector)
for p in pairs
del_client(p[1], p[2])
end
end
any_gc_flag = Condition()
function start_gc_msgs_task()
@schedule while true
wait(any_gc_flag)
flush_gc_msgs()
end
end
function send_del_client(rr)
if rr.where == myid()
del_client(rr)
elseif id_in_procs(rr.where) # process only if a valid worker
w = worker_from_id(rr.where)
push!(w.del_msgs, (remoteref_id(rr), myid()))
w.gcflag = true
notify(any_gc_flag)
end
end
function add_client(id, client)
lock(client_refs) do
rv = lookup_ref(id)
push!(rv.clientset, client)
end
nothing
end
function add_clients(pairs::Vector)
for p in pairs
add_client(p[1], p[2]...)
end
end
function send_add_client(rr::AbstractRemoteRef, i)
if rr.where == myid()
add_client(remoteref_id(rr), i)
elseif (i != rr.where) && id_in_procs(rr.where)
# don't need to send add_client if the message is already going
# to the processor that owns the remote ref. it will add_client
# itself inside deserialize().
w = worker_from_id(rr.where)
push!(w.add_msgs, (remoteref_id(rr), i))
w.gcflag = true
notify(any_gc_flag)
end
end
channel_type(rr::RemoteChannel{T}) where {T} = T
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
end
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
end
function deserialize(s::ClusterSerializer, t::Type{<:Future})
f = deserialize_rr(s,t)
Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
end
function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel})
rr = deserialize_rr(s,t)
# call ctor to make sure this rr gets added to the client_refs table
RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id))
end
function deserialize_rr(s, t)
rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
if rr.where == myid()
# send_add_client() is not executed when the ref is being
# serialized to where it exists
add_client(remoteref_id(rr), myid())
end
rr
end
# Future and RemoteChannel are serializable only in a running cluster.
# Serialize zeroed-out values to non ClusterSerializer objects
function serialize(s::AbstractSerializer, ::Future)
zero_fut = Future((0,0,0,Nullable{Any}()))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
end
function serialize(s::AbstractSerializer, ::RemoteChannel)
zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
end
# make a thunk to call f on args in a way that simulates what would happen if
# the function were sent elsewhere
function local_remotecall_thunk(f, args, kwargs)
if isempty(args) && isempty(kwargs)
return f
end
return ()->f(args...; kwargs...)
end
function remotecall(f, w::LocalProcess, args...; kwargs...)
rr = Future(w)
schedule_call(remoteref_id(rr), local_remotecall_thunk(f, args, kwargs))
return rr
end
function remotecall(f, w::Worker, args...; kwargs...)
rr = Future(w)
send_msg(w, MsgHeader(remoteref_id(rr)), CallMsg{:call}(f, args, kwargs))
return rr
end
"""
remotecall(f, id::Integer, args...; kwargs...) -> Future
Call a function `f` asynchronously on the given arguments on the specified process.
Returns a [`Future`](@ref).
Keyword arguments, if any, are passed through to `f`.
"""
remotecall(f, id::Integer, args...; kwargs...) = remotecall(f, worker_from_id(id), args...; kwargs...)
function remotecall_fetch(f, w::LocalProcess, args...; kwargs...)
v=run_work_thunk(local_remotecall_thunk(f,args, kwargs), false)
return isa(v, RemoteException) ? throw(v) : v
end
function remotecall_fetch(f, w::Worker, args...; kwargs...)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
oid = RRID()
rv = lookup_ref(oid)
rv.waitingfor = w.id
send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))
v = take!(rv)
lock(client_refs) do
delete!(PGRP.refs, oid)
end
return isa(v, RemoteException) ? throw(v) : v
end
"""
remotecall_fetch(f, id::Integer, args...; kwargs...)
Perform `fetch(remotecall(...))` in one message.
Keyword arguments, if any, are passed through to `f`.
Any remote exceptions are captured in a
[`RemoteException`](@ref) and thrown.
See also [`fetch`](@ref) and [`remotecall`](@ref).
"""
remotecall_fetch(f, id::Integer, args...; kwargs...) =
remotecall_fetch(f, worker_from_id(id), args...; kwargs...)
remotecall_wait(f, w::LocalProcess, args...; kwargs...) = wait(remotecall(f, w, args...; kwargs...))
function remotecall_wait(f, w::Worker, args...; kwargs...)
prid = RRID()
rv = lookup_ref(prid)
rv.waitingfor = w.id
rr = Future(w)
send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs))
v = fetch(rv.c)
lock(client_refs) do
delete!(PGRP.refs, prid)
end
isa(v, RemoteException) && throw(v)
return rr
end
"""
remotecall_wait(f, id::Integer, args...; kwargs...)
Perform a faster `wait(remotecall(...))` in one message on the `Worker` specified by worker id `id`.
Keyword arguments, if any, are passed through to `f`.
See also [`wait`](@ref) and [`remotecall`](@ref).
"""
remotecall_wait(f, id::Integer, args...; kwargs...) =
remotecall_wait(f, worker_from_id(id), args...; kwargs...)
function remote_do(f, w::LocalProcess, args...; kwargs...)
# the LocalProcess version just performs in local memory what a worker
# does when it gets a :do message.
# same for other messages on LocalProcess.
thk = local_remotecall_thunk(f, args, kwargs)
schedule(Task(thk))
nothing
end
function remote_do(f, w::Worker, args...; kwargs...)
send_msg(w, MsgHeader(), RemoteDoMsg(f, args, kwargs))
nothing
end
"""
remote_do(f, id::Integer, args...; kwargs...) -> nothing
Executes `f` on worker `id` asynchronously.
Unlike [`remotecall`](@ref), it does not store the
result of computation, nor is there a way to wait for its completion.
A successful invocation indicates that the request has been accepted for execution on
the remote node.
While consecutive `remotecall`s to the same worker are serialized in the order they are
invoked, the order of executions on the remote worker is undetermined. For example,
`remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)` will serialize the call
to `f1`, followed by `f2` and `f3` in that order. However, it is not guaranteed that `f1`
is executed before `f3` on worker 2.
Any exceptions thrown by `f` are printed to [`STDERR`](@ref) on the remote worker.
Keyword arguments, if any, are passed through to `f`.
"""
remote_do(f, id::Integer, args...; kwargs...) = remote_do(f, worker_from_id(id), args...; kwargs...)
# have the owner of rr call f on it
function call_on_owner(f, rr::AbstractRemoteRef, args...)
rid = remoteref_id(rr)
if rr.where == myid()
f(rid, args...)
else
remotecall_fetch(f, rr.where, rid, args...)
end
end
function wait_ref(rid, callee, args...)
v = fetch_ref(rid, args...)
if isa(v, RemoteException)
if myid() == callee
throw(v)
else
return v
end
end
nothing
end
wait(r::Future) = (!isnull(r.v) && return r; call_on_owner(wait_ref, r, myid()); r)
wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r)
function fetch(r::Future)
!isnull(r.v) && return get(r.v)
v=call_on_owner(fetch_ref, r)
r.v=v
send_del_client(r)
v
end
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...)
"""
fetch(x)
Waits and fetches a value from `x` depending on the type of `x`:
* [`Future`](@ref): Wait for and get the value of a `Future`. The fetched value is cached locally.
Further calls to `fetch` on the same reference return the cached value. If the remote value
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
* [`RemoteChannel`](@ref): Wait for and get the value of a remote reference. Exceptions raised are
same as for a `Future` .
Does not remove the item fetched.
"""
fetch(@nospecialize x) = x
isready(rv::RemoteValue, args...) = isready(rv.c, args...)
"""
put!(rr::Future, v)
Store a value to a [`Future`](@ref) `rr`.
`Future`s are write-once remote references.
A `put!` on an already set `Future` throws an `Exception`.
All asynchronous remote calls return `Future`s and set the
value to the return value of the call upon completion.
"""
function put!(rr::Future, v)
!isnull(rr.v) && error("Future can be set only once")
call_on_owner(put_future, rr, v, myid())
rr.v = v
rr
end
function put_future(rid, v, callee)
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
put!(rv, v)
# The callee has the value and hence can be removed from the remote store.
del_client(rid, callee)
nothing
end
put!(rv::RemoteValue, args...) = put!(rv.c, args...)
put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing)
"""
put!(rr::RemoteChannel, args...)
Store a set of values to the [`RemoteChannel`](@ref).
If the channel is full, blocks until space is available.
Returns its first argument.
"""
put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, args...); rr)
# take! is not supported on Future
take!(rv::RemoteValue, args...) = take!(rv.c, args...)
function take_ref(rid, callee, args...)
v=take!(lookup_ref(rid), args...)
isa(v, RemoteException) && (myid() == callee) && throw(v)
v
end
"""
take!(rr::RemoteChannel, args...)
Fetch value(s) from a [`RemoteChannel`](@ref) `rr`,
removing the value(s) in the process.
"""
take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...)
# close is not supported on Future
close_ref(rid) = (close(lookup_ref(rid).c); nothing)
close(rr::RemoteChannel) = call_on_owner(close_ref, rr)
getindex(r::RemoteChannel) = fetch(r)
getindex(r::Future) = fetch(r)
getindex(r::Future, args...) = getindex(fetch(r), args...)
function getindex(r::RemoteChannel, args...)
if r.where == myid()
return getindex(fetch(r), args...)
end
return remotecall_fetch(getindex, r.where, r, args...)
end