KafkaPublisherEventListenerProvider.java

package com.markstickel.keycloak.kafka;

import com.markstickel.keycloak.kafka.user.UserEvent;
import com.markstickel.keycloak.kafka.user.UserEventPublisher;
import java.util.concurrent.ExecutionException;
import org.jboss.logging.Logger;
import org.keycloak.events.Event;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserProvider;

public class KafkaPublisherEventListenerProvider
        implements EventListenerProvider {

    private static final Logger logger = Logger.getLogger(KafkaPublisherEventListenerProvider.class);

    private final UserEventPublisher kafkaPublisher;

    private final UserProvider userProvider;

    private final RealmModel realmModel;

    public KafkaPublisherEventListenerProvider(UserEventPublisher kafkaPublisher, KeycloakSession session) {
        this.kafkaPublisher = kafkaPublisher;
        this.userProvider = session.getProvider(UserProvider.class);
        this.realmModel = session.getContext().getRealm();
    }

    @Override
    public void onEvent(Event event) {
        if (EventType.REGISTER.equals(event.getType())) {
            UserModel userModel = userProvider.getUserById(realmModel, event.getUserId());
            logger.debug("A new user registered: " + userModel.getEmail());
            UserEvent userEvent = new UserEvent();
            userEvent.setId(event.getId());
            userEvent.setModel(userModel);
            try {
                kafkaPublisher.publishCreate(userEvent);
            } catch (ExecutionException | InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    @Override
    public void onEvent(AdminEvent event, boolean b) {
    }

    @Override
    public void close() {
    }
}