Airlift (dagster-airlift)
Core (dagster_airlift.core)
AirflowInstance
- class dagster_airlift.core.AirflowInstance
A class that represents a running Airflow Instance and provides methods for interacting with its REST API.
Parameters:
- auth_backend (AirflowAuthBackend) – The authentication backend to use when making requests to the Airflow instance.
- name (str) – The name of the Airflow instance. This will be prefixed to any assets automatically created using this instance.
- batch_task_instance_limit (int) – The number of task instances to query at a time when fetching task instances. Defaults to 100.
- batch_dag_runs_limit (int) – The number of dag runs to query at a time when fetching dag runs. Defaults to 100.
- get_run_state
Given a run ID of an airflow dag, return the state of that run.
Parameters:
- dag_id (str) – The dag id.
- run_id (str) – The run id.
Returns: The state of the run. Will be one of the states defined by Airflow.Return type: str
- trigger_dag
Trigger a dag run for the given dag_id.
Does not wait for the run to finish. To wait for the completed run to finish, use
wait_for_run_completion()
.Parameters:
- dag_id (str) – The dag id to trigger.
- logical_date (Optional[datetime.datetime]) – The Airflow logical_date to use for the dag run. If not provided, the current time will be used. Previously known as execution_date in Airflow; find more information in the Airflow docs: https://airflow.apache.org/docs/apache-airflow/stable/faq.html#what-does-execution-date-mean
Returns: The dag run id.Return type: str
- wait_for_run_completion
Given a run ID of an airflow dag, wait for that run to reach a completed state.
Parameters:
- dag_id (str) – The dag id.
- run_id (str) – The run id.
- timeout (int) – The number of seconds to wait before timing out.
Returns: None
- class dagster_airlift.core.AirflowAuthBackend
An abstract class that represents an authentication backend for an Airflow instance.
Requires two methods to be implemented by subclasses:
- get_session: Returns a requests.Session object that can be used to make requests to the Airflow instance, and handles authentication.
- get_webserver_url: Returns the base URL of the Airflow webserver.
The dagster-airlift package provides the following default implementations:
dagster-airlift.core.AirflowBasicAuthBackend
: An authentication backend that uses Airflow’s basic auth to authenticate with the Airflow instance.dagster-airlift.mwaa.MwaaSessionAuthBackend
: An authentication backend that uses AWS MWAA’s web login token to authenticate with the Airflow instance (requires dagster-airlift[mwaa]).
- class dagster_airlift.core.AirflowBasicAuthBackend
A
dagster_airlift.core.AirflowAuthBackend
that authenticates using basic auth.Parameters:
- webserver_url (str) – The URL of the webserver.
- username (str) – The username to authenticate with.
- password (str) – The password to authenticate with.
Examples:
Creating a
AirflowInstance
using this backend.from dagster_airlift.core import AirflowInstance, AirflowBasicAuthBackend
af_instance = AirflowInstance(
name="my-instance",
auth_backend=AirflowBasicAuthBackend(
webserver_url="https://my-webserver-hostname",
username="my-username",
password="my-password"
)
)
Assets & Definitions
- dagster_airlift.core.build_defs_from_airflow_instance
Builds a
dagster.Definitions
object from an Airflow instance.For every DAG in the Airflow instance, this function will create a Dagster asset for the DAG with an asset key instance_name/dag/dag_id. It will also create a sensor that polls the Airflow instance for DAG runs and emits Dagster events for each successful run.
An optional defs argument can be provided, where the user can pass in a
dagster.Definitions
object containing assets which are mapped to Airflow DAGs and tasks. These assets will be enriched with metadata from the Airflow instance, and placed upstream of the automatically generated DAG assets.An optional event_transformer_fn can be provided, which allows the user to modify the Dagster events produced by the sensor. The function takes the Dagster events produced by the sensor and returns a sequence of Dagster events.
An optional dag_selector_fn can be provided, which allows the user to filter which DAGs assets are created for. The function takes a
dagster_airlift.core.serialization.serialized_data.DagInfo
object and returns a boolean indicating whether the DAG should be included.Parameters:
- airflow_instance (AirflowInstance) – The Airflow instance to build assets and the sensor from.
- defs – Optional[Definitions]: A
dagster.Definitions
object containing assets that are - sensor_minimum_interval_seconds (int) – The minimum interval in seconds between sensor runs.
- event_transformer_fn (DagsterEventTransformerFn) – A function that allows for modifying the Dagster events
- dag_selector_fn (Optional[DagSelectorFn]) – A function that allows for filtering which DAGs assets are created for.
Returns: A
dagster.Definitions
object containing the assets and sensor.Return type: Definitions Examples:Building a
dagster.Definitions
object from an Airflow instance.from dagster_airlift.core import (
AirflowInstance,
AirflowBasicAuthBackend,
build_defs_from_airflow_instance,
)
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
)
defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)Providing task-mapped assets to the function.
from dagster import Definitions
from dagster_airlift.core import (
AirflowInstance,
AirflowBasicAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
defs=Definitions(
assets=assets_with_task_mappings(
dag_id="rebuild_iris_models",
task_mappings={
"my_task": [AssetSpec("my_first_asset"), AssetSpec("my_second_asset")],
},
),
),
)Providing a custom event transformer function.
from typing import Sequence
from dagster import Definitions, SensorEvaluationContext
from dagster_airlift.core import (
AirflowInstance,
AirflowBasicAuthBackend,
AssetEvent,
assets_with_task_mappings,
build_defs_from_airflow_instance,
AirflowDefinitionsData,
)
...
def add_tags_to_events(
context: SensorEvaluationContext,
defs_data: AirflowDefinitionsData,
events: Sequence[AssetEvent]
) -> Sequence[AssetEvent]:
altered_events = []
for event in events:
altered_events.append(event._replace(tags={"my_tag": "my_value"}))
return altered_events
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
event_transformer_fn=add_tags_to_events,
)Filtering which DAGs assets are created for.
from dagster import Definitions
from dagster_airlift.core import (
AirflowInstance,
AirflowBasicAuthBackend,
AssetEvent,
assets_with_task_mappings,
build_defs_from_airflow_instance,
DagInfo,
)
...
def only_include_dag(dag_info: DagInfo) -> bool:
return dag_info.dag_id == "my_dag_id"
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
dag_selector_fn=only_include_dag,
)
Mapping Dagster assets to Airflow tasks/dags:
- dagster_airlift.core.assets_with_task_mappings
Modify assets to be associated with a particular task in Airlift tooling.
Used in concert with build_defs_from_airflow_instance to observe an airflow instance to monitor the tasks that are associated with the assets and keep their materialization histories up to date.
Concretely this adds metadata to all asset specs in the provided definitions with the provided dag_id and task_id. The dag_id comes from the dag_id argument; the task_id comes from the key of the provided task_mappings dictionary. There is a single metadata key “airlift/task-mapping” that is used to store this information. It is a list of dictionaries with keys “dag_id” and “task_id”.
Example:
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
- dagster_airlift.core.assets_with_dag_mappings
Modify assets to be associated with a particular dag in Airlift tooling.
Used in concert with build_defs_from_airflow_instance to observe an airflow instance to monitor the dags that are associated with the assets and keep their materialization histories up to date.
In contrast with assets_with_task_mappings, which maps assets on a per-task basis, this is used in concert with proxying_to_dagster dag-level mappings where an entire dag is migrated at once.
Concretely this adds metadata to all asset specs in the provided definitions with the provided dag_id. The dag_id comes from the key of the provided dag_mappings dictionary. There is a single metadata key “airlift/dag-mapping” that is used to store this information. It is a list of strings, where each string is a dag_id which the asset is associated with.
Example:
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_dag_mappings(
dag_mappings={
"dag_one": [asset_one],
"dag_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
- dagster_airlift.core.assets_with_multiple_task_mappings
Given an asset or assets definition, return a new asset or assets definition with metadata that indicates that it is targeted by multiple airflow tasks. An example of this would be a separate weekly and daily dag that contains a task that targets a single asset.
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import (
build_defs_from_airflow_instance,
targeted_by_multiple_tasks,
assets_with_task_mappings,
)
# Asset maps to a single task.
@asset
def other_asset(): ...
# Asset maps to a physical entity which is produced by two different airflow tasks.
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=Definitions(
assets=[
*assets_with_task_mappings(
dag_id="other_dag",
task_mappings={
"task1": [other_asset]
},
),
*assets_with_multiple_task_mappings(
assets=[scheduled_twice],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
),
]
),
)