Skip to content

Cannot serialize a Thunk when processing an error with Distributed #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
StevenWhitaker opened this issue Sep 14, 2023 · 3 comments · Fixed by #434
Closed

Cannot serialize a Thunk when processing an error with Distributed #430

StevenWhitaker opened this issue Sep 14, 2023 · 3 comments · Fixed by #434

Comments

@StevenWhitaker
Copy link

StevenWhitaker commented Sep 14, 2023

I get the error mentioned in the title with the following example. See also #431 and JuliaParallel/DTables.jl#52, which may be related. I'm submitting this issue here because I'm guessing it's not specific to just DTables.jl (but I could be wrong).

Contents of mwe.jl:

using Distributed, DelimitedFiles
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using CSV, DTables, DataFrames

file = tempname() * ".csv"
writedlm(file, [1, 2])

# See issue #431 for the purpose of this next line.
DTable(x -> CSV.File(x), [file]; tabletype = DataFrame)

remotecall_fetch(2, file) do f
    d = DTable(x -> CSV.File(x), [f]; tabletype = DataFrame)
    getproperty(d, Symbol("1"))
end

rm(file)

Results:

julia> include("mwe.jl")
Error in eager scheduler:
ArgumentError: Cannot serialize a Thunk
Stacktrace:
  [1] serialize(io::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Dagger.Thunk)
    @ Dagger ~/.julia/packages/Dagger/xGAvM/src/thunk.jl:116
  [2] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [3] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657
  [4] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [5] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657
  [6] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [7] serialize
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657 [inlined]
  [8] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}})
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205
  [9] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Distributed.RRID, Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, Int64})
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205
 [10] serialize_msg(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, o::Distributed.CallMsg{:call_fetch})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:78
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call_fetch}, now::Bool)
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:181
 [14] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [15] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [16] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [17] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [18] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [19] call_on_owner(::Function, ::Future, ::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:565
 [20] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:681 [inlined]
 [21] macro expansion
    @ ./lock.jl:267 [inlined]
 [22] put!(r::Future, v::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:680
 [23] #put!#73
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:24 [inlined]
 [24] fill_registered_futures!(state::Dagger.Sch.ComputeState, node::Dagger.Thunk, failed::Bool)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:37
 [25] finish_failed!(state::Dagger.Sch.ComputeState, thunk::Dagger.Thunk, origin::Dagger.Thunk)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:149
 [26] set_failed!(state::Dagger.Sch.ComputeState, origin::Dagger.Thunk, thunk::Dagger.Thunk)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:146
 [27] set_failed!
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:143 [inlined]
 [28] finish_task!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, node::Dagger.Thunk, thunk_failed::Bool)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:898
 [29] (::Dagger.Sch.var"#91#92"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64})()
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:574
 [30] lock(f::Dagger.Sch.var"#91#92"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64}, l::ReentrantLock)
    @ Base ./lock.jl:229
 [31] scheduler_run(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, d::Dagger.Thunk, options::Dagger.Sch.SchedulerOptions)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:525
 [32] compute_dag(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:449
 [33] compute_dag
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:414 [inlined]
 [34] compute(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions)
    @ Dagger ~/.julia/packages/Dagger/xGAvM/src/compute.jl:23
 [35] compute
    @ ~/.julia/packages/Dagger/xGAvM/src/compute.jl:22 [inlined]
 [36] macro expansion
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/eager.jl:28 [inlined]
 [37] (::Dagger.Sch.var"#50#51"{Dagger.Context})()
    @ Dagger.Sch ./threadingconstructs.jl:410
ERROR: LoadError: On worker 2:
SchedulingException (Scheduler exited)
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:54 [inlined]
  [5] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:21
  [6] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:35
  [7] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:37 [inlined]
  [8] getproperty
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:291 [inlined]
  [9] JuliaParallel/DTables.jl#7
    @ ~/tmp/mwe.jl:15
 [10] #invokelatest#2
    @ ./essentials.jl:819
 [11] invokelatest
    @ ./essentials.jl:816
 [12] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [13] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [14] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [15] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(f::Function, w::Distributed.Worker, args::String; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(f::Function, w::Distributed.Worker, args::String)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(f::Function, id::Int64, args::String)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:13
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:13

(tmp) pkg> st
Status `~/tmp/Project.toml`
  [336ed68f] CSV v0.10.11
  [20c56dc6] DTables v0.4.1
  [a93c6f00] DataFrames v1.6.1
  [8bb1440f] DelimitedFiles v1.9.1
  [8ba89e20] Distributed

Note that if I change line 543 in src/sch/Sch.jl from thunk_failed = true to throw(res) I can see the actual error. (See JuliaParallel/DTables.jl#52 for the actual error.)

Let me know if I'm missing anything or if I'm doing something wrong.

@jpsamaroo
Copy link
Member

Ahh, looks like the ThunkFailedException that we throw when reporting a failed task captures a scheduler-internal Thunk object, which it definitely should not be doing. I'll figure out a workaround. Thanks for reporting this!

@bolognam
Copy link

Julian, do you think #435 is related to this issue?

@jpsamaroo
Copy link
Member

That appears to possibly be an unrelated Dagger bug - give me some time to investigate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants