Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw CapturedExceptions from asyncmap to avoid dropping the stacktrace #42105

Merged
merged 9 commits into from
Jan 8, 2022

Conversation

ericphanson
Copy link
Contributor

@ericphanson ericphanson commented Sep 3, 2021

Edit: see #42105 (comment)


Original OP:

Currently, we don't get stacktraces from errors that occur inside a function being asyncmap'd. We get the exceptions but not the stacktrace (#41030). This PR just adds the print statement from #36709 (comment).

A simple example:

julia> f(i) = (sleep(i); i == 5 && error("5"))
f (generic function with 1 method)

julia> asyncmap(f, 1:5) # master
ERROR: 5
Stacktrace:
 [1] (::Base.var"#881#883")(x::Task)
   @ Base ./asyncmap.jl:177
 [2] foreach(f::Base.var"#881#883", itr::Vector{Any})
   @ Base ./abstractarray.jl:2606
 [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:177
 [4] wrap_n_exec_twice
   @ ./asyncmap.jl:153 [inlined]
 [5] async_usemap(f::typeof(f), c::UnitRange{Int64}; ntasks::Int64, batch_size::Nothing)
   @ Base ./asyncmap.jl:103
 [6] #asyncmap#865
   @ ./asyncmap.jl:81 [inlined]
 [7] asyncmap(f::Function, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:81
 [8] top-level scope
   @ REPL[4]:1

julia> function Base.start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing) # monkeypatch
           t = @async begin
               retval = nothing

               try
                   if isa(batch_size, Number)
                       while isopen(chnl)
                           # The mapping function expects an array of input args, as it processes
                           # elements in a batch.
                           batch_collection=Any[]
                           n = 0
                           for exec_data in chnl
                               push!(batch_collection, exec_data)
                               n += 1
                               (n == batch_size) && break
                           end
                           if n > 0
                               exec_func(batch_collection)
                           end
                       end
                   else
                       for exec_data in chnl
                           exec_func(exec_data...)
                       end
                   end
               catch e
                   close(chnl)
                   Base.display_error(stderr, Base.catch_stack())
                   retval = e
               end
               retval
           end
           push!(worker_tasks, t)
       end

julia> asyncmap(f, 1:5) # post-patch
ERROR: 5
Stacktrace:
 [1] error(s::String)
   @ Base ./error.jl:33
 [2] f
   @ ./REPL[3]:1 [inlined]
 [3] (::Base.var"#871#876"{typeof(f)})(r::Base.RefValue{Any}, args::Tuple{Int64})
   @ Base ./asyncmap.jl:100
 [4] macro expansion
   @ ./REPL[5]:23 [inlined]
 [5] (::var"#1#2"{Base.var"#871#876"{typeof(f)}, Channel{Any}, Nothing})()
   @ Main ./task.jl:411
ERROR: 5
Stacktrace:
 [1] (::Base.var"#881#883")(x::Task)
   @ Base ./asyncmap.jl:177
 [2] foreach(f::Base.var"#881#883", itr::Vector{Any})
   @ Base ./abstractarray.jl:2606
 [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:177
 [4] wrap_n_exec_twice
   @ ./asyncmap.jl:153 [inlined]
 [5] async_usemap(f::typeof(f), c::UnitRange{Int64}; ntasks::Int64, batch_size::Nothing)
   @ Base ./asyncmap.jl:103
 [6] #asyncmap#865
   @ ./asyncmap.jl:81 [inlined]
 [7] asyncmap(f::Function, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:81
 [8] top-level scope
   @ REPL[6]:1

The stacktrace isn't so interesting in this example, but since pmap uses asyncmap under the hood, this comes up there too. For me, I was working on concurrency with AWS.jl and was missing a retry statement somewhere, so my job failed after several hours without a usable stacktrace-- very frustating. So I very much think we should print the stacktrace.

@ericphanson
Copy link
Contributor Author

Huh, I'm pretty confused. The test that is failing is https://github.com/JuliaLang/julia/blame/027071f0fe1a4313ddba5a0909b6f851dff96452/stdlib/Distributed/test/distributed_exec.jl#L594-L601 which looks very related. However: that test fails for me locally on 1.7-beta3, even though the code hasn't changed in 4 years!

❯ julia
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.7.0-beta3 (2021-07-07)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using Test, Distributed

julia> let error_thrown = false
                  try
                      pmap(x -> x == 50 ? error("foobar") : x, 1:100)
                  catch e
                      @test e.captured.ex.msg == "foobar"
                      error_thrown = true
                  end
                  @test error_thrown
              end
Error During Test at REPL[2]:5
  Test threw exception
  Expression: e.captured.ex.msg == "foobar"
  type ErrorException has no field captured
  Stacktrace:
   [1] getproperty(x::ErrorException, f::Symbol)
     @ Base ./Base.jl:42
   [2] macro expansion
     @ /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Test/src/Test.jl:445 [inlined]
   [3] top-level scope
     @ REPL[2]:5

  caused by: foobar
  Stacktrace:
    [1] (::Base.var"#881#883")(x::Task)
      @ Base ./asyncmap.jl:177
    [2] foreach(f::Base.var"#881#883", itr::Vector{Any})
      @ Base ./abstractarray.jl:2606
    [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
      @ Base ./asyncmap.jl:177
    [4] wrap_n_exec_twice
      @ ./asyncmap.jl:153 [inlined]
    [5] #async_usemap#866
      @ ./asyncmap.jl:103 [inlined]
    [6] #asyncmap#865
      @ ./asyncmap.jl:81 [inlined]
    [7] pmap(f::Function, p::WorkerPool, c::UnitRange{Int64}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
      @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:126
    [8] pmap(f::Function, p::WorkerPool, c::UnitRange{Int64})
      @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:101
    [9] pmap(f::Function, c::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
      @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:156
   [10] pmap(f::Function, c::UnitRange{Int64})
      @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:156
   [11] top-level scope
      @ REPL[2]:3
   [12] eval
      @ ./boot.jl:373 [inlined]
   [13] eval_user_input(ast::Any, backend::REPL.REPLBackend)
      @ REPL /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/REPL/src/REPL.jl:150
   [14] repl_backend_loop(backend::REPL.REPLBackend)
      @ REPL /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/REPL/src/REPL.jl:241
   [15] start_repl_backend(backend::REPL.REPLBackend, consumer::Any)
      @ REPL /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/REPL/src/REPL.jl:226
   [16] run_repl(repl::REPL.AbstractREPL, consumer::Any; backend_on_current_task::Bool)
      @ REPL /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/REPL/src/REPL.jl:359
   [17] run_repl(repl::REPL.AbstractREPL, consumer::Any)
      @ REPL /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/REPL/src/REPL.jl:346
   [18] (::Base.var"#919#921"{Bool, Bool, Bool})(REPL::Module)
      @ Base ./client.jl:394
   [19] #invokelatest#2
      @ ./essentials.jl:716 [inlined]
   [20] invokelatest
      @ ./essentials.jl:714 [inlined]
   [21] run_main_repl(interactive::Bool, quiet::Bool, banner::Bool, history_file::Bool, color_set::Bool)
      @ Base ./client.jl:379
   [22] exec_options(opts::Base.JLOptions)
      @ Base ./client.jl:309
   [23] _start()
      @ Base ./client.jl:495
ERROR: There was an error during testing

caused by: foobar
Stacktrace:
  [1] (::Base.var"#881#883")(x::Task)
    @ Base ./asyncmap.jl:177
  [2] foreach(f::Base.var"#881#883", itr::Vector{Any})
    @ Base ./abstractarray.jl:2606
  [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
    @ Base ./asyncmap.jl:177
  [4] wrap_n_exec_twice
    @ ./asyncmap.jl:153 [inlined]
  [5] #async_usemap#866
    @ ./asyncmap.jl:103 [inlined]
  [6] #asyncmap#865
    @ ./asyncmap.jl:81 [inlined]
  [7] pmap(f::Function, p::WorkerPool, c::UnitRange{Int64}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
    @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:126
  [8] pmap(f::Function, p::WorkerPool, c::UnitRange{Int64})
    @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:101
  [9] pmap(f::Function, c::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:156
 [10] pmap(f::Function, c::UnitRange{Int64})
    @ Distributed /Users/sabae/src/julia/usr/share/julia/stdlib/v1.7/Distributed/src/pmap.jl:156
 [11] top-level scope
    @ REPL[2]:3

@rofinn
Copy link
Contributor

rofinn commented Oct 26, 2021

Yeah, I also ran into this while debugging some asyncmap code. Thankfully @iamed2 magically knew where to look, so I wasn't stuck searching through all the packages in my environment to find the underlying issue.

@kshyatt kshyatt added error messages Better, more actionable error messages parallelism Parallel or distributed computation labels Nov 16, 2021
@ericphanson
Copy link
Contributor Author

ericphanson commented Dec 28, 2021

Just printing the error isn't ideal, since it prints at a different time than the actual exception is thrown, and it could trigger multiple times (if errors occur on multiple tasks). Now, I've updated it to just use a CapturedException. The same example looks like this:

julia> f(i) = (sleep(i); i == 5 && error("5"))
f (generic function with 1 method)

julia> asyncmap(f, 1:5) # latest version of this PR
ERROR: 5
Stacktrace:
 [1] error(s::String)
   @ Base ./error.jl:33
 [2] f
   @ ./REPL[3]:1 [inlined]
 [3] (::Base.var"#885#890"{typeof(f)})(r::Base.RefValue{Any}, args::Tuple{Int64})
   @ Base ./asyncmap.jl:100
 [4] macro expansion
   @ ./asyncmap.jl:234 [inlined]
 [5] (::Base.var"#901#902"{Base.var"#885#890"{typeof(f)}, Channel{Any}, Nothing})()
   @ Base ./task.jl:411
Stacktrace:
 [1] (::Base.var"#895#897")(x::Task)
   @ Base ./asyncmap.jl:177
 [2] foreach(f::Base.var"#895#897", itr::Vector{Any})
   @ Base ./abstractarray.jl:2694
 [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:177
 [4] wrap_n_exec_twice
   @ ./asyncmap.jl:153 [inlined]
 [5] #async_usemap#880
   @ ./asyncmap.jl:103 [inlined]
 [6] #asyncmap#879
   @ ./asyncmap.jl:81 [inlined]
 [7] asyncmap(f::Function, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:81
 [8] top-level scope
   @ REPL[4]:1

I think instead of throwing on the first error, we could collect them all and use a CompositeException, which might be even better.

It looks like some tests failed because we are throwing a different exception here than before. I'll update them when I get a chance.

@ericphanson
Copy link
Contributor Author

This fix (wrapping in a CapturedException) means that of course the exceptions generated by asyncmap will be different. This affects code built on asyncmap like pmap as well. This means there could be the risk of breaking code that is counting on unwrapping the exception in a particular way, or getting a RemoteException instead of a CapturedException (since those two tests fail).

However, @kolia has told me he has seen issues with stacktraces being eaten by pmap, so I think returning a CapturedException is good so that we don't drop stacktraces- i.e. this might be fixing that issue too.

@ericphanson ericphanson changed the title Print stacktraces from asyncmap'd tasks Throw CapturedExceptions from asyncmap to avoid dropping the stacktrace Dec 29, 2021
@ericphanson
Copy link
Contributor Author

all tests passed on c896ad2 (except trailing whitespace), and with a whitespace only change in 4a40e09, the trailing whitespace checked passed, but now one of the test runs has failed (buildbot/tester_linux64), and the rest are pending. I don't understand the failure at all (no tests ran?). So anyway I think this should count as green CI up to spurious failures 😄.

```julia
julia> f(i) = (sleep(i); i == 5 && error("5"))
f (generic function with 1 method)

julia> asyncmap(f, 1:5) # master
ERROR: 5
Stacktrace:
 [1] (::Base.var"JuliaLang#881#883")(x::Task)
   @ Base ./asyncmap.jl:177
 [2] foreach(f::Base.var"JuliaLang#881#883", itr::Vector{Any})
   @ Base ./abstractarray.jl:2606
 [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:177
 [4] wrap_n_exec_twice
   @ ./asyncmap.jl:153 [inlined]
 [5] async_usemap(f::typeof(f), c::UnitRange{Int64}; ntasks::Int64, batch_size::Nothing)
   @ Base ./asyncmap.jl:103
 [6] #asyncmap#865
   @ ./asyncmap.jl:81 [inlined]
 [7] asyncmap(f::Function, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:81
 [8] top-level scope
   @ REPL[4]:1

julia> function Base.start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing) # monkeypatch
           t = @async begin
               retval = nothing

               try
                   if isa(batch_size, Number)
                       while isopen(chnl)
                           # The mapping function expects an array of input args, as it processes
                           # elements in a batch.
                           batch_collection=Any[]
                           n = 0
                           for exec_data in chnl
                               push!(batch_collection, exec_data)
                               n += 1
                               (n == batch_size) && break
                           end
                           if n > 0
                               exec_func(batch_collection)
                           end
                       end
                   else
                       for exec_data in chnl
                           exec_func(exec_data...)
                       end
                   end
               catch e
                   close(chnl)
                   Base.display_error(stderr, Base.catch_stack())
                   retval = e
               end
               retval
           end
           push!(worker_tasks, t)
       end

julia> function Base.start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
           t = @async begin
               retval = nothing

               try
                   if isa(batch_size, Number)
                       while isopen(chnl)
                           # The mapping function expects an array of input args, as it processes
                           # elements in a batch.
julia> function Base.start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
           t = @async begin
               retval = nothing

               try
                   if isa(batch_size, Number)
                       while isopen(chnl)
                           # The mapping function expects an array of input args, as it processes
                           # elements in a batch.
julia> asyncmap(f, 1:5) # post-patch
ERROR: 5
Stacktrace:
 [1] error(s::String)
   @ Base ./error.jl:33
 [2] f
   @ ./REPL[3]:1 [inlined]
 [3] (::Base.var"JuliaLang#871#876"{typeof(f)})(r::Base.RefValue{Any}, args::Tuple{Int64})
   @ Base ./asyncmap.jl:100
 [4] macro expansion
   @ ./REPL[5]:23 [inlined]
 [5] (::var"JuliaLang#1#2"{Base.var"JuliaLang#871#876"{typeof(f)}, Channel{Any}, Nothing})()
   @ Main ./task.jl:411
ERROR: 5
Stacktrace:
 [1] (::Base.var"JuliaLang#881#883")(x::Task)
   @ Base ./asyncmap.jl:177
 [2] foreach(f::Base.var"JuliaLang#881#883", itr::Vector{Any})
   @ Base ./abstractarray.jl:2606
 [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:177
 [4] wrap_n_exec_twice
   @ ./asyncmap.jl:153 [inlined]
 [5] async_usemap(f::typeof(f), c::UnitRange{Int64}; ntasks::Int64, batch_size::Nothing)
   @ Base ./asyncmap.jl:103
 [6] #asyncmap#865
   @ ./asyncmap.jl:81 [inlined]
 [7] asyncmap(f::Function, c::UnitRange{Int64})
   @ Base ./asyncmap.jl:81
 [8] top-level scope
   @ REPL[6]:1
```
Comment on lines 237 to +239
catch e
close(chnl)
retval = e
retval = capture_exception(e, catch_backtrace())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we potentially make use of the exception stack here now?

Suggested change
catch e
close(chnl)
retval = e
retval = capture_exception(e, catch_backtrace())
catch
close(chnl)
retval = Base.ExceptionStack(Any[capture_exception(ex...) for ex in current_exceptions()])
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look. I think there should be a way to use ExceptionStacks here but it’s a bit subtle. I think we’d need more changes in Distributed to facilitate this, since code there is expecting RemoteExceptions to be thrown and to handle them specially. The current approach works around that by continuing to throw them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
error messages Better, more actionable error messages parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants