Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency limiting #557

Merged
merged 9 commits into from
Aug 15, 2023
Merged

Concurrency limiting #557

merged 9 commits into from
Aug 15, 2023

Conversation

cisaacstern
Copy link
Member

@cisaacstern cisaacstern commented Aug 5, 2023

WIP, will close #45 and #389 when complete.

Prioritizing this because it is blocking leap-stc/data-management#36, and also because it's been a long standing feature request to unblock many different recipes. Implementation using GroupByKey adapted from example provided by @alxmrs's in linked issue, with this being the key couple lines:

https://github.com/google/weather-tools/blob/bb2274d2be4ed51a9a6cc3bc79a8be110b250509/weather_mv/loader_pipeline/util.py#L380-L381

@cisaacstern cisaacstern marked this pull request as ready for review August 10, 2023 04:32
@cisaacstern cisaacstern requested review from alxmrs and rabernat August 10, 2023 04:32
@cisaacstern
Copy link
Member Author

cisaacstern commented Aug 10, 2023

This appears to work in its current form. The climsim recipes I deployed with OpenURLWithFSSpec(max_concurrency=20) via leap-stc/data-management#44 have now been running for over a day without erroring out:

Screen Shot 2023-08-09 at 9 33 22 PM

Indeed, following a quick spike to 20 at the start of the job, the number of workers has been locked at 10 for most of this time (with 2 vCPUs per worker, 10 workers means 20 vCPUs, so maybe this means two concurrency groups are running per worker?):

Screen Shot 2023-08-09 at 9 47 26 PM

At any rate, I've very encouraged by this (slow and steady) caching progress, as compared to previous deployment attempts of these recipes, wherein:

  • No concurrency limit + unrestricted workers: crashed in ~20 minutes with aiohttp connection errors
  • No concurrency limit + max_num_workers=50: crash in ~1 hr with aiohttp connection errors (this was just an experiment; even if it had worked, its not a viable alternative, because the max_num_worker applies to all pipeline stages, whereas this PR allows us to limit concurrency per-stage).

In the day plus of running, the climsim-highres-mli recipe has cached a little over 100k files (~10 TB, at ~100 MB/file):

import gcsfs
gcs = gcsfs.GCSFileSystem()
cache_mli = [f for f in gcs.ls("gs://leap-scratch/data-library/cache") if "mli" in f]
len(cache_mli)  # -> 100238

I'm working on an integration test which will hopefully be able to capture this behavior in a (much) smaller pipeline. In the meantime, this work is unit tested here, so I thought I'd open it up for review to see what others think.

I've made the concurrency-limiter here a standalone ptransform which can wrapped by other, more specific transforms, as demonstrated for OpenURLWithFSSpec. The approach is inspired by the above-linked code in weather-tools (thanks for that reference, @alxmrs!), but is obviously a bit simpler, insofar as it only does concurrency limiting (no rate limiting/latency between invocations). In exchange for being less general and simpler than the weather-tools example, I do like the readability we gain with this implementation, which IMHO is easier to grok than the (much more general and fully featured!) weather-tools implementation.

My feeling is that merging some version of what's here is the easiest way to move forward with blocked recipes, and that if we need something more general down the line, we can address that later. That being said, if others see it another way, I'm all ears. Thanks in advance for taking a look!

@cisaacstern
Copy link
Member Author

A few additional notes/thoughts:

  • I am curious if there is any approach better than trial-and-error for tuning the selection of a max concurrency value. I semi-arbitrarily chose max_concurrency=20 as a conservative value for the ClimSim deployments above, after observing the max_num_workers=50 experiment fail.

  • Because the ongoing ClimSim jobs (my first real-world test of this logic), are still in the caching phase, I have not yet definitively confirmed that subsequent stages will scale back out to unrestricted numbers of workers. There is no reason I can think of that this won't happen, but I suppose there's a small chance a reshuffle may be needed to trigger it? I will report back here with what I see in the ClimSim jobs, and also I'd like to see if it's possible to capture this scale-out as part of a forthcoming integration test.

  • I wonder if it's worthwhile to add any backoff/retry logic for each call to open_url. In practice, this might be easiest (and most general) to implement as backoff/retries of the result = func(...) call here (in OpenURLWithFSSpec, this func is open_url):

    for inner_item in arg:
    key, item = inner_item
    result = func(item, **kwargs)
    yield key, result

    My thinking here is that a given concurrency group could be very large (in the current ClimSim jobs, each concurrency group is roughly ~10k elements). If there is an intermittent networking issue causing a single call to open_url to error out, perhaps the most graceful way to recover is to backoff and retry it at the worker code level? My concern would be that if we leave it to the runner to retry (IIUC Dataflow, e.g., has builtin retry routines), this might result in the entire concurrency group being iterated over from the beginning? If so, that seems undesirable.

Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks great!

A few nits on type hints and function signature.

@@ -56,7 +57,7 @@


# TODO: replace with beam.MapTuple?
def _add_keys(func):
def _add_keys(func: Callable) -> Callable:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Callable of what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Touché, @alxmrs, that was a cunning nerd snipe. 😆

To find the answer, I just went down the (interesting, not-too-deep) rabbit hole of PEP612, and arrived at 5ebb0f7.

Do those hints look plausible/intelligible to you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these annotation are correct! To keep things moving, I'm going to merge, @alxmrs if you catch anything out-of-place, feel free to let me know and I'll fix it in a follow-on.

Thanks for encouraging more specificity here, I think this really helps with clarity. 😃

@cisaacstern cisaacstern merged commit 2739f22 into main Aug 15, 2023
@cisaacstern cisaacstern deleted the limit-concurrency branch August 15, 2023 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

limit concurrency for input downloads
3 participants