Apache Kafka Reference
Apache Kafka Reference
Kafka core concepts, producer/consumer configuration, consumer groups, exactly-once semantics, kafka-topics and kafka-console commands, and the production tuning parameters that actually matter — covering Kafka 3.x with KRaft mode.
Core concepts
# Topics, partitions, and offsets
# - A topic is a log, split into N partitions
# - Each partition is an ordered, immutable sequence of records
# - Each record has an offset (monotonically increasing integer, per-partition)
# - Consumers track their position (offset) per partition
# Producer → Broker → Consumer flow:
# 1. Producer writes record to topic partition (round-robin or by key hash)
# 2. Record stored on disk (ordered append-only log)
# 3. Consumer reads records from its committed offset position
# 4. Consumer commits offset after processing (at-least-once default)
# Replication:
# - Each partition has a leader replica + N follower replicas
# - Producers/consumers always talk to the leader
# - Followers replicate asynchronously
# - ISR (In-Sync Replicas) = replicas caught up to leader
# Retention (NOT consumption):
# Records are NOT deleted when consumed — they're deleted by retention policy:
# log.retention.hours=168 (7 days, default)
# log.retention.bytes=1073741824 (1GB per partition)
# Consumers can re-read from any offset (within retention window)
# Consumer groups:
# - Each consumer in a group gets a subset of partitions (1:1 assignment)
# - If consumers > partitions, some consumers are idle
# - Multiple groups can independently consume the same topic
# - Offset committed per group, not per consumer
# Kafka 3.x KRaft mode:
# - Eliminates ZooKeeper dependency
# - Simpler operations, faster metadata operations
# - Production-ready since Kafka 3.3
kafka-topics — topic management
# Create topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic my-events \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 # 7 days
# List topics
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Describe topic (partitions, replicas, ISR)
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-events
# Alter topic (add partitions — cannot reduce!)
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic my-events --partitions 12
# Update config
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name my-events \
--add-config retention.ms=86400000
# Delete topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-events
# Check partition/replica health
kafka-topics.sh --bootstrap-server localhost:9092 --describe \
--under-replicated-partitions # show partitions with replicas behind leader
# How many partitions?
# Rule of thumb: target throughput / single-partition throughput
# For ordering: use keys to route to same partition (e.g. user_id % partitions)
# For parallelism: more partitions = more consumers = more throughput
Producer configuration
# Key producer config parameters
# acks — durability vs latency trade-off
acks=0 # fire-and-forget (fastest, possible data loss)
acks=1 # leader acknowledges (default before 2.x, can lose if leader fails before replicas catch up)
acks=all # all ISR replicas acknowledge (strongest, recommended for production)
# combine with min.insync.replicas=2 on the broker
# retries and idempotence
retries=2147483647 # max retries (default since Kafka 2.1)
enable.idempotent=true # prevents duplicate messages on retry
# requires acks=all, retries>0
# max.in.flight.requests.per.connection
max.in.flight.requests.per.connection=5 # with idempotent producer — safe for ordering
max.in.flight.requests.per.connection=1 # without idempotent — only way to guarantee order
# Batching — key for throughput
linger.ms=5 # wait up to 5ms to accumulate records into a batch
batch.size=16384 # max batch size in bytes (default 16KB)
# increase to 65536 (64KB) for high-throughput
# Compression
compression.type=lz4 # lz4 is best balance of speed/compression
# snappy: fast, less compression
# gzip: best compression, slower
# zstd: best compression ratio of all (Kafka 2.1+)
# buffer.memory — total memory for unsent records
buffer.memory=33554432 # 32MB default
# If producer sends faster than broker accepts: block for max.block.ms then throw
# Timeouts
request.timeout.ms=30000 # how long to wait for broker ack
delivery.timeout.ms=120000 # total time for a record (including retries)
Consumer configuration
# Key consumer config parameters
# auto.offset.reset — what to do when no committed offset exists
auto.offset.reset=earliest # start from beginning of topic (good for new consumer groups)
auto.offset.reset=latest # start from newest records (good for live consumers)
auto.offset.reset=none # throw exception (use when offset MUST exist)
# enable.auto.commit — dangerous in many use cases
enable.auto.commit=true # commits offset automatically every auto.commit.interval.ms
# risk: record consumed but processing fails after commit = LOST
enable.auto.commit=false # manually commit after successful processing (at-least-once)
# consumer.commitSync() after each record/batch
# Polling
max.poll.records=500 # max records per poll() call (tune for your processing time)
max.poll.interval.ms=300000 # if consumer doesn't poll within this window, it's considered dead
# increase if batch processing takes long
session.timeout.ms=45000 # heartbeat timeout — consumer removed from group if no heartbeat
heartbeat.interval.ms=3000 # send heartbeat this often
# fetch sizes
fetch.min.bytes=1 # minimum data to wait for (increase for throughput)
fetch.max.wait.ms=500 # max wait if fetch.min.bytes not available
# Consumer group rebalance
partition.assignment.strategy=CooperativeStickyAssignor # incremental rebalance (Kafka 2.4+)
# avoids stop-the-world rebalances
# CooperativeStickyAssignor: preferred for production
# RangeAssignor: default (older, stop-the-world)
# RoundRobinAssignor: even distribution but non-sticky
# Manual offset management:
consumer.seek(partition, offset) # jump to specific offset
consumer.seekToBeginning(partitions) # replay from start
consumer.seekToEnd(partitions) # skip to latest
Consumer group and offset management
# Consumer group commands
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Describe group — shows lag per partition
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe
# Output: TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID
# Reset offsets (consumer must be stopped!)
# Reset to earliest (replay all)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --topic my-events \
--reset-offsets --to-earliest --execute
# Reset to specific offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --topic my-events \
--reset-offsets --to-offset 1000 --execute
# Reset to specific timestamp
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --topic my-events \
--reset-offsets --to-datetime 2026-03-14T00:00:00.000 --execute
# Delete consumer group (must be inactive)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --delete
# Monitor lag in real-time (custom approach using kafka-consumer-groups):
watch -n 5 'kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe | grep -v "^$"'
Exactly-once semantics (EOS)
# Three delivery guarantees:
# at-most-once: messages may be lost, never duplicated (fastest)
# at-least-once: messages never lost, may be duplicated (default)
# exactly-once: messages never lost, never duplicated (most expensive)
# Idempotent producer (prevents duplicates on retry within a session)
enable.idempotent=true
acks=all
retries=2147483647
# Transactional producer (exactly-once across partitions/topics)
transactional.id=my-app-transaction-1 # unique per producer instance
# Java example:
props.put("transactional.id", "my-app-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("output-topic", key, value));
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata); // atomic commit
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close(); // another producer with same transactional.id took over
} catch (KafkaException e) {
producer.abortTransaction();
}
# Consumer side: only read committed transactions
isolation.level=read_committed # default: read_uncommitted (sees all records)
# Transaction coordinator timeout
transaction.timeout.ms=60000 # abort transaction if not committed within 1 min
kafka-console and debugging tools
# Produce test messages
echo "test message" | kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic my-topic
# Interactive producer with key:value
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic my-topic \
--property parse.key=true \
--property key.separator=:
# Type: mykey:myvalue[Enter]
# Consume messages
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning \
--max-messages 10
# With key
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--property print.key=true \
--property key.separator=" : " \
--from-beginning
# kcat (kafkacat) — faster and more flexible
# Consume (mode -C)
kcat -b localhost:9092 -t my-topic -C -o beginning
kcat -b localhost:9092 -t my-topic -C -o -10 # last 10 messages
kcat -b localhost:9092 -t my-topic -C -f '%k : %s\n' # key : value format
# Produce from file
cat events.json | kcat -b localhost:9092 -t my-topic -P
# Metadata
kcat -b localhost:9092 -L -t my-topic # list partitions, replicas, ISR
# Check broker status
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# Check topic offsets
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 --topic my-topic
Production configuration and tuning
# Broker key settings (server.properties)
# Replication safety
default.replication.factor=3 # 3 replicas per partition (prod minimum)
min.insync.replicas=2 # producer acks=all requires 2 replicas in sync
# allows 1 broker to be down without data loss
unclean.leader.election.enable=false # NEVER elect out-of-sync replica as leader
# prevents data loss at the cost of availability
# Retention
log.retention.hours=168 # 7 days (default)
log.retention.bytes=107374182400 # 100GB per partition cap
log.segment.bytes=1073741824 # 1GB segment files (roll to new file)
log.cleanup.policy=delete # delete old segments
# or for compact topics (keep only latest value per key):
log.cleanup.policy=compact
# Performance
num.network.threads=8 # threads for network I/O
num.io.threads=16 # threads for disk I/O
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600 # 100MB max request size
# Log flush (usually leave at OS defaults — fsync too often = slow)
log.flush.interval.messages=10000 # fsync every 10K messages
log.flush.interval.ms=1000 # or every 1 second
# Better: rely on OS page cache + replication for durability, not fsync
# JVM heap (KAFKA_HEAP_OPTS env var)
# 6-8 GB for brokers in production
# Keep under 32GB to avoid compressed pointer overhead in JVM
🔍 Free tool: PyPI Package Health Checker — check kafka-python, confluent-kafka, and related packages for known CVEs and active maintenance.
Founded
2023 in London, UK
Contact
hello@releaserun.com