Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Labels In DetermineSchema #591

Merged
merged 2 commits into from
Sep 19, 2023
Merged

Added Labels In DetermineSchema #591

merged 2 commits into from
Sep 19, 2023

Conversation

DarshanSP19
Copy link

@DarshanSP19 DarshanSP19 commented Aug 31, 2023

  • While using more than two concat Dimension apache-beam is throwing below Error.
RuntimeError: A transform with label "DetermineSchema/_NestDim" already exists in the pipeline. To apply a transform with a specified label write pvalue | "label" >> transform

@cisaacstern
Copy link
Member

cisaacstern commented Aug 31, 2023

Thanks @DarshanSP19! Looking good.

It would be great to add a test before merging.

Could you share the code which caused the error you reported in #402? Or if you prefer not to share the full code, perhaps you could share a pseudo-code version of what you were running that prompted your bug report in #402.

Once I see the use case that this PR fixes, I can recommend how to add it to the tests. 🙏

@DarshanSP19
Copy link
Author

DarshanSP19 commented Sep 1, 2023

Hey @cisaacstern I have tried with below snippets.

concat_dim = ConcatDim("time", date_range)
merge_dim = MergeDim("chunk", chunks)
pressure_levels = MergeDim("pressure", pressure_level)

def make_path(time: datetime.datetime, chunk: str, pressure: str) -> str:
    if chunk in pressure_level_chunk_data:
        return f"gs://gcp-public-data-arco-era5/raw/date-variable-pressure_level/{time.year:04d}/{time.month:02d}/{time.day:02d}/{chunk}/{pressure}.nc"
    else:
        return f"gs://gcp-public-data-arco-era5/raw/date-variable-single_level/{time.year:04d}/{time.month:02d}/{time.day:02d}/{chunk}/surface.nc"

pattern = FilePattern(make_path, merge_dim, concat_dim, pressure_levels,  file_type='netcdf3')

with beam.Pipeline(argv=pipeline_args) as p:
    pressure_level_schema = (
        p 
        | "Create" >> beam.Create(pattern.items())
        | "OpenURLWithFSSpec" >> OpenURLWithFSSpec()
        | "OpenDatasets" >> OpenWithXarray(file_type=pattern.file_type, copy_to_local=True, load=True)
        | "DetermineSchema" >> DetermineSchema(combine_dims=pattern.combine_dim_keys)
        | "PrepareZarrTarget" >> PrepareZarrTarget(known_args.output)
    )

Also the data is publicly available here.

@cisaacstern
Copy link
Member

👋 @DarshanSP19 thanks for your patience on this! On further thought I think this is simple enough to defer testing to a separate PR. I've opened #624 to track that. Thanks for this fix!

@cisaacstern cisaacstern merged commit c769d27 into pangeo-forge:main Sep 19, 2023
@DarshanSP19 DarshanSP19 deleted the unlabeled-stages-issues branch September 19, 2023 16:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants