Why the Choice Matters
Every data pipeline encodes a latency contract: how quickly must consumers see new data after it is produced? That question — not technology familiarity, not vendor preference — should drive the architecture decision. Get it wrong in the direction of over-engineering and you spend months building a Flink cluster for a use case where a nightly dbt run would have been fine. Get it wrong in the other direction and your fraud detection system is running 6-hour batch jobs.
The three main options each occupy a different point on the latency-complexity curve: dbt incremental models for batch (minutes to hours), Spark Structured Streaming for micro-batch (seconds to minutes), and Apache Flink for true streaming (milliseconds to seconds). Understanding each option deeply — not just its marketing — is the prerequisite for choosing correctly.
Batch (dbt + warehouse)
Data is collected over a time window, transformed as a unit, and made available downstream. Latency is bounded by schedule frequency — typically 15 minutes to 24 hours. Operational complexity is low: SQL-based, version-controlled, CI/CD-friendly. The right choice for reporting, analytics, and any use case where freshness matters in hours rather than seconds.
Micro-batch (Spark Structured Streaming)
Spark processes data in small batches triggered on a schedule (every few seconds to minutes). The API is identical to batch Spark — readStream/writeStream wraps a DataFrame pipeline. Watermarks handle late data. Output modes (append, complete, update) control what gets written. Suitable for dashboards requiring near-real-time refresh, session aggregations, and ETL pipelines that must handle late-arriving events.
True streaming (Apache Flink)
Flink processes events one at a time as they arrive, with millisecond latency. Its event-time processing model — where time is derived from data, not the system clock — is the most correct way to handle out-of-order and late-arriving events. Stateful operations (windows, joins, aggregations) are persisted in RocksDB and backed by distributed checkpoints. The cost is significantly higher operational complexity.
Batch Processing with dbt: Incremental Models
dbt does not run a processing engine — it orchestrates SQL transformations in your data warehouse (BigQuery, Snowflake, Redshift, DuckDB). Its value in the batch processing conversation is the incremental materialization: instead of rebuilding a table from scratch on every run, dbt transforms only the rows that arrived since the last run. This is the batch processing primitive that keeps warehouse compute costs under control at scale.
The incremental strategy determines how new rows merge with existing data. The merge strategy handles upserts (new events update existing records), append adds rows without touching existing ones, and delete+insert replaces a partition entirely. Choose based on whether your source data is append-only or mutable.
-- models/marts/orders/fct_orders_daily.sql
-- dbt incremental model: process only new orders since last run
{{
config(
materialized = 'incremental',
unique_key = 'order_id',
incremental_strategy = 'merge',
on_schema_change = 'append_new_columns',
-- Partition filter: only scan recent partitions (BigQuery cost optimization)
partition_by = {
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by = ['customer_id', 'status']
)
}}
with source_orders as (
select
order_id,
customer_id,
status,
total_amount_cents,
currency,
date(created_at) as order_date,
created_at,
updated_at
from {{ source('raw', 'orders') }}
{% if is_incremental() %}
-- On incremental runs: only process rows updated since last run
-- The this_model macro returns the current table for comparison
where updated_at > (
select coalesce(max(updated_at), '1970-01-01')
from {{ this }}
)
{% endif %}
),
enriched as (
select
o.order_id,
o.customer_id,
o.status,
-- Convert to standard currency using exchange rates
case
when o.currency = 'USD' then o.total_amount_cents
when o.currency = 'EUR' then round(o.total_amount_cents * fx.usd_rate)
else o.total_amount_cents
end as total_amount_usd_cents,
o.order_date,
o.created_at,
o.updated_at,
-- Add derived fields
date_diff(current_date, o.order_date, day) as days_since_order,
row_number() over (
partition by o.customer_id
order by o.created_at
) as customer_order_sequence
from source_orders o
left join {{ ref('dim_fx_rates') }} fx
on o.currency = fx.from_currency
and o.order_date = fx.rate_date
)
select * from enrichedNote
is_incremental() macro evaluates to false during the first run (full refresh) and true on subsequent incremental runs. Always test your incremental filter logic with dbt run --full-refresh after schema changes — incremental models can silently miss rows if the filter predicate doesn't account for all update paths in the source table. See the dbt incremental model documentation for partition-based strategies and the merge_exclude_columns config for large merge operations.Scheduling dbt runs via dbt Cloud, Airflow, or Dagster determines the minimum latency of your batch pipeline. A 15-minute schedule means consumers see data that is up to 15 minutes old plus transformation time. For most analytical use cases — daily reporting, weekly aggregations, BI dashboards — this is entirely acceptable and dramatically simpler than any streaming alternative.
Micro-Batch with Spark Structured Streaming
Spark Structured Streaming builds a continuous processing model on top of Spark's DataFrame API. The key insight is that a streaming DataFrame is treated identically to a static one — the same transformations, joins, and aggregations work on both. Under the hood, Spark executes the query as a sequence of small batch jobs triggered on an interval (processingTime), once per micro-batch of data (once), or continuously (continuous, still experimental).
Watermarks are the mechanism for handling late data. When you define a watermark on an event-time column, Spark tracks the maximum observed event time and drops state (and late events) older than max_event_time - watermark_delay. Without watermarks, stateful operations like windowed aggregations accumulate state indefinitely, eventually causing out-of-memory errors.
# Spark Structured Streaming: Kafka source, event-time windowing, Delta sink
# PySpark — requires pyspark, delta-spark packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, from_json, window, count, sum as spark_sum,
avg, current_timestamp, expr
)
from pyspark.sql.types import (
StructType, StructField, StringType,
LongType, DoubleType, TimestampType
)
spark = SparkSession.builder .appName("OrderStreamProcessor") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# ── Schema definition ─────────────────────────────────────────────────────────
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("amount_cents", LongType(), nullable=False),
StructField("status", StringType(), nullable=True),
StructField("event_time", TimestampType(), nullable=False), # producer timestamp
])
# ── Kafka source ──────────────────────────────────────────────────────────────
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders.events")
.option("startingOffsets", "latest")
# Kafka consumer tuning
.option("maxOffsetsPerTrigger", 100_000) # cap records per micro-batch
.option("kafka.group.id", "spark-order-processor")
.option("failOnDataLoss", "false") # continue on Kafka retention gaps
.load()
)
# ── Parse JSON payload ────────────────────────────────────────────────────────
parsed = (
raw_stream
.select(
col("offset"),
col("partition"),
col("timestamp").alias("kafka_ingest_time"),
from_json(col("value").cast("string"), order_schema).alias("data")
)
.select("offset", "partition", "kafka_ingest_time", "data.*")
# Drop malformed records (nulls from failed JSON parse)
.where(col("order_id").isNotNull())
)
# ── Watermark: tolerate up to 10 minutes of late arrival ─────────────────────
# Spark will:
# - Track max(event_time) seen so far
# - Maintain state for windows ending within max(event_time) - 10 minutes
# - Drop events and state older than the watermark threshold
watermarked = parsed.withWatermark("event_time", "10 minutes")
# ── Windowed aggregation: 5-minute tumbling window, 1-minute slide ────────────
order_metrics = (
watermarked
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"), # sliding window
col("status")
)
.agg(
count("order_id").alias("order_count"),
spark_sum("amount_cents").alias("total_amount_cents"),
avg("amount_cents").alias("avg_amount_cents"),
)
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("status"),
col("order_count"),
col("total_amount_cents"),
col("avg_amount_cents"),
current_timestamp().alias("processed_at"),
)
)
# ── Write to Delta Lake ───────────────────────────────────────────────────────
# Output modes:
# - append: only write new rows (safe for non-aggregated streams)
# - complete: rewrite the entire result table each trigger (small result sets only)
# - update: write only changed rows (requires watermark for aggregations)
query = (
order_metrics
.writeStream
.format("delta")
.outputMode("update") # only emit changed aggregation rows
.option("checkpointLocation", "s3://my-data/checkpoints/order-metrics/")
.option("path", "s3://my-data/streaming/order_metrics/")
.trigger(processingTime="30 seconds") # micro-batch interval
# Partitioning for efficient downstream queries
.partitionBy("status")
.start()
)
query.awaitTermination()Note
startingOffsets value). If you must change the query logic, use a new checkpoint location and plan for state migration. See the Spark checkpointing documentation for the list of changes that require a fresh checkpoint versus those that are safe to make in place.Output Modes and When to Use Each
The output mode controls what rows Spark writes to the sink on each trigger. Choosing the wrong mode causes either silent data loss or unbounded sink growth. The rules are: use append for non-aggregated streams and aggregations with watermarks where you only care about final results. Use update for aggregations where intermediate results are useful (dashboards, real-time counters). Use complete only for small result sets that must be fully rewritten — it does not scale.
True Streaming with Apache Flink
Apache Flink is a stateful stream processing framework designed for exactly-once semantics, event-time correctness, and millisecond latency at high throughput. Unlike Spark's micro-batch model, Flink processes records individually as they arrive. Its execution model — TaskManagers executing parallel operator instances connected by network buffers — is designed for continuous, long-running streaming jobs rather than batch execution.
Flink's event-time processing model is its most important differentiator. Every record carries a timestamp extracted by a TimestampAssigner, and watermarks — special records injected into the stream — advance the event-time clock. When a watermark with timestamp T arrives at an operator, Flink triggers all windows ending before Tand discards any late events. This makes Flink's output deterministic regardless of network delays, consumer lag, or producer clock skew.
// Apache Flink: DataStream API with event-time watermarks and keyed windows
// Java — requires flink-streaming-java, flink-connector-kafka
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class OrderMetricsPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// ── Checkpoint configuration (exactly-once semantics) ──────────────
// Checkpoints are the distributed snapshots that allow recovery
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE
);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(120_000);
// Retain checkpoints on cancellation for manual recovery
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// State backend: RocksDB for large state (vs HashMapStateBackend for small state)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// ── Kafka source ───────────────────────────────────────────────────
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders.events")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> rawStream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(), // we assign watermarks below
"Kafka Orders Source"
);
// ── Parse JSON and assign event-time watermarks ────────────────────
DataStream<OrderEvent> orderStream = rawStream
.map(json -> OrderEvent.fromJson(json))
.filter(event -> event != null) // drop parse failures
// Watermark strategy: bound out-of-orderness to 30 seconds
// Events more than 30 seconds late are dropped
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner(
(event, recordTimestamp) -> event.getEventTimeMs()
)
// Idle source handling: advance watermark even if no events arrive
// (prevents downstream windows from stalling)
.withIdleness(Duration.ofMinutes(1))
);
// ── Keyed window aggregation ───────────────────────────────────────
// Key by status, compute 5-minute tumbling windows
DataStream<OrderWindowMetric> metrics = orderStream
.keyBy(OrderEvent::getStatus)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
// Allow late events up to 1 minute after the window closes
// (they update the window result, triggering a late firing)
.allowedLateness(Time.minutes(1))
// Side output: collect elements dropped as too late for debugging
.sideOutputLateData(lateOutputTag)
.aggregate(new OrderMetricsAggregator());
// ── Sink: write to Kafka output topic ─────────────────────────────
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("orders.metrics.5min")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// Exactly-once delivery: requires Kafka transactions
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-order-metrics")
.build();
metrics.map(OrderWindowMetric::toJson).sinkTo(kafkaSink);
// Execute the pipeline
env.execute("Order Metrics Pipeline");
}
}
// ── Aggregate function: accumulates order stats per window ────────────────────
class OrderMetricsAggregator
implements AggregateFunction<OrderEvent, OrderAccumulator, OrderWindowMetric> {
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator(0L, 0L, 0L);
}
@Override
public OrderAccumulator add(OrderEvent event, OrderAccumulator acc) {
return new OrderAccumulator(
acc.count + 1,
acc.totalAmountCents + event.getAmountCents(),
acc.maxAmountCents < event.getAmountCents()
? event.getAmountCents() : acc.maxAmountCents
);
}
@Override
public OrderWindowMetric getResult(OrderAccumulator acc) {
return new OrderWindowMetric(
acc.count,
acc.totalAmountCents,
acc.count > 0 ? acc.totalAmountCents / acc.count : 0,
acc.maxAmountCents
);
}
@Override
public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
return new OrderAccumulator(
a.count + b.count,
a.totalAmountCents + b.totalAmountCents,
Math.max(a.maxAmountCents, b.maxAmountCents)
);
}
}Note
OffsetsInitializer with committed offsets), checkpointing must be enabled, and the Kafka sink must use transactional delivery (DeliveryGuarantee.EXACTLY_ONCE). Exactly-once across Kafka and a database sink requires a two-phase commit sink implementation or idempotent writes. See the Flink Kafka connector documentation for the full configuration matrix.Flink SQL: The Lower-Barrier Entry Point
Flink SQL provides a SQL interface over the DataStream API, making Flink accessible to data engineers who know SQL but not Java. It supports tumbling, hopping, and session windows via the TUMBLE(), HOP(), and SESSION() table-valued functions, watermark declarations in the DDL, and connectors to Kafka, JDBC, Iceberg, and Delta Lake.
-- Flink SQL: streaming aggregation over a Kafka source
-- Run via Flink SQL Client or embedded in a Python/Java job
-- ── Create Kafka source table ──────────────────────────────────────────────
CREATE TABLE orders_raw (
order_id STRING,
customer_id STRING,
amount_cents BIGINT,
status STRING,
event_time TIMESTAMP(3), -- event timestamp from message payload
-- Declare watermark: tolerate 30-second late arrivals
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders.events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-processor',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- ── Create output table (Kafka sink) ──────────────────────────────────────
CREATE TABLE order_metrics_5min (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
status STRING,
order_count BIGINT,
total_amount BIGINT,
avg_amount DOUBLE,
PRIMARY KEY (window_start, window_end, status) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders.metrics.5min',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- ── Streaming INSERT: windowed aggregation ────────────────────────────────
INSERT INTO order_metrics_5min
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
status,
COUNT(order_id) AS order_count,
SUM(amount_cents) AS total_amount,
AVG(CAST(amount_cents AS DOUBLE)) AS avg_amount
FROM orders_raw
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
status;
-- ── Temporal join: enrich stream with slowly-changing dimension ───────────
-- Flink supports temporal table joins to look up the version of a dimension
-- that was valid at the time the stream event occurred (event-time join).
CREATE TABLE dim_customers (
customer_id STRING,
tier STRING,
region STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://db:5432/analytics',
'table-name' = 'dim_customers',
'username' = 'flink_reader',
'password' = '${DIM_DB_PASSWORD}',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '5 min'
);
SELECT
o.order_id,
o.amount_cents,
o.event_time,
c.tier,
c.region
FROM orders_raw AS o
LEFT JOIN dim_customers FOR SYSTEM_TIME AS OF o.event_time AS c
ON o.customer_id = c.customer_id;The Lambda and Kappa Architectures
As streaming and batch pipelines co-exist in production, two competing architecture patterns emerged to reconcile them. Understanding both helps teams avoid the traps each carries.
Lambda Architecture
Lambda runs two parallel pipelines: a batch layer (Spark batch, dbt) that reprocesses all historical data on a schedule to produce accurate results, and a speed layer (Flink, Spark Streaming) that produces low-latency but potentially approximate results for recent data. Consumers merge results from both layers. The problem: you maintain two codebases producing the same result by different means. Any business logic change must be applied to both. Operational complexity is roughly double. Use Lambda only when you genuinely need both batch reprocessing guarantees AND sub-second latency — rare in practice.
Kappa Architecture
Kappa eliminates the batch layer entirely. All processing — historical reprocessing and real-time — runs through the same streaming pipeline. When you need to reprocess historical data, you replay the Kafka topic from the beginning with a new consumer group. This works if your Kafka topic retains sufficient history (or you use a replayable log like Apache Iceberg change files). The tradeoff: streaming engines are harder to debug, test, and iterate on than SQL-based batch tools. Teams often underestimate this operational cost.
The Pragmatic Hybrid
Most production teams end up with a hybrid that doesn't fit cleanly into either label: streaming for ingestion and real-time signals (Flink or Spark Streaming writing to Delta/Iceberg), batch for analytics transformations and reporting (dbt over the same lakehouse tables). The two systems share storage but serve different consumers. This is not Lambda — there is no merging of results — it is simply using the right tool for each job on a shared storage layer.
Decision Framework: Concrete Rules
The following decision rules are opinionated but grounded in production experience. Use them as a starting point, not a substitute for understanding your specific latency, cost, and operational constraints.
# Streaming vs Batch decision rules
# Apply in order — first matching rule wins
# ── Use dbt batch (scheduled SQL) when: ─────────────────────────────────────
# 1. Freshness requirement is > 15 minutes
# Examples: daily reporting, weekly aggregations, overnight ETL
# 2. Source is a relational database (Postgres, MySQL) or data warehouse
# 3. Primary consumers are BI tools, dashboards, or data analysts
# 4. Team is SQL-fluent but not JVM/Python streaming engineers
# 5. Data volume is moderate (< 100M rows/run is comfortable for most warehouses)
# 6. You need full historical reprocessing with schema evolution
# ── Use Spark Structured Streaming when: ────────────────────────────────────
# 1. Freshness requirement is 30 seconds – 15 minutes
# Examples: near-real-time dashboards, alerting on aggregated metrics
# 2. Source is Kafka, Kinesis, or Delta/Iceberg change files
# 3. You already run Spark (Databricks, EMR, GKE with Spark Operator)
# 4. Team knows PySpark DataFrames (lower learning curve than Flink)
# 5. You need windowed aggregations with watermarks and can tolerate
# micro-batch latency (30s trigger is the practical minimum for most sinks)
# 6. Exactly-once is required but sub-second latency is NOT required
# ── Use Apache Flink when: ───────────────────────────────────────────────────
# 1. Freshness requirement is < 10 seconds
# Examples: fraud detection, real-time recommendation, live leaderboards
# 2. You need true event-time processing with precise watermark control
# (out-of-order events from mobile apps, IoT sensors, distributed producers)
# 3. Complex stateful computations: pattern matching (CEP), stream-stream joins,
# iterative graph processing, temporal table lookups
# 4. High throughput: > 1M events/second per job
# 5. You can afford the operational overhead: JVM tuning, state backend config,
# checkpoint sizing, backpressure monitoring, savepoint management
# 6. Exactly-once end-to-end is a hard requirement
# ── Red flags for each option ────────────────────────────────────────────────
# dbt red flags:
# - Source produces CDC events (use Debezium → Kafka → streaming instead)
# - Latency requirement is < 5 minutes and rising
# - Need to react to individual events (not aggregations)
# Spark Streaming red flags:
# - Trigger interval < 10 seconds (micro-batch overhead starts hurting)
# - Need complex event pattern detection (use Flink CEP)
# - Team has no existing Spark infrastructure
# Flink red flags:
# - Team has never run a JVM streaming system in production
# - Use case is analytics/reporting (overkill; use dbt)
# - Budget is tight (Flink clusters are expensive to run and maintain)Production Patterns: Monitoring, Backpressure, and Checkpointing
Streaming systems fail in ways that batch systems do not: consumer lag accumulates silently, backpressure cascades through operator chains, state stores grow unboundedly, and checkpoint failures cause job restarts that replay duplicate events if exactly-once is not configured correctly. The following patterns address the most common production failure modes.
Kafka Consumer Lag Monitoring
Consumer lag — the difference between the latest Kafka offset and the last committed offset — is the primary health metric for any streaming pipeline. A pipeline processing in real-time has near-zero lag. Lag growing monotonically means the consumer cannot keep up with the producer. Lag spiking and recovering means the consumer keeps up on average but falls behind during bursts. Alert on both patterns.
# Prometheus metrics for Kafka consumer lag (via kafka-exporter or Confluent Control Center)
# Alert: consumer group falling behind
- alert: KafkaConsumerLagHigh
expr: |
kafka_consumergroup_lag{
consumergroup="flink-order-processor",
topic="orders.events"
} > 100000
for: 5m
labels:
severity: error
team: data-engineering
annotations:
summary: "Kafka consumer lag > 100k messages on orders.events"
description: |
Consumer group flink-order-processor is {{ $value }} messages behind.
Either the Flink job is unhealthy, under-resourced, or producer throughput
has spiked. Check Flink TaskManager logs and checkpoint metrics.
runbook_url: "https://runbooks.internal/kafka-consumer-lag"
# Alert: lag growing continuously for 15 minutes (indicates real bottleneck)
- alert: KafkaConsumerLagGrowing
expr: |
deriv(
kafka_consumergroup_lag{consumergroup="flink-order-processor"}[5m]
) > 500
for: 15m
labels:
severity: critical
annotations:
summary: "Kafka consumer lag increasing at > 500 msgs/sec for 15 minutes"
# ── Flink job metrics (via Flink Prometheus Reporter) ────────────────────────
# Flink exposes metrics at :9249/metrics by default when configured with
# MetricReporters.prom.class = org.apache.flink.metrics.prometheus.PrometheusReporter
# Checkpoint duration (should be well under checkpoint interval)
flink_taskmanager_job_task_checkpointAlignmentTime # time waiting for barriers
flink_jobmanager_job_lastCheckpointDuration # total checkpoint time
# Backpressure ratio (0.0 = no backpressure, 1.0 = fully blocked)
flink_taskmanager_job_task_backPressuredTimeMsPerSecond
# Watermark age (how far behind the current wall clock the watermark is)
flink_taskmanager_job_task_currentInputWatermark
# Alert: Flink checkpoint failing
- alert: FlinkCheckpointFailing
expr: |
increase(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) > 0
for: 2m
labels:
severity: critical
annotations:
summary: "Flink checkpoints failing — data loss risk on job restart"Note
Checkpoint Sizing and State Backend Selection
Flink state backends determine where operator state is stored during execution and how checkpoints are taken. The HashMapStateBackend stores state in JVM heap — fast but limited by available memory, unsuitable for large state. The EmbeddedRocksDBStateBackend stores state on local SSD via RocksDB with incremental checkpointing — suitable for state sizes up to several hundred GB per TaskManager, at the cost of slower state access (disk I/O vs heap).
# flink-conf.yaml — production checkpoint and state configuration
# ── State Backend ─────────────────────────────────────────────────────────────
state.backend: rocksdb
state.backend.incremental: true # incremental checkpoints (much faster)
state.checkpoints.dir: s3://my-data/flink-checkpoints/
state.savepoints.dir: s3://my-data/flink-savepoints/
# ── Checkpoint tuning ─────────────────────────────────────────────────────────
execution.checkpointing.interval: 60s
execution.checkpointing.min-pause: 30s # min gap between checkpoints
execution.checkpointing.timeout: 120s # fail checkpoint if not done in 2 min
execution.checkpointing.max-concurrent-checkpoints: 1
# Keep last 3 checkpoints on cancellation for manual recovery
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Number of completed checkpoints to retain on HDFS/S3
state.checkpoints.num-retained: 3
# ── RocksDB tuning (for high-throughput keyed state) ─────────────────────────
state.backend.rocksdb.memory.managed: true # use Flink's managed memory
state.backend.rocksdb.memory.fixed-per-slot: 256 mb
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
# Increase write buffer for write-heavy workloads
state.backend.rocksdb.writebuffer.size: 64 mb
state.backend.rocksdb.writebuffer.count: 4
# ── TaskManager resources ─────────────────────────────────────────────────────
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 4g
taskmanager.memory.managed.fraction: 0.4 # 40% of TM memory for RocksDB
# ── Restart strategy ─────────────────────────────────────────────────────────
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.max-backoff: 5min
restart-strategy.exponential-delay.reset-backoff-threshold: 10minSummary: At a Glance
The three options occupy fundamentally different positions on the spectrum from operational simplicity to real-time capability. Neither is universally better — the right choice is determined by latency requirements, team capabilities, and operational budget.
dbt + Warehouse
Latency: minutes to hours. Complexity: low (SQL, version-controlled, CI/CD-friendly). Cost: warehouse query cost only. Best for: analytics, reporting, BI, data products consumed by analysts. Not suitable for: anything requiring sub-minute freshness or event-by-event processing.
Spark Structured Streaming
Latency: 30 seconds to 5 minutes (practical minimum). Complexity: medium (PySpark DataFrames, checkpoint management, watermark tuning). Cost: Spark cluster compute (Databricks, EMR, GKE). Best for: near-real-time dashboards, event aggregations, teams already on Spark. Not suitable for: sub-10-second latency, complex event pattern detection.
Apache Flink
Latency: milliseconds to seconds. Complexity: high (JVM ops, RocksDB tuning, checkpoint sizing, savepoint management, backpressure diagnosis). Cost: dedicated cluster, significant engineering investment. Best for: fraud detection, real-time personalization, IoT event processing, any use case demanding true event-time correctness and sub-second latency. Not suitable for: teams without JVM streaming experience, analytical workloads.
Note
Work with us
Designing or migrating a data pipeline and unsure whether streaming or batch fits your latency and cost requirements?
We design and implement production data pipelines — from dbt batch models and Spark Structured Streaming to Apache Flink event-time processing, Kafka-native topologies, and hybrid Lambda/Kappa architectures. Let’s talk.
Get in touch