bootique-kafka
Integration of Kafka client and Kafka streams for Bootique. See usage examples:
Dependencies
Include the BOMs and then bootique-kafka-client
:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.bootique.bom</groupId>
<artifactId>bootique-bom</artifactId>
<version>2.0.M1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
...
<!-- If using Producer and/or Consumer -->
<dependency>
<groupId>io.bootique.kafka</groupId>
<artifactId>bootique-kafka-client</artifactId>
</dependency>
<!-- If using streams -->
<dependency>
<groupId>io.bootique.kafka</groupId>
<artifactId>bootique-kafka-streams</artifactId>
</dependency>
Producer/Consumer Configuration
Configure parameters in the YAML. Note that practically all of these settings can be overidden when obtaining a specific Producer or Consumer instance via io.bootique.kafka.client.KafkaClientFactory
. So this is just a collection of defaults for the most typical Producer or Consumer:
kafkaclient:
# any number of named clusters, specifying comma-separated bootstrap Kafka servers for each.
clusters:
cluster1: 127.0.0.1:9092
cluster2: host1:9092,host2:9092
consumer:
autoCommit: true
autoCommitInterval: "200ms"
defaultGroup: myappgroup
sessionTimeout: "2s"
producer:
acks: all # values are "all" or numeric number for min acks
retries: 1
batchSize: 16384
linger: "1ms"
bufferMemory: 33554432
Now you can inject producer and consumer factories and create any number of producers and consumers. Producer example (also see this code sample) :
@Inject
KafkaProducerFactory factory;
public void runProducer() {
Producer<byte[], String> producer = factory
.charValueProducer()
.cluster("cluster2")
.create();
producer.send(new ProducerRecord<>("mytopic", "Hi!"));
// close if there's nothing else to send
producer.close();
}
Consumer example (also see this code sample) :
@Inject
KafkaConsumerFactory factory;
public void runConsumer() {
KafkaConsumerRunner<byte[], String> consumer = factory
.charValueConsumer()
.cluster("cluster1")
.group("somegroup")
.topic("mytopic")
.pollInterval(Duration.ofSeconds(1))
.create();
for (ConsumerRecord<byte[], String> r : consumer) {
System.out.println(r.topic() + "_" + r.partition() + "_" + r.offset() + ": " + r.value());
}
consumer.close();
}
Streams Configuration
TODO