Skip to main content

Idempotent message handler

Because Kafka delivery is at-least-once, a consumer may receive the same message more than once. The jeap-messaging-idempotence module provides automatic idempotent handling so that an already-processed message is skipped. See Choosing dependencies for the artifact.

@IdempotentMessageHandler

@IdempotentMessageHandler is a method-level, runtime-retained annotation. Put it on a handler method whose first argument is the Message. An AOP aspect (IdempotentMessageHandlerAspect) wraps the method:

  • it derives the key from message.getIdentity().getIdempotenceId();
  • it derives a context from the message type name (message.getType().getName()) with a trailing version stripped (pattern (.*)(V[0-9]+)(Event|Command), so e.g. FooV2Event → context FooEvent);
  • it creates an IdempotentProcessing record via createIfNotExists. If the record is newly created, the method proceeds; if the record already exists, the method is skipped and returns null.

Requires a transaction

The aspect requires an active transaction — without one it throws IllegalStateException. Annotate the handler (or a surrounding method) with @Transactional. On a concurrent create (DataIntegrityViolationException / PessimisticLockingFailureException) it throws IdempotentMessageHandlerExecutionSkippedException.

@Component
class DeclarationConsumer {

@Transactional
@IdempotentMessageHandler
@KafkaListener(topics = "jme-messaging-declaration-created")
void consume(JmeDeclarationCreatedEvent event, Acknowledgment ack) {
// ... process the event ...
ack.acknowledge();
}
}

Storage

Storage is JPA-backed. The identity IdempotentProcessingIdentity is {idempotence_id, idempotence_id_context}. A datasource and the backing table are required.

Housekeeping

Old records are deleted on a schedule, coordinated via ShedLock.

PropertyDefaultDescription
jeap.messaging.idempotent-processing.houseKeepingSchedule0 0 4 * * *Cron expression for the housekeeping job
jeap.messaging.idempotent-processing.idempotentProcessingRetentionDuration30dRetention duration (30 days) before old records are deleted
jeap.messaging.idempotent-message-handler.adviceOrderlowest precedenceOrder of the AOP advice — lowest precedence runs nearest to the method

Relationship to other mechanisms

This relies on the idempotenceId in the message identity (see Message types). For a manual alternative, persist and check the idempotenceId yourself (see Consuming messages).