From 52f9cb63ccc689efe7bddfaf92e61726f2cc1d0d Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 23 May 2016 12:07:07 +0530 Subject: [PATCH 1/4] update NEWS.md for pmap changes --- NEWS.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/NEWS.md b/NEWS.md index 5d6e48b217c7d..d7d00d3115876 100644 --- a/NEWS.md +++ b/NEWS.md @@ -69,9 +69,8 @@ Breaking changes in a `MethodError`. ([#6190]) * `pmap` keyword arguments `err_retry=true` and `err_stop=false` are deprecated. - `pmap` no longer retries or returns `Exception` objects in the result collection. - `pmap(retry(f), c)` or `pmap(@catch(f), c)` can be used instead. - ([#15409](https://github.com/JuliaLang/julia/pull/15409#discussion_r57494701)). + Action to be taken on errors can be specified via the `on_error` keyword argument. + Retry is specified via `retry_n`, `retry_on` and `retry_max_delay`. * `reshape` is now defined to always share data with the original array. If a reshaped copy is needed, use `copy(reshape(a))` or `copy!` to a new array of From 12d6b46c2ac602a278d87984ecdbb363771aaea9 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 23 May 2016 12:18:44 +0530 Subject: [PATCH 2/4] minor update to `retry` --- base/error.jl | 8 +++++--- doc/stdlib/base.rst | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/base/error.jl b/base/error.jl index 7e2578719dbbc..0bb913b609d74 100644 --- a/base/error.jl +++ b/base/error.jl @@ -50,12 +50,13 @@ macro assert(ex, msgs...) :($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg))) end +# NOTE: Please keep the constant values specified below in sync with the doc string const DEFAULT_RETRY_N = 1 const DEFAULT_RETRY_ON = e->true const DEFAULT_RETRY_MAX_DELAY = 10.0 """ - retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function + retry(f, [retry_on]; n=1, max_delay=10.0) -> Function Returns a lambda that retries function `f` up to `n` times in the event of an exception. If `retry_on` is a `Type` then retry only @@ -83,8 +84,9 @@ function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY rethrow(e) end end - sleep(delay) - delay = min(max_delay, delay * (0.8 + (rand() * 0.4)) * 5) + delay = min(max_delay, delay) + sleep(delay * (0.8 + (rand() * 0.2))) + delay = delay * 5 end end end diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index e4aeea4d14470..125b378847b04 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -1210,7 +1210,7 @@ Errors An error occurred when running a module's ``__init__`` function. The actual error thrown is available in the ``.error`` field. -.. function:: retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function +.. function:: retry(f, [retry_on]; n=1, max_delay=10.0) -> Function .. Docstring generated from Julia source From dd9d495d0f49d0986e668d5928ee93ec9ab8f224 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 23 May 2016 12:23:18 +0530 Subject: [PATCH 3/4] closes #16322 --- base/pmap.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/pmap.jl b/base/pmap.jl index 0504d1ddd473a..0270ee0293f93 100644 --- a/base/pmap.jl +++ b/base/pmap.jl @@ -65,7 +65,7 @@ end pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...) -pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c...; kwargs...) +pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c; kwargs...) pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...) function wrap_on_error(f, on_error; capture_data=false) From 730617a9deb883f179877ae9523285d89011af60 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 30 May 2016 17:01:18 +0530 Subject: [PATCH 4/4] move keyword args support to pmap from pgenerate. pgenerate auto selects batch size. --- base/pmap.jl | 131 +++++++++++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 62 deletions(-) diff --git a/base/pmap.jl b/base/pmap.jl index 0270ee0293f93..7c273e6813ceb 100644 --- a/base/pmap.jl +++ b/base/pmap.jl @@ -6,7 +6,7 @@ type BatchProcessingError <: Exception end """ - pgenerate([::WorkerPool], f, c...) -> (iterator, process_batch_errors) + pgenerate([::WorkerPool], f, c...) -> iterator Apply `f` to each element of `c` in parallel using available workers and tasks. @@ -19,10 +19,62 @@ Note that `f` must be made available to all worker processes; see and Loading Packages `) for details. """ -function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing, - retry_n=0, - retry_max_delay=DEFAULT_RETRY_MAX_DELAY, - retry_on=DEFAULT_RETRY_ON) +function pgenerate(p::WorkerPool, f, c) + if length(p) == 0 + return AsyncGenerator(f, c) + end + batches = batchsplit(c, min_batch_count = length(p) * 3) + return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches)) +end +pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...)) +pgenerate(f, c) = pgenerate(default_worker_pool(), f, c) +pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...)) + +""" + pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection + +Transform collection `c` by applying `f` to each element using available +workers and tasks. + +For multiple collection arguments, apply f elementwise. + +Note that `f` must be made available to all worker processes; see +[Code Availability and Loading Packages](:ref:`Code Availability +and Loading Packages `) +for details. + +If a worker pool is not specified, all available workers, i.e., the default worker pool +is used. + +By default, `pmap` distributes the computation over all specified workers. To use only the +local process and distribute over tasks, specify `distributed=false`. This is equivalent to `asyncmap`. + +`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes +greater than 1, the collection is split into multiple batches, which are distributed across +workers. Each such batch is processed in parallel via tasks in each worker. The specified +`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated +depending on the number of workers available and length of the collection. + +Any error stops pmap from processing the remainder of the collection. To override this behavior +you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., +the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value +which is then returned inline with the results to the caller. + +Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through +to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails, +all items in the batch are retried. + +The following are equivalent: + +* `pmap(f, c; distributed=false)` and `asyncmap(f,c)` +* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)` +* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` +""" +function pmap(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing, + retry_n=0, + retry_max_delay=DEFAULT_RETRY_MAX_DELAY, + retry_on=DEFAULT_RETRY_ON) + f_orig = f # Don't do remote calls if there are no workers. if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid()) distributed = false @@ -45,7 +97,7 @@ function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error if on_error != nothing f = wrap_on_error(f, on_error) end - return (AsyncGenerator(f, c), nothing) + return collect(AsyncGenerator(f, c)) else batches = batchsplit(c, min_batch_count = length(p) * 3, max_batch_size = batch_size) @@ -56,17 +108,21 @@ function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error # to ensure that we do not call mapped function on the same element more than retry_n. # This guarantee is not possible in case of worker death / network errors, wherein # we will retry the entire batch on a new worker. - f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) + if (on_error != nothing) || (retry_n > 0) + f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) + end f = wrap_batch(f, p, on_error) - return (flatten(AsyncGenerator(f, batches)), - (p, f, results)->process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay)) + results = collect(flatten(AsyncGenerator(f, batches))) + if (on_error != nothing) || (retry_n > 0) + process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay) + end + return results end end -pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...) - -pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c; kwargs...) -pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...) +pmap(p::WorkerPool, f, c1, c...; kwargs...) = pmap(p, a->f(a...), zip(c1, c...); kwargs...) +pmap(f, c; kwargs...) = pmap(default_worker_pool(), f, c; kwargs...) +pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...) function wrap_on_error(f, on_error; capture_data=false) return x -> begin @@ -101,55 +157,6 @@ end asyncmap_batch(f) = batch -> asyncmap(f, batch) -""" - pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection - -Transform collection `c` by applying `f` to each element using available -workers and tasks. - -For multiple collection arguments, apply f elementwise. - -Note that `f` must be made available to all worker processes; see -[Code Availability and Loading Packages](:ref:`Code Availability -and Loading Packages `) -for details. - -If a worker pool is not specified, all available workers, i.e., the default worker pool -is used. - -By default, `pmap` distributes the computation over all specified workers. To use only the -local process and distribute over tasks, specify `distributed=false`. This is equivalent to `asyncmap`. - -`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes -greater than 1, the collection is split into multiple batches, which are distributed across -workers. Each such batch is processed in parallel via tasks in each worker. The specified -`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated -depending on the number of workers available and length of the collection. - -Any error stops pmap from processing the remainder of the collection. To override this behavior -you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., -the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value -which is then returned inline with the results to the caller. - -Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through -to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails, -all items in the batch are retried. - -The following are equivalent: - -* `pmap(f, c; distributed=false)` and `asyncmap(f,c)` -* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)` -* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` -""" -function pmap(p::WorkerPool, f, c...; kwargs...) - results_iter, process_errors! = pgenerate(p, f, c...; kwargs...) - results = collect(results_iter) - if isa(process_errors!, Function) - process_errors!(p, f, results) - end - results -end - function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay) # Handle all the ones in error in another pmap, with batch size set to 1 if (on_error != nothing) || (retry_n > 0)