Back to Blog
DagsterApache AirflowData EngineeringOrchestrationPythonData PipelinesWorkflowdbt

Dagster vs Airflow — Choosing the Right Data Orchestrator for Modern Data Stacks

A comprehensive comparison of Dagster and Apache Airflow for data orchestration: Dagster's Software-Defined Assets model vs Airflow's task-centric DAG approach, asset lineage and freshness policies for data-aware scheduling, Ops and ConfigurableResources vs Operators and Connections, IO Managers for storage-agnostic asset materialisation, partition-based incremental processing with DailyPartitionsDefinition and MultiPartitionsDefinition, Dagster's built-in unit testing via materialize() and build_asset_context() vs Airflow dag.test(), the dagster-dbt integration for first-class dbt model lineage across the asset graph, asset checks for post-materialisation data quality validation, Kubernetes deployment with K8sRunLauncher vs KubernetesExecutor Helm chart, Astronomer Astro vs Dagster Cloud hybrid deployment options, and a decision framework covering existing stack investment, partition complexity, testability requirements, and migration cost.

2026-06-01

The Orchestration Problem — Why Cron Breaks at Scale

Every data platform eventually outgrows cron. Cron runs scripts in isolation, provides no dependency graph, offers no retry logic, emits no structured observability, and has no concept of data freshness or upstream health. The moment you need task B to run only after task A succeeds — or need to backfill three months of data — cron becomes a liability.

Workflow orchestrators solve this by modelling pipelines as directed acyclic graphs (DAGs) of tasks with explicit dependencies, retry policies, scheduling semantics, and centralised logs. Two tools dominate the modern data stack: Apache Airflow, the incumbent (2.10+, 2015 origin at Airbnb), and Dagster, the challenger (1.x, 2020 rewrite). They share the same problem statement but differ fundamentally in how they model the world.

Airflow thinks in tasks — units of work arranged into a DAG that runs on a schedule. Dagster thinks in assets — data artefacts (tables, files, ML models) that have producers and consumers. This is not a superficial difference; it shapes testing, observability, partitioning, and the entire developer experience.

Apache Airflow Architecture — Scheduler, Executor, and the DAG Model

Airflow's architecture has five core components. The Scheduler continuously parses all DAG files, determines which task instances are ready to run, and enqueues them. The Executor (LocalExecutor, CeleryExecutor, or KubernetesExecutor) takes queued tasks and dispatches them to workers. The Webserver serves the UI and REST API. The Metadata Database (PostgreSQL or MySQL) is the single source of truth for task state. The DAG folder (local filesystem or remote sync) contains Python files that define DAG objects.

# Airflow DAG — ingest raw events, transform, write to warehouse
# airflow/dags/user_events_pipeline.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "email_on_failure": True,
    "email": ["data-oncall@company.com"],
}

with DAG(
    dag_id="user_events_pipeline",
    default_args=default_args,
    schedule="0 3 * * *",        # daily at 03:00 UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,                # do NOT backfill missed runs
    max_active_runs=1,            # prevent concurrent runs
    tags=["events", "warehouse"],
) as dag:

    with TaskGroup("extract") as extract_group:
        extract_events = PythonOperator(
            task_id="extract_raw_events",
            python_callable=extract_events_from_api,
            op_kwargs={"date": "{{ ds }}"},   # Jinja templating for run date
        )
        validate_schema = PythonOperator(
            task_id="validate_schema",
            python_callable=validate_event_schema,
        )
        extract_events >> validate_schema

    with TaskGroup("transform") as transform_group:
        deduplicate = PostgresOperator(
            task_id="deduplicate_events",
            postgres_conn_id="warehouse_prod",
            sql="sql/deduplicate_events.sql",
            parameters={"run_date": "{{ ds }}"},
        )
        enrich_users = PythonOperator(
            task_id="enrich_with_user_profiles",
            python_callable=join_user_profiles,
        )
        deduplicate >> enrich_users

    load_warehouse = PostgresOperator(
        task_id="load_to_warehouse",
        postgres_conn_id="warehouse_prod",
        sql="sql/load_fact_events.sql",
    )

    notify_downstream = PythonOperator(
        task_id="notify_downstream_jobs",
        python_callable=trigger_dbt_run,
    )

    extract_group >> transform_group >> load_warehouse >> notify_downstream

Airflow's TaskFlow API (Airflow 2.0+) simplifies Python-heavy DAGs by removing boilerplate XCom serialisation — functions decorated with @task automatically pass return values to downstream tasks via XCom. Despite this improvement, the fundamental abstraction remains a task-centric execution graph rather than a data-centric asset graph.

Note

Airflow's Datasets API (2.4+) adds dataset-aware scheduling, allowing a DAG to trigger when another DAG produces a named dataset. This is a step toward asset-centric thinking, but it lacks the full lineage graph, freshness policies, partitioning semantics, and IO Manager system that Dagster provides natively. Think of it as a cross-DAG trigger mechanism, not a replacement for asset modelling.

Dagster Architecture — Software-Defined Assets, Ops, and the Asset Graph

Dagster's core abstraction is the Software-Defined Asset (SDA) — a Python function decorated with @asset that describes how to produce a specific data artefact (a database table, a Parquet file, a trained model). An asset has a key (its identity), optional upstream dependencies (other assets), metadata (description, owners, tags), and optionally a freshness policy that defines how stale the asset is allowed to become before triggering a materialisation.

Dagster's runtime consists of a Dagster Daemon (scheduler and sensor loop), a Dagster Webserver (UI + GraphQL API), and a Code Location (a gRPC server loading your Python definitions). The Instance stores all run history in a PostgreSQL database and event log. Unlike Airflow where DAG files are parsed by the scheduler continuously (a common source of import errors and slow scheduling), Dagster loads code via an isolated gRPC server, decoupling code execution from the webserver.

# Dagster Software-Defined Assets — same pipeline as the Airflow example above
# pipelines/user_events.py

import pandas as pd
from dagster import (
    asset, AssetIn, Output, MetadataValue,
    FreshnessPolicy, AutoMaterializePolicy,
)
from dagster_aws.s3 import S3Resource
from dagster_dbt import DbtCliResource

# Asset 1: extract raw events from upstream API
@asset(
    group_name="user_events",
    description="Raw click events ingested from the events API",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=120),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def raw_events(context, events_api: EventsApiResource) -> pd.DataFrame:
    date = context.partition_key   # date string if partitioned
    df = events_api.fetch_events(date=date)
    context.add_output_metadata({
        "row_count": MetadataValue.int(len(df)),
        "preview": MetadataValue.md(df.head(5).to_markdown()),
    })
    return df

# Asset 2: deduplicated events — depends on raw_events
@asset(
    group_name="user_events",
    ins={"raw_events": AssetIn()},
    description="Events with duplicates removed by (user_id, event_id)",
)
def deduplicated_events(raw_events: pd.DataFrame) -> pd.DataFrame:
    return raw_events.drop_duplicates(subset=["user_id", "event_id"])

# Asset 3: enriched events — depends on deduplicated_events
@asset(
    group_name="user_events",
    ins={"deduplicated_events": AssetIn()},
    description="Events enriched with user tier and region from profiles table",
)
def enriched_events(
    deduplicated_events: pd.DataFrame,
    warehouse: WarehouseResource,
) -> Output:
    profiles = warehouse.fetch_user_profiles()
    enriched = deduplicated_events.merge(profiles, on="user_id", how="left")
    return Output(
        value=enriched,
        metadata={"enriched_row_count": MetadataValue.int(len(enriched))},
    )

# Asset 4: warehouse fact table — final materialised destination
@asset(
    group_name="user_events",
    ins={"enriched_events": AssetIn()},
    description="fact_events table in the warehouse, daily grain",
)
def fact_events(
    enriched_events: pd.DataFrame,
    warehouse: WarehouseResource,
) -> None:
    warehouse.upsert_fact_events(enriched_events)

The key difference from Airflow is immediately visible: Dagster assets are Python functions that accept other assets as arguments by name. The dependency graph is derived from function signatures — raw_events flows into deduplicated_eventsbecause the latter accepts it as a parameter. There is no explicit task wiring syntax like Airflow's >> operator. The asset graph in Dagster's UI shows the data lineage across your entire pipeline — including dbt models, Fivetran syncs, and ML training jobs — in a single view.

Partitioned Assets and Incremental Processing

Backfills and incremental processing are a major pain point in Airflow. To backfill a date range, you either set catchup=True and let the scheduler replay missed runs, or manually trigger DAG runs with specific date parameters. The semantics are fragile — date parameterisation relies on Jinja templates in operator arguments, and the relationship between a run and the data partition it covers is implicit.

Dagster makes partitioning a first-class concept. Partitioned assets declare a PartitionsDefinition that maps every materialisation to a specific partition key (a date, a region string, or a custom key). The Dagster UI shows which partitions are materialised, which are stale, and which have failed — making backfilling a visual, multi-select operation rather than a CLI incantation.

# Dagster partitioned assets — daily time partitions + backfill support
from dagster import (
    asset, DailyPartitionsDefinition, AssetExecutionContext,
    WeeklyPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition,
)
from datetime import datetime

# Daily partitions from 2026-01-01 onwards
DAILY = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(
    partitions_def=DAILY,
    group_name="user_events",
    description="Raw events for a single calendar day",
)
def raw_events_partitioned(context: AssetExecutionContext, events_api: EventsApiResource):
    # context.partition_key is the date string e.g. "2026-05-15"
    date = context.partition_key
    df = events_api.fetch_events(date=date)
    context.log.info(f"Fetched {len(df)} events for {date}")
    return df

@asset(
    partitions_def=DAILY,
    group_name="user_events",
)
def deduplicated_events_partitioned(raw_events_partitioned) -> ...:
    return raw_events_partitioned.drop_duplicates(subset=["user_id", "event_id"])

# Multi-dimensional partitions: date × region
REGIONS = StaticPartitionsDefinition(["us-east", "eu-west", "ap-south"])
DATE_REGION = MultiPartitionsDefinition({"date": DAILY, "region": REGIONS})

@asset(
    partitions_def=DATE_REGION,
    group_name="regional_events",
)
def regional_events(context: AssetExecutionContext, events_api: EventsApiResource):
    keys = context.partition_key.keys_by_dimension
    date = keys["date"]
    region = keys["region"]
    return events_api.fetch_events(date=date, region=region)

# Backfill a range from the CLI or UI:
#   dagster asset materialize --select raw_events_partitioned
#     --partition-range 2026-01-01..2026-05-31
#
# Or from Python:
from dagster import instance_for_test, materialize
# materialize([raw_events_partitioned], partition_key="2026-05-15")

Note

Airflow's data_interval_start / data_interval_end macros provide similar semantics to Dagster partition keys in time-based DAGs. The crucial difference is visibility: Airflow shows a list of DAG runs with states, but offers no per-partition materialisation map. Dagster's partition health view shows a colour-coded grid where green means materialised, yellow means stale, and red means failed — for every partition, at a glance.

Resources and IO Managers vs Airflow Connections and Hooks

Airflow manages external system credentials via Connections stored in the metadata database (or environment variables with the AIRFLOW_CONN_* prefix). Operators access connections via Hooks — thin wrappers around the connection that provide a client object. This works, but connections are opaque config bags; testing operators that use them requires mocking hooks or using real credentials.

Dagster's Resources are typed Python objects that encapsulate external clients. They are declared as function parameters on assets and ops, and injected at runtime from a resource configuration dict. Resources can be swapped between environments (production vs test vs development) without touching asset code — critical for testability.

