issue 117apr 27mmxxvi
est. 2017
Sun, 27 Apr 2026
vol. IX · no. 117
PapersAdda
placement intelligence, since 2017
640+ briefs · 24 campuses · by reservation
verified offers · sourced from r/developersIndia
razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1

Kafka Interview Questions 2026: 25 Answers with Code

25 min read
Interview Questions
Updated: 8 Jun 2026
Aditya Sharma
Aditya's Edit

PapersAdda 2026 Placement Cycle

By Aditya Sharma·Founder & Editor, PapersAdda

What changed in 2026 drives

Mass-recruiter offer letters are flatter for 2026 batch - the 4-5 LPA ASE band has barely budged in three years while inflation eats real wages. Premium tracks (Digital, Pro, Elite, Specialist) are still where the differential lives, and they are entirely test-driven. If you are aiming higher than the default offer, the coding round is not optional pageantry - it is the entire interview.

What I'd actually study for this

  • 01Two solid coding-round answers (1 medium-hard DSA each, with edge-case discussion) > five half-baked ones
  • 02One real project you can defend end-to-end - file paths, design decisions, and what you would change
  • 03One DBMS schema you actually built (not a textbook ER diagram), with at least 3 join-heavy queries written from memory
  • 04Three behavioural STAR stories: failure recovered, conflict handled, ownership taken

Where most candidates trip up

The single biggest mistake is treating company-specific guides as primary prep and DSA as secondary. It is the opposite. Mass recruiters use the test as a filter, but premium tracks at every IT services company use coding to allocate offer band. Spend 70% of prep time on DSA + system fundamentals, 20% on company-specific patterns, 10% on HR rehearsal. Reverse that ratio and you collect the default offer.

Editorial commentary by Aditya Sharma · written for PapersAdda · not generated, not aggregated.

Apache Kafka is the backbone of event-driven architectures and real-time data pipelines at every major tech company. Data engineering, backend, and MLOps interviews increasingly include Kafka questions on topics, partitions, consumer groups, and exactly-once semantics. This guide covers 25 Kafka interview questions with full answers and code examples.

PapersAdda's take: Candidates report that partition-to-consumer-group relationship questions are the most common Kafka interview filter: "If you have 6 partitions and 8 consumers in the same group, what happens?" Getting this wrong eliminates candidates immediately. Consumer lag monitoring and delivery guarantee tradeoffs are the next tier. Confirm the specific Kafka version and use case expected on the official company careers portal before you prepare.

Related articles: Apache Spark Interview Questions 2026 | Airflow Interview Questions 2026 | Data Engineering Interview Questions 2026 | MLOps Interview Questions 2026 | Hadoop Interview Questions 2026


EASY: Core Kafka Concepts (Questions 1-8)

Q1. What is Apache Kafka? What problems does it solve?

Kafka is a distributed event streaming platform -- a durable, high-throughput, ordered log. It solves three problems traditional messaging systems cannot:

  1. Decoupling: producers and consumers are independent. A producer sends to a topic; any number of consumers read at their own pace.
  2. Durability and replay: messages are stored on disk and can be replayed by new or re-processing consumers.
  3. Scale: millions of messages/second, horizontal scaling via partitions.
FeatureTraditional QueueApache Kafka
Message retentionDeleted after consumptionRetained for configurable period
Consumer countOne consumer per messageN consumer groups, each gets full copy
ReplayNot supportedAny consumer can replay from any offset
ThroughputLimitedMillions of msgs/sec
OrderingQueue-widePer-partition ordering

Q2. Explain the Kafka architecture: topics, partitions, brokers, and ZooKeeper/KRaft.

Kafka Cluster:
  Brokers (e.g., 3 nodes):
    Each broker holds some partitions
    One broker is the controller (elects partition leaders)

  Topics:
    Logical category (e.g., "orders", "user-events")
    Divided into N partitions

  Partitions:
    Ordered, immutable sequence of records
    Each record has an offset (0, 1, 2, ...)
    Replicated across brokers (replication factor R)
    One replica is the "leader" (handles reads/writes)
    Others are "followers" (ISR = in-sync replicas)

  ZooKeeper (Kafka < 2.8): external coordination service for cluster metadata
  KRaft (Kafka 2.8+, default in 3.x): embedded Raft consensus, no ZooKeeper needed
# Python example: basic producer
from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

# Produce a message to the 'orders' topic
producer.produce(
    topic='orders',
    key='user_123',    # determines partition (hash of key % num_partitions)
    value='{"order_id": 456, "amount": 1500}',
    callback=delivery_callback
)
producer.flush()  # wait for all in-flight messages to be delivered

Q3. How does Kafka partitioning work? How is the partition for a message determined?

from confluent_kafka import Producer
import hashlib

# Partition selection:
# 1. Explicit partition: producer.produce(topic, partition=2, ...)
# 2. Key-based (most common): hash(key) % num_partitions
# 3. Round-robin (null key): messages distributed evenly

# Key-based partitioning ensures all messages with the same key
# go to the SAME partition -- guaranteeing order for that key

producer = Producer({'bootstrap.servers': 'localhost:9092'})

# All orders for user_123 go to same partition (guaranteed order)
for order_id in range(10):
    producer.produce(
        topic='orders',
        key='user_123',  # same key -> same partition
        value=f'{{"order_id": {order_id}, "amount": 100}}'
    )

# Different users may go to different partitions (parallel processing)
for user_id in range(100):
    producer.produce(
        topic='orders',
        key=str(user_id),  # different keys -> different partitions
        value=f'{{"user_id": {user_id}}}'
    )

producer.flush()

# Manual partition selection (avoid unless you have a specific reason)
producer.produce(topic='orders', partition=0, value='specific partition message')

# Partition count determines max parallelism:
# 6 partitions -> max 6 consumers in a group process in parallel

Q4. What is a consumer group? How do partitions get assigned to consumers?

from confluent_kafka import Consumer

# Consumer group: group of consumers sharing a topic's partitions
# Each partition is assigned to EXACTLY ONE consumer in the group
# Multiple groups can read the same topic independently

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-consumer-group',      # all consumers with same group.id share
    'auto.offset.reset': 'earliest',
})

consumer.subscribe(['orders'])

# Partition assignment rules:
# - 6 partitions, 3 consumers: each consumer gets 2 partitions
# - 6 partitions, 6 consumers: each consumer gets 1 partition
# - 6 partitions, 8 consumers: 6 consumers active (1 partition each), 2 IDLE
#   (IMPORTANT: having MORE consumers than partitions wastes resources)
# - 6 partitions, 2 consumers: one gets 3 partitions, one gets 3 partitions

# Rebalance: when consumers join or leave, partition assignments are rebalanced
# During rebalance: consumption is PAUSED (all consumers stop)

# Cooperative rebalance (Kafka 2.4+): only reassigned partitions are paused
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'fast-group',
    'partition.assignment.strategy': 'cooperative-sticky',  # minimize rebalance disruption
})

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
        else:
            print(f"Consumed: topic={msg.topic()}, partition={msg.partition()}, "
                  f"offset={msg.offset()}, key={msg.key()}, value={msg.value()}")
finally:
    consumer.close()

Q5. What is an offset in Kafka? How does offset management work?

from confluent_kafka import Consumer, TopicPartition

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,   # MANUAL offset commit (recommended for production)
    'auto.offset.reset': 'earliest',
})

consumer.subscribe(['orders'])

# Offset: sequential integer position within a partition
# Kafka stores committed offsets in __consumer_offsets internal topic
# On restart, consumer resumes from last committed offset

# Manual commit (after successful processing)
msgs = consumer.poll(timeout=5.0)
if msgs and not msgs.error():
    # Process the message
    process_message(msgs.value())
    # Commit AFTER successful processing
    consumer.commit(message=msgs)  # commit this specific offset

# Auto-commit (simpler but risky)
consumer_auto = Consumer({
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 5000,  # commit every 5 seconds
    # RISK: message may be consumed but not processed if crash happens before 5s
})

# Seek to specific offset (for replay or recovery)
tp = TopicPartition('orders', partition=0, offset=1000)
consumer.seek(tp)  # next poll will start from offset 1000

# Seek to beginning (replay all messages)
consumer.assign([TopicPartition('orders', 0)])
consumer.seek_to_beginning([TopicPartition('orders', 0)])

# Seek to timestamp (replay from a specific time)
# consumer.offsets_for_times([TopicPartition('orders', 0, int(ts_ms))])

Q6. What is the difference between at-most-once, at-least-once, and exactly-once?

GuaranteeDescriptionWhen message is lostWhen message is duplicated
At-most-onceCommit before processingIf consumer crashes after commit, before processNever
At-least-onceCommit after processingNeverIf consumer crashes after process, before commit
Exactly-onceTransactional commitNeverNever
from confluent_kafka import Consumer, Producer

# AT-MOST-ONCE: commit before processing (can lose messages)
consumer_amo = Consumer({'enable.auto.commit': True})  # auto-commit is at-most-once risk

# AT-LEAST-ONCE: commit after processing (default recommendation)
# Safe for idempotent operations
consumer = Consumer({
    'enable.auto.commit': False,
    'group.id': 'my-group',
})
msg = consumer.poll(1.0)
if msg:
    result = process_message(msg.value())  # process first
    consumer.commit(msg)                   # then commit

# EXACTLY-ONCE: use Kafka transactions
# Consumer reads + Producer writes + offset commit in one atomic transaction
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-producer',
})
producer.init_transactions()
producer.begin_transaction()
try:
    # Read from input topic
    msgs = consumer.consume(10)
    for msg in msgs:
        # Process and produce to output topic (same transaction)
        producer.produce('processed_orders', value=transform(msg.value()))

    # Commit consumer offsets as part of the transaction
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )
    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Q7. What is replication in Kafka? What is the ISR?

Replication factor = N: each partition has N replicas across N brokers
One replica is the LEADER, others are FOLLOWERS

ISR (In-Sync Replicas): replicas that are fully caught up with the leader
  - Follower is in-sync if it fetched the latest offset within replica.lag.time.max.ms
  - Out-of-sync followers are removed from ISR

acks configuration (producer):
  acks=0: fire and forget -- fastest, possible data loss
  acks=1: leader acknowledges -- loss if leader fails before replication
  acks=all (or -1): all ISR replicas acknowledge -- no data loss, slowest

min.insync.replicas (broker):
  Minimum ISR members required to accept a write when acks=all
  With replication_factor=3, min.insync.replicas=2:
  - Up to 1 follower can be down, writes still succeed
  - If 2+ replicas fail, producer gets NotEnoughReplicas error
from confluent_kafka import Producer

# Durability-first configuration
durable_producer = Producer({
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'acks': 'all',                    # wait for all ISR replicas
    'enable.idempotence': True,       # prevent duplicate messages on retry
    'retries': 2147483647,            # retry forever until max.block.ms
    'max.in.flight.requests.per.connection': 5,  # works with idempotence=True
    'delivery.timeout.ms': 120000,    # 2-minute delivery timeout
})

Q8. What is consumer lag? How do you monitor it?

from confluent_kafka import Consumer, TopicPartition
from confluent_kafka.admin import AdminClient

# Consumer lag = latest offset in partition - current committed offset for consumer group
# High lag means consumer is falling behind producers

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

def get_consumer_lag(topic, group_id):
    """Compute per-partition consumer lag."""
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': f'__lag_check_{group_id}',
    })

    # Get total partitions
    metadata = admin.list_topics(topic)
    partitions = [
        TopicPartition(topic, p)
        for p in metadata.topics[topic].partitions.keys()
    ]

    # Get latest offsets (high watermark)
    high_offsets = {}
    for tp in partitions:
        lo, hi = consumer.get_watermark_offsets(tp)
        high_offsets[tp.partition] = hi

    # Get committed offsets for the consumer group
    committed = admin.list_consumer_group_offsets([
        {'group.id': group_id, 'topics': [{'topic': topic}]}
    ])
    consumer.close()
    return high_offsets

# Production lag monitoring: use Kafka Manager, Confluent Control Center, or Prometheus
# kafka_consumer_group_lag metric in JMX

# Alert thresholds (example):
# WARNING: lag > 10,000 messages
# CRITICAL: lag growing for > 5 minutes consecutively
# ACTION: scale up consumers or optimize processing logic

# Auto-scaling based on lag (KEDA with Kafka trigger):
# Deploy KEDA with Kafka scaler -- scales Kubernetes deployments based on lag

MEDIUM: Advanced Kafka Operations (Questions 9-18)

Q9. What is Kafka's log compaction? When should you use it?

Normal retention: messages deleted after retention.ms (e.g., 7 days) or retention.bytes
Log compaction: for each key, only the LATEST value is retained

Use cases for compaction:
  - User profile updates: keep latest profile per user_id
  - Configuration changes: keep current config value per key
  - Database changelog (CDC): keep latest row state per primary key
  - Event sourcing: maintain latest state per entity
from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Create a compacted topic
compacted_topic = NewTopic(
    'user-profiles',
    num_partitions=6,
    replication_factor=3,
    config={
        'cleanup.policy': 'compact',
        'min.cleanable.dirty.ratio': '0.5',  # trigger compaction when 50% dirty
        'segment.ms': '86400000',            # compact daily
        'delete.retention.ms': '86400000',   # keep tombstones (null-value deletes) for 1 day
    }
)
admin.create_topics([compacted_topic])

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'})

# Update user profile -- old record for same key will be compacted away
producer.produce('user-profiles', key='user_123', value='{"name": "Alice", "plan": "premium"}')
producer.produce('user-profiles', key='user_123', value='{"name": "Alice", "plan": "enterprise"}')
# After compaction: only the last record for user_123 is retained

# Tombstone: send null value to "delete" a key
producer.produce('user-profiles', key='user_999', value=None)  # marks for deletion
producer.flush()

Q10. How does Kafka achieve high throughput? What are the key design decisions?

Kafka achieves high throughput through several architectural decisions:

1. Sequential disk I/O (append-only log)

  • Writing to end of file is extremely fast on HDD and SSD.
  • Reads are also sequential (consumers advance monotonically).
  • No random seeks needed.

2. Zero-copy file transfer

Normal transfer: disk -> kernel buffer -> user space -> kernel socket buffer -> network
Zero-copy (sendfile): disk -> kernel buffer -> network (bypasses user space)

3. Message batching

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'batch.size': 1048576,          # 1MB batch
    'linger.ms': 5,                 # wait 5ms to fill batch before sending
    'compression.type': 'lz4',     # compress batches (3-5x size reduction)
    'buffer.memory': 67108864,     # 64MB producer buffer
})

