/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputSplitCacheLoadTask;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InputFormatCacheLoader
extends CacheLoader {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(InputFormatCacheLoader.class);
    private final InputFormat<RowData, InputSplit> initialInputFormat;
    private final GenericRowDataKeySelector keySelector;
    private final RowDataSerializer cacheEntriesSerializer;
    private volatile transient List<InputSplitCacheLoadTask> cacheLoadTasks;
    private transient Configuration parameters;

    public InputFormatCacheLoader(InputFormat<RowData, ?> initialInputFormat, GenericRowDataKeySelector keySelector, RowDataSerializer cacheEntriesSerializer) {
        this.initialInputFormat = initialInputFormat;
        this.keySelector = keySelector;
        this.cacheEntriesSerializer = cacheEntriesSerializer;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.parameters = parameters;
        this.initialInputFormat.configure(parameters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void reloadCache() throws Exception {
        InputSplit[] inputSplits = this.createInputSplits();
        int numSplits = inputSplits.length;
        ConcurrentHashMap newCache = new ConcurrentHashMap(16, 0.75f, this.getConcurrencyLevel(numSplits));
        this.cacheLoadTasks = Arrays.stream(inputSplits).map(split -> this.createCacheLoadTask((InputSplit)split, newCache)).collect(Collectors.toList());
        if (this.isStopped) {
            return;
        }
        ExecutorService cacheLoadTaskService = null;
        try {
            if (numSplits > 1) {
                int numThreads = this.getConcurrencyLevel(numSplits);
                ExecutorService finalCacheLoadTaskService = cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
                List futures = this.cacheLoadTasks.stream().map(finalCacheLoadTaskService::submit).collect(Collectors.toList());
                for (Future future : futures) {
                    future.get();
                }
            } else {
                this.cacheLoadTasks.get(0).run();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            if (cacheLoadTaskService != null) {
                cacheLoadTaskService.shutdownNow();
            }
        }
        this.cache = newCache;
    }

    @Override
    public void close() throws Exception {
        this.isStopped = true;
        if (this.cacheLoadTasks != null) {
            this.cacheLoadTasks.forEach(InputSplitCacheLoadTask::stopRunning);
        }
        super.close();
    }

    private InputSplitCacheLoadTask createCacheLoadTask(InputSplit inputSplit, ConcurrentHashMap<RowData, Collection<RowData>> newCache) {
        try {
            InputFormat<RowData, InputSplit> inputFormat = InstantiationUtil.clone(this.initialInputFormat);
            inputFormat.configure(this.parameters);
            return new InputSplitCacheLoadTask(newCache, this.keySelector.copy(), this.cacheEntriesSerializer, inputFormat, inputSplit);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create InputFormatCacheLoadTask", e);
        }
    }

    private InputSplit[] createInputSplits() throws IOException {
        Object[] inputSplits = this.initialInputFormat.createInputSplits(1);
        if (LOG.isDebugEnabled()) {
            LOG.debug("InputFormat created {} InputSplits: {}", (Object)inputSplits.length, (Object)Arrays.deepToString(inputSplits));
        }
        Preconditions.checkState(inputSplits.length >= 1, "InputFormat must provide at least one input split to load data into the lookup 'FULL' cache.");
        return inputSplits;
    }

    private int getConcurrencyLevel(int numSplits) {
        int numOfCores = Runtime.getRuntime().availableProcessors();
        return Math.min(numSplits, numOfCores);
    }
}

