-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix CombineReference merge ordering #689
Conversation
Converted to a draft for now, pending a bit of discussion. I feel pretty confident that the path I'm on is superior in terms of readability and code simplicity. This is one of those cases where the change seems so right that I'm kind of doubting that I understand entirely why it was as it was before Also, need to spend some time on tests. Don't think so many should be failing given the small surface area of the changes... |
@moradology this great, I think you're totally on the right track here. I am to "blame" for precombine_inputs, and I think I did that in a hurry and perhaps without even full awareness at the time that flatmap was an available option, so not for a particular well-informed reason. |
3042e44
to
ac9f2a9
Compare
dd80b8b
to
386f008
Compare
@cisaacstern Awesome. I think we've verified that this does work. Now to produce some meaningful tests that demonstrate failure and the current path's success. I always find it hard not to second-guess when the elephant is so big and I can touch so little of it at any one time 😅 |
2675cc3
to
f09cb03
Compare
Does this feel ready @moradology? @abarciauskas-bgse wrote a nice test case in #692 that I think we should add to this PR |
4ab631f
to
83faae4
Compare
ef511e3
to
595889a
Compare
97a152a
to
658e6fa
Compare
) | ||
| "Group by buckets for ordering" >> beam.GroupByKey() | ||
| "Distributed reduce" >> beam.MapTuple(lambda k, refs: self.to_mzz(refs).translate()) | ||
| "Assign global key for collecting to executor" >> beam.Map(lambda ref: (None, ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we talked about: this will route all work to one worker for the final reduce but we need to think (across runners) how we'd identify and right-size that worker in terms of resources to handle this. So I'll need to find what Flink says here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 🥳 Looked over the code again (especially CombineReferences
) and have the following comments:
- I like how
CombineReferences.expand
got much more readable and clear on it's steps CombineReferences.bucket_by_position
is still opague to me even though I know what it's attempting to do
I'd request we write some (this PR #697 shows how we can do it as a class function fine)parametrize
tests to explain the inputs and outputs for bucket_by_position
clearly and use one of those in the docs string for so folks can understand it immediately. Though now that I mention it bucket_by_position
is hard to test by itself b/c it's a class function
Should be able to break it out for testing. May come in handy again? |
Ha, spoke too soon and took a quick stab at it myself to help elucidate my understanding @ #697 |
"""Assigns a bucket based on index position to help preserve data order.""" | ||
idx = indexed_references[0] | ||
ref = indexed_references[1] | ||
global_min, global_max, global_count = global_position_min_max_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the assumption of bucket_by_position
that we are always dealing with contiguous data? If true then wouldn't count
always be the same as the max
position?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just over here trying to see if we can simplify the last couple operations and leave out range_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If` we assume that the index starts at 0, count would be max+1, but that quibble aside you're right (if the assumption holds true). Might be sensible to just eat the very slight count cost though to keep it more general and perhaps to sanity check results (e.g by comparing max-min to count)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, right max+1
, maybe we just assert
the assumption somewhere either here or in MinMaxCountCombiner
?
| "Distributed reduce" >> beam.MapTuple(lambda k, refs: self.to_mzz(refs).translate()) | ||
| "Assign global key for collecting to executor" >> beam.Map(lambda ref: (None, ref)) | ||
| "Group globally" >> beam.GroupByKey() | ||
| "Global reduce" >> beam.MapTuple(lambda k, refs: self.global_combine_refs(refs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just for my own understanding, what I think is happening is this:
- In the case of grib, merge multiple references so we start from a flat list of references
- The next map operation buckets the references while preserving their order, such that each bucket will have a set of ordered references, ordered by the last
concat_dim
dimension. - The first GroupByKey ensures the order of the buckets is preserved.
- Create a multizarrtozarr ref for each bucket (
to_mzz
) - This part I'm less clear about, from what I can tell we're assigning a new key to each group - but is each group a group of single references or merged references?
- Merge everything
and write to the target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global reduce here is kind of weird, I agree. I would love a beam.Collect
or something. So it goes. What we have to do, so far as I can tell, is to assign the same key to all records (None
here) and then beam.GroupByKey
if we want to ensure that they are realized together on one executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge everything and write to the target.
Just want to point out that nothing is being written in global_combine_refs
just setting up an fsspec
target for the next transform that does the writing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assign the same key to all records (None here) and then beam.GroupByKey if we want to ensure that they are realized together on one executor
Thanks that is super helpful explanation. In that case how are we assured that the final combine is done in order on the last single executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Experiments I've run locally w/ multiprocessing
appear to demonstrate that the list fed to MultiZarrToZarr
can have any ordering at all so long as there exists some ordering of the elements that would be contiguous. It is when you have separate mzz
calls that you run into the risk of one of the sub-lists not having contiguity among its would-be merged references
Messing with ordering here should illustrate the behavior: https://gist.github.com/moradology/0e207a7ea4afd7251b0df604ac10958e#file-wtf-py-L69-L71
global_position_min_max_count=beam.pvalue.AsSingleton(min_max_count_positions), | ||
) | ||
| "Group by buckets for ordering" >> beam.GroupByKey() | ||
| "Distributed reduce" >> beam.MapTuple(lambda k, refs: self.to_mzz(refs).translate()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still curious if we know that it's faster to do this intermediate to_mzz
call - if it is then I'm fine with leaving this part in but it seems to add complexity since we have to re-group by key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have specific analysis in the beam context at hand but it appears to parallelize pretty well from what I've seen: fsspec/kerchunk#200 (comment)
At this point the shuffle isn't so bad; mostly just passing around some json so I would expect the groupby to be pretty snappy. We could and perhaps should consider moving the groupbykey logic up to the point of Index/URL PCollection
creation, where the shuffle would be extremely minimal but that may require a bit of thought in terms of what kind of flexibility we want there. At some point, I'll wager we will have to come up with a few alternative data localization strategies - perhaps even to get performant pyramiding?
A surprisingly relevant video that I love (speaking of indexing data to appropriately localize it for distributed work): https://www.youtube.com/watch?v=2e8QJBkCwvo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That video is 💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And damned relevant for building indexes of arbitrary precision (e.g. if you're trying to query data for processing 'zoom levels' on a pyramid)! Glad you liked it. It is one of my favorites
https://opensource.googleblog.com/2008/08/uzaygezen-multi-dimensional-indexing.html
https://link.springer.com/referenceworkentry/10.1007/978-0-387-39940-9_350
https://tildesites.bowdoin.edu/~ltoma/teaching/cs3225-GIS/fall17/Lectures/gis-spaceFC.pdf
https://www.grin.com/document/322063?lang=en
https://github.com/locationtech/sfcurve
And some prior art in roughly our space:
https://github.com/locationtech/geotrellis/blob/af71a4ae027bfeaa9a930bc086ca98186e701953/store/src/main/scala/geotrellis/store/index/hilbert/HilbertSpatialKeyIndex.scala#L49
…lify_zarr_merge Tests to illustrate `CombineReferences.bucket_by_position`
precombine_inputs is kind of awkward and it is really only used (so far as I can tell) to deal with the fact that GRIB2 reads will produce a list of ZARR refs. This could be simpler.
We could just flatmap over the list of lists, producing a list of references that can be accumulated, shipped to the driver, and then combined via MultiZarrToZarr all at once.FlatMap is gone, as it is convenient to keep grib references grouped under their key until
CombineReferences
is called. Instead, MapTuple is used and we hang onto keys. Keys are used to construct batches that keep neighboring data on the same worker and merging is handled much more consciously than is possible in abeam.CombineGlobally
This also happens to be much closer to the strategy described as 'tree reduction' in the kerchunk docs and implemented in their auto_dask method so maybe it will resolve some very weird bugs that currently appear to be associated with the (bad) assumption that MultiZarrToZarr merger has the properties of associativity and idempotence. Maybe 🤞?We now have testing (courtesy of @abarciauskas-bgse) which verifies correctness.
This also supersedes #688 as it resolves the item pluralization issue @abarciauskas-bgse identified
Incorporates #691