Skip to main content

Consuming messages

A consumer is a Spring @KafkaListener method taking the typed message and an Acknowledgment. For the default cluster, omit containerFactory; for a non-default cluster set containerFactory = "<clusterName>KafkaListenerContainerFactory" (see Configuration reference).

A listener method

jeap-messaging sets enable.auto.commit=false and AckMode=MANUAL. You MUST always acknowledge with ack.acknowledge() at the END of processing — once the message was either processed successfully or handed to the error handler. NEVER acknowledge before processing completes, or messages may be lost.

@Component
class DeclarationConsumer {
@KafkaListener(topics = "jme-messaging-declaration-created")
void consume(JmeDeclarationCreatedEvent event, Acknowledgment ack) {
// ... process the event ...
ack.acknowledge();
}
}

Make consumers idempotent

Delivery is at-least-once, so duplicates are possible. Make consumers idempotent. The automatic approach is the idempotent message handler. The manual alternative is to persist and check the message's idempotence id:

String idempotenceId = event.getIdentity().getIdempotenceId();
if (alreadyProcessed(idempotenceId)) {
ack.acknowledge();
return;
}

Consumer contract required

A consumer contract is REQUIRED (see Message contracts). The consumeWithoutContractAllowed property must be false in production. When listening to a topic that carries several event types, set silentIgnoreWithoutContract=true to suppress the error logged for types you don't process (see Configuration reference).

Overriding the deserialized type

Set the listener properties specific.avro.value.type / specific.avro.key.type to deserialize into a specific (compatible) type — used for self-message schema evolution (see Message evolution):

@KafkaListener(topics = TOPIC_NAME,
properties = {"specific.avro.value.type=ch.admin.bit.jme.test.JmeSimpleTestV2Event"})
public void consume(JmeSimpleTestV2Event event, Acknowledgment ack) { ... }

Accessing the message key

To access message keys in the consumer, set expose-message-key-to-consumer=true (see Configuration reference).