The Orchestration Problem — Why Cron Breaks at Scale
Every data platform eventually outgrows cron. Cron runs scripts in isolation, provides no dependency graph, offers no retry logic, emits no structured observability, and has no concept of data freshness or upstream health. The moment you need task B to run only after task A succeeds — or need to backfill three months of data — cron becomes a liability.
Workflow orchestrators solve this by modelling pipelines as directed acyclic graphs (DAGs) of tasks with explicit dependencies, retry policies, scheduling semantics, and centralised logs. Two tools dominate the modern data stack: Apache Airflow, the incumbent (2.10+, 2015 origin at Airbnb), and Dagster, the challenger (1.x, 2020 rewrite). They share the same problem statement but differ fundamentally in how they model the world.
Airflow thinks in tasks — units of work arranged into a DAG that runs on a schedule. Dagster thinks in assets — data artefacts (tables, files, ML models) that have producers and consumers. This is not a superficial difference; it shapes testing, observability, partitioning, and the entire developer experience.
Apache Airflow Architecture — Scheduler, Executor, and the DAG Model
Airflow's architecture has five core components. The Scheduler continuously parses all DAG files, determines which task instances are ready to run, and enqueues them. The Executor (LocalExecutor, CeleryExecutor, or KubernetesExecutor) takes queued tasks and dispatches them to workers. The Webserver serves the UI and REST API. The Metadata Database (PostgreSQL or MySQL) is the single source of truth for task state. The DAG folder (local filesystem or remote sync) contains Python files that define DAG objects.
# Airflow DAG — ingest raw events, transform, write to warehouse
# airflow/dags/user_events_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.utils.task_group import TaskGroup
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"email_on_failure": True,
"email": ["data-oncall@company.com"],
}
with DAG(
dag_id="user_events_pipeline",
default_args=default_args,
schedule="0 3 * * *", # daily at 03:00 UTC
start_date=datetime(2026, 1, 1),
catchup=False, # do NOT backfill missed runs
max_active_runs=1, # prevent concurrent runs
tags=["events", "warehouse"],
) as dag:
with TaskGroup("extract") as extract_group:
extract_events = PythonOperator(
task_id="extract_raw_events",
python_callable=extract_events_from_api,
op_kwargs={"date": "{{ ds }}"}, # Jinja templating for run date
)
validate_schema = PythonOperator(
task_id="validate_schema",
python_callable=validate_event_schema,
)
extract_events >> validate_schema
with TaskGroup("transform") as transform_group:
deduplicate = PostgresOperator(
task_id="deduplicate_events",
postgres_conn_id="warehouse_prod",
sql="sql/deduplicate_events.sql",
parameters={"run_date": "{{ ds }}"},
)
enrich_users = PythonOperator(
task_id="enrich_with_user_profiles",
python_callable=join_user_profiles,
)
deduplicate >> enrich_users
load_warehouse = PostgresOperator(
task_id="load_to_warehouse",
postgres_conn_id="warehouse_prod",
sql="sql/load_fact_events.sql",
)
notify_downstream = PythonOperator(
task_id="notify_downstream_jobs",
python_callable=trigger_dbt_run,
)
extract_group >> transform_group >> load_warehouse >> notify_downstreamAirflow's TaskFlow API (Airflow 2.0+) simplifies Python-heavy DAGs by removing boilerplate XCom serialisation — functions decorated with @task automatically pass return values to downstream tasks via XCom. Despite this improvement, the fundamental abstraction remains a task-centric execution graph rather than a data-centric asset graph.
Note
Dagster Architecture — Software-Defined Assets, Ops, and the Asset Graph
Dagster's core abstraction is the Software-Defined Asset (SDA) — a Python function decorated with @asset that describes how to produce a specific data artefact (a database table, a Parquet file, a trained model). An asset has a key (its identity), optional upstream dependencies (other assets), metadata (description, owners, tags), and optionally a freshness policy that defines how stale the asset is allowed to become before triggering a materialisation.
Dagster's runtime consists of a Dagster Daemon (scheduler and sensor loop), a Dagster Webserver (UI + GraphQL API), and a Code Location (a gRPC server loading your Python definitions). The Instance stores all run history in a PostgreSQL database and event log. Unlike Airflow where DAG files are parsed by the scheduler continuously (a common source of import errors and slow scheduling), Dagster loads code via an isolated gRPC server, decoupling code execution from the webserver.
# Dagster Software-Defined Assets — same pipeline as the Airflow example above
# pipelines/user_events.py
import pandas as pd
from dagster import (
asset, AssetIn, Output, MetadataValue,
FreshnessPolicy, AutoMaterializePolicy,
)
from dagster_aws.s3 import S3Resource
from dagster_dbt import DbtCliResource
# Asset 1: extract raw events from upstream API
@asset(
group_name="user_events",
description="Raw click events ingested from the events API",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=120),
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def raw_events(context, events_api: EventsApiResource) -> pd.DataFrame:
date = context.partition_key # date string if partitioned
df = events_api.fetch_events(date=date)
context.add_output_metadata({
"row_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(df.head(5).to_markdown()),
})
return df
# Asset 2: deduplicated events — depends on raw_events
@asset(
group_name="user_events",
ins={"raw_events": AssetIn()},
description="Events with duplicates removed by (user_id, event_id)",
)
def deduplicated_events(raw_events: pd.DataFrame) -> pd.DataFrame:
return raw_events.drop_duplicates(subset=["user_id", "event_id"])
# Asset 3: enriched events — depends on deduplicated_events
@asset(
group_name="user_events",
ins={"deduplicated_events": AssetIn()},
description="Events enriched with user tier and region from profiles table",
)
def enriched_events(
deduplicated_events: pd.DataFrame,
warehouse: WarehouseResource,
) -> Output:
profiles = warehouse.fetch_user_profiles()
enriched = deduplicated_events.merge(profiles, on="user_id", how="left")
return Output(
value=enriched,
metadata={"enriched_row_count": MetadataValue.int(len(enriched))},
)
# Asset 4: warehouse fact table — final materialised destination
@asset(
group_name="user_events",
ins={"enriched_events": AssetIn()},
description="fact_events table in the warehouse, daily grain",
)
def fact_events(
enriched_events: pd.DataFrame,
warehouse: WarehouseResource,
) -> None:
warehouse.upsert_fact_events(enriched_events)The key difference from Airflow is immediately visible: Dagster assets are Python functions that accept other assets as arguments by name. The dependency graph is derived from function signatures — raw_events flows into deduplicated_eventsbecause the latter accepts it as a parameter. There is no explicit task wiring syntax like Airflow's >> operator. The asset graph in Dagster's UI shows the data lineage across your entire pipeline — including dbt models, Fivetran syncs, and ML training jobs — in a single view.
Partitioned Assets and Incremental Processing
Backfills and incremental processing are a major pain point in Airflow. To backfill a date range, you either set catchup=True and let the scheduler replay missed runs, or manually trigger DAG runs with specific date parameters. The semantics are fragile — date parameterisation relies on Jinja templates in operator arguments, and the relationship between a run and the data partition it covers is implicit.
Dagster makes partitioning a first-class concept. Partitioned assets declare a PartitionsDefinition that maps every materialisation to a specific partition key (a date, a region string, or a custom key). The Dagster UI shows which partitions are materialised, which are stale, and which have failed — making backfilling a visual, multi-select operation rather than a CLI incantation.
# Dagster partitioned assets — daily time partitions + backfill support
from dagster import (
asset, DailyPartitionsDefinition, AssetExecutionContext,
WeeklyPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition,
)
from datetime import datetime
# Daily partitions from 2026-01-01 onwards
DAILY = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(
partitions_def=DAILY,
group_name="user_events",
description="Raw events for a single calendar day",
)
def raw_events_partitioned(context: AssetExecutionContext, events_api: EventsApiResource):
# context.partition_key is the date string e.g. "2026-05-15"
date = context.partition_key
df = events_api.fetch_events(date=date)
context.log.info(f"Fetched {len(df)} events for {date}")
return df
@asset(
partitions_def=DAILY,
group_name="user_events",
)
def deduplicated_events_partitioned(raw_events_partitioned) -> ...:
return raw_events_partitioned.drop_duplicates(subset=["user_id", "event_id"])
# Multi-dimensional partitions: date × region
REGIONS = StaticPartitionsDefinition(["us-east", "eu-west", "ap-south"])
DATE_REGION = MultiPartitionsDefinition({"date": DAILY, "region": REGIONS})
@asset(
partitions_def=DATE_REGION,
group_name="regional_events",
)
def regional_events(context: AssetExecutionContext, events_api: EventsApiResource):
keys = context.partition_key.keys_by_dimension
date = keys["date"]
region = keys["region"]
return events_api.fetch_events(date=date, region=region)
# Backfill a range from the CLI or UI:
# dagster asset materialize --select raw_events_partitioned
# --partition-range 2026-01-01..2026-05-31
#
# Or from Python:
from dagster import instance_for_test, materialize
# materialize([raw_events_partitioned], partition_key="2026-05-15")Note
Resources and IO Managers vs Airflow Connections and Hooks
Airflow manages external system credentials via Connections stored in the metadata database (or environment variables with the AIRFLOW_CONN_* prefix). Operators access connections via Hooks — thin wrappers around the connection that provide a client object. This works, but connections are opaque config bags; testing operators that use them requires mocking hooks or using real credentials.
Dagster's Resources are typed Python objects that encapsulate external clients. They are declared as function parameters on assets and ops, and injected at runtime from a resource configuration dict. Resources can be swapped between environments (production vs test vs development) without touching asset code — critical for testability.
# Dagster Resources — type-safe, injectable, testable
from dagster import ConfigurableResource, resource, EnvVar
from sqlalchemy import create_engine, text
import boto3
# Define a typed resource for the data warehouse
class WarehouseResource(ConfigurableResource):
host: str
port: int = 5432
database: str
username: str
password: str = EnvVar("WAREHOUSE_PASSWORD") # reads from env at runtime
def get_engine(self):
url = (
f"postgresql+psycopg2://{self.username}:{self.password}"
f"@{self.host}:{self.port}/{self.database}"
)
return create_engine(url, pool_pre_ping=True)
def fetch_user_profiles(self):
with self.get_engine().connect() as conn:
return pd.read_sql("SELECT * FROM dim_users", conn)
def upsert_fact_events(self, df: pd.DataFrame) -> None:
with self.get_engine().begin() as conn:
df.to_sql("fact_events", conn, if_exists="append", index=False,
method="multi", chunksize=5000)
# Define a resource for S3 with role assumption
class S3BucketResource(ConfigurableResource):
bucket_name: str
region: str = "eu-west-1"
role_arn: str = "" # optional: assume role for cross-account access
def get_client(self):
if self.role_arn:
sts = boto3.client("sts")
creds = sts.assume_role(RoleArn=self.role_arn, RoleSessionName="dagster")
return boto3.client("s3", region_name=self.region,
aws_access_key_id=creds["Credentials"]["AccessKeyId"],
aws_secret_access_key=creds["Credentials"]["SecretAccessKey"],
aws_session_token=creds["Credentials"]["SessionToken"])
return boto3.client("s3", region_name=self.region)
# Wire resources to your Definitions object
from dagster import Definitions
defs = Definitions(
assets=[raw_events, deduplicated_events, enriched_events, fact_events],
resources={
# Production: reads credentials from environment
"warehouse": WarehouseResource(
host="warehouse.prod.internal",
database="analytics",
username="dagster_writer",
),
"s3_bucket": S3BucketResource(bucket_name="company-datalake"),
},
)
# For tests: swap resources with in-memory fakes
from dagster import build_asset_context
def test_deduplicated_events():
df = pd.DataFrame({"user_id": [1, 1, 2], "event_id": ["a", "a", "b"]})
result = deduplicated_events(df)
assert len(result) == 2 # duplicate (1, "a") removedDagster's IO Managers extend the resource concept to storage. An IO Manager handles how an asset's output value is written to and read from persistent storage — decoupling the asset's business logic from the storage layer entirely. Swapping from local Parquet to S3 to a database table is a configuration change, not a code change.
# Dagster IO Managers — decouple storage from computation
from dagster import IOManager, OutputContext, InputContext, io_manager
import pandas as pd
import os
class ParquetIOManager(IOManager):
"""Stores assets as Parquet files. Swap for S3ParquetIOManager in prod."""
def __init__(self, base_path: str):
self.base_path = base_path
def handle_output(self, context: OutputContext, obj: pd.DataFrame):
path = self._get_path(context)
os.makedirs(os.path.dirname(path), exist_ok=True)
obj.to_parquet(path, index=False)
context.add_output_metadata({"path": path, "rows": len(obj)})
def load_input(self, context: InputContext) -> pd.DataFrame:
path = self._get_path(context.upstream_output)
return pd.read_parquet(path)
def _get_path(self, context) -> str:
keys = list(context.asset_key.path)
return os.path.join(self.base_path, *keys[:-1], f"{keys[-1]}.parquet")
# Production: use dagster-aws S3 IO Manager
from dagster_aws.s3 import s3_pickle_io_manager, S3Resource
defs = Definitions(
assets=[raw_events, deduplicated_events, enriched_events],
resources={
# Development
"io_manager": ParquetIOManager(base_path="/tmp/dagster-dev"),
# Production (switch here, no asset code changes)
# "io_manager": s3_pickle_io_manager.configured({
# "s3_bucket": "company-datalake",
# "s3_prefix": "dagster-assets",
# }),
},
)Testing — Dagster's Built-In Testability vs Airflow Patterns
Testing Airflow DAGs is historically painful. The standard approach is dag.test() (Airflow 2.5+) or the older task.execute(context) pattern — both run task code in-process but require a running Airflow metadata database or extensive mocking. DAG import validation (checking for syntax errors and import cycles) is straightforward, but testing operator logic with injected test data and verifying output correctness requires significant mocking infrastructure.
Dagster assets are regular Python functions. Because their dependencies are injected via function parameters, testing is straightforward — pass mock inputs and assert on the output. The materialize() function runs a set of assets in-process without any running Dagster infrastructure, making unit and integration tests fast and dependency-free.
# Dagster testing — assets are plain functions, resources are swappable
import pytest
import pandas as pd
from dagster import materialize, build_asset_context, instance_for_test
from unittest.mock import MagicMock, patch
# Test 1: unit test — pure function, no infrastructure needed
def test_deduplicated_events_removes_duplicates():
raw = pd.DataFrame({
"user_id": [1, 1, 2, 3],
"event_id": ["a", "a", "b", "c"],
"ts": ["2026-01-01", "2026-01-01", "2026-01-01", "2026-01-01"],
})
result = deduplicated_events(raw)
assert len(result) == 3
assert result["event_id"].tolist() == ["a", "b", "c"]
# Test 2: asset with resource injection — use a mock resource
def test_fact_events_calls_upsert():
mock_warehouse = MagicMock(spec=WarehouseResource)
enriched = pd.DataFrame({"user_id": [1], "event_id": ["a"], "tier": ["pro"]})
fact_events(enriched, warehouse=mock_warehouse)
mock_warehouse.upsert_fact_events.assert_called_once()
# Test 3: materialize() — run asset graph in-process with fake resources
def test_full_pipeline_with_fake_resources():
class FakeWarehouse(WarehouseResource):
host: str = "fake"
database: str = "fake"
username: str = "fake"
upserted: list = []
def fetch_user_profiles(self):
return pd.DataFrame({"user_id": [1], "tier": ["pro"]})
def upsert_fact_events(self, df):
self.upserted.extend(df.to_dict("records"))
fake_events_api = MagicMock()
fake_events_api.fetch_events.return_value = pd.DataFrame({
"user_id": [1, 1], "event_id": ["x", "x"],
})
fake_warehouse = FakeWarehouse()
result = materialize(
assets=[raw_events, deduplicated_events, enriched_events, fact_events],
resources={
"warehouse": fake_warehouse,
"events_api": fake_events_api,
"io_manager": ParquetIOManager(base_path="/tmp/test"),
},
)
assert result.success
assert len(fake_warehouse.upserted) == 1 # one unique event after dedup
# Test 4: partitioned asset test
def test_partitioned_asset_uses_partition_key():
fake_api = MagicMock()
fake_api.fetch_events.return_value = pd.DataFrame({"user_id": [1], "event_id": ["z"]})
ctx = build_asset_context(partition_key="2026-05-01")
result = raw_events_partitioned(ctx, events_api=fake_api)
fake_api.fetch_events.assert_called_once_with(date="2026-05-01")
assert len(result) == 1Note
dag.test()does not require a running database (uses an in-memory SQLite instance), which is a significant improvement. However, Dagster's testing model is structurally simpler because assets are pure functions by design — there is no context object that requires careful construction for testing, just dependency injection via function parameters.dbt Integration — Assets vs BashOperator
dbt is a nearly universal component of the modern data stack. Integrating it with an orchestrator determines whether your dbt models appear in your lineage graph or are a black box.
Airflow's most common dbt integration runs dbt run via a BashOperator or the Astronomer Cosmos provider (which translates each dbt model into an Airflow task and supports dependency-aware selection). Cosmos is a major improvement over plain BashOperator, but dbt models still appear as tasks in an Airflow DAG — they are not integrated into a unified cross-system asset graph.
Dagster's dagster-dbt integration loads every dbt model as a native Dagster asset, making dbt models first-class citizens in the asset graph alongside upstream ingestion assets and downstream ML assets. Freshness policies, metadata, and partitioning apply uniformly across dbt and non-dbt assets.
# Dagster + dbt — dbt models as Software-Defined Assets
# pipelines/dbt_assets.py
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
# Load dbt project metadata at definition time
dbt_project = DbtProject(
project_dir="/opt/dagster/dbt",
profiles_dir="/opt/dagster/dbt/profiles",
)
# @dbt_assets generates one Dagster asset per dbt model, automatically
# wiring upstream/downstream dependencies from dbt graph
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run", "--select", "tag:production"], context=context).stream()
# Now combine with non-dbt assets in a single Definitions
from dagster import Definitions
defs = Definitions(
assets=[
raw_events, # upstream ingestion asset
my_dbt_assets, # all dbt models as Dagster assets
ml_training_asset, # downstream ML — depends on dbt model output
],
resources={
"dbt": DbtCliResource(project_dir=dbt_project),
"warehouse": WarehouseResource(...),
},
)
# The Dagster UI now shows a unified lineage graph:
# raw_events → [dbt: stg_events] → [dbt: fct_events] → ml_training_asset
# Freshness policies, observability, and backfills span the entire graph.Deployment — Kubernetes, Astronomer, and Dagster Cloud
Both Airflow and Dagster have well-supported Kubernetes deployment paths. The operational profiles differ significantly.
Airflow on Kubernetes with the official Helm chart is the standard self-hosted path. The KubernetesExecutor spawns a fresh pod per task — providing isolation and resource limits per task — but adds pod startup latency (5–30 seconds). The CeleryExecutor uses a pool of persistent workers (faster task startup, harder to scale down). Astronomer Astro provides a managed Airflow cloud with auto-scaling, Astro Runtime (Airflow + providers), and a developer CLI.
# Airflow Helm chart — production values (values.yaml)
# helm install airflow apache-airflow/airflow -f values.yaml
executor: KubernetesExecutor
scheduler:
resources:
requests: { cpu: "500m", memory: "2Gi" }
limits: { cpu: "1", memory: "4Gi" }
replicas: 2 # active-passive HA
webserver:
resources:
requests: { cpu: "200m", memory: "1Gi" }
workers:
# KubernetesExecutor: workers are pods spawned per task
# No persistent worker deployment needed
postgresql:
enabled: false # use external managed PostgreSQL (RDS, Cloud SQL)
data:
metadataConnection:
user: airflow
pass: ${AIRFLOW_DB_PASSWORD} # from K8s Secret
host: airflow-postgres.internal
db: airflow
port: 5432
dags:
gitSync:
enabled: true
repo: git@github.com:company/airflow-dags.git
branch: main
subPath: dags/
sshKeySecret: airflow-git-secret
config:
core:
max_active_tasks_per_dag: 32
parallelism: 64
scheduler:
dag_file_processor_timeout: 120
min_file_process_interval: 30 # reparse DAGs every 30s
kubernetes_executor:
worker_pods_creation_batch_size: 16
delete_worker_pods: true # clean up completed task pods
worker_container_repository: my-registry/airflow-worker
worker_container_tag: 2.10.0# Dagster on Kubernetes — Helm chart values (dagster/values.yaml)
# helm install dagster dagster/dagster -f values.yaml
dagster-webserver:
replicaCount: 2
resources:
requests: { cpu: "250m", memory: "512Mi" }
dagster-daemon:
resources:
requests: { cpu: "250m", memory: "512Mi" }
postgresql:
enabled: false
postgresqlHost: dagster-postgres.internal
postgresqlDatabase: dagster
postgresqlUsername: dagster
postgresqlPassword: ${DAGSTER_DB_PASSWORD}
dagsterDaemon:
runCoordinator:
config:
max_concurrent_runs: 25
runLauncher:
type: K8sRunLauncher # each Dagster run = isolated K8s Job
config:
job_namespace: dagster-runs
image_pull_policy: Always
userDeployments:
# Each code location is a separate gRPC server deployment
deployments:
- name: data-pipelines
image:
repository: my-registry/dagster-pipelines
tag: latest
dagsterApiGrpcArgs:
- -m
- pipelines.definitions # Python module path to Definitions object
port: 3030
resources:
requests: { cpu: "250m", memory: "512Mi" }
envSecrets:
- name: dagster-secrets # injects WAREHOUSE_PASSWORD, etc.Note
Observability — Asset Lineage, Metadata, and Run History
Airflow's UI excels at task-centric observability: the grid view shows run history per task, the graph view shows the DAG topology, and the calendar view highlights patterns in run success/failure over time. Logs are centralised and searchable. The Airflow REST API enables programmatic DAG triggering and monitoring. However, Airflow's observability is pipeline-scoped — it answers "how did this run go?" not "how fresh is this table?" or "what upstream changes caused this downstream failure?"
Dagster's UI is asset-centric. The Asset Catalog shows every asset with its materialisation history, metadata, owners, tags, and current freshness status. The Global Asset Graph shows the full cross-pipeline lineage from raw source to final ML model. Asset checks — similar to dbt tests but at the orchestrator level — run after materialisation and can block downstream assets on failure.
# Dagster asset checks — validate asset quality after materialisation
from dagster import asset_check, AssetCheckResult, AssetCheckSeverity
@asset_check(asset=fact_events, description="Ensure no null user_ids in fact table")
def check_no_null_user_ids(context, warehouse: WarehouseResource) -> AssetCheckResult:
with warehouse.get_engine().connect() as conn:
null_count = conn.execute(
text("SELECT COUNT(*) FROM fact_events WHERE user_id IS NULL")
).scalar()
if null_count > 0:
return AssetCheckResult(
passed=False,
severity=AssetCheckSeverity.ERROR,
metadata={"null_user_id_count": null_count},
description=f"Found {null_count} rows with null user_id",
)
return AssetCheckResult(passed=True)
@asset_check(asset=fact_events, description="Row count is within expected range")
def check_daily_row_count(context, warehouse: WarehouseResource) -> AssetCheckResult:
partition = context.partition_key
with warehouse.get_engine().connect() as conn:
count = conn.execute(
text("SELECT COUNT(*) FROM fact_events WHERE event_date = :d"),
{"d": partition},
).scalar()
in_range = 10_000 <= count <= 10_000_000
return AssetCheckResult(
passed=in_range,
severity=AssetCheckSeverity.WARN if not in_range else AssetCheckSeverity.ERROR,
metadata={"row_count": count, "partition": partition},
)
# Register checks in Definitions — they run automatically after materialisation
defs = Definitions(
assets=[fact_events, ...],
asset_checks=[check_no_null_user_ids, check_daily_row_count],
resources={...},
)Decision Framework — When to Choose Airflow vs Dagster
The right choice depends on your existing stack, team expertise, workflow complexity, and operational investment tolerance.
# Decision matrix: Airflow vs Dagster
CHOOSE Apache Airflow when:
✓ Existing investment: large DAG library, team Airflow expertise, Astronomer contract
✓ Task diversity: ETL, dbt, Spark submit, sensor-based triggers, HTTP calls, ML training
— Airflow's provider ecosystem (800+ operators) is unmatched
✓ Non-data workflows: Airflow is general-purpose; use it for infra automation,
report generation, or anything beyond pure data transformation
✓ Mature managed offering: Astronomer Astro, MWAA (AWS), Cloud Composer (GCP)
have years of production hardening and enterprise support
✓ Cross-team familiarity: most data engineers know Airflow; hiring is easier
✓ Simple scheduling: straightforward cron schedules without complex
partition management or freshness policies
CHOOSE Dagster when:
✓ Asset-centric data platform: you think in tables/files/models, not task runs;
you want a unified lineage graph from ingestion through dbt to ML
✓ Testability is a first-class requirement: teams that write unit tests for
data assets, or use TDD, will find Dagster's model dramatically simpler
✓ Partition-heavy pipelines: incremental processing with backfill UI,
multi-dimensional partitions (date × region × model), partition-aware assets
✓ dbt-centric stack: dagster-dbt gives best-in-class dbt integration with
full model-level asset visibility and cross-project lineage
✓ Freshness-driven scheduling: auto-materialisation policies let Dagster
materialise assets automatically when upstream data is fresh,
without explicit schedule definitions
✓ Greenfield data platform: no migration cost; start with the better model
✓ Strong Python typing: Dagster's API is fully typed; mypy/pyright work
across assets, resources, and configs
PREFER AIRFLOW despite Dagster advantages when:
✗ Large existing DAG library (100+ DAGs): migration cost is real;
Dagster has migration tools but it requires asset modelling work
✗ Astronomer contract: switch cost exceeds benefit until renewal
✗ Team expertise gap: retraining + new mental model has a cost
MIGRATION PATH (Airflow → Dagster):
1. Run both in parallel; migrate one pipeline group at a time
2. Use Dagster's Airflow migration CLI (dagster-airflow package can
convert simple DAGs to asset graphs automatically)
3. Prioritise pipelines with heavy partitioning or dbt as early wins
4. Keep complex sensor-based or cross-system DAGs in Airflow longerNote
Prefect Cloud managed tier is generous, and its deployment model (work pools) is simpler than Airflow's executor architecture. It lacks Dagster's asset graph and first-class dbt integration, but is a strong fit for ML pipeline orchestration and teams that want minimal infrastructure overhead.Work with us
Running Airflow in production and hitting friction with testability, asset lineage, or partition management?
We design and implement data orchestration platforms — from Airflow on Kubernetes with KubernetesExecutor and Helm-based deployment to Dagster Software-Defined Asset pipelines with IO Managers, partitioned incremental processing, resource injection, and Dagster Cloud hybrid deployment. We also handle Airflow-to-Dagster migrations including DAG translation, asset modelling, and zero-downtime cutover strategies. Let’s talk.
Get in touch