4. Partitioned parallel processing

  • N partitions = N parallel producers and N parallel consumers.
  • Brokers handle partitions independently.

5. Consumer fetch efficiency

consumer = Consumer({
    'fetch.min.bytes': 65536,       # minimum 64KB before returning
    'fetch.max.wait.ms': 500,       # or return after 500ms
    'max.partition.fetch.bytes': 1048576,  # 1MB per partition per fetch
})

Q11. What is idempotent production in Kafka?

from confluent_kafka import Producer

# Problem: producer retries can cause duplicate messages
# Network failure after broker receives message but before ack reaches producer
# Producer retries -> duplicate message in partition

# Idempotent producer: exactly-once per session
# Each producer has a unique producer ID (PID)
# Each message has a sequence number
# Broker deduplicates based on (PID, partition, sequence_number)

idempotent_producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,
    # Automatically sets: acks=all, max.in.flight.requests.per.connection=5, retries=MAX_INT
})

idempotent_producer.produce('orders', key='user_1', value='order_data')
idempotent_producer.flush()

# Guarantees: exactly-once per producer session, per partition
# Idempotence is WITHIN a session -- if producer restarts, new PID, duplicate possible
# For cross-session exactly-once: use transactions (Q6)

# Enable idempotence implications:
# - acks=all is required (forced automatically)
# - Slightly higher latency due to stronger delivery guarantee
# - Use for any producer where duplicates would cause problems

Q12. What is Kafka Streams? How does it compare to Spark Structured Streaming?

# Kafka Streams: client library embedded in the application
# No separate cluster needed; runs inside your JVM/Python process
# Low latency, stateful stream processing, exactly-once

# KafkaStreams topology in Python (using faust as Python DSL over Kafka Streams concepts)
import faust

app = faust.App('order-processor', broker='kafka://localhost:9092')

# Define topics
orders_topic = app.topic('orders', value_type=dict)
metrics_topic = app.topic('order-metrics', value_type=dict)

# Stateful stream processing: count orders per user
user_order_count = app.Table('user-order-count', default=int)

@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        user_id = order['user_id']
        user_order_count[user_id] += 1

        # Emit to metrics topic
        await metrics_topic.send(
            key=str(user_id),
            value={'user_id': user_id, 'count': user_order_count[user_id]}
        )

# Kafka Streams vs Spark Structured Streaming:
# | Property | Kafka Streams | Spark Structured Streaming |
# |---|---|---|
# | Deployment | Embedded in app | Separate cluster |
# | Latency | Milliseconds | Seconds (micro-batch) |
# | Scale | App-level scale | Cluster-level (100s of nodes) |
# | State | Local RocksDB | Spark memory + shuffle |
# | Language | Java/Scala (Faust for Python) | Python/Scala/Java/R |
# | Use when | Low-latency, simple topologies | Large-scale, complex analytics |

Q13. What is Kafka Connect? When do you use it over a custom producer?

// Kafka Connect: framework for streaming data between Kafka and external systems
// No code needed for standard integrations; just configuration

// JDBC Source Connector: stream database changes to Kafka
{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://db:5432/mydb",
        "connection.user": "kafkauser",
        "connection.password": "secret",
        "mode": "timestamp",
        "timestamp.column.name": "updated_at",
        "table.whitelist": "orders,users",
        "topic.prefix": "db.",
        "poll.interval.ms": "5000"
    }
}
// S3 Sink Connector: write Kafka topic to S3 (Parquet or JSON)
{
    "name": "s3-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "4",
        "topics": "orders,user-events",
        "s3.region": "ap-south-1",
        "s3.bucket.name": "my-kafka-archive",
        "s3.part.size": "5242880",
        "flush.size": "1000",
        "rotate.interval.ms": "300000",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
        "parquet.codec": "snappy"
    }
}

Use Kafka Connect over custom producer when:

  • Standard integration (MySQL CDC, S3, HDFS, Elasticsearch, BigQuery)
  • Restart/failure handling is needed (Connect handles offset management)
  • Parallelism via tasks is needed
  • Schema registry integration required

Use custom producer when:

  • Non-standard data transformation needed
  • Complex business logic in the pipeline
  • Tight control over message format

Q14. How does the Schema Registry work with Kafka?

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Schema Registry: centralized store for Avro/JSON/Protobuf schemas
# Enables schema evolution without breaking consumers

schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})

# Define Avro schema
order_schema = """
{
    "type": "record",
    "name": "Order",
    "namespace": "com.example",
    "fields": [
        {"name": "order_id", "type": "int"},
        {"name": "user_id", "type": "int"},
        {"name": "amount", "type": "double"},
        {"name": "status", "type": "string"},
        {"name": "discount", "type": ["null", "double"], "default": null}
    ]
}
"""

avro_serializer = AvroSerializer(schema_registry_client, order_schema)

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'value.serializer': avro_serializer,
})

order = {"order_id": 1, "user_id": 100, "amount": 1500.0, "status": "completed", "discount": None}
producer.produce(
    topic='orders',
    value=order,
    on_delivery=lambda err, msg: print(f"Delivered: {msg.offset()}" if not err else f"Error: {err}")
)
producer.flush()

# Schema evolution rules:
# BACKWARD compatible: new schema can read old data (add optional field with default)
# FORWARD compatible: old schema can read new data (remove optional field)
# FULL compatible: both directions
# Adding "discount" field with default null = BACKWARD compatible change

Q15. What are Kafka Streams state stores? How do they handle failure?

Kafka Streams state stores: local storage for stateful operations
  - RocksDB (default): persistent key-value store on local disk
  - In-memory: faster, no persistence (for non-critical state)

Stateful operations that need state stores:
  - count(), aggregate(), reduce() with groupBy()
  - Windowed aggregations (tumbling, hopping, sliding windows)
  - KTable: table abstraction backed by a changelog topic
  - Joins between KStream and KTable

Fault tolerance:
  - State store backed by a Kafka changelog topic (compacted)
  - On failure/restart: state restored from changelog topic
  - Standby replicas: num.standby.replicas copies pre-warm state for fast failover
# Faust example: windowed word count with state
import faust
from datetime import timedelta

app = faust.App('word-count', broker='kafka://localhost:9092')
text_topic = app.topic('text', value_type=str)
word_counts = app.Table('word-counts', default=int)

@app.agent(text_topic)
async def count_words(texts):
    async for text in texts:
        for word in text.lower().split():
            word_counts[word] += 1

# Windowed table: count per word per 1-hour window
windowed_counts = app.Table(
    'hourly-word-counts',
    default=int
).tumbling(timedelta(hours=1), expires=timedelta(hours=24))

@app.agent(text_topic)
async def windowed_count(texts):
    async for text in texts:
        for word in text.lower().split():
            windowed_counts[word] += 1  # automatically scoped to current window

Q16. How do you handle message ordering guarantees in Kafka?

from confluent_kafka import Producer, Consumer

# Kafka ordering guarantee: WITHIN a partition, messages are strictly ordered
# ACROSS partitions: NO ordering guarantee

# Ensure order for a specific entity: use the entity ID as the key
producer = Producer({'bootstrap.servers': 'localhost:9092'})

# All events for order_id=123 go to same partition (same key)
# Order: PLACED -> PAYMENT -> SHIPPED -> DELIVERED
for event in ['PLACED', 'PAYMENT', 'SHIPPED', 'DELIVERED']:
    producer.produce(
        topic='order-events',
        key='order_123',          # same key -> same partition -> ordered
        value=f'{{"order_id": 123, "status": "{event}"}}'
    )
producer.flush()

# Pitfall: max.in.flight.requests.per.connection > 1 can cause reordering on retry
# Fix: enable.idempotence=True (handles ordering correctly with up to 5 in-flight)
ordered_producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,  # ensures ordering even with retries
    'max.in.flight.requests.per.connection': 5,  # safe with idempotence
})

# Consumer side: process messages from single partition for ordering guarantee
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
})
consumer.subscribe(['order-events'])

# Messages are consumed in partition order
# If you have 3 partitions and 3 consumers, each consumer sees ordered events
# for the orders assigned to their partition

Q17. How do you implement a dead letter queue (DLQ) pattern in Kafka?

from confluent_kafka import Consumer, Producer
import json
import traceback

def process_message(msg_value: dict) -> dict:
    """Business logic that may fail."""
    if msg_value.get("amount") < 0:
        raise ValueError(f"Invalid negative amount: {msg_value['amount']}")
    return {"processed": True, **msg_value}

def run_consumer_with_dlq(input_topic: str, output_topic: str, dlq_topic: str):
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'order-processor-dlq',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest',
    })
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
    consumer.subscribe([input_topic])

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                continue

            try:
                value = json.loads(msg.value())
                result = process_message(value)
                producer.produce(output_topic, key=msg.key(), value=json.dumps(result))
                consumer.commit(msg)

            except Exception as e:
                # Route to DLQ with error metadata
                dlq_payload = {
                    "original_topic": msg.topic(),
                    "original_partition": msg.partition(),
                    "original_offset": msg.offset(),
                    "original_value": msg.value().decode('utf-8'),
                    "error_type": type(e).__name__,
                    "error_message": str(e),
                    "traceback": traceback.format_exc()
                }
                producer.produce(dlq_topic, key=msg.key(), value=json.dumps(dlq_payload))
                producer.flush()
                consumer.commit(msg)  # commit to avoid infinite retry loop

    finally:
        consumer.close()

Q18. What is Kafka's retention policy? How do you tune it?

