Skip to content

Commit cf52a34

Browse files
samoconnoramitmurthy
authored andcommitted
@catch, retry, partition, asyncmap and pmap (#15409)
* @catch, retry, partition, asyncmap and refactored pmap (#15409 and #14843)
1 parent 1b4618e commit cf52a34

22 files changed

+740
-335
lines changed

NEWS.md

+6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ Breaking changes
5858
is now divided among the fields `code`, `slotnames`, `slottypes`, `slotflags`,
5959
`gensymtypes`, `rettype`, `nargs`, and `isva` in the `LambdaInfo` type ([#15609]).
6060

61+
* `pmap` keyword arguments `err_retry=true` and `err_stop=false` are deprecated.
62+
`pmap` no longer retries or returns `Exception` objects in the result collection.
63+
`pmap(retry(f), c)` or `pmap(@catch(f), c)` can be used instead.
64+
([#15409](https://github.com/JuliaLang/julia/pull/15409#discussion_r57494701)).
65+
66+
6167
Library improvements
6268
--------------------
6369

base/asyncmap.jl

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# This file is a part of Julia. License is MIT: http://julialang.org/license
2+
3+
4+
"""
5+
AsyncCollector(f, results, c...; ntasks=100) -> iterator
6+
7+
Apply f to each element of c using at most 100 asynchronous tasks.
8+
For multiple collection arguments, apply f elementwise.
9+
Output is collected into "results".
10+
11+
Note: `next(::AsyncCollector, state) -> (nothing, state)`
12+
13+
Note: `for task in AsyncCollector(f, results, c...) end` is equivalent to
14+
`map!(f, results, c...)`.
15+
"""
16+
type AsyncCollector
17+
f
18+
results
19+
enumerator::Enumerate
20+
ntasks::Int
21+
end
22+
23+
function AsyncCollector(f, results, c...; ntasks=0)
24+
if ntasks == 0
25+
ntasks = 100
26+
end
27+
AsyncCollector(f, results, enumerate(zip(c...)), ntasks)
28+
end
29+
30+
31+
type AsyncCollectorState
32+
enum_state
33+
active_count::Int
34+
task_done::Condition
35+
done::Bool
36+
end
37+
38+
39+
# Busy if the maximum number of concurrent tasks is running.
40+
function isbusy(itr::AsyncCollector, state::AsyncCollectorState)
41+
state.active_count == itr.ntasks
42+
end
43+
44+
45+
# Wait for @async task to end.
46+
wait(state::AsyncCollectorState) = wait(state.task_done)
47+
48+
49+
# Open a @sync block and initialise iterator state.
50+
function start(itr::AsyncCollector)
51+
sync_begin()
52+
AsyncCollectorState(start(itr.enumerator), 0, Condition(), false)
53+
end
54+
55+
# Close @sync block when iterator is done.
56+
function done(itr::AsyncCollector, state::AsyncCollectorState)
57+
if !state.done && done(itr.enumerator, state.enum_state)
58+
state.done = true
59+
sync_end()
60+
end
61+
return state.done
62+
end
63+
64+
function next(itr::AsyncCollector, state::AsyncCollectorState)
65+
66+
# Wait if the maximum number of concurrent tasks are already running...
67+
while isbusy(itr, state)
68+
wait(state)
69+
end
70+
71+
# Get index and mapped function arguments from enumeration iterator...
72+
(i, args), state.enum_state = next(itr.enumerator, state.enum_state)
73+
74+
# Execute function call and save result asynchronously...
75+
@async begin
76+
itr.results[i] = itr.f(args...)
77+
state.active_count -= 1
78+
notify(state.task_done, nothing)
79+
end
80+
81+
# Count number of concurrent tasks...
82+
state.active_count += 1
83+
84+
return (nothing, state)
85+
end
86+
87+
88+
89+
"""
90+
AsyncGenerator(f, c...; ntasks=100) -> iterator
91+
92+
Apply f to each element of c using at most 100 asynchronous tasks.
93+
For multiple collection arguments, apply f elementwise.
94+
Results are returned by the iterator as they become available.
95+
Note: `collect(AsyncGenerator(f, c...; ntasks=1))` is equivalent to
96+
`map(f, c...)`.
97+
"""
98+
type AsyncGenerator
99+
collector::AsyncCollector
100+
end
101+
102+
function AsyncGenerator(f, c...; ntasks=0)
103+
AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks))
104+
end
105+
106+
107+
type AsyncGeneratorState
108+
i::Int
109+
async_state::AsyncCollectorState
110+
end
111+
112+
113+
start(itr::AsyncGenerator) = AsyncGeneratorState(0, start(itr.collector))
114+
115+
# Done when source async collector is done and all results have been consumed.
116+
function done(itr::AsyncGenerator, state::AsyncGeneratorState)
117+
done(itr.collector, state.async_state) && isempty(itr.collector.results)
118+
end
119+
120+
# Pump the source async collector if it is not already busy...
121+
function pump_source(itr::AsyncGenerator, state::AsyncGeneratorState)
122+
if !isbusy(itr.collector, state.async_state) &&
123+
!done(itr.collector, state.async_state)
124+
ignored, state.async_state = next(itr.collector, state.async_state)
125+
return true
126+
else
127+
return false
128+
end
129+
end
130+
131+
function next(itr::AsyncGenerator, state::AsyncGeneratorState)
132+
133+
state.i += 1
134+
135+
results = itr.collector.results
136+
while !haskey(results, state.i)
137+
138+
# Wait for results to become available...
139+
if !pump_source(itr,state) && !haskey(results, state.i)
140+
wait(state.async_state)
141+
end
142+
end
143+
r = results[state.i]
144+
delete!(results, state.i)
145+
146+
return (r, state)
147+
end
148+
149+
iteratorsize(::Type{AsyncGenerator}) = SizeUnknown()
150+
151+
152+
"""
153+
asyncgenerate(f, c...) -> iterator
154+
155+
Apply `@async f` to each element of `c`.
156+
157+
For multiple collection arguments, apply f elementwise.
158+
159+
Results are returned in order as they become available.
160+
"""
161+
asyncgenerate(f, c...) = AsyncGenerator(f, c...)
162+
163+
164+
"""
165+
asyncmap(f, c...) -> collection
166+
167+
Transform collection `c` by applying `@async f` to each element.
168+
169+
For multiple collection arguments, apply f elementwise.
170+
"""
171+
asyncmap(f, c...) = collect(asyncgenerate(f, c...))
172+
173+
174+
"""
175+
asyncmap!(f, c)
176+
177+
In-place version of `asyncmap()`.
178+
"""
179+
asyncmap!(f, c) = (for x in AsyncCollector(f, c, c) end; c)
180+
181+
182+
"""
183+
asyncmap!(f, results, c...)
184+
185+
Like `asyncmap()`, but stores output in `results` rather returning a collection.
186+
"""
187+
asyncmap!(f, r, c1, c...) = (for x in AsyncCollector(f, r, c1, c...) end; r)

base/deprecated.jl

+28
Original file line numberDiff line numberDiff line change
@@ -1003,5 +1003,33 @@ export call
10031003
# 1933
10041004
@deprecate_binding SingleAsyncWork AsyncCondition
10051005

1006+
10061007
# #12872
10071008
@deprecate istext istextmime
1009+
1010+
#15409
1011+
function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)
1012+
1013+
if err_retry != nothing
1014+
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap)
1015+
if err_retry == true
1016+
f = retry(f)
1017+
end
1018+
end
1019+
1020+
if err_stop != nothing
1021+
depwarn("err_stop is deprecated, use pmap(@catch(f), c...).", :pmap)
1022+
if err_stop == false
1023+
f = @catch(f)
1024+
end
1025+
end
1026+
1027+
if pids == nothing
1028+
p = default_worker_pool()
1029+
else
1030+
depwarn("pids is deprecated, use pmap(::WorkerPool, f, c...).", :pmap)
1031+
p = WorkerPool(pids)
1032+
end
1033+
1034+
return pmap(p, f, c...)
1035+
end

base/docs/helpdb/Base.jl

-15
Original file line numberDiff line numberDiff line change
@@ -241,21 +241,6 @@ If `types` is specified, returns an array of methods whose types match.
241241
"""
242242
methods
243243

244-
"""
245-
pmap(f, lsts...; err_retry=true, err_stop=false, pids=workers())
246-
247-
Transform collections `lsts` by applying `f` to each element in parallel. (Note that
248-
`f` must be made available to all worker processes; see [Code Availability and Loading Packages](:ref:`Code Availability and Loading Packages <man-parallel-computing-code-availability>`)
249-
for details.) If `nprocs() > 1`, the calling process will be dedicated to assigning tasks.
250-
All other available processes will be used as parallel workers, or on the processes
251-
specified by `pids`.
252-
253-
If `err_retry` is `true`, it retries a failed application of `f` on a different worker. If
254-
`err_stop` is `true`, it takes precedence over the value of `err_retry` and `pmap` stops
255-
execution on the first error.
256-
"""
257-
pmap
258-
259244
"""
260245
workers()
261246

base/error.jl

+60
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,63 @@ macro assert(ex, msgs...)
4949
end
5050
:($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg)))
5151
end
52+
53+
54+
"""
55+
retry(f, [condition]; n=3; max_delay=10) -> Function
56+
57+
Returns a lambda that retries function `f` up to `n` times in the
58+
event of an exception. If `condition` is a `Type` then retry only
59+
for exceptions of that type. If `condition` is a function
60+
`cond(::Exception) -> Bool` then retry only if it is true.
61+
62+
# Examples
63+
```julia
64+
retry(http_get, e -> e.status == "503")(url)
65+
retry(read, UVError)(io)
66+
```
67+
"""
68+
function retry(f::Function, condition::Function=e->true;
69+
n::Int=3, max_delay::Int=10)
70+
71+
(args...) -> begin
72+
delay = 0.05
73+
for i = 1:n
74+
try
75+
return f(args...)
76+
catch e
77+
if i == n || try condition(e) end != true
78+
rethrow(e)
79+
end
80+
end
81+
sleep(delay * (0.8 + (rand() * 0.4)))
82+
delay = min(max_delay, delay * 5)
83+
end
84+
end
85+
end
86+
87+
retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...)
88+
89+
90+
"""
91+
@catch(f) -> Function
92+
93+
Returns a lambda that executes `f` and returns either the result of `f` or
94+
an `Exception` thrown by `f`.
95+
96+
# Examples
97+
```julia
98+
julia> r = @catch(length)([1,2,3])
99+
3
100+
101+
julia> r = @catch(length)()
102+
MethodError(length,())
103+
104+
julia> typeof(r)
105+
MethodError
106+
```
107+
"""
108+
catchf(f) = (args...) -> try f(args...) catch ex; ex end
109+
macro catch(f)
110+
esc(:(Base.catchf($f)))
111+
end

