-
-
Notifications
You must be signed in to change notification settings - Fork 73
Don't clobber the scheduler #310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Everytime a Iirc one thing that made me a bit nervous when I did the current code was race conditions where the same |
Can you elaborate on this potential race condition? We shouldn't be able to double-schedule a thunk, and even if we do, it would probably not cause significant harm to the scheduler's internal consistency. |
I can't think of anything specific except for a general worry of things like non-deterministic bugs happening due to things running in some unexpected order in case multiple tasks yield. I suppose double-scheduling a thunk should not happen unless something firing off a thunk is threaded. It was probably more of an attempt at trying to keep the moving parts to a minimum. |
That's a fair concern, although I think it's probably not something that we need to be concerned about. At this point, the scheduler should be multitasking-safe due to having a global lock taken during modifications. If I implement an event-driven notification system, it would be implemented with a thread-safe primitive, and scheduler updates would be serialized with the scheduler lock. Slow, maybe, but also race-free. |
d50360b
to
f2a2689
Compare
Codecov Report
@@ Coverage Diff @@
## master #310 +/- ##
======================================
Coverage 0.00% 0.00%
======================================
Files 42 42
Lines 3596 3630 +34
======================================
- Misses 3596 3630 +34
Continue to review full report at Codecov.
|
b03a61a
to
22c235f
Compare
I get a reproducible hang on this branch when running i'll get an MWE later, but here's the benchmark steps clone https://github.com/krynju/dtable_benchmarks |
Yeah I also get a hang somewhere on a large distributed benchmark. I plan to investigate before I merge. |
It kinda looks like the one we had here #284 |
847044f
to
5bd8fec
Compare
5bd8fec
to
7601fa8
Compare
@DrChainsaw I'd appreciate a review of the |
Hmmm sometimes I get crashes, but I managed to get my biggest (16GB) groupby work once and the performance ~ about the same Just got smaller data size to crash and throw this:
|
That's very concerning; is there any chance you can run benchmarks with a debug build of Julia? It might be that we've got a portion of |
Yeah, performance may be somewhat similar on non-distributed workloads, since work is over-subscribed first, and then executed (so you could get cycles of scheduling to execution, resulting in clobbering not being a problem). For me, a distributed benchmark of heavy BLAS operations was helped a lot by this, as well as #165 . |
Ah nvm it's usually faster. Sometimes even 2x faster in some longer runs |
logs with debug on:
|
Can one add new |
Absolutely! That's how the eager scheduler is implemented, as a regular thunk running on a regular scheduler, listening on a channel, and then constructing new |
Here is a patched version of the add workers test which should not be unreliable. I could not get Let me know if it is acceptable and I'll try to fix the remove workers test too. Add procs test setup = quote
using Dagger, Distributed
# blocked is to guarantee that processing is not completed before we add new workers
# Note: blocked is used in expressions below
blocked = true
function testfun(i)
i < 4 && return myid()
# Wait for test to do its thing before we proceed
if blocked
sleep(0.1) # just so we don't end up overflowing or something while waiting for workers to be added
# Here we would like to just wait to be rescheduled on another worker (which is not blocked)
# but this functionality does not exist, so instead we do this weird thing where we reschedule
# until we end up on a non-blocked worker
h = Dagger.Sch.sch_handle()
id = Dagger.Sch.add_thunk!(testfun, h, i)
return fetch(h, id)
end
return myid()
end
end
@testset "Add new workers" begin
ps = []
try
ps1 = addprocs(2, exeflags="--project")
append!(ps, ps1)
@everywhere vcat(ps1, myid()) $setup
ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...)
ctx = Context(ps1)
job = @async collect(ctx, ts)
while !istaskstarted(job)
sleep(0.001)
end
# Will not be added, so they should never appear in output
# TODO: Does not work: add_thunk! seems to create a new context using all available workers :(
#ps2 = addprocs(2, exeflags="--project")
#append!(ps, ps2)
ps3 = addprocs(2, exeflags="--project")
append!(ps, ps3)
@everywhere ps3 $setup
addprocs!(ctx, ps3)
@test length(procs(ctx)) == 4
@everywhere ps3 blocked=false
ps_used = fetch(job)
@test ps_used isa Vector
@test any(p -> p in ps_used, ps1)
@test any(p -> p in ps_used, ps3)
#@test !any(p in ps2, ps_used)
finally
wait(rmprocs(ps))
end
end |
It'd probably be better to call into the scheduler (with
Yes please! |
9bd1328
to
a9f3358
Compare
I'll update your new test approach to use my suggestion. |
Wonderfully enough, this PR seems to actually make fault handling more robust. |
3764402
to
3f4fb15
Compare
I tried a little to fix the remove procs test but I got stuck. It seems like I also get frequent Examplesetup with logging and longer wait:setup = quote
using Dagger, Distributed
function _list_workers(ctx, state, task, tid, _)
return procs(ctx)
end
# blocked is to guarantee that processing is not completed before we add new workers
# Note: blocked is used in expressions below
blocked = true
function testfun(i)
i < 4 && return myid()
# Wait for test to do its thing before we proceed
if blocked
sleep(0.5) # just so we don't end up overflowing or something while waiting for workers to be added
# Here we would like to just wait to be rescheduled on another worker (which is not blocked)
# but this functionality does not exist, so instead we do this weird thing where we reschedule
# until we end up on a non-blocked worker
h = Dagger.Sch.sch_handle()
wkrs = Dagger.Sch.exec!(_list_workers, h)
id = if length(wkrs) > 2
id = Dagger.Sch.add_thunk!(testfun, h, i; single=last(wkrs).pid)
@info "After adding from wkrs: $id"
id
else
id = Dagger.Sch.add_thunk!(testfun, h, i)
@info "After adding to all $id"
id
end
return fetch(h, id)
end
return myid()
end
end First I ran this: ps = []
ps1 = addprocs(2, exeflags="--project")
append!(ps, ps1)
@everywhere vcat(ps1, myid()) $setup
ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...)
ctx = Context(ps1)
job = @async collect(ctx, ts)
while !istaskstarted(job)
sleep(0.001)
end Which prints a steady stream of:
But then after running: ps3 = addprocs(2, exeflags="--project")
append!(ps, ps3)
@everywhere ps3 $setup
addprocs!(ctx, ps3)
@test length(procs(ctx)) == 4 It just prints the following and then goes silent:
Which seems to indicate that thunks get stuck somehow. Trying to unblock does nothing: julia> job
Task (runnable) @0x000000000e9a5b30
julia> @everywhere blocked = false
julia> job
Task (runnable) @0x000000000e9a5b30
julia> @everywhere blocked = false
julia> job
Task (runnable) @0x000000000e9a5b30 Also, if I try to log anything about |
Ok, I think I found the root cause here: I suppose the drawback is that there will be one task listening for halts each time this is done, so perhaps refactoring so that I suppose one would also need/want a mechanism to clean up the listeners when procs are removed (although I dread dealing with the edge cases that this might create). Let me know if you want a PR or code suggestion for this. I do feel a bit bad for having added more moving parts with the add/remove procs and I hope it is useful for someone else than me 😟 |
Got this interesting error log, but not sure if it's related. log Activating project at `C:\Users\krynjupc\WS\mgr_benchmark_setup\dtable`
From worker 4: Activating project at `C:\Users\krynjupc\WS\mgr_benchmark_setup\dtable`
From worker 2: Activating project at `C:\Users\krynjupc\WS\mgr_benchmark_setup\dtable`
From worker 3: Activating project at `C:\Users\krynjupc\WS\mgr_benchmark_setup\dtable`
@@@ TABLESIZE: 1600.0 MB
@@@ SAVING TO: results\dtable_bench1640153009.csv
From worker 4: ┌ Error: Error on 4 while connecting to peer 3, exitingError in sending dynamic request:
no process with id 4 exists
Stacktrace:
[1] error(s::String)
@ Base .\error.jl:33
[2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1094
[3] worker_from_id
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1086 [inlined]
[4] #remotecall_fetch#158
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:494 [inlined]
[5] remotecall_fetch
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:494 [inlined]
[6] call_on_owner
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:567 [inlined]
[7] take!
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:763 [inlined]
[8] macro expansion
@ C:\Users\krynjupc\.julia\dev\Dagger\src\sch\dynamic.jl:52 [inlined]
[9] (::Dagger.Sch.var"#38#42"{Context, Dagger.Sch.ComputeState, Task, RemoteChannel{Channel{Any}}, RemoteChannel{Channel{Any}}})()
@ Dagger.Sch .\task.jl:466
┌ Error: Fatal error on process 1
│ exception =
│ attempt to send to unknown socket
│ Stacktrace:
│ [1] error(s::String)
│ @ Base .\error.jl:33
│ [2] send_msg_unknown(s::Sockets.TCPSocket, header::Distributed.MsgHeader, msg::Distributed.ResultMsg)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\messages.jl:99
│ [3] send_msg_now(s::Sockets.TCPSocket, header::Distributed.MsgHeader, msg::Distributed.ResultMsg)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\messages.jl:115
│ [4] deliver_result(sock::Sockets.TCPSocket, msg::Symbol, oid::Distributed.RRID, value::Nothing)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:95
│ [5] macro expansion
│ @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:286 [inlined]
│ [6] (::Distributed.var"#105#107"{Distributed.CallMsg{:call_fetch}, Distributed.MsgHeader, Sockets.TCPSocket})()
│ @ Distributed .\task.jl:466
└ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:99
From worker 4: │ exception =
┌ Error: Fatal error on process 1
│ exception =
│ attempt to send to unknown socket
│ Stacktrace:
│ [1] error(s::String)
│ @ Base .\error.jl:33
│ [2] send_msg_unknown(s::Sockets.TCPSocket, header::Distributed.MsgHeader, msg::Distributed.ResultMsg)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\messages.jl:99
│ [3] send_msg_now(s::Sockets.TCPSocket, header::Distributed.MsgHeader, msg::Distributed.ResultMsg)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\messages.jl:115
│ [4] deliver_result(sock::Sockets.TCPSocket, msg::Symbol, oid::Distributed.RRID, value::Nothing)
│ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:95
│ [5] macro expansion
│ @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:295 [inlined]
│ [6] (::Distributed.var"#109#111"{Distributed.CallWaitMsg, Distributed.MsgHeader, Sockets.TCPSocket})()
│ @ Distributed .\task.jl:466
└ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:99
From worker 4: │ ConcurrencyViolationError("lock must be held")
Worker 4 terminated. From worker 4: │ Stacktrace:
Error in eager scheduler:
TaskFailedException
nested task error: no process with id 4 exists
Stacktrace:
[1] error(s::String)
@ Base .\error.jl:33
[2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1094
[3] worker_from_id
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1086 [inlined]
[4] remote_do(::Function, ::Int64, ::Dagger.NoOpLog, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:559
[5] remote_do(::Function, ::Int64, ::Dagger.NoOpLog, ::Vararg{Any})
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:559
[6] (::Dagger.Sch.var"#117#119"{Context, Set{Dagger.Chunk}, Int64})()
@ Dagger.Sch .\task.jl:466
Stacktrace:
[1] sync_end(c::Channel{Any})
@ Base .\task.jl:424
[2] macro expansion
@ .\task.jl:443 [inlined]
[3] evict_all_chunks!(ctx::Context, to_evict::Set{Dagger.Chunk})
@ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:783
[4] finish_task!(ctx::Context, state::Dagger.Sch.ComputeState, node::Thunk, thunk_failed::Bool)
@ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:778
[5] (::Dagger.Sch.var"#90#96"{Context, Dagger.Sch.ComputeState, OSProc, NamedTuple{(:pressure, :loadavg, :threadtime, :transfer_rate), Tuple{UInt64, Tuple{Float64, Float64, Float64}, UInt64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64})()
@ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:451
[6] lock(f::Dagger.Sch.var"#90#96"{Context, Dagger.Sch.ComputeState, OSProc, NamedTuple{(:pressure, :loadavg, :threadtime, :transfer_rate), Tuple{UInt64, Tuple{Float64, Float64, Float64}, UInt64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64}, l::ReentrantLock)
@ Base .\lock.jl:183
[7] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:407
[8] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger C:\Users\krynjupc\.julia\dev\Dagger\src\compute.jl:31
[9] (::Dagger.Sch.var"#61#62"{Context})()
@ Dagger.Sch .\task.jl:466
From worker 4: │ [1] concurrency_violation()
From worker 4: │ @ Base .\condition.jl:8
From worker 4: │ [2] assert_havelock
From worker 4: │ @ .\condition.jl:25 [inlined]
From worker 4: │ [3] assert_havelock
From worker 4: │ @ .\condition.jl:48 [inlined]
From worker 4: │ [4] assert_havelock
From worker 4: │ @ .\condition.jl:72 [inlined]
From worker 4: │ [5] notify(c::Condition, arg::Any, all::Bool, error::Bool)
From worker 4: │ @ Base .\condition.jl:144
From worker 4: │ [6] #notify#570
From worker 4: │ @ .\condition.jl:142 [inlined]
From worker 4: │ [7] set_worker_state
From worker 4: │ @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:148 [inlined]
From worker 4: │ [8] Distributed.Worker(id::Int64, r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, manager::Distributed.DefaultClusterManager; version::Nothing, config::WorkerConfig)
From worker 4: │ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:126
From worker 4: │ [9] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int64, wconfig::WorkerConfig)
From worker 4: │ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:356
From worker 4: │ [10] (::Distributed.var"#117#119"{Int64, WorkerConfig})()
From worker 4: │ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:342
From worker 4: │ [11] exec_conn_func(w::Distributed.Worker)
From worker 4: │ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:181
From worker 4: │ [12] (::Distributed.var"#17#20"{Distributed.Worker})()
From worker 4: │ @ Distributed .\task.jl:466
From worker 4: └ @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:362
From worker 3: ErrorException("Cookie read failed. Connection closed by peer.")CapturedException(ErrorException("Cookie read failed. Connection closed by peer."), Any[(error(s::String) at error.jl:33, 1), (process_hdr(s::Sockets.TCPSocket, validate_cookie::Bool) at process_messages.jl:251, 1), (message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) at process_messages.jl:151, 1), (process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) at process_messages.jl:126, 1), ((::Distributed.var"#99#100"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() at task.jl:466, 1)])
From worker 3: Process(3) - Unknown remote, closing connection.
Unhandled Task ERROR: EOFError: read end of file
Stacktrace:
[1] (::Base.var"#wait_locked#660")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64)
@ Base .\stream.jl:941
[2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64)
@ Base .\stream.jl:950
[3] unsafe_read
@ .\io.jl:751 [inlined]
[4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64)
@ Base .\io.jl:750
[5] read!
@ .\io.jl:752 [inlined]
[6] deserialize_hdr_raw
@ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\messages.jl:167 [inlined]
[7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:165
[8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
@ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:126
[9] (::Distributed.var"#99#100"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})()
@ Distributed .\task.jl:466
┌ Error: Error initializing worker OSProc(4)
│ exception =
│ KeyError: key 4 not found
│ Stacktrace:
│ [1] getindex
│ @ .\dict.jl:498 [inlined]
│ [2] (::Dagger.Sch.var"#74#79"{Dagger.Sch.ComputeState, OSProc})()
│ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:280
│ [3] lock(f::Dagger.Sch.var"#74#79"{Dagger.Sch.ComputeState, OSProc}, l::ReentrantLock)
│ @ Base .\lock.jl:183
│ [4] init_proc(state::Dagger.Sch.ComputeState, p::OSProc, log_sink::Dagger.NoOpLog)
│ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:257
│ [5] macro expansion
│ @ C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:364 [inlined]
│ [6] (::Dagger.Sch.var"#88#94"{Context, Dagger.Sch.ComputeState, OSProc})()
│ @ Dagger.Sch .\task.jl:466
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:366
From worker 3: ProcessExitedException(4)
From worker 3: Stacktrace:
From worker 3: [1] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
From worker 3: @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1089
From worker 3: [2] worker_from_id
From worker 3: @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\cluster.jl:1086 [inlined]
From worker 3: [3] #remotecall_fetch#158
From worker 3: @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:494 [inlined]
From worker 3: [4] remotecall_fetch
From worker 3: @ C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\remotecall.jl:494 [inlined]
From worker 3: [5] #68
From worker 3: @ C:\Users\krynjupc\.julia\dev\Dagger\src\processor.jl:98 [inlined]
From worker 3: [6] get!(default::Dagger.var"#68#69"{Int64}, h::Dict{Int64, Vector{Dagger.Processor}}, key::Int64)
From worker 3: @ Base .\dict.jl:481
From worker 3: [7] OSProc
From worker 3: @ C:\Users\krynjupc\.julia\dev\Dagger\src\processor.jl:97 [inlined]
From worker 3: [8] evict_chunks!(log_sink::Dagger.NoOpLog, chunks::Set{Dagger.Chunk})
From worker 3: @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:789
From worker 3: [9] invokelatest(::Any, ::Any, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
From worker 3: @ Base .\essentials.jl:731
From worker 3: [10] invokelatest(::Any, ::Any, ::Vararg{Any})
From worker 3: @ Base .\essentials.jl:729
From worker 3: [11] (::Distributed.var"#114#116"{Distributed.RemoteDoMsg})()
From worker 3: @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:301
From worker 3: [12] run_work_thunk(thunk::Distributed.var"#114#116"{Distributed.RemoteDoMsg}, print_error::Bool)
From worker 3: @ Distributed C:\cygwin64\home\krynjupc\julia\usr\share\julia\stdlib\v1.8\Distributed\src\process_messages.jl:63
From worker 3: [13] (::Distributed.var"#113#115"{Distributed.RemoteDoMsg})()
From worker 3: @ Distributed .\task.jl:466fatal: error thrown and no exception handler available.
From worker 3: InterruptException()
From worker 2: fatal: error thrown and no exception handler available.
From worker 2: InterruptException()
|
This PR is a set of bugfixes and optimizations that aim to improve the scheduler's ability to pump out tasks to workers, and maximize throughput. It will try to fix the following issues:
do_tasks
), which then (by nature of holding the scheduler lock) hangs the scheduler infire_tasks!
, which prevents new tasks from being added to the scheduler and launched. We shouldn't hold the lock during this time, and in the future, we'll want to have a dedicated thread for each worker to receive and launch work from.Context
modifications through a customsetproperty!
call which will notify any listening tasks that the set of available workers has changed (@DrChainsaw ).Todo:
MemPool.approxsize
returns non-nothing
for important types