Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support argo retry #2278

Open
dhpikolo opened this issue Feb 19, 2025 · 0 comments · May be fixed by #2277
Open

feature: support argo retry #2278

dhpikolo opened this issue Feb 19, 2025 · 0 comments · May be fixed by #2277

Comments

@dhpikolo
Copy link

dhpikolo commented Feb 19, 2025

Seems like argo retry does not work as expected when a workflow is triggered using metaflow retry enabled.

Current Behaviour

Argo retry restarts failed tasks and resets the retry count to 0, overwriting previous artifacts. The downstream task uses the latest attempt as the input artifact, leading to failure due to missing artifacts. These artifacts were recorded in the overwritten attempt 0 but are absent in the latest attempt.

Desired Behaviour

Ideally, hitting argo retry button should create a new attempt rather than overwriting the old attempt.

Metaflow ChatRoom Discussion

Solution Proposal

Infer the retry_count from flow datastore [example: s3]. The retry_count will be the latest_done _attempt number + 1. This will not overwrite artifacts on retry aka, the restarted run would add artifacts as it was new attempt.

Steps to reproduce:

Run

python hello_world.py argo-workflows create
python hello_world.py argo-workflows trigger
import pandas as pd
from metaflow import (
    FlowSpec,
    Parameter,
    card,
    project,
    step,
    retry,
    environment
)

from metaflow import current

ENV_VARS = {
    "FAIL_FLOW": "YES",
}

@project(name="dummy_project")
class HelloWorld(FlowSpec):
    force_error = Parameter("force-error", type=bool, default=False)

    @card
    @step
    def start(self):
        self.next(self.print_df)

    @card
    @environment(vars=ENV_VARS)
    @step
    def print_df(self):
        import os

        if os.environ["FAIL_FLOW"] == "YES":
            raise Exception(f"Fail now, current retry count: {current.retry_count}")    
        else:
            self.new_df = pd.DataFrame(
            {
                "city": [1, 2, 3],
                "country": ["de", "de", "at"],
                "order_id": [5, 6, 7],
                "score": [0.5, 0.6, 0.7],
            }
            )
        # pass this step on retry
        print(f"count is = {current.retry_count}")
        self.next(self.end)
    
    @card  
    @step
    def end(self):
        print(f"the new df is: {self.new_df}")

if __name__ == "__main__":
    HelloWorld()
  • Argo workflow with metaflow retry enabled and fails in all retries. Due to error induced via envar

Image

  • Node print_df passes after a fix in the backend [or updated the workflow with right env-var value], and then hit argo retry --node button.

  • end step fails with a missing artifact, new_df is a artifiact from the parent step, which was created when the parent node was successful on retry.

    print(f"the new df is: {self.new_df}")
                            ^^^^^^^^^^^
  File "/app/metaflow/metaflow/flowspec.py", line 250, in __getattr__
    raise AttributeError("Flow %s has no attribute '%s'" % (self.name, name))
AttributeError: Flow HelloWorld has no attribute 'new_df'

Image

@dhpikolo dhpikolo linked a pull request Feb 19, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant