Skip to main content

Configuration reference

All properties use the prefix jeap.messaging.kafka. Cluster-specific options live under jeap.messaging.kafka.cluster.<name>.*; for backwards compatibility most of them can also be set directly under jeap.messaging.kafka.* (this does not apply to the AWS-specific properties).

For Kafka client tuning (consumer/producer/topic) via spring.kafka.*, see Kafka topics & client configuration.

Clusters

Since version 6, jEAP Messaging can connect to more than one Kafka cluster (mainly for hybrid-cloud use cases). Each cluster is configured under a freely chosen name. Avoid special characters, as the name is used for Spring bean names.

jeap:
messaging:
kafka:
cluster:
bit:
bootstrapServers: ...
default-cluster: true # optional; the first cluster is the default if none is marked
aws:
bootstrapServers: ...

Producers select a non-default cluster with a @Qualifier("<name>") on the injected KafkaTemplate; consumers select it with containerFactory = "<name>KafkaListenerContainerFactory" on @KafkaListener. The default cluster needs neither.

NameDefaultDescription
cluster.<name>.bootstrapServerslocalhost:9092Kafka bootstrap servers (host:port)
cluster.<name>.consumerBootstrapServersOverride bootstrap servers for consumers (since 4.2.0)
cluster.<name>.producerBootstrapServersOverride bootstrap servers for producers (since 4.2.0)
cluster.<name>.adminClientBootstrapServersOverride bootstrap servers for admin clients (since 4.2.0)
cluster.<name>.default-producer-cluster-overridefalseMake this cluster the default producer cluster for migration scenarios (mirroring). Affects which KafkaTemplate and DefaultKafkaProducerFactory beans are marked @Primary

Core

NameDefaultDescription
useSchemaRegistrytrueUse a schema registry. If false, an internal mock Confluent schema registry is used
autoRegisterSchematrueAutomatically register new schemas before first send (uses TopicRecordNameStrategy)
expose-message-key-to-consumerfalseMake message keys available to the consumer (true) or suppress them (false)
systemNameName of the sending system; required to generate MessageProcessingFailedEvent messages
serviceName${spring.application.name}Name of the sending service; used in generated MessageProcessingFailedEvent messages
errorTopicNameError topic for MessageProcessingFailedEvent messages (also per cluster: cluster.<name>.errorTopicName)
errorServiceRetryIntervalMs5000Retry interval for sending a MessageProcessingFailedEvent
errorServiceRetryAttempts5Number of attempts to send a MessageProcessingFailedEvent before the application terminates
embeddedForce (true) or disable (false) the automatic EmbeddedKafka test configuration

Contracts

NameDefaultDescription
publishWithoutContractAllowedfalseAllow producing without a contract. Must be false in production
consumeWithoutContractAllowedfalseAllow consuming without a contract. Must be false in production
silentIgnoreWithoutContractfalseSuppress the error logged when a message without a contract is received (e.g. when listening to a topic carrying several event types)

See Message contracts.

Encryption

NameDefaultDescription
messageTypeEncryptionDisabledfalseDisable encryption for message types that declare an encryptionKeyId in their producer contract — useful in tests so no jEAP-Crypto instance is needed. Forbidden on acceptance and production

See Encrypting messages.

Schema registry & Kafka authentication

For the Confluent Schema Registry with SASL authentication, the AWS Glue Schema Registry and AWS MSK IAM authentication properties, see the dedicated pages:

Overriding the deserialized type

Since 7.2.0 (Confluent) the deserialized key/value type can be set per listener with the specific.avro.key.type / specific.avro.value.type properties. This is needed for self-messages during schema evolution:


@KafkaListener(topics = TOPIC_NAME,
properties = {"specific.avro.value.type=ch.admin.bit.jme.test.JmeSimpleTestV2Event"})
public void consume(JmeSimpleTestV2Event event, Acknowledgment ack) { ...}