Op jobs
Looking to materialize asset definitions instead of ops? Check out the asset jobs documentation.
Jobs are the main unit of execution and monitoring in Dagster. An op job executes a graph of ops.
Op jobs can be launched in a few different ways:
- Manually from the Dagster UI
- At fixed intervals, by schedules
- When external changes occur, using sensors
Relevant APIs
Name | Description |
---|---|
@dg.job | The decorator used to define a job. |
JobDefinition | A job definition. Jobs are the main unit of execution and monitoring in Dagster. Typically constructed using the @dg.job decorator. |
Creating op jobs
Op jobs can be created:
Using the @job decorator
The simplest way to create an op job is to use the @dg.job
decorator.
Within the decorated function body, you can use function calls to indicate the dependency structure between the ops/graphs. This allows you to explicitly define dependencies between ops when you define the job.
In this example, the add_one
op depends on the return_five
op's output. Because this data dependency exists, the add_one
op executes after return_five
runs successfully and emits the required output:
from dagster import job, op
@op
def return_five():
return 5
@op
def add_one(arg):
return arg + 1
@job
def do_stuff():
add_one(return_five())
When defining an op job, you can provide any of the following:
- Resources
- Configuration
- Hooks
- Tags and other metadata
- An executor
From a graph
Creating op jobs from a graph can be useful when you want to define inter-op dependencies before binding them to resources, configuration, executors, and other environment-specific features. This approach to op job creation allows you to customize graphs for each environment by plugging in configuration and services specific to that environment.
You can model this by building multiple op jobs that use the same underlying graph of ops. The graph represents the logical core of data transformation, and the configuration and resources on each op job customize the behavior of that job for its environment.
To do this, define a graph using the @dg.graph
decorator:
from dagster import graph, op, ConfigurableResource
class Server(ConfigurableResource):
def ping_server(self): ...
@op
def interact_with_server(server: Server):
server.ping_server()
@graph
def do_stuff():
interact_with_server()
Then build op jobs from it using the GraphDefinition
method:
from dagster import ResourceDefinition
prod_server = ResourceDefinition.mock_resource()
local_server = ResourceDefinition.mock_resource()
prod_job = do_stuff.to_job(resource_defs={"server": prod_server}, name="do_stuff_prod")
local_job = do_stuff.to_job(
resource_defs={"server": local_server}, name="do_stuff_local"
)
to_job
accepts the same arguments as the @dg.job
decorator, such as providing resources, configuration, etc.
Configuring op jobs
Ops and resources can accept configuration that determines how they behave. By default, configuration is supplied at the time an op job is launched.
When constructing an op job, you can customize how that configuration will be satisfied by passing a value to the config
parameter of the GraphDefinition.to_job
method or the @dg.job
decorator.
The options are discussed below:
Hardcoded configuration
You can supply a RunConfig
object or raw config dictionary. The supplied config will be used to configure the op job whenever it's launched. It will show up in the Dagster UI Launchpad and can be overridden.
from dagster import Config, OpExecutionContext, RunConfig, job, op
class DoSomethingConfig(Config):
config_param: str
@op
def do_something(context: OpExecutionContext, config: DoSomethingConfig):
context.log.info("config_param: " + config.config_param)
default_config = RunConfig(
ops={"do_something": DoSomethingConfig(config_param="stuff")}
)
@job(config=default_config)
def do_it_all_with_default_config():
do_something()
Partitioned configuration
Supplying a PartitionedConfig
will create a partitioned op job. This defines a discrete set of partitions along with a function for generating config for a partition. Op job runs can be configured by selecting a partition.
Refer to the Partitioning ops documentation for more info and examples.
Config mapping
Supplying a ConfigMapping
allows you to expose a narrower config interface to the op job.
Instead of needing to configure every op and resource individually when launching the op job, you can supply a smaller number of values to the outer config. The ConfigMapping
will then translate it into config for all the job's ops and resources.
from dagster import Config, OpExecutionContext, RunConfig, config_mapping, job, op
class DoSomethingConfig(Config):
config_param: str
@op
def do_something(context: OpExecutionContext, config: DoSomethingConfig) -> None:
context.log.info("config_param: " + config.config_param)
class SimplifiedConfig(Config):
simplified_param: str
@config_mapping
def simplified_config(val: SimplifiedConfig) -> RunConfig:
return RunConfig(
ops={"do_something": DoSomethingConfig(config_param=val.simplified_param)}
)
@job(config=simplified_config)
def do_it_all_with_simplified_config():
do_something()
Making op jobs available to Dagster tools
You make jobs available to the UI, GraphQL, and command line by including them in a Definitions
object at the top level of Python module or file. The tool loads that module as a code location. If you include schedules or sensors, the code location will automatically include jobs that those schedules or sensors target.
from dagster import Definitions, job
@job
def do_it_all(): ...
defs = Definitions(jobs=[do_it_all])
Testing op jobs
Dagster has built-in support for testing, including separating business logic from environments and setting explicit expectations on uncontrollable inputs. Refer to the testing documentation for more info and examples.
Executing op jobs
You can run an op job in a variety of ways:
- In the Python process where it's defined
- Via the command line
- Via the GraphQL API
- In the UI. The UI centers on jobs, making it a one-stop shop - you can manually kick off runs for an op job and view all historical runs.
Refer to the Job execution guide for more info and examples.