# Dagster Resources — type-safe, injectable, testable
from dagster import ConfigurableResource, resource, EnvVar
from sqlalchemy import create_engine, text
import boto3

# Define a typed resource for the data warehouse
class WarehouseResource(ConfigurableResource):
    host: str
    port: int = 5432
    database: str
    username: str
    password: str = EnvVar("WAREHOUSE_PASSWORD")  # reads from env at runtime

    def get_engine(self):
        url = (
            f"postgresql+psycopg2://{self.username}:{self.password}"
            f"@{self.host}:{self.port}/{self.database}"
        )
        return create_engine(url, pool_pre_ping=True)

    def fetch_user_profiles(self):
        with self.get_engine().connect() as conn:
            return pd.read_sql("SELECT * FROM dim_users", conn)

    def upsert_fact_events(self, df: pd.DataFrame) -> None:
        with self.get_engine().begin() as conn:
            df.to_sql("fact_events", conn, if_exists="append", index=False,
                      method="multi", chunksize=5000)

# Define a resource for S3 with role assumption
class S3BucketResource(ConfigurableResource):
    bucket_name: str
    region: str = "eu-west-1"
    role_arn: str = ""   # optional: assume role for cross-account access

    def get_client(self):
        if self.role_arn:
            sts = boto3.client("sts")
            creds = sts.assume_role(RoleArn=self.role_arn, RoleSessionName="dagster")
            return boto3.client("s3", region_name=self.region,
                aws_access_key_id=creds["Credentials"]["AccessKeyId"],
                aws_secret_access_key=creds["Credentials"]["SecretAccessKey"],
                aws_session_token=creds["Credentials"]["SessionToken"])
        return boto3.client("s3", region_name=self.region)

# Wire resources to your Definitions object
from dagster import Definitions

defs = Definitions(
    assets=[raw_events, deduplicated_events, enriched_events, fact_events],
    resources={
        # Production: reads credentials from environment
        "warehouse": WarehouseResource(
            host="warehouse.prod.internal",
            database="analytics",
            username="dagster_writer",
        ),
        "s3_bucket": S3BucketResource(bucket_name="company-datalake"),
    },
)

# For tests: swap resources with in-memory fakes
from dagster import build_asset_context

def test_deduplicated_events():
    df = pd.DataFrame({"user_id": [1, 1, 2], "event_id": ["a", "a", "b"]})
    result = deduplicated_events(df)
    assert len(result) == 2   # duplicate (1, "a") removed

Dagster's IO Managers extend the resource concept to storage. An IO Manager handles how an asset's output value is written to and read from persistent storage — decoupling the asset's business logic from the storage layer entirely. Swapping from local Parquet to S3 to a database table is a configuration change, not a code change.

# Dagster IO Managers — decouple storage from computation
from dagster import IOManager, OutputContext, InputContext, io_manager
import pandas as pd
import os

class ParquetIOManager(IOManager):
    """Stores assets as Parquet files. Swap for S3ParquetIOManager in prod."""

    def __init__(self, base_path: str):
        self.base_path = base_path

    def handle_output(self, context: OutputContext, obj: pd.DataFrame):
        path = self._get_path(context)
        os.makedirs(os.path.dirname(path), exist_ok=True)
        obj.to_parquet(path, index=False)
        context.add_output_metadata({"path": path, "rows": len(obj)})

    def load_input(self, context: InputContext) -> pd.DataFrame:
        path = self._get_path(context.upstream_output)
        return pd.read_parquet(path)

    def _get_path(self, context) -> str:
        keys = list(context.asset_key.path)
        return os.path.join(self.base_path, *keys[:-1], f"{keys[-1]}.parquet")

# Production: use dagster-aws S3 IO Manager
from dagster_aws.s3 import s3_pickle_io_manager, S3Resource

defs = Definitions(
    assets=[raw_events, deduplicated_events, enriched_events],
    resources={
        # Development
        "io_manager": ParquetIOManager(base_path="/tmp/dagster-dev"),

        # Production (switch here, no asset code changes)
        # "io_manager": s3_pickle_io_manager.configured({
        #     "s3_bucket": "company-datalake",
        #     "s3_prefix": "dagster-assets",
        # }),
    },
)

Testing — Dagster's Built-In Testability vs Airflow Patterns

Testing Airflow DAGs is historically painful. The standard approach is dag.test() (Airflow 2.5+) or the older task.execute(context) pattern — both run task code in-process but require a running Airflow metadata database or extensive mocking. DAG import validation (checking for syntax errors and import cycles) is straightforward, but testing operator logic with injected test data and verifying output correctness requires significant mocking infrastructure.

Dagster assets are regular Python functions. Because their dependencies are injected via function parameters, testing is straightforward — pass mock inputs and assert on the output. The materialize() function runs a set of assets in-process without any running Dagster infrastructure, making unit and integration tests fast and dependency-free.

# Dagster testing — assets are plain functions, resources are swappable
import pytest
import pandas as pd
from dagster import materialize, build_asset_context, instance_for_test
from unittest.mock import MagicMock, patch

# Test 1: unit test — pure function, no infrastructure needed
def test_deduplicated_events_removes_duplicates():
    raw = pd.DataFrame({
        "user_id":  [1, 1, 2, 3],
        "event_id": ["a", "a", "b", "c"],
        "ts":       ["2026-01-01", "2026-01-01", "2026-01-01", "2026-01-01"],
    })
    result = deduplicated_events(raw)
    assert len(result) == 3
    assert result["event_id"].tolist() == ["a", "b", "c"]

# Test 2: asset with resource injection — use a mock resource
def test_fact_events_calls_upsert():
    mock_warehouse = MagicMock(spec=WarehouseResource)
    enriched = pd.DataFrame({"user_id": [1], "event_id": ["a"], "tier": ["pro"]})
    fact_events(enriched, warehouse=mock_warehouse)
    mock_warehouse.upsert_fact_events.assert_called_once()

# Test 3: materialize() — run asset graph in-process with fake resources
def test_full_pipeline_with_fake_resources():
    class FakeWarehouse(WarehouseResource):
        host: str = "fake"
        database: str = "fake"
        username: str = "fake"
        upserted: list = []

        def fetch_user_profiles(self):
            return pd.DataFrame({"user_id": [1], "tier": ["pro"]})

        def upsert_fact_events(self, df):
            self.upserted.extend(df.to_dict("records"))

    fake_events_api = MagicMock()
    fake_events_api.fetch_events.return_value = pd.DataFrame({
        "user_id": [1, 1], "event_id": ["x", "x"],
    })

    fake_warehouse = FakeWarehouse()

    result = materialize(
        assets=[raw_events, deduplicated_events, enriched_events, fact_events],
        resources={
            "warehouse": fake_warehouse,
            "events_api": fake_events_api,
            "io_manager": ParquetIOManager(base_path="/tmp/test"),
        },
    )

    assert result.success
    assert len(fake_warehouse.upserted) == 1  # one unique event after dedup

# Test 4: partitioned asset test
def test_partitioned_asset_uses_partition_key():
    fake_api = MagicMock()
    fake_api.fetch_events.return_value = pd.DataFrame({"user_id": [1], "event_id": ["z"]})

    ctx = build_asset_context(partition_key="2026-05-01")
    result = raw_events_partitioned(ctx, events_api=fake_api)

    fake_api.fetch_events.assert_called_once_with(date="2026-05-01")
    assert len(result) == 1

Note

Airflow's 2.7+ test isolation with dag.test()does not require a running database (uses an in-memory SQLite instance), which is a significant improvement. However, Dagster's testing model is structurally simpler because assets are pure functions by design — there is no context object that requires careful construction for testing, just dependency injection via function parameters.

dbt Integration — Assets vs BashOperator

dbt is a nearly universal component of the modern data stack. Integrating it with an orchestrator determines whether your dbt models appear in your lineage graph or are a black box.

Airflow's most common dbt integration runs dbt run via a BashOperator or the Astronomer Cosmos provider (which translates each dbt model into an Airflow task and supports dependency-aware selection). Cosmos is a major improvement over plain BashOperator, but dbt models still appear as tasks in an Airflow DAG — they are not integrated into a unified cross-system asset graph.

Dagster's dagster-dbt integration loads every dbt model as a native Dagster asset, making dbt models first-class citizens in the asset graph alongside upstream ingestion assets and downstream ML assets. Freshness policies, metadata, and partitioning apply uniformly across dbt and non-dbt assets.

# Dagster + dbt — dbt models as Software-Defined Assets
# pipelines/dbt_assets.py

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject

# Load dbt project metadata at definition time
dbt_project = DbtProject(
    project_dir="/opt/dagster/dbt",
    profiles_dir="/opt/dagster/dbt/profiles",
)

# @dbt_assets generates one Dagster asset per dbt model, automatically
# wiring upstream/downstream dependencies from dbt graph
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run", "--select", "tag:production"], context=context).stream()

# Now combine with non-dbt assets in a single Definitions
from dagster import Definitions

defs = Definitions(
    assets=[
        raw_events,           # upstream ingestion asset
        my_dbt_assets,        # all dbt models as Dagster assets
        ml_training_asset,    # downstream ML — depends on dbt model output
    ],
    resources={
        "dbt": DbtCliResource(project_dir=dbt_project),
        "warehouse": WarehouseResource(...),
    },
)

# The Dagster UI now shows a unified lineage graph:
# raw_events → [dbt: stg_events] → [dbt: fct_events] → ml_training_asset
# Freshness policies, observability, and backfills span the entire graph.

Deployment — Kubernetes, Astronomer, and Dagster Cloud

Both Airflow and Dagster have well-supported Kubernetes deployment paths. The operational profiles differ significantly.

Airflow on Kubernetes with the official Helm chart is the standard self-hosted path. The KubernetesExecutor spawns a fresh pod per task — providing isolation and resource limits per task — but adds pod startup latency (5–30 seconds). The CeleryExecutor uses a pool of persistent workers (faster task startup, harder to scale down). Astronomer Astro provides a managed Airflow cloud with auto-scaling, Astro Runtime (Airflow + providers), and a developer CLI.

# Airflow Helm chart — production values (values.yaml)
# helm install airflow apache-airflow/airflow -f values.yaml

executor: KubernetesExecutor

scheduler:
  resources:
    requests: { cpu: "500m", memory: "2Gi" }
    limits:   { cpu: "1",    memory: "4Gi" }
  replicas: 2   # active-passive HA

webserver:
  resources:
    requests: { cpu: "200m", memory: "1Gi" }

workers:
  # KubernetesExecutor: workers are pods spawned per task
  # No persistent worker deployment needed

postgresql:
  enabled: false   # use external managed PostgreSQL (RDS, Cloud SQL)

data:
  metadataConnection:
    user: airflow
    pass: ${AIRFLOW_DB_PASSWORD}   # from K8s Secret
    host: airflow-postgres.internal
    db: airflow
    port: 5432

dags:
  gitSync:
    enabled: true
    repo: git@github.com:company/airflow-dags.git
    branch: main
    subPath: dags/
    sshKeySecret: airflow-git-secret

config:
  core:
    max_active_tasks_per_dag: 32
    parallelism: 64
  scheduler:
    dag_file_processor_timeout: 120
    min_file_process_interval: 30   # reparse DAGs every 30s
  kubernetes_executor:
    worker_pods_creation_batch_size: 16
    delete_worker_pods: true        # clean up completed task pods
    worker_container_repository: my-registry/airflow-worker
    worker_container_tag: 2.10.0