base/exports.jl

+5
Original file line numberDiff line numberDiff line change
@@ -1045,9 +1045,11 @@ export
10451045
# errors
10461046
assert,
10471047
backtrace,
1048+
@catch,
10481049
catch_backtrace,
10491050
error,
10501051
rethrow,
1052+
retry,
10511053
systemerror,
10521054

10531055
# stack traces
@@ -1211,7 +1213,9 @@ export
12111213

12121214
# multiprocessing
12131215
addprocs,
1216+
asyncmap,
12141217
ClusterManager,
1218+
default_worker_pool,
12151219
fetch,
12161220
init_worker,
12171221
interrupt,
@@ -1233,6 +1237,7 @@ export
12331237
timedwait,
12341238
wait,
12351239
workers,
1240+
WorkerPool,
12361241

12371242
# multimedia I/O
12381243
Display,

base/generator.jl

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ function next(g::Generator, s)
2222
g.f(v), s2
2323
end
2424

25+
26+
"""
27+
generate(f, c...) -> iterator
28+
29+
Return an iterator applying `f` to each element of `c`.
30+
For multiple collection arguments, apply f elementwise.
31+
"""
32+
generate(f, c...) = Generator(f, c...)
33+
34+
2535
## iterator traits
2636

2737
abstract IteratorSize

0 commit comments

Comments
 (0)