/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;

public class HsSelectiveSpillingStrategy
implements HsSpillingStrategy {
    private final float spillBufferRatio;
    private final float spillThreshold;

    public HsSelectiveSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.spillThreshold = hybridShuffleConfiguration.getSelectiveStrategySpillThreshold();
        this.spillBufferRatio = hybridShuffleConfiguration.getSelectiveStrategySpillBufferRatio();
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
        return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer) {
        return Optional.of(HsSpillingStrategy.Decision.builder().addBufferToRelease(consumedBuffer).build());
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onMemoryUsageChanged(int numTotalRequestedBuffers, int currentPoolSize) {
        return (float)numTotalRequestedBuffers < (float)currentPoolSize * this.spillThreshold ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override
    public HsSpillingStrategy.Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) {
        if ((float)spillingInfoProvider.getNumTotalRequestedBuffers() < (float)spillingInfoProvider.getPoolSize() * this.spillThreshold) {
            return HsSpillingStrategy.Decision.NO_ACTION;
        }
        int spillNum = (int)((float)spillingInfoProvider.getPoolSize() * this.spillBufferRatio);
        TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToBuffers = new TreeMap<Integer, Deque<BufferIndexAndChannel>>();
        for (int channel = 0; channel < spillingInfoProvider.getNumSubpartitions(); ++channel) {
            subpartitionToBuffers.put(channel, spillingInfoProvider.getBuffersInOrder(channel, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED));
        }
        TreeMap<Integer, List<BufferIndexAndChannel>> subpartitionToHighPriorityBuffers = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(spillingInfoProvider.getNextBufferIndexToConsume(), subpartitionToBuffers, spillNum);
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        subpartitionToHighPriorityBuffers.forEach((subpartitionId, buffers) -> {
            builder.addBufferToSpill((int)subpartitionId, (List<BufferIndexAndChannel>)buffers);
            builder.addBufferToRelease((int)subpartitionId, (List<BufferIndexAndChannel>)buffers);
        });
        return builder.build();
    }

    @Override
    public HsSpillingStrategy.Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); ++subpartitionId) {
            builder.addBufferToSpill(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL)).addBufferToRelease(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.ALL));
        }
        return builder.build();
    }
}

