/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

public class DefaultEventHandler
implements EventHandler {
    private static final String METRIC_GRP_PREFIX = "consumer";
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final DefaultBackgroundThread backgroundThread;

    public DefaultEventHandler(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, SubscriptionState subscriptionState, ApiVersions apiVersions, Metrics metrics, ClusterResourceListeners clusterResourceListeners, Sensor fetcherThrottleTimeSensor) {
        this(Time.SYSTEM, config, groupRebalanceConfig, logContext, new LinkedBlockingQueue<ApplicationEvent>(), new LinkedBlockingQueue<BackgroundEvent>(), subscriptionState, apiVersions, metrics, clusterResourceListeners, fetcherThrottleTimeSensor);
    }

    public DefaultEventHandler(Time time, ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, SubscriptionState subscriptionState, ApiVersions apiVersions, Metrics metrics, ClusterResourceListeners clusterResourceListeners, Sensor fetcherThrottleTimeSensor) {
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        ConsumerMetadata metadata = this.bootstrapMetadata(logContext, clusterResourceListeners, config, subscriptionState);
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
        Selector selector = new Selector(config.getLong("connections.max.idle.ms"), metrics, time, METRIC_GRP_PREFIX, channelBuilder, logContext);
        NetworkClient networkClient = new NetworkClient(selector, metadata, config.getString("client.id"), 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), config.getLong("socket.connection.setup.timeout.ms"), config.getLong("socket.connection.setup.timeout.max.ms"), time, true, apiVersions, fetcherThrottleTimeSensor, logContext);
        this.backgroundThread = new DefaultBackgroundThread(time, config, groupRebalanceConfig, logContext, this.applicationEventQueue, this.backgroundEventQueue, metadata, networkClient);
        this.backgroundThread.start();
    }

    DefaultEventHandler(Time time, ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, ConsumerMetadata metadata, KafkaClient networkClient) {
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        this.backgroundThread = new DefaultBackgroundThread(time, config, groupRebalanceConfig, logContext, this.applicationEventQueue, this.backgroundEventQueue, metadata, networkClient);
        this.backgroundThread.start();
    }

    DefaultEventHandler(DefaultBackgroundThread backgroundThread, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue) {
        this.backgroundThread = backgroundThread;
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        backgroundThread.start();
    }

    @Override
    public Optional<BackgroundEvent> poll() {
        return Optional.ofNullable(this.backgroundEventQueue.poll());
    }

    @Override
    public boolean isEmpty() {
        return this.backgroundEventQueue.isEmpty();
    }

    @Override
    public boolean add(ApplicationEvent event) {
        this.backgroundThread.wakeup();
        return this.applicationEventQueue.add(event);
    }

    private ConsumerMetadata bootstrapMetadata(LogContext logContext, ClusterResourceListeners clusterResourceListeners, ConsumerConfig config, SubscriptionState subscriptions) {
        ConsumerMetadata metadata = new ConsumerMetadata(config.getLong("retry.backoff.ms"), config.getLong("metadata.max.age.ms"), config.getBoolean("exclude.internal.topics") == false, config.getBoolean("allow.auto.create.topics"), subscriptions, logContext, clusterResourceListeners);
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
        metadata.bootstrap(addresses);
        return metadata;
    }

    @Override
    public void close() {
        try {
            this.backgroundThread.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

