/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.writer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.config.PluginSettings;
import org.opensearch.performanceanalyzer.commons.event_process.Event;
import org.opensearch.performanceanalyzer.commons.event_process.EventLogFileHandler;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;

public class EventLogQueueProcessor {
    private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class);
    private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1);
    private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval();
    private final EventLogFileHandler eventLogFileHandler;
    private final long initialDelayMillis;
    private final long purgePeriodicityMillis;
    private final PerformanceAnalyzerController controller;
    private long lastCleanupTimeBucket;
    private long lastTimeBucket;

    public EventLogQueueProcessor(EventLogFileHandler eventLogFileHandler, long initialDelayMillis, long purgePeriodicityMillis, PerformanceAnalyzerController controller) {
        this.eventLogFileHandler = eventLogFileHandler;
        this.initialDelayMillis = initialDelayMillis;
        this.purgePeriodicityMillis = purgePeriodicityMillis;
        this.lastCleanupTimeBucket = 0L;
        this.lastTimeBucket = 0L;
        this.controller = controller;
    }

    public void scheduleExecutor() {
        try {
            this.eventLogFileHandler.deleteAllFiles();
        }
        catch (Exception ex) {
            LOG.error("Unable to cleanup lingering files from previous plugin run.", (Throwable)ex);
        }
        this.lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval((long)System.currentTimeMillis());
        ScheduledFuture<?> futureHandle = this.writerExecutor.scheduleAtFixedRate(this::purgeQueueAndPersist, this.initialDelayMillis, this.purgePeriodicityMillis, TimeUnit.MILLISECONDS);
        new Thread(() -> {
            try {
                futureHandle.get();
            }
            catch (InterruptedException e) {
                LOG.error("Scheduled execution was interrupted", (Throwable)e);
            }
            catch (CancellationException e) {
                LOG.warn("Watcher thread has been cancelled", (Throwable)e);
            }
            catch (ExecutionException e) {
                LOG.error("QueuePurger interrupted. Caused by ", e.getCause());
            }
        }).start();
    }

    public void purgeQueueAndPersist() {
        if (PerformanceAnalyzerConfigAction.getInstance() == null) {
            return;
        }
        if (!this.controller.isPerformanceAnalyzerEnabled()) {
            if (PerformanceAnalyzerMetrics.metricQueue.size() > 0) {
                ArrayList metrics = new ArrayList();
                PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
                LOG.info("Performance Analyzer no longer enabled. Drained thequeue to remove stale data.");
            }
            return;
        }
        LOG.debug("Starting to purge the queue.");
        ArrayList metrics = new ArrayList();
        PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
        LOG.debug("Queue draining successful.");
        long currentTimeMillis = System.currentTimeMillis();
        long timeBucket = PerformanceAnalyzerMetrics.getTimeInterval((long)currentTimeMillis, (int)5000) - 5000L;
        long nextTimeBucket = timeBucket + 5000L;
        ArrayList<Event> currMetrics = new ArrayList<Event>();
        ArrayList<Event> nextMetrics = new ArrayList<Event>();
        for (Event entry : metrics) {
            if (entry.epoch == timeBucket) {
                currMetrics.add(entry);
                continue;
            }
            if (entry.epoch == nextTimeBucket) {
                nextMetrics.add(entry);
                continue;
            }
            StatsCollector.instance().logException(StatExceptionCode.STALE_METRICS);
        }
        LOG.debug("Start serializing and writing to file.");
        this.writeAndRotate(currMetrics, timeBucket, currentTimeMillis);
        if (!nextMetrics.isEmpty()) {
            this.eventLogFileHandler.writeTmpFile(nextMetrics, nextTimeBucket);
        }
        LOG.debug("Writing to disk complete.");
        this.cleanup();
    }

    private void cleanup() {
        long currCleanupTimeBucket;
        if (this.lastCleanupTimeBucket != 0L && (currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval((long)(System.currentTimeMillis() - (long)this.filesCleanupPeriodicityMillis))) - this.lastCleanupTimeBucket > (long)this.filesCleanupPeriodicityMillis) {
            List filesForCleanup = LongStream.range(this.lastCleanupTimeBucket, currCleanupTimeBucket).filter(timeMillis -> timeMillis % 5000L == 0L).mapToObj(String::valueOf).collect(Collectors.toList());
            this.eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup));
            this.lastCleanupTimeBucket = currCleanupTimeBucket;
        }
    }

    private void writeAndRotate(List<Event> currMetrics, long currTimeBucket, long currentTime) {
        if (this.lastTimeBucket != 0L && this.lastTimeBucket != currTimeBucket) {
            this.eventLogFileHandler.renameFromTmp(this.lastTimeBucket);
        }
        if (!currMetrics.isEmpty()) {
            this.eventLogFileHandler.writeTmpFile(currMetrics, currTimeBucket);
        }
        this.lastTimeBucket = currTimeBucket;
    }
}

