Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making listing lazy in DatasetQuery #976

Merged
merged 9 commits into from
Mar 26, 2025
Merged

Making listing lazy in DatasetQuery #976

merged 9 commits into from
Mar 26, 2025

Conversation

ilongin
Copy link
Contributor

@ilongin ilongin commented Mar 17, 2025

Before we had listing process happening in .from_storage() method itself which meant it wasn't lazy.
Idea was to move it to DatasetQuery.apply_steps() instead.

Fixes: #317

Copy link

codecov bot commented Mar 17, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 88.13%. Comparing base (71d87f2) to head (de8dcbf).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #976      +/-   ##
==========================================
+ Coverage   88.09%   88.13%   +0.03%     
==========================================
  Files         145      145              
  Lines       12279    12294      +15     
  Branches     1699     1703       +4     
==========================================
+ Hits        10817    10835      +18     
+ Misses       1045     1043       -2     
+ Partials      417      416       -1     
Flag Coverage Δ
datachain 88.05% <100.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@skshetry
Copy link
Member

@ilongin, do we have some caching here? I believe the chain would rerun when executed?
Eg:

dc = DataChain.from_storage(...)
dc.exec() # runs once
dc.exec() # will it rerun the generator function again?

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall, thank you, this is a great improvement!

Couple comments below and I also haven't seen tests for the case described in the PR (making listing lazy). If I missed one, sorry, if not, should we consider to add one?

# not setting query step yet as listing dataset might not exist at
# this point
self.list_ds_name = name
elif fallback_to_studio and is_token_set():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but is_token_set here looks odd and raises questions.

We may want to import it as:

from datachain.remote.studio import is_token_set as is_studio_token_set

above, for example, just for the better readability of the code here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agreed cc @amritghimire ... it is still not a good idea to have Studio exposed this way

ideally it should be just get_dataset, inside it it should be deciding on fallback

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's push really really hard to keep studio contained, it is important ... in the same way as for example using DC itself for the implementations (e.g. I wonder if from_storage can be done via map or gen and thus in a lazy way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listing is already done with gen but we cannot just append the rest of the chain to that part as we want to cache listing at some point, i.e call save() on it and if we call it the middle of the chain it's not lazy any more. It needs to happen in the save() of the dataset when we apply other steps.
So we could do

def from_ storage():
    return (
        cls.from_records(DEFAULT_FILE_RECORDS)
           .gen(list_bucket(,,,))
           .save(list_ds_name, listing=True) 
    )

ds = DataChain.from_storage("s3://ldb-public").filter(...).map(...).save("my_dataset")

This is similar as it was before this PR but it's not lazy and to make it lazy we need to add some step in DatasetQuery as there we start to apply steps.
Ideal solution would be to move all those steps and apply_step function from DatasetQuery to DataChain as there is no point for main logic to be there IMO and maybe even remove DatasetQuery alltogether but that's whole another topic.

