Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subclassing: Can expand the graph but not "remove" steps #2252

Open
dotKokott opened this issue Feb 9, 2025 · 0 comments
Open

Subclassing: Can expand the graph but not "remove" steps #2252

dotKokott opened this issue Feb 9, 2025 · 0 comments

Comments

@dotKokott
Copy link

dotKokott commented Feb 9, 2025

Very excited for Flow subclassing to arrive! I noticed a little issue:

The original example from the PR works: #2086
(After putting BaseFlow into its own file)

from metaflow import FlowSpec, step


class BaseFlow(FlowSpec):
    @step
    def start(self):
        print("this is the start")
        self.next(self.step1)

    @step
    def step1(self):
        print("base step 1")
        self.next(self.end)

    @step
    def end(self):
        print("base step end.")


class SubFlow(BaseFlow):
    @step
    def step1(self):
        print("sub step 1")
        self.next(self.step2)

    @step
    def step2(self):
        print("sub step 2")
        self.next(self.end)


if __name__ == "__main__":
    SubFlow()

Here the SubFlow is actually modifying the graph by expanding it (adding an extra step).

However I have a use case were I would like to rename a step:

from metaflow import FlowSpec, step


class BaseFlow(FlowSpec):
    @step
    def start(self):
        print("this is the start")
        self.next(self.process)

    @step
    def process(self):
        print("base process")
        self.next(self.end)

    @step
    def end(self):
        print("base step end.")


class ForEachFlow(BaseFlow):
    @step
    def start(self):
        print("this is the start")
        self.items = [1,2,3]
        self.next(self.process_chunk, foreach="items")

    @step
    def process_chunk(self):
        print("processing chunk")
        self.next(self.end)    


if __name__ == "__main__":
    SubFlow()

In which case the graph validator says that process is unreachable.

    Step process is unreachable from the start step. Add self.next(process) in another step or remove process.

So it seems like I can add steps to the graph but I cannot remove steps from the BaseFlow. Is this intentional?

I can work around this of course, by either keeping the name process for my fan step or by fanning out after process as to not skip a step that is defined in the base flow.

My use case here is to provide a set of BaseFlows for both training and data processing that hide a lot of complexity in regards to chunking, logging etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant