UserEventPublisher.java

package com.markstickel.keycloak.kafka.user;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jboss.logging.Logger;
import org.keycloak.models.UserModel;

public class UserEventPublisher {

    private static final Logger logger = Logger.getLogger(UserEventPublisher.class);

    private final Producer producer;
    private final String topic;

    public UserEventPublisher(String kafkaBootstrapUrl, String topic) {
        this.producer = createProducer(kafkaBootstrapUrl);
        this.topic = topic;
    }

    public void publishCreate(UserEvent object) throws ExecutionException, InterruptedException {
        logger.warn("Publishing user event on Kafka: " + object.getModel());
        ProducerRecord<String, UserModel> record = new ProducerRecord<>(topic, object.getId(), object.getModel());
        producer.send(record).get();
        producer.flush();
        logger.warn("Event was published");
    }

    public void close() {
        producer.flush();
        producer.close();
    }

    private Producer<String, UserModel> createProducer(String kafkaBootstrapUrl) {
        Properties props = new Properties();
        logger.info("Setting property " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + kafkaBootstrapUrl);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  kafkaBootstrapUrl);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "UserProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  UserModelSerializer.class.getName());
        KafkaProducer<String, UserModel> producer = new KafkaProducer<>(props);
        return producer;
    }

}