Skip to main content

Op retries

Assets vs ops

If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.

When an exception occurs during op execution, Dagster provides tools to retry that op within the same job run.

Relevant APIs

NameDescription
RetryRequestedAn exception that can be thrown from the body of an op to request a retry
RetryPolicyA declarative policy to attach which will have retries requested on exception
BackoffModification to delay between retries based on attempt number
JitterRandom modification to delay beween retries

Overview

In Dagster, code is executed within an op. Sometimes this code can fail for transient reasons, and the desired behavior is to retry and run the function again.

Dagster provides both declarative RetryPolicys as well as manual RetryRequested exceptions to enable this behavior.

Using op retries

Here we start off with an op that is causing us to have to retry the whole job anytime it fails.

@op
def problematic():
fails_sometimes()

RetryPolicy

To get this op to retry when an exception occurs, we can attach a RetryPolicy.

@op(retry_policy=RetryPolicy())
def better():
fails_sometimes()

This improves the situation, but we may need additional configuration to control how many times to retry and/or how long to wait between each retry.

@op(
retry_policy=RetryPolicy(
max_retries=3,
delay=0.2, # 200ms
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
)
)
def even_better():
fails_sometimes()

In addition to being able to set the policy directly on the op definition, it can also be set on specific invocations of an op, or a @dg.job to apply to all ops contained within.

default_policy = RetryPolicy(max_retries=1)
flakey_op_policy = RetryPolicy(max_retries=10)


@job(op_retry_policy=default_policy)
def default_and_override_job():
problematic.with_retry_policy(flakey_op_policy)()

RetryRequested

In certain more nuanced situations, we may need to evaluate code to determine if we want to retry or not. For this we can use a manual RetryRequested exception.

@op
def manual():
try:
fails_sometimes()
except Exception as e:
if should_retry(e):
raise RetryRequested(max_retries=1, seconds_to_wait=1) from e
else:
raise

Using raise from will ensure the original exceptions information is captured by Dagster.