@@ -1097,26 +1091,43 @@ def __init__(
self.temp_table_names: list[str] = []
self.dependencies: set[DatasetDependencyType] = set()
self.table = self.get_table()
self.starting_step: StartingStep
self.query_step: Optional[QueryStep] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wonder if we can think of better name for query_step attribute. I have a lot of questions lower in the code and keep confusing about this name. Should we, may be, keep starting_step name as it is more verbose? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was also thinking between starting_step and query_step .. anyway, returned to starting_step for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I really think it will be more verbose 🙏

Comment on lines 1206 to 1209
if self.list_fn:
self.list_fn()

if self.list_ds_name:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why do we need to pass list_ds_name? Can list_fn returns the listed dataset name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_fn is optional so if it's not defined, we still need list_ds_name to create that starting step which we couldn't create in constructor as there listing maybe hasn't happened yet so listing dataset could not exist. We also cannot use self.name as that one is present only for "attached" chains which are those that are pointing to whole dataset and on any modification method call (e.g .filter()) it is removed as chain becomes "unattached"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thank you so much for explanation 🙏

Copy link

cloudflare-workers-and-pages bot commented Mar 19, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: de8dcbf
Status: ✅  Deploy successful!
Preview URL: https://078b6742.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-317-lazy-listing.datachain-documentation.pages.dev

View logs

@ilongin
Copy link
Contributor Author

ilongin commented Mar 19, 2025

@ilongin, do we have some caching here? I believe the chain would rerun when executed? Eg:

dc = DataChain.from_storage(...)
dc.exec() # runs once
dc.exec() # will it rerun the generator function again?

Yes, chain will rerun in this example, together with listing if it is needed (listing doesn't exist yet or update flag is provided) so I would say no caching.

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍❤️

@ilongin ilongin requested a review from shcheklein March 20, 2025 11:10
@@ -1180,11 +1191,24 @@ def c(self, column: Union[C, str]) -> "ColumnClause[Any]":
col.table = self.table
return col

def set_listing_pre_step(self, list_fn: Callable) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: does it have to have a "listing" in its name? or is it a general mechanism that can be applied even in the future for other steps?

essentially, we can make it less listing specific here and do before_apply_callback or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be. I thought of making this generic but didn't see any current need for it atm so decided to make it explicit. Also, this is all in internal DatasetQuery. If we would need something like before_apply_callback we would need to add public method in DataChain as well. This is all easy to add if needed in future.

Copy link
Member

@shcheklein shcheklein Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of making this generic but didn't see any current need for it atm so decided to make it explicit.

it might confuse people in the future (it looks specific, and I would be looking why it is specific and if I can reuse it for something else wasting some time).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or it might even lead for someone duplicating it

@@ -153,7 +153,7 @@ def _list_dataset_name(uri: str) -> str:
return name

dogs_uri = f"{src_uri}/dogs"
DataChain.from_storage(dogs_uri, session=session)
DataChain.from_storage(dogs_uri, session=session).exec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really ever test lazyness though? not sure it's that important, but just a small note ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we don't explicitly. I think current tests are good enough though.

@ilongin ilongin requested a review from shcheklein March 24, 2025 08:23
@ilongin ilongin merged commit eed7148 into main Mar 26, 2025
35 checks passed
@ilongin ilongin deleted the ilongin/317-lazy-listing branch March 26, 2025 16:10
def apply_steps(self) -> QueryGenerator:
"""
Apply the steps in the query and return the resulting
sqlalchemy.SelectBase.
"""
for fn in self.before_steps:
fn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this I saw a caveat, that fn seems to be called every time a step is performed since we don't clear the before steps at any time. So, whenever I try to use the collect or chain, I am getting the query to refetch the table instead.

Copy link
Contributor Author

@ilongin ilongin Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected and it's how it was before when listing was lazy (before we refactored it using DataChain higher level functions). Listing was always done when someone would apply steps if update flag is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it seems to run every time I run chain.collect() or chain.count() . As you can see in the test test_from_storage_multiple_uris_cache in #994 , it is called every time for chains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chain.collect() applies steps every time it's called

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my question was should we rerun listing every time collect is called when update is passed? Or once should suffice?

)
.settings(prefetch=0)
.gen(
list_bucket(list_uri, cache, client_config=client_config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be called everytime I use the datachain to apply steps. Should'nt this be applied only once?

Copy link
Contributor Author

@ilongin ilongin Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be called every time you apply steps. The whole idea is for user to apply steps only once anyway as it's very expensive operation.

@@ -95,24 +94,28 @@ def from_storage(
dc.signals_schema = dc.signals_schema.mutate({f"{object_name}": file_type})
return dc

dc = from_dataset(list_ds_name, session=session, settings=settings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling from_dataset when list_ds_exists is false also doesn't seem right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lower level code (DatasetQuery) is aware of listing being lazy so this is ok. We will start chain with listing dataset and the fact it doesn't exists yet is just the nature of it's "laziness"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we could get dataset not found error when the ist_ds_name doesn't exist

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make .get_storage() caching listing lazy
5 participants