KafkaPublisherEventListenerProviderFactory.java

package com.markstickel.keycloak.kafka;

import com.markstickel.keycloak.kafka.user.UserEventPublisher;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.jboss.logging.Logger;
import org.keycloak.Config.Scope;
import org.keycloak.events.EventListenerProviderFactory;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.ServerInfoAwareProviderFactory;

public class KafkaPublisherEventListenerProviderFactory implements EventListenerProviderFactory,
                                                                   ServerInfoAwareProviderFactory {

    private static final Logger logger = Logger.getLogger(KafkaPublisherEventListenerProviderFactory.class);
    private static final String KAFKA_TOPIC = "KAFKA_TOPIC";
    private static final String KAFKA_BOOTSTRAP_SERVER = "KAFKA_BOOTSTRAP_SERVER";

    private UserEventPublisher userEventPublisher;

    private String kafkaBootstrapUrl;
    private String topic;

    @Override
    public KafkaPublisherEventListenerProvider create(KeycloakSession session) {
        return new KafkaPublisherEventListenerProvider(userEventPublisher, session);
    }

    @Override
    public void init(Scope scope) {
        this.topic = Optional.ofNullable(System.getenv(KAFKA_TOPIC)).orElse("abcd");
        this.kafkaBootstrapUrl = Optional.ofNullable(System.getenv(KAFKA_BOOTSTRAP_SERVER)).orElse("popfizzclink:9092");
        logger.info("Initiating Kafka publisher. Bootstrap URL is " + kafkaBootstrapUrl + "; topic to publish on is " + topic);
        userEventPublisher = new UserEventPublisher(kafkaBootstrapUrl, topic);
    }

    @Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
        logger.info("factory postInit");
    }

    @Override
    public void close() {
        userEventPublisher.close();
    }

    @Override
    public String getId() {
        return "kafka-event-publisher";
    }

    @Override
    public Map<String, String> getOperationalInfo() {
        Map<String, String> opInfo = new HashMap<>();
        opInfo.put("kafkaBootstrapUrl", this.kafkaBootstrapUrl);
        opInfo.put("topic", this.topic);
        return opInfo;
    }
}