/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.transport;

import java.io.IOException;
import java.util.Locale;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public class RCFPollingTransportAction
extends HandledTransportAction<RCFPollingRequest, RCFPollingResponse> {
    private static final Logger LOG = LogManager.getLogger(RCFPollingTransportAction.class);
    static final String NO_NODE_FOUND_MSG = "Cannot find model hosting node";
    static final String FAIL_TO_GET_RCF_UPDATE_MSG = "Cannot find hosted model or related checkpoint";
    private final TransportService transportService;
    private final ModelManager modelManager;
    private final HashRing hashRing;
    private final TransportRequestOptions option;
    private final ClusterService clusterService;

    @Inject
    public RCFPollingTransportAction(ActionFilters actionFilters, TransportService transportService, Settings settings, ModelManager modelManager, HashRing hashRing, ClusterService clusterService) {
        super(RCFPollingAction.NAME, transportService, actionFilters, RCFPollingRequest::new);
        this.transportService = transportService;
        this.modelManager = modelManager;
        this.hashRing = hashRing;
        this.option = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout((TimeValue)AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings)).build();
        this.clusterService = clusterService;
    }

    protected void doExecute(Task task, RCFPollingRequest request, final ActionListener<RCFPollingResponse> listener) {
        String adID = request.getAdID();
        String rcfModelID = SingleStreamModelIdMapper.getRcfModelId(adID, 0);
        Optional<DiscoveryNode> rcfNode = this.hashRing.getOwningNodeWithSameLocalAdVersionForRealtimeAD(rcfModelID);
        if (!rcfNode.isPresent()) {
            listener.onFailure((Exception)new AnomalyDetectionException(adID, NO_NODE_FOUND_MSG));
            return;
        }
        String rcfNodeId = rcfNode.get().getId();
        DiscoveryNode localNode = this.clusterService.localNode();
        if (localNode.getId().equals(rcfNodeId)) {
            this.modelManager.getTotalUpdates(rcfModelID, adID, (ActionListener<Long>)ActionListener.wrap(totalUpdates -> listener.onResponse((Object)new RCFPollingResponse((long)totalUpdates)), e -> listener.onFailure((Exception)new AnomalyDetectionException(adID, FAIL_TO_GET_RCF_UPDATE_MSG, (Throwable)e))));
        } else if (request.remoteAddress() == null) {
            LOG.info("Sending RCF polling request to {} for model {}", (Object)rcfNodeId, (Object)rcfModelID);
            try {
                this.transportService.sendRequest(rcfNode.get(), RCFPollingAction.NAME, (TransportRequest)request, this.option, (TransportResponseHandler)new TransportResponseHandler<RCFPollingResponse>(){

                    public RCFPollingResponse read(StreamInput in) throws IOException {
                        return new RCFPollingResponse(in);
                    }

                    public void handleResponse(RCFPollingResponse response) {
                        listener.onResponse((Object)response);
                    }

                    public void handleException(TransportException exp) {
                        listener.onFailure((Exception)exp);
                    }

                    public String executor() {
                        return "same";
                    }
                });
            }
            catch (Exception e2) {
                LOG.error(String.format(Locale.ROOT, "Fail to poll RCF models for {}", adID), (Throwable)e2);
                listener.onFailure((Exception)new AnomalyDetectionException(adID, FAIL_TO_GET_RCF_UPDATE_MSG, e2));
            }
        } else {
            LOG.error("Fail to poll rcf for model {} due to an unexpected bug.", (Object)rcfModelID);
            listener.onFailure((Exception)new AnomalyDetectionException(adID, NO_NODE_FOUND_MSG));
        }
    }
}