from confluent_kafka.admin import AdminClient, ConfigResource

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Retention settings per topic
retention_configs = {
    # Time-based retention
    'retention.ms': str(7 * 24 * 3600 * 1000),  # 7 days (default)

    # Size-based retention (triggers whichever comes first)
    'retention.bytes': str(100 * 1024 * 1024 * 1024),  # 100GB per partition

    # Segment size (smaller segments = faster cleanup, more overhead)
    'segment.bytes': str(1 * 1024 * 1024 * 1024),  # 1GB segments

    # Segment time (create new segment even if not full)
    'segment.ms': str(24 * 3600 * 1000),  # daily segments

    # Cleanup policy
    'cleanup.policy': 'delete',   # or 'compact' (see Q9) or 'delete,compact'
}

# Alter topic retention
resource = ConfigResource('topic', 'orders')
admin.alter_configs({resource: retention_configs})

# Guideline per use case:
# Transactional events (orders, payments): 7-30 days
# User events (clicks, page views): 3-7 days
# ML feature streaming: 90 days (for offline retraining)
# CDC / compacted topics: indefinite (only latest value kept)

# Cost optimization: use tiered storage for long retention
# Active data on local SSDs, older data offloaded to S3/GCS automatically

HARD: Production Kafka (Questions 19-25)

Q19. How do you partition a topic for optimal performance?

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Partition count determines:
# 1. Max parallelism (consumers in a group)
# 2. Ordering granularity (order within partition, not across)
# 3. Resource overhead per partition (file descriptors, memory)

# Heuristic: start with max(3x cores across brokers, expected consumer count)
# Rule of thumb: 3 brokers x 4 cores = 12 partitions minimum
# For high-throughput: 24-100 partitions

# Create topic with optimal configuration
orders_topic = NewTopic(
    'orders',
    num_partitions=24,       # high throughput, 24 consumer threads max
    replication_factor=3,    # survive 2 broker failures
    config={
        'min.insync.replicas': '2',      # require 2 replicas in sync
        'unclean.leader.election.enable': 'false',  # no data loss on failover
        'compression.type': 'lz4',       # fast compression, good ratio
        'retention.ms': str(7 * 24 * 3600 * 1000),
        'max.message.bytes': str(10 * 1024 * 1024),  # 10MB max message
    }
)
admin.create_topics([orders_topic])

# Partition count increase: allowed (only increase, never decrease)
# Changing partition count: existing keyed messages may end up on different partition
# Critical: if order-by-key matters, increasing partitions changes routing

# Benchmark target: each partition handles ~10MB/s write throughput
# At 100MB/s producer throughput: 10 partitions minimum

Q20. How do you implement Kafka in a microservices event-driven architecture?

# Event-driven architecture: services communicate via Kafka topics
# Each service publishes events to its own topic
# Subscribers listen to relevant topics independently

# Pattern: Order service lifecycle
# OrderService publishes to: orders.placed, orders.updated
# InventoryService consumes: orders.placed -> allocates stock
# NotificationService consumes: orders.placed, orders.updated -> sends emails
# AnalyticsService consumes: all topics -> updates dashboards

from confluent_kafka import Producer, Consumer
from dataclasses import dataclass, asdict
import json
from datetime import datetime

@dataclass
class OrderPlacedEvent:
    event_type: str = "ORDER_PLACED"
    order_id: int = 0
    user_id: int = 0
    items: list = None
    total_amount: float = 0.0
    timestamp: str = ""

class OrderService:
    def __init__(self):
        self.producer = Producer({'bootstrap.servers': 'localhost:9092',
                                   'enable.idempotence': True})

    def place_order(self, user_id: int, items: list) -> dict:
        order = {"order_id": 12345, "user_id": user_id, "items": items,
                 "total": sum(i['price'] for i in items)}
        # Publish event to Kafka
        event = OrderPlacedEvent(
            order_id=order["order_id"],
            user_id=user_id,
            items=items,
            total_amount=order["total"],
            timestamp=datetime.now().isoformat()
        )
        self.producer.produce(
            topic="orders.placed",
            key=str(order["order_id"]),  # same order always same partition
            value=json.dumps(asdict(event))
        )
        self.producer.flush()
        return order

class InventoryService:
    def __init__(self):
        self.consumer = Consumer({
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'inventory-service',
            'enable.auto.commit': False,
        })
        self.consumer.subscribe(['orders.placed'])

    def run(self):
        while True:
            msg = self.consumer.poll(1.0)
            if msg and not msg.error():
                event = json.loads(msg.value())
                self.allocate_stock(event)
                self.consumer.commit(msg)

    def allocate_stock(self, event: dict):
        for item in event['items']:
            print(f"Allocating {item['quantity']} of product {item['product_id']}")

Q21. How do you monitor a production Kafka cluster?

# Key metrics to monitor (via JMX, Prometheus, Datadog, Grafana)

# BROKER METRICS:
# - UnderReplicatedPartitions: partitions with fewer than expected ISR replicas
#   Alert if > 0 for more than 1 minute
# - ActiveControllerCount: should be exactly 1 across cluster
# - OfflinePartitionsCount: should be 0 always
# - RequestHandlerAvgIdlePercent: below 20% = broker overloaded
# - BytesInPerSec / BytesOutPerSec: track throughput trends
# - NetworkProcessorAvgIdlePercent: below 30% = network thread bottleneck

# CONSUMER METRICS:
# - consumer_lag: lag per (group, topic, partition)
#   Alert if lag > threshold OR lag is growing consistently

# PRODUCER METRICS:
# - record-error-rate: should be 0 with idempotent producer
# - request-latency-avg: p99 latency for produce requests

# Prometheus + Kafka Exporter setup
import subprocess

# kafka-exporter exposes Kafka metrics in Prometheus format
# http://kafka-exporter:9308/metrics

# Alert examples (Prometheus AlertManager):
alerts = {
    "KafkaConsumerLagHigh": {
        "condition": "kafka_consumer_group_lag > 50000",
        "for": "5m",
        "severity": "warning"
    },
    "KafkaConsumerLagCritical": {
        "condition": "kafka_consumer_group_lag > 500000",
        "for": "2m",
        "severity": "critical"
    },
    "KafkaUnderReplicatedPartitions": {
        "condition": "kafka_topic_partition_under_replicated_partition > 0",
        "for": "1m",
        "severity": "critical"
    },
    "KafkaOfflinePartitions": {
        "condition": "kafka_controller_stats_offline_partitions_count > 0",
        "for": "0m",
        "severity": "critical"
    }
}

Q22. What is Kafka's compaction and cleanup.policy=delete,compact?

from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

# cleanup.policy=delete: standard time/size-based retention
# cleanup.policy=compact: keep only latest value per key
# cleanup.policy=delete,compact: BOTH -- compact old segments + delete by retention

# Use case for delete,compact:
# User preferences topic:
# - Want latest preference per user (compact)
# - But also want to delete users who haven't updated in 1 year (delete)

# Configuration example
config = {
    'cleanup.policy': 'delete,compact',
    'retention.ms': str(365 * 24 * 3600 * 1000),  # 1 year
    'min.cleanable.dirty.ratio': '0.5',
    'delete.retention.ms': str(24 * 3600 * 1000),  # keep tombstones 1 day
    'segment.ms': str(7 * 24 * 3600 * 1000),       # weekly segments
}

# How compaction works:
# Log segments are divided into "clean" (already compacted) and "dirty" (not yet)
# Log cleaner thread picks segments with dirty_ratio > min.cleanable.dirty.ratio
# Creates new cleaned segment with only latest value per key
# Old segment is deleted after cleaning completes

# Tombstone: key with null value signals "delete this key during compaction"
# delete.retention.ms: how long tombstones are kept before being physically deleted
# Allows consumers with lag to see the deletion before it's purged

Q23. How do you implement a Kafka consumer with graceful shutdown?

from confluent_kafka import Consumer, KafkaException
import signal
import threading
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GracefulKafkaConsumer:
    def __init__(self, topics: list, group_id: str):
        self.consumer = Consumer({
            'bootstrap.servers': 'localhost:9092',
            'group.id': group_id,
            'enable.auto.commit': False,
            'max.poll.interval.ms': 300000,  # 5 min processing timeout
            'session.timeout.ms': 30000,
        })
        self.consumer.subscribe(topics)
        self._shutdown = threading.Event()

        # Register signal handlers
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

    def _signal_handler(self, sig, frame):
        logger.info(f"Received signal {sig}, initiating graceful shutdown...")
        self._shutdown.set()

    def run(self):
        try:
            while not self._shutdown.is_set():
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaException._PARTITION_EOF:
                        logger.debug(f"Reached end of partition: {msg.topic()}/{msg.partition()}")
                    else:
                        logger.error(f"Kafka error: {msg.error()}")
                    continue

                try:
                    self.process(msg)
                    self.consumer.commit(msg)
                except Exception as e:
                    logger.error(f"Processing error: {e}", exc_info=True)
                    # Decide: DLQ or retry or skip

        except Exception as e:
            logger.error(f"Consumer error: {e}", exc_info=True)
        finally:
            logger.info("Closing consumer...")
            self.consumer.close()  # commits pending offsets, triggers rebalance
            logger.info("Consumer closed gracefully")

    def process(self, msg):
        # Override this in subclass
        logger.info(f"Processing: {msg.value().decode('utf-8')[:100]}")

Q24. What are Kafka quotas? How do you prevent a single producer from overloading the cluster?

from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Kafka quotas: rate limits per client, per user, or per user+client
# Prevents a single high-volume producer from starving others
# Quota types: producer_byte_rate, consumer_byte_rate, request_percentage

# Set quota via kafka-configs.sh (CLI):
# kafka-configs.sh --bootstrap-server localhost:9092 --alter \
#   --add-config 'producer_byte_rate=10485760,consumer_byte_rate=20971520' \
#   --entity-type clients --entity-name my-heavy-producer

# Programmatic quota management (via JMX or Admin API)
# Admin API for quota description (Kafka 2.6+):
quota_filter = {'entity_type': 'client-id', 'entity_name': 'my-producer'}

# When quota is exceeded:
# - Broker throttles the client (delays responses)
# - Producer sees increased latency but no errors (if backoff is set correctly)
# - Consumer sees delayed fetch responses

# Producer configuration to handle throttling gracefully:
from confluent_kafka import Producer

throttle_aware_producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'delivery.timeout.ms': 120000,   # allow 2 minutes for throttled delivery
    'request.timeout.ms': 30000,
    'retries': 2147483647,
    'retry.backoff.ms': 1000,
})

# Monitor quota throttle rate:
# kafka.server:type=ClientQuotaManager,user=your_user,client-id=your_client
# throttle-time metrics indicate when quota is being enforced

Q25. Design a real-time click attribution pipeline using Kafka.

Business problem: attribute user purchases to the last ad click within 30 minutes
Scale: 50M events/day (clicks + purchases)

Architecture:

  Input topics:
    ad-clicks: (user_id, ad_id, campaign_id, click_time, session_id)
    purchases: (user_id, order_id, amount, purchase_time)

  Stream processing (Kafka Streams / Faust):

    Step 1: Key both streams by user_id
      clicks-by-user: KeyedStream(user_id, ClickEvent)
      purchases-by-user: KeyedStream(user_id, PurchaseEvent)

    Step 2: State store for recent clicks
      For each user, store: last 30 minutes of click events
      Use windowed KTable with 30-minute retention

    Step 3: Enrich purchases with last click
      On each PurchaseEvent:
        1. Lookup state store: latest click for user_id within last 30 min
        2. If found: emit AttributedPurchase(purchase + click metadata)
        3. If not found: emit AttributedPurchase(purchase, attribution=None)

    Step 4: Publish to attributed-purchases topic

  Output topic:
    attributed-purchases: (order_id, user_id, amount, ad_id, campaign_id, click_time)

  Sinks:
    - BigQuery/Snowflake: for reporting and ROI analysis
    - Redis: real-time campaign budget tracker

  Fault tolerance:
    - State stores backed by Kafka changelog topics (compacted)
    - Exactly-once via Kafka transactions
    - Checkpointing every 30 seconds

  Scale:
    - 6 partitions by user_id: deterministic routing for both streams
    - 6 consumer threads process in parallel
    - RocksDB state store per partition: handles 100K user states per thread

FAQ

Q: How many Kafka partitions should I start with? A: A common starting point is max(consumer_count * 3, 12) for medium-throughput topics. More partitions increase parallelism but also increase overhead (file descriptors, leader elections, memory). Start conservatively and increase if throughput requires it. Candidates from public preparation resources confirm that partition design questions appear in every senior Kafka interview.

Q: What is the difference between a Kafka topic and a queue? A: A traditional queue delivers each message to exactly one consumer and deletes it after consumption. A Kafka topic retains messages for a configurable period and supports multiple independent consumer groups, each getting the full message stream. Kafka is designed for event streaming and replay; queues are designed for task distribution.

Q: What should I know about Kafka for a data engineering interview? A: Understand partition-consumer group mechanics, offset management (at-least-once vs exactly-once), consumer lag monitoring, and when to use Kafka vs a traditional queue. Production patterns (DLQ, graceful shutdown, quota management) differentiate senior candidates. Confirm the specific Kafka use case and version expected on the official company careers portal before your interview.

Methodology applied to this articlelast verified 8 Jun 2026
Sources used
Public exam-pattern documents, official recruiter pages, and verified candidate reports on r/developersIndia and LinkedIn.
Verification window
Page last edited 8 Jun 2026 by Aditya Sharma. Numbers and patterns sanity-checked against the most recent 2026 cycle drives we tracked.
What we did NOT do
  • No fabricated salary numbers or success rates. If we quote a range, it's sourced.
  • No noun-substituted templates. This article was not generated by swapping company names in a stock prompt.
  • No paid placements, sponsored coaching links, or affiliate-shilled course pushes.
Verification policy: /editorial-standards/. Found something incorrect? Submit a correction - we respond within 48 hours.

Explore this topic cluster

More resources in Interview Questions

Use the category hub to browse similar questions, exam patterns, salary guides, and preparation resources related to this topic.

Paid contributor programme

Sat this this year? Share your story, earn ₹500.

First-person experience reports help future candidates prep smarter. We pay verified contributors ₹500 via UPI per accepted story - with byline.

Submit your story →

Ready to practice?

Take a free timed mock test

Put what you learned into practice. Our mock tests match the 2026 pattern with timer, navigator, reveal, and score breakdown. No signup.

Start Free Mock Test →

Related Articles

More from PapersAdda

Share this guide: