Skip to content

Support retrying flows on crashed state #10211

@rbanick

Description

@rbanick

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar request and didn't find it.
  • I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

I’ve set up an Automation to re-run flows that crash. This nicely catches random crashes but goes haywire when an error inherent to the flow causes crashes — the flow will re-run and crash ad infinitum.

At the suggestion of @WillRaphaelson on the Prefect Slack Community I attempted to use the retries parameter in a flow as an alternate mechanism for retrying that respects limits.

@flow(
    name="my-flow”,
    retries=3
)
def my_flow(
# stuff

I then deployed the flow and deliberately crashed it* to test the retry functionality.

Unfortunately, the retries parameter did not cause retries on the flow when it crashed. If I turned on the relevant automation to trigger retries, the retries parameter did not limit the automation — I observed 5 retries before I canceled it.

Environment
Python 3.10.9
Prefect 2.20
Mac OS X 13.4.1

*My Prefect deployments are set up to install any packages specified with the EXTRA_PIP_PACKAGES environment variable prior to runtime. I induced an artificial crash of the fly by specifying {“env.EXTRA_PIP_PACKAGES” : “not a package”} in the relevant deployment’s Infrastructure Overrides.

Describe the proposed behavior

Ideally setting retries=n would cause a flow to retry not only on Failure, but on Crashed, a maximum of n times.

A bonus would be to have this retry functionality play nicely with Automation retries, just in case.

An alternate path would be to introduce a limits parameter on the Automation trigger. But handling this within a flow and not an automation is preferable, given that it keeps things closer to source + Prefect limits most users’ total Automations.

Example Use

The use of retries in the code below (copied from above) should work.

@flow(
    name="my-flow”,
    retries=3
)
def my_flow(
# stuff

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementAn improvement of an existing feature

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions