Kafka how-to
An end-to-end checklist for wiring Kafka into a service with jEAP Messaging. For the basic setup see Getting started.
Checklist
- Define the message types per your business requirements.
- Order the Kafka topics for the cluster (usually one topic per event; multiple events per topic is possible) following the naming conventions (see Kafka topics & client configuration).
- Define the type descriptor and Avro schema and publish them in the Message Type Registry on a feature branch.
- Set up an Error Handling Service (don't forget the role ordering for authorization).
- Add
jeap-messaging-infrastructure-kafkaand the generated message-type dependencies. - Import the Kafka cluster certificates and the schema-registry root certificate into the truststore.
- Declare the consumer/producer message contracts via annotations.
- Implement the producer and consumer; apply the idempotent-receiver pattern on the consumer.
- Define the Kafka configuration in the service.
- Develop and test locally and on dev; follow the idempotence guidelines.
- Once the schema is stable, merge the registry branch to master via a pull request.
Local development with Kafka
A trimmed docker-compose.yml with a KRaft-mode broker, a schema registry and a Postgres for the
error-handling service:
services:
broker:
image: confluentinc/cp-kafka:latest
hostname: broker
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093
KAFKA_LISTENERS: SASL_PLAINTEXT://broker:9092,CONTROLLER://broker:9093
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: SASL_PLAINTEXT://broker:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
errorhandling-db:
image: postgres:latest
ports:
- "5432:5432"
environment:
POSTGRES_DB: errorhandling
POSTGRES_USER: errorhandling
POSTGRES_PASSWORD: errorhandling
Acknowledge and commit
jeap-messaging sets enable.auto.commit=false and AckMode=MANUAL; always ack.acknowledge() at the
end. The jEAP error handler acknowledges on failure after sending to the error topic. See
Consuming messages.
Idempotent consumer
At-least-once delivery means a consumer may receive a record more than once (also via an error-handler
resubmit). Use the idempotenceId in the message identity to deduplicate; see the
idempotent message handler.
The two registries
The Message Type Registry is the design-time home of the schema; the Kafka Schema Registry is the runtime home, and jeap-messaging registers schemas there automatically and transparently. See Message Type Registry and Message evolution.