Why Kafka Streams?
Most stream processing frameworks require you to operate a separate cluster — Flink JobManagers, Spark executors, YARN ResourceManagers. Kafka Streams takes a different approach: it is a Java library that runs inside your application process. There is no cluster to provision. Your application instances are the stream processing nodes, and Kafka itself handles coordination via consumer groups.
This embedded model is not a limitation — it is a deliberate design for operational simplicity. A Kafka Streams application can scale from a single JVM instance on a laptop to a fleet of hundreds of instances in Kubernetes, with Kafka automatically rebalancing partitions across them. You deploy it exactly like any other microservice: build a JAR, run it, scale horizontally.
For teams already running Kafka, adding Kafka Streams means adding a library dependency — not a new infrastructure component. Compare this to Apache Flink, which delivers higher throughput for complex graph topologies but demands a dedicated cluster with its own monitoring, upgrade cycles, and operational runbooks. Kafka Streams is the right choice when your team wants exactly-once stateful processing without the operational overhead of a second distributed system.
No separate cluster
Runs as a library inside your app. Deploy it like any microservice — no JobManager, no YARN, no additional infra.
Exactly-once semantics
Transactional state updates and output writes. Enable with a single config flag — no custom checkpointing logic.
Elastic scaling
Add instances and Kafka rebalances partitions automatically. Scale down the same way. No manual partition reassignment.
Kafka Streams vs Flink vs Spark Streaming
| Property | Kafka Streams | Apache Flink | Spark Structured Streaming |
|---|---|---|---|
| Deployment | Library — runs in your JVM | Dedicated Flink cluster | Spark cluster (YARN/k8s) |
| State backend | RocksDB (local, Kafka-backed) | RocksDB / Heap (S3-backed) | Executor memory / HDFS |
| Exactly-once | Yes (Kafka sources/sinks only) | Yes (any source/sink) | Yes (with idempotent sinks) |
| Latency | Low (ms-level, record-at-a-time) | Very low (sub-ms pipelines) | Higher (micro-batch) |
| Operational complexity | Low — no additional infra | High — cluster management | High — Spark cluster |
| Data sources | Kafka only | Kafka, Kinesis, files, JDBC | Kafka, files, Delta, Iceberg |
| Best for | Kafka-native microservice pipelines | Complex event processing at scale | Batch + streaming unification |
Core Abstractions: KStream, KTable, and GlobalKTable
Kafka Streams models data as three primary abstractions. Understanding when to use each is the foundation of correct topology design.
A KStream represents an unbounded sequence of records — each record is an independent event. A KTable represents a changelog stream where each record is an upsert: the latest value for a key replaces any previous value. A GlobalKTable is a KTable whose contents are fully replicated to every instance in the application — useful for small reference data that needs to be joined without co-partitioning constraints.
// Maven dependency (pom.xml)
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
// ── Application bootstrap ─────────────────────────────────────────────
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// ── KStream: raw event stream from the "orders" topic ────────────────
KStream<String, String> orderStream = builder.stream("orders");
// ── KTable: latest product price per product_id key ──────────────────
// Every new record for a key overwrites the previous value
KTable<String, String> priceTable = builder.table(
"product-prices",
Materialized.as("price-store") // name the state store
);
// ── GlobalKTable: small reference data replicated to all instances ────
// No co-partitioning requirement for joins; entire table fits in memory
GlobalKTable<String, String> regionTable = builder.globalTable(
"regions",
Materialized.as("region-store")
);
// Build and start the topology
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown on JVM exit
Runtime.getRuntime().addShutdownHook(
new Thread(streams::close)
);Note
The mental model for KStream vs KTable maps directly to event-driven architecture semantics: a KStream is like a log of facts (immutable, append-only events), while a KTable is like a database table (mutable, keyed state). For a deeper exploration of event-driven patterns, see our article on event-driven architecture with Kafka and Schema Registry.
Stateful Aggregations: Count, Reduce, and Aggregate
Kafka Streams maintains aggregation state in local RocksDB stores that are continuously backed up to compacted Kafka changelog topics. This means state survives application restarts — on recovery, instances replay the changelog to restore their local store before resuming processing.
The three aggregation operators are count() (count records per key), reduce() (combine records of the same type into one), and aggregate() (the most general form — accumulate any state type per key).
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
// ── Count orders per customer ─────────────────────────────────────────
KStream<String, OrderEvent> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderEventSerde)
);
KTable<String, Long> orderCountPerCustomer = orders
.groupByKey()
.count(Materialized.<String, Long>as("order-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
orderCountPerCustomer.toStream().to(
"order-counts-output",
Produced.with(Serdes.String(), Serdes.Long())
);
// ── Reduce: running total revenue per customer ────────────────────────
KTable<String, Double> revenuePerCustomer = orders
.mapValues(order -> order.getAmount())
.groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
.reduce(Double::sum,
Materialized.as("revenue-store"));
// ── Aggregate: complex accumulator — track per-customer stats ─────────
// CustomerStats is a POJO: { orderCount, totalRevenue, lastOrderTs }
KTable<String, CustomerStats> customerStats = orders
.groupByKey()
.aggregate(
CustomerStats::new, // initialiser
(customerId, order, stats) -> { // adder
stats.incrementCount();
stats.addRevenue(order.getAmount());
stats.setLastOrderTs(order.getTimestamp());
return stats;
},
Materialized.<String, CustomerStats>as("customer-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(customerStatsSerde)
);Windowed Aggregations: Tumbling, Hopping, Sliding, and Session Windows
Time-bounded aggregations group records into windows before aggregating. Kafka Streams supports four window types covering the full range of time-series use cases.
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;
// ── Tumbling windows: fixed, non-overlapping 5-minute buckets ─────────
// Every record belongs to exactly one window.
KTable<Windowed<String>, Long> tumblingOrderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("tumbling-order-counts-store"));
// ── Hopping windows: overlapping — 10-min window, 5-min hop ──────────
// A record belongs to multiple windows (overlap = window_size - hop).
KTable<Windowed<String>, Long> hoppingCounts = orders
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(5))
)
.count(Materialized.as("hopping-order-counts-store"));
// ── Sliding windows: include all records within a time difference ─────
// Window starts when a new record arrives; ends timeDifference later.
KTable<Windowed<String>, Long> slidingCounts = orders
.groupByKey()
.windowedBy(
SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
)
.count(Materialized.as("sliding-order-counts-store"));
// ── Session windows: activity-based — gap closes inactive sessions ────
// Records within inactivityGap of each other are in the same session.
KTable<Windowed<String>, Long> sessionCounts = orders
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30))
)
.count(Materialized.as("session-order-counts-store"));
// ── Output windowed results with window boundaries in the key ─────────
tumblingOrderCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key() + "@" +
windowedKey.window().startTime() + "/" +
windowedKey.window().endTime(),
count
))
.to("windowed-order-counts-output",
Produced.with(Serdes.String(), Serdes.Long()));Note
WithNoGrace variants discard late-arriving records immediately. Use ofSizeAndGrace(windowSize, gracePeriod) to accept records that arrive up to gracePeriod after the window closes. Longer grace periods increase result accuracy but delay downstream output and increase state store size. A grace period of 1–5% of the window size is a common starting point.Joins: Stream-Stream, Stream-Table, and Stream-GlobalKTable
Kafka Streams supports three join patterns, each with different co-partitioning requirements and temporal semantics. Choosing the wrong join type is one of the most common sources of bugs and performance problems in Kafka Streams applications.
KStream-KStream Join (Co-partitioning Required)
A stream-stream join correlates records from two topics within a time window. Both topics must have the same number of partitions and the same partitioning strategy — this is the co-partitioning requirement. Kafka Streams will throw a TopologyException at startup if the partition counts differ.
// ── KStream-KStream join: correlate orders with payments ─────────────
// Both topics must be co-partitioned (same partition count, same key)
KStream<String, OrderEvent> orderStream = builder.stream("orders");
KStream<String, PaymentEvent> paymentStream = builder.stream("payments");
// Join within a 10-minute window: match orders with their payment
KStream<String, EnrichedOrder> enriched = orderStream.join(
paymentStream,
(order, payment) -> EnrichedOrder.builder()
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.amount(order.getAmount())
.paymentMethod(payment.getMethod())
.paymentStatus(payment.getStatus())
.build(),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderEventSerde, paymentEventSerde)
);
enriched.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde));
// ── Left join: emit order even if no matching payment arrives ─────────
KStream<String, EnrichedOrder> leftJoined = orderStream.leftJoin(
paymentStream,
(order, payment) -> EnrichedOrder.builder()
.orderId(order.getOrderId())
.paymentStatus(payment != null ? payment.getStatus() : "PENDING")
.build(),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderEventSerde, paymentEventSerde)
);KStream-KTable Join (Enrichment Pattern)
The most common join pattern: enrich a stream of events with slowly-changing reference data stored in a KTable. The KTable lookup is local — no network round-trip, no external database call. Co-partitioning is still required, but the join is not time-windowed: each stream record is joined with the current KTable value for its key.
// ── KStream-KTable join: enrich orders with customer profile ─────────
// Both "orders" and "customer-profiles" must have the same partition count.
KStream<String, OrderEvent> orderStream = builder.stream("orders");
KTable<String, CustomerProfile> customerTable = builder.table("customer-profiles");
KStream<String, EnrichedOrder> enriched = orderStream.join(
customerTable,
(order, customer) -> {
if (customer == null) {
// Customer not yet in KTable — handle gracefully
return EnrichedOrder.withUnknownCustomer(order);
}
return EnrichedOrder.builder()
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.customerTier(customer.getTier())
.customerRegion(customer.getRegion())
.amount(order.getAmount())
.build();
},
Joined.with(Serdes.String(), orderEventSerde, customerProfileSerde)
);KStream-GlobalKTable Join (No Co-partitioning Required)
When reference data is small enough to replicate to every instance (typically under a few hundred MB), use a GlobalKTable. Every instance gets the complete table — no co-partitioning constraints, and the join key can be derived from the stream value rather than being restricted to the record key.
// ── KStream-GlobalKTable join: add region name from a small lookup ────
// GlobalKTable is fully replicated — no co-partitioning requirement.
KStream<String, OrderEvent> orderStream = builder.stream("orders");
GlobalKTable<String, RegionInfo> regionTable = builder.globalTable("regions");
KStream<String, EnrichedOrder> withRegion = orderStream.join(
regionTable,
// Key extractor: derive the join key FROM the stream value
(orderKey, orderValue) -> orderValue.getRegionCode(),
(order, region) -> EnrichedOrder.builder()
.orderId(order.getOrderId())
.regionName(region.getDisplayName())
.currency(region.getCurrency())
.build()
);State Stores: RocksDB, In-Memory, and Custom Stores
Kafka Streams persists state in local state stores. The default implementation uses RocksDB — an embedded LSM-tree key-value engine from Facebook that handles datasets larger than available RAM by spilling to disk. Every persistent store is backed by a Kafka changelog topic, enabling recovery and replication to standby instances.
import org.apache.kafka.streams.state.*;
import org.apache.kafka.streams.processor.api.*;
// ── Persistent RocksDB store (default) ───────────────────────────────
// Materialized.as() names the store so it can be queried via IQ
KTable<String, Long> counts = orders
.groupByKey()
.count(Materialized.<String, Long>as("persistent-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
// RocksDB is the default — no extra config needed
);
// ── In-memory store (no persistence, fastest) ─────────────────────────
// Use when state is small, recovery time is acceptable, or for testing.
KTable<String, Long> inMemoryCounts = orders
.groupByKey()
.count(Materialized.<String, Long>as(
Stores.inMemoryKeyValueStore("memory-count-store"))
);
// ── Punctuator: scheduled processing on a state store ────────────────
// Emit alerts for customers who have not ordered in 24 hours.
class OrderMonitorProcessor
implements Processor<String, OrderEvent, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> lastOrderStore;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.lastOrderStore =
context.getStateStore("last-order-timestamp-store");
// Schedule a check every 60 seconds (wall-clock time)
context.schedule(
Duration.ofSeconds(60),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
long cutoff = timestamp - Duration.ofHours(24).toMillis();
try (KeyValueIterator<String, Long> it =
lastOrderStore.all()) {
while (it.hasNext()) {
KeyValue<String, Long> kv = it.next();
if (kv.value < cutoff) {
context.forward(
new Record<>(kv.key, "INACTIVE", timestamp)
);
}
}
}
}
);
}
@Override
public void process(Record<String, OrderEvent> record) {
lastOrderStore.put(record.key(), record.timestamp());
}
}Note
rocksdb.block.cache.size to at least 256 MB per store instance using a custom RocksDBConfigSetter. Cold-cache reads from disk can be 10-100× slower than cache hits.Interactive Queries: Serving State Over a REST API
Kafka Streams exposes a queryable state store API called Interactive Queries (IQ). Instead of materialising aggregation results back to a Kafka topic and consuming them from a database, your application instances can serve state directly over HTTP — turning each Kafka Streams instance into a distributed key-value lookup service.
Because state is partitioned across instances (each instance owns the state for its assigned partitions), a query for a specific key must be routed to the instance that owns that partition. Kafka Streams provides metadata APIs to discover the owner, enabling a proxy or redirect pattern.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.*;
// ── Query local state store directly ─────────────────────────────────
ReadOnlyKeyValueStore<String, Long> countStore = streams.store(
StoreQueryParameters.fromNameAndType(
"persistent-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Look up a specific key — O(1) RocksDB point lookup
Long orderCount = countStore.get("customer-42");
// Range scan: iterate keys from "a" to "m"
try (KeyValueIterator<String, Long> it = countStore.range("a", "m")) {
while (it.hasNext()) {
KeyValue<String, Long> kv = it.next();
System.out.println(kv.key + " -> " + kv.value);
}
}
// ── Query windowed state stores ───────────────────────────────────────
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
StoreQueryParameters.fromNameAndType(
"tumbling-order-counts-store",
QueryableStoreTypes.windowStore()
)
);
Instant from = Instant.now().minus(Duration.ofMinutes(10));
Instant to = Instant.now();
// Fetch all windows for a key in the time range
try (WindowStoreIterator<Long> it =
windowStore.fetch("customer-42", from, to)) {
while (it.hasNext()) {
KeyValue<Long, Long> kv = it.next(); // key = window start ms
System.out.println("Window " + kv.key + ": " + kv.value);
}
}
// ── Discover which instance owns a key (for routing) ─────────────────
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"persistent-count-store",
"customer-42",
Serdes.String().serializer()
);
String ownerHost = metadata.activeHost().host();
int ownerPort = metadata.activeHost().port();
// If this instance is not the owner, proxy the request:
// HttpClient.get("http://" + ownerHost + ":" + ownerPort + "/state/customer-42")Configure standby replicas to enable fast failover. With num.standby.replicas=1, Kafka Streams maintains a warm copy of each state partition on a second instance. If the active instance fails, the standby can take over without replaying the full changelog — typically within seconds rather than minutes.
// ── Configure Interactive Queries endpoint for peer discovery ────────
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "host1.internal:8080");
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// ── Query standby replicas (stale but always available) ───────────────
ReadOnlyKeyValueStore<String, Long> staleStore = streams.store(
StoreQueryParameters
.fromNameAndType("persistent-count-store",
QueryableStoreTypes.keyValueStore())
.enableStaleStores() // allow reads from standby replicas
);Exactly-Once Semantics
Kafka Streams achieves exactly-once processing via Kafka's transactional producer API. When processing.guarantee=exactly_once_v2 is enabled, each processing iteration atomically commits: consumed offsets, state store changelog records, and output topic records — all in a single Kafka transaction. If the instance crashes mid-transaction, Kafka aborts the transaction and the records are never visible to consumers.
// ── Enable exactly-once semantics (EOS v2 — Kafka 2.5+) ─────────────
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// EOS v2 requirements:
// - Kafka brokers 2.5+
// - Idempotent producer is automatically enabled
// - Each task gets its own transactional producer (vs shared in EOS v1)
// ── Performance cost of EOS ──────────────────────────────────────────
// EOS adds ~20-40% latency overhead vs at-least-once due to:
// 1. Transactional producer initialization (one per task)
// 2. Commit waits for all replicas (acks=all is forced)
// 3. Consumer isolation: downstream consumers with
// isolation.level=read_committed skip aborted records
// ── Committed offset fencing (zombie protection) ──────────────────────
// EOS v2 uses epoch-based fencing — if a rebooted instance tries to
// produce under an old epoch, the broker rejects it. This prevents
// "zombie writers" from corrupting state during rebalances.
// ── When to use at-least-once instead ────────────────────────────────
// Use at-least-once (default) when:
// - Downstream processing is idempotent (safe to replay)
// - Low latency is more important than strict correctness
// - Throughput is the primary constraint
// - Your pipeline has non-Kafka sinks (EOS only covers Kafka end-to-end)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE); // defaultError Handling and Dead Letter Queues
Production Kafka Streams applications must handle two categories of errors: deserialization failures (poison pill records that cannot be parsed) and production failures (transient or permanent write errors to output topics). Unhandled exceptions in either category will crash the stream thread, triggering a rebalance.
The standard pattern is to route unparseable records to a dead letter topic (DLT) rather than crashing, preserving the raw bytes for later inspection and replay. This is the same DLQ pattern used in Change Data Capture pipelines with Debezium where connector errors must not block the main pipeline.
import org.apache.kafka.streams.errors.*;
import org.apache.kafka.clients.producer.*;
// ── Custom DeserializationExceptionHandler ────────────────────────────
// Called when a record's key or value cannot be deserialized.
public class DeadLetterDeserializationHandler
implements DeserializationExceptionHandler {
private Producer<byte[], byte[]> dlqProducer;
private static final String DLQ_TOPIC = "dead-letter-orders";
@Override
public DeserializationHandlerResponse handle(
ProcessorContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception
) {
// Forward the raw bytes to the dead letter topic
dlqProducer.send(new ProducerRecord<>(
DLQ_TOPIC,
null,
record.timestamp(),
record.key(),
record.value(),
Arrays.asList(
new RecordHeader("source-topic",
record.topic().getBytes()),
new RecordHeader("source-partition",
String.valueOf(record.partition()).getBytes()),
new RecordHeader("source-offset",
String.valueOf(record.offset()).getBytes()),
new RecordHeader("error-message",
exception.getMessage().getBytes())
)
));
// CONTINUE: skip this record and keep processing
return DeserializationHandlerResponse.CONTINUE;
// FAIL: crash the stream thread (use for critical schema violations)
}
}
// ── Register the handler in StreamsConfig ─────────────────────────────
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeadLetterDeserializationHandler.class);
// ── ProductionExceptionHandler: handle write failures ─────────────────
public class RetryableProductionExceptionHandler
implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception
) {
if (exception instanceof RecordTooLargeException) {
// Log and skip — record can never succeed
log.error("Record too large, dropping: {}", record.topic());
return ProductionExceptionHandlerResponse.CONTINUE;
}
// All other errors: crash the stream thread and let Kafka retry
return ProductionExceptionHandlerResponse.FAIL;
}
}
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
RetryableProductionExceptionHandler.class);Testing with TopologyTestDriver
Kafka Streams provides TopologyTestDriver — an in-process test harness that simulates Kafka topics and advances wall-clock and event-time without a real broker. Unit tests run in milliseconds and can verify aggregation logic, join behavior, and windowed output deterministically.
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.junit.jupiter.api.*;
import java.time.Instant;
class OrderAggregationTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, OrderEvent> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
@BeforeEach
void setup() {
StreamsBuilder builder = new StreamsBuilder();
// Build the same topology as production
KTable<String, Long> counts = buildOrderCountTopology(builder);
counts.toStream().to("order-counts-output");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
testDriver = new TopologyTestDriver(builder.build(), props);
inputTopic = testDriver.createInputTopic(
"orders",
Serdes.String().serializer(),
orderEventSerde.serializer()
);
outputTopic = testDriver.createOutputTopic(
"order-counts-output",
Serdes.String().deserializer(),
Serdes.Long().deserializer()
);
}
@Test
void countIncrementsPerCustomer() {
Instant t0 = Instant.parse("2026-06-14T10:00:00Z");
inputTopic.pipeInput("customer-1",
new OrderEvent("order-1", "customer-1", 99.99), t0);
inputTopic.pipeInput("customer-1",
new OrderEvent("order-2", "customer-1", 49.99), t0.plusSeconds(30));
inputTopic.pipeInput("customer-2",
new OrderEvent("order-3", "customer-2", 199.99), t0.plusSeconds(60));
var output = outputTopic.readKeyValuesToMap();
Assertions.assertEquals(2L, output.get("customer-1"));
Assertions.assertEquals(1L, output.get("customer-2"));
}
@Test
void tumblingWindowProducesWindowedCount() {
// Advance event time to trigger window close
Instant t0 = Instant.parse("2026-06-14T10:00:00Z");
inputTopic.pipeInput("customer-1", orderEvent(), t0);
inputTopic.pipeInput("customer-1", orderEvent(), t0.plusSeconds(200));
inputTopic.pipeInput("customer-1", orderEvent(), t0.plusSeconds(400));
// Third record crosses 5-minute window boundary — first window closes.
var records = outputTopic.readKeyValuesToList();
Assertions.assertFalse(records.isEmpty());
}
@AfterEach
void tearDown() {
testDriver.close();
}
}Note
TopologyTestDriver simulates event-time advancement by the timestamps you pipe into TestInputTopic. Window closes trigger when a record with a timestamp past the window's end arrives. Always pipe a record beyond the window boundary in windowed tests — otherwise the window never closes and no output is emitted during the test.Production Tuning
Most Kafka Streams performance problems fall into three categories: insufficient stream threads for the partition count, excessive state store cache flushing due to small cache allocation, and slow RocksDB reads due to undersized block cache. The following settings address all three, along with observability configuration for Prometheus and JMX.
For broader context on streaming performance trade-offs vs batch processing, see our guide on stream processing vs batch: when each approach wins.
// ── Thread and parallelism ────────────────────────────────────────────
// num.stream.threads: one thread processes one or more partitions.
// Rule: set to min(available_cores, topic_partition_count).
// Each thread runs its own tasks independently — no sharing.
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// ── State store cache ─────────────────────────────────────────────────
// cache.max.bytes.buffering: total memory for write-behind caching
// across ALL state stores in one stream thread.
// Larger cache = fewer flushes = fewer changelog records = higher throughput.
// Cost: records are not immediately visible in IQ until flushed.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
100 * 1024 * 1024L); // 100 MB per stream thread
// ── Commit interval ───────────────────────────────────────────────────
// How often offsets and state are committed (at-least-once mode).
// Shorter interval = lower recovery lag after crash, higher overhead.
// 100ms is a good default; increase to 5000ms for throughput-heavy pipelines.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// ── Standby replicas ──────────────────────────────────────────────────
// Each state partition is maintained on N additional instances.
// Failover without full changelog replay (seconds, not minutes).
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// ── RocksDB block cache tuning via RocksDBConfigSetter ───────────────
public class LargeBlockCacheConfigSetter implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options,
Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// 256 MB block cache per store (shared across column families)
tableConfig.setBlockCache(
new LRUCache(256 * 1024 * 1024L)
);
tableConfig.setBlockSize(16 * 1024); // 16 KB blocks
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setWriteBufferSize(64 * 1024 * 1024L); // 64 MB memtable
options.setMaxWriteBufferNumber(3);
}
@Override
public void close(String storeName, Options options) {
// release cache reference
}
}
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
LargeBlockCacheConfigSetter.class);# ── Prometheus JMX Exporter for Kafka Streams metrics ────────────────
# Add to JVM startup args:
# -javaagent:/opt/jmx_prometheus_javaagent.jar=9090:/opt/kafka-streams-config.yml
# kafka-streams-config.yml
lowercaseOutputName: true
rules:
# Stream thread metrics
- pattern: 'kafka.streams<type=stream-thread-metrics, thread-id=(.*)><>(w+)'
name: kafka_streams_thread_$2
labels:
thread_id: "$1"
# State store metrics
- pattern: 'kafka.streams<type=stream-store-metrics, thread-id=(.*), task-id=(.*), (.+)-metrics, store-name=(.*)><>(w+)'
name: kafka_streams_store_$5
labels:
thread_id: "$1"
task_id: "$2"
store_name: "$4"
# Task-level metrics
- pattern: 'kafka.streams<type=stream-task-metrics, thread-id=(.*), task-id=(.*)><>(w+)'
name: kafka_streams_task_$3
labels:
thread_id: "$1"
task_id: "$2"
# Key metrics to alert on:
# kafka_streams_thread_commit_latency_avg > 1000ms → commit is slow
# kafka_streams_store_put_rate decreasing → throughput drop
# kafka_streams_task_process_rate → records/sec per task
# kafka_streams_thread_poll_records_avg > 500 → consumer lag buildingKafka Streams Production Checklist
Set num.stream.threads equal to min(CPU cores, partition count) — one thread handles one or more tasks; additional threads above partition count add no benefit
Set cache.max.bytes.buffering to at least 64 MB per stream thread — default of 10 MB causes excessive changelog flushes on high-throughput topologies
Enable exactly_once_v2 only when your business requirements demand it — EOS adds 20-40% latency overhead and requires Kafka 2.5+ brokers
Configure num.standby.replicas=1 for any production stateful topology — warm standby eliminates minutes-long state rebuild on failover
Implement a DeserializationExceptionHandler that routes poison pills to a dead letter topic — never let a single bad record crash the stream thread
Verify co-partitioning before deploying stream-stream or stream-table joins — Kafka Streams throws TopologyException at startup but only after your Kubernetes pod is running
Tune RocksDB block cache size with a custom RocksDBConfigSetter — 256 MB per store is a safe starting point for high-read workloads
Set APPLICATION_SERVER_CONFIG to expose the host:port for Interactive Queries — without it, peer metadata discovery returns empty results
Use TopologyTestDriver for every topology — test windowed closes by piping a record past the window boundary, not just within it
Monitor commit_latency_avg in JMX/Prometheus — values above 500 ms indicate EOS commit contention or rebalancing storms
Building stateful stream processing pipelines on Kafka and unsure whether Kafka Streams, Flink, or Spark Streaming fits your use case?
We design and implement Kafka Streams topologies for production — from KStream and KTable schema design and windowed aggregation configuration with grace periods, through stream-table join enrichment pipelines and GlobalKTable broadcast patterns, RocksDB state store tuning and interactive query REST API implementation, exactly-once semantics trade-off analysis, dead letter queue error handling, TopologyTestDriver unit test suites, and production monitoring with JMX and Prometheus. Let’s talk.
Let's Talk