# Dagster on Kubernetes — Helm chart values (dagster/values.yaml)
# helm install dagster dagster/dagster -f values.yaml

dagster-webserver:
  replicaCount: 2
  resources:
    requests: { cpu: "250m", memory: "512Mi" }

dagster-daemon:
  resources:
    requests: { cpu: "250m", memory: "512Mi" }

postgresql:
  enabled: false
  postgresqlHost: dagster-postgres.internal
  postgresqlDatabase: dagster
  postgresqlUsername: dagster
  postgresqlPassword: ${DAGSTER_DB_PASSWORD}

dagsterDaemon:
  runCoordinator:
    config:
      max_concurrent_runs: 25
  runLauncher:
    type: K8sRunLauncher    # each Dagster run = isolated K8s Job
    config:
      job_namespace: dagster-runs
      image_pull_policy: Always

userDeployments:
  # Each code location is a separate gRPC server deployment
  deployments:
    - name: data-pipelines
      image:
        repository: my-registry/dagster-pipelines
        tag: latest
      dagsterApiGrpcArgs:
        - -m
        - pipelines.definitions   # Python module path to Definitions object
      port: 3030
      resources:
        requests: { cpu: "250m", memory: "512Mi" }
      envSecrets:
        - name: dagster-secrets    # injects WAREHOUSE_PASSWORD, etc.

Note

Dagster Cloud (dagster.io/cloud) offers a hybrid model: the control plane (UI, scheduler, event log) runs in Dagster's cloud, while your code runs in your own Kubernetes cluster or cloud account via a lightweight agent. This eliminates database and webserver management without the vendor lock-in of running execution in Dagster's cloud. The Serverless tier runs execution in Dagster's infrastructure as well, suitable for smaller teams without Kubernetes expertise.

Observability — Asset Lineage, Metadata, and Run History

Airflow's UI excels at task-centric observability: the grid view shows run history per task, the graph view shows the DAG topology, and the calendar view highlights patterns in run success/failure over time. Logs are centralised and searchable. The Airflow REST API enables programmatic DAG triggering and monitoring. However, Airflow's observability is pipeline-scoped — it answers "how did this run go?" not "how fresh is this table?" or "what upstream changes caused this downstream failure?"

Dagster's UI is asset-centric. The Asset Catalog shows every asset with its materialisation history, metadata, owners, tags, and current freshness status. The Global Asset Graph shows the full cross-pipeline lineage from raw source to final ML model. Asset checks — similar to dbt tests but at the orchestrator level — run after materialisation and can block downstream assets on failure.

# Dagster asset checks — validate asset quality after materialisation
from dagster import asset_check, AssetCheckResult, AssetCheckSeverity

@asset_check(asset=fact_events, description="Ensure no null user_ids in fact table")
def check_no_null_user_ids(context, warehouse: WarehouseResource) -> AssetCheckResult:
    with warehouse.get_engine().connect() as conn:
        null_count = conn.execute(
            text("SELECT COUNT(*) FROM fact_events WHERE user_id IS NULL")
        ).scalar()

    if null_count > 0:
        return AssetCheckResult(
            passed=False,
            severity=AssetCheckSeverity.ERROR,
            metadata={"null_user_id_count": null_count},
            description=f"Found {null_count} rows with null user_id",
        )
    return AssetCheckResult(passed=True)

@asset_check(asset=fact_events, description="Row count is within expected range")
def check_daily_row_count(context, warehouse: WarehouseResource) -> AssetCheckResult:
    partition = context.partition_key
    with warehouse.get_engine().connect() as conn:
        count = conn.execute(
            text("SELECT COUNT(*) FROM fact_events WHERE event_date = :d"),
            {"d": partition},
        ).scalar()

    in_range = 10_000 <= count <= 10_000_000
    return AssetCheckResult(
        passed=in_range,
        severity=AssetCheckSeverity.WARN if not in_range else AssetCheckSeverity.ERROR,
        metadata={"row_count": count, "partition": partition},
    )

# Register checks in Definitions — they run automatically after materialisation
defs = Definitions(
    assets=[fact_events, ...],
    asset_checks=[check_no_null_user_ids, check_daily_row_count],
    resources={...},
)

Decision Framework — When to Choose Airflow vs Dagster

The right choice depends on your existing stack, team expertise, workflow complexity, and operational investment tolerance.

# Decision matrix: Airflow vs Dagster

CHOOSE Apache Airflow when:
  ✓ Existing investment: large DAG library, team Airflow expertise, Astronomer contract
  ✓ Task diversity: ETL, dbt, Spark submit, sensor-based triggers, HTTP calls, ML training
       — Airflow's provider ecosystem (800+ operators) is unmatched
  ✓ Non-data workflows: Airflow is general-purpose; use it for infra automation,
       report generation, or anything beyond pure data transformation
  ✓ Mature managed offering: Astronomer Astro, MWAA (AWS), Cloud Composer (GCP)
       have years of production hardening and enterprise support
  ✓ Cross-team familiarity: most data engineers know Airflow; hiring is easier
  ✓ Simple scheduling: straightforward cron schedules without complex
       partition management or freshness policies

CHOOSE Dagster when:
  ✓ Asset-centric data platform: you think in tables/files/models, not task runs;
       you want a unified lineage graph from ingestion through dbt to ML
  ✓ Testability is a first-class requirement: teams that write unit tests for
       data assets, or use TDD, will find Dagster's model dramatically simpler
  ✓ Partition-heavy pipelines: incremental processing with backfill UI,
       multi-dimensional partitions (date × region × model), partition-aware assets
  ✓ dbt-centric stack: dagster-dbt gives best-in-class dbt integration with
       full model-level asset visibility and cross-project lineage
  ✓ Freshness-driven scheduling: auto-materialisation policies let Dagster
       materialise assets automatically when upstream data is fresh,
       without explicit schedule definitions
  ✓ Greenfield data platform: no migration cost; start with the better model
  ✓ Strong Python typing: Dagster's API is fully typed; mypy/pyright work
       across assets, resources, and configs

PREFER AIRFLOW despite Dagster advantages when:
  ✗ Large existing DAG library (100+ DAGs): migration cost is real;
       Dagster has migration tools but it requires asset modelling work
  ✗ Astronomer contract: switch cost exceeds benefit until renewal
  ✗ Team expertise gap: retraining + new mental model has a cost

MIGRATION PATH (Airflow → Dagster):
  1. Run both in parallel; migrate one pipeline group at a time
  2. Use Dagster's Airflow migration CLI (dagster-airflow package can
     convert simple DAGs to asset graphs automatically)
  3. Prioritise pipelines with heavy partitioning or dbt as early wins
  4. Keep complex sensor-based or cross-system DAGs in Airflow longer

Note

Prefect 3 is a third option worth evaluating for teams that want a task-centric model with better testability than Airflow and a lower migration barrier than Dagster. Prefect's flow and task model is Python-native (decorators on plain functions), its Prefect Cloud managed tier is generous, and its deployment model (work pools) is simpler than Airflow's executor architecture. It lacks Dagster's asset graph and first-class dbt integration, but is a strong fit for ML pipeline orchestration and teams that want minimal infrastructure overhead.

Work with us

Running Airflow in production and hitting friction with testability, asset lineage, or partition management?

We design and implement data orchestration platforms — from Airflow on Kubernetes with KubernetesExecutor and Helm-based deployment to Dagster Software-Defined Asset pipelines with IO Managers, partitioned incremental processing, resource injection, and Dagster Cloud hybrid deployment. We also handle Airflow-to-Dagster migrations including DAG translation, asset modelling, and zero-downtime cutover strategies. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.