/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.action.search;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.PitSearchContextIdForNode;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchUtils;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.UpdatePitContextRequest;
import org.opensearch.action.search.UpdatePitContextResponse;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

public class CreatePitController {
    private final SearchTransportService searchTransportService;
    private final ClusterService clusterService;
    private final TransportSearchAction transportSearchAction;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final PitService pitService;
    private static final Logger logger = LogManager.getLogger(CreatePitController.class);
    public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting("point_in_time.init.keep_alive", TimeValue.timeValueSeconds((long)30L), Setting.Property.NodeScope);

    @Inject
    public CreatePitController(SearchTransportService searchTransportService, ClusterService clusterService, TransportSearchAction transportSearchAction, NamedWriteableRegistry namedWriteableRegistry, PitService pitService) {
        this.searchTransportService = searchTransportService;
        this.clusterService = clusterService;
        this.transportSearchAction = transportSearchAction;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.pitService = pitService;
    }

    public void executeCreatePit(CreatePitRequest request, Task task, StepListener<SearchResponse> createPitListener, ActionListener<CreatePitResponse> updatePitIdListener) {
        SearchRequest searchRequest = new SearchRequest(request.getIndices());
        searchRequest.preference(request.getPreference());
        searchRequest.routing(request.getRouting());
        searchRequest.indicesOptions(request.getIndicesOptions());
        searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation());
        Task searchTask = searchRequest.createTask(task.getId(), task.getType(), task.getAction(), task.getParentTaskId(), Collections.emptyMap());
        searchRequest.setCcsMinimizeRoundtrips(false);
        this.executeCreatePit(searchTask, searchRequest, createPitListener);
        createPitListener.whenComplete((CheckedConsumer<SearchResponse, Exception>)((CheckedConsumer)searchResponse -> this.executeUpdatePitId(request, searchRequest, (SearchResponse)searchResponse, updatePitIdListener)), arg_0 -> updatePitIdListener.onFailure(arg_0));
    }

    void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<SearchResponse> createPitListener) {
        logger.debug(() -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", (Object)Arrays.toString(searchRequest.indices())));
        this.transportSearchAction.executeRequest(task, searchRequest, "create_pit", true, new TransportSearchAction.SinglePhaseSearchAction(){

            @Override
            public void executeOnShardTarget(SearchTask searchTask, SearchShardTarget target, Transport.Connection connection, ActionListener<SearchPhaseResult> searchPhaseResultActionListener) {
                CreatePitController.this.searchTransportService.createPitContext(connection, new TransportCreatePitAction.CreateReaderContextRequest(target.getShardId(), PIT_INIT_KEEP_ALIVE.get(CreatePitController.this.clusterService.getSettings())), searchTask, (ActionListener<TransportCreatePitAction.CreateReaderContextResponse>)ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse((Object)r), arg_0 -> searchPhaseResultActionListener.onFailure(arg_0)));
            }
        }, (ActionListener<SearchResponse>)createPitListener);
    }

    void executeUpdatePitId(CreatePitRequest request, SearchRequest searchRequest, SearchResponse searchResponse, ActionListener<CreatePitResponse> updatePitIdListener) {
        logger.debug(() -> new ParameterizedMessage("Updating PIT context with PIT ID [{}], creation time and keep alive", (Object)searchResponse.pointInTimeId()));
        long relativeStartNanos = System.nanoTime();
        TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        long creationTime = timeProvider.getAbsoluteStartMillis();
        CreatePitResponse createPITResponse = new CreatePitResponse(searchResponse.pointInTimeId(), creationTime, searchResponse.getTotalShards(), searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), searchResponse.getFailedShards(), searchResponse.getShardFailures());
        SearchContextId contextId = SearchContextId.decode(this.namedWriteableRegistry, createPITResponse.getId());
        StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = this.getConnectionLookupListener(contextId);
        lookupListener.whenComplete((CheckedConsumer<BiFunction<String, String, DiscoveryNode>, Exception>)((CheckedConsumer)nodelookup -> {
            ActionListener<UpdatePitContextResponse> groupedActionListener = this.getGroupedListener(updatePitIdListener, createPITResponse, contextId.shards().size(), contextId.shards().values());
            for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
                DiscoveryNode node = (DiscoveryNode)nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
                if (node == null) {
                    node = this.clusterService.state().getNodes().get(entry.getValue().getNode());
                }
                if (node == null) {
                    logger.error(() -> new ParameterizedMessage("Create pit update phase for PIT ID [{}] failed because node [{}] not found", (Object)searchResponse.pointInTimeId(), (Object)((SearchContextIdForNode)entry.getValue()).getNode()));
                    groupedActionListener.onFailure((Exception)new OpenSearchException("Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed because node[" + entry.getValue().getNode() + "] not found", new Object[0]));
                    return;
                }
                try {
                    Transport.Connection connection = this.searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
                    this.searchTransportService.updatePitContext(connection, new UpdatePitContextRequest(entry.getValue().getSearchContextId(), createPITResponse.getId(), request.getKeepAlive().millis(), creationTime), groupedActionListener);
                }
                catch (Exception e) {
                    String nodeName = node.getName();
                    logger.error(() -> new ParameterizedMessage("Create pit update phase failed for PIT ID [{}] on node [{}]", (Object)searchResponse.pointInTimeId(), (Object)nodeName), (Throwable)e);
                    groupedActionListener.onFailure((Exception)new OpenSearchException("Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]", (Throwable)e, new Object[0]));
                }
            }
        }), arg_0 -> updatePitIdListener.onFailure(arg_0));
    }

    private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
        ClusterState state = this.clusterService.state();
        Set<String> clusters = contextId.shards().values().stream().filter(ctx -> !Strings.isEmpty((CharSequence)ctx.getClusterAlias())).map(SearchContextIdForNode::getClusterAlias).collect(Collectors.toSet());
        return (StepListener)SearchUtils.getConnectionLookupListener(this.searchTransportService.getRemoteClusterService(), state, clusters);
    }

    private ActionListener<UpdatePitContextResponse> getGroupedListener(final ActionListener<CreatePitResponse> updatePitIdListener, final CreatePitResponse createPITResponse, int size, final Collection<SearchContextIdForNode> contexts) {
        return new GroupedActionListener<UpdatePitContextResponse>(new ActionListener<Collection<UpdatePitContextResponse>>(){

            public void onResponse(Collection<UpdatePitContextResponse> responses) {
                updatePitIdListener.onResponse((Object)createPITResponse);
            }

            public void onFailure(Exception e) {
                CreatePitController.this.cleanupContexts(contexts, createPITResponse.getId());
                updatePitIdListener.onFailure(e);
            }
        }, size);
    }

    private void cleanupContexts(Collection<SearchContextIdForNode> contexts, String pitId) {
        ActionListener<DeletePitResponse> deleteListener = new ActionListener<DeletePitResponse>(){

            public void onResponse(DeletePitResponse response) {
                StringBuilder failedPitsStringBuilder = new StringBuilder();
                response.getDeletePitResults().stream().filter(r -> !r.isSuccessful()).forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
                logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", (Object)failedPitsStringBuilder.toString()));
                if (logger.isDebugEnabled()) {
                    StringBuilder successfulPitsStringBuilder = new StringBuilder();
                    response.getDeletePitResults().stream().filter(r -> r.isSuccessful()).forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
                    logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", (Object)successfulPitsStringBuilder.toString()));
                }
            }

            public void onFailure(Exception e) {
                logger.error("Cleaning up PIT contexts failed ", (Throwable)e);
            }
        };
        HashMap<String, List<PitSearchContextIdForNode>> nodeToContextsMap = new HashMap<String, List<PitSearchContextIdForNode>>();
        for (SearchContextIdForNode context : contexts) {
            List contextIdsForNode = nodeToContextsMap.getOrDefault(context.getNode(), new ArrayList());
            contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context));
            nodeToContextsMap.put(context.getNode(), contextIdsForNode);
        }
        this.pitService.deletePitContexts(nodeToContextsMap, deleteListener);
    }
}

