/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.stream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.connector.file.table.stream.PartitionCommitPredicate;
import org.apache.flink.connector.file.table.stream.PartitionCommitTrigger;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.StringUtils;

@Internal
public class ProcTimeCommitTrigger
implements PartitionCommitTrigger {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor("pending-partitions-with-time", new MapSerializer<String, Long>(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions;
    private final ProcessingTimeService procTimeService;
    private final PartitionCommitPredicate partitionCommitPredicate;

    public ProcTimeCommitTrigger(boolean isRestored, OperatorStateStore stateStore, ProcessingTimeService procTimeService, PartitionCommitPredicate partitionCommitPredicate) throws Exception {
        this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        this.pendingPartitions = new HashMap<String, Long>();
        if (isRestored) {
            this.pendingPartitions.putAll((Map)((Iterable)this.pendingPartitionsState.get()).iterator().next());
        }
        this.procTimeService = procTimeService;
        this.partitionCommitPredicate = partitionCommitPredicate;
    }

    @Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
            this.pendingPartitions.putIfAbsent(partition, this.procTimeService.getCurrentProcessingTime());
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        ArrayList<String> needCommit = new ArrayList<String>();
        Iterator<Map.Entry<String, Long>> iter = this.pendingPartitions.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            long creationTime = entry.getValue();
            PartitionCommitPredicate.PredicateContext predicateContext = this.createPredicateContext(entry.getKey(), creationTime);
            if (!this.partitionCommitPredicate.isPartitionCommittable(predicateContext)) continue;
            needCommit.add(entry.getKey());
            iter.remove();
        }
        return needCommit;
    }

    private PartitionCommitPredicate.PredicateContext createPredicateContext(final String partition, final long createProcTime) {
        return new PartitionCommitPredicate.PredicateContext(){

            @Override
            public String partition() {
                return partition;
            }

            @Override
            public long createProcTime() {
                return createProcTime;
            }

            @Override
            public long currentProcTime() {
                return ProcTimeCommitTrigger.this.procTimeService.getCurrentProcessingTime();
            }

            @Override
            public long currentWatermark() {
                throw new UnsupportedOperationException("Method currentWatermark isn't supported in ProcTimeCommitTrigger.");
            }
        };
    }

    @Override
    public void snapshotState(long checkpointId, long watermark) throws Exception {
        this.pendingPartitionsState.clear();
        this.pendingPartitionsState.add(new HashMap<String, Long>(this.pendingPartitions));
    }

    @Override
    public List<String> endInput() {
        ArrayList<String> partitions = new ArrayList<String>(this.pendingPartitions.keySet());
        this.pendingPartitions.clear();
        return partitions;
    }
}

