/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class PackageDistributedNotifier
implements TopologyChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
    public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
    private final ConcurrentMap<String, Long> lastDistributedOffsets = new ConcurrentHashMap<String, Long>();
    private final ConcurrentMap<String, LocalStore> localStores = new ConcurrentHashMap<String, LocalStore>();
    private final EventAdmin eventAdmin;
    private final PubQueueProvider pubQueueCacheService;
    private final MessagingProvider messagingProvider;
    private final Topics topics;
    private final ResourceResolverFactory resolverFactory;
    private Consumer<PackageDistributedMessage> sender;
    private final boolean sendMsg;
    private final boolean ensureEvent;

    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueCacheService, MessagingProvider messagingProvider, Topics topics, ResourceResolverFactory resolverFactory, boolean ensureEvent) {
        this.eventAdmin = eventAdmin;
        this.pubQueueCacheService = pubQueueCacheService;
        this.messagingProvider = messagingProvider;
        this.topics = topics;
        this.resolverFactory = resolverFactory;
        this.ensureEvent = ensureEvent;
        this.sendMsg = StringUtils.isNotBlank((CharSequence)topics.getEventTopic());
        if (this.sendMsg) {
            this.sender = messagingProvider.createSender(topics.getEventTopic());
        }
        LOG.info("Started package distributed notifier with event message topic {}", (Object)topics.getEventTopic());
    }

    @Override
    public void changed(TopologyViewDiff diffView) {
        diffView.getProcessedOffsets().forEach(this::processOffsets);
    }

    private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
        long minOffset = offsets.get().findFirst().getAsLong();
        if (this.ensureEvent) {
            long lastDistributedOffset = this.lastDistributedOffsets.computeIfAbsent(pubAgentName, this::getLastStoredDistributedOffset);
            minOffset = Math.min(offsets.get().findFirst().getAsLong(), lastDistributedOffset);
        }
        OffsetQueue<DistributionQueueItem> offsetQueue = this.pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
        offsets.get().mapToObj(offsetQueue::getItem).filter(Objects::nonNull).forEach(msg -> this.notifyDistributed(pubAgentName, (DistributionQueueItem)msg));
    }

    private long getLastStoredDistributedOffset(String pubAgentName) {
        return this.localStores.computeIfAbsent(pubAgentName, this::newLocalStore).load(STORE_TYPE_OFFSETS, Long.MAX_VALUE);
    }

    private LocalStore newLocalStore(String pubAgentName) {
        String packageNodeName = DistributionSubscriber.escapeTopicName(this.messagingProvider.getServerUri(), this.topics.getPackageTopic());
        return new LocalStore(this.resolverFactory, packageNodeName, pubAgentName);
    }

    protected void storeLastDistributedOffset() {
        for (Map.Entry localStoreEntry : this.localStores.entrySet()) {
            long lastStoredOffset;
            String pubAgentName = (String)localStoreEntry.getKey();
            LocalStore localStore = (LocalStore)localStoreEntry.getValue();
            long lastDistributedOffset = this.lastDistributedOffsets.getOrDefault(pubAgentName, Long.MAX_VALUE);
            if (lastDistributedOffset == (lastStoredOffset = localStore.load(STORE_TYPE_OFFSETS, Long.MAX_VALUE).longValue())) continue;
            try {
                localStore.store(STORE_TYPE_OFFSETS, lastDistributedOffset);
                LOG.info("The offset={} has been stored for the pubAgentName={}", (Object)lastDistributedOffset, (Object)pubAgentName);
            }
            catch (Exception e) {
                LOG.warn("Exception when storing the last distributed offset in the repository", (Throwable)e);
            }
        }
    }

    protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
        LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", (Object)pubAgentName, (Object)queueItem.getPackageId());
        this.sendEvt(pubAgentName, queueItem);
        if (this.sendMsg) {
            this.sendMsg(pubAgentName, queueItem);
        }
    }

    private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
        try {
            PackageDistributedMessage msg = this.createDistributedMessage(pubAgentName, queueItem);
            this.sender.accept(msg);
        }
        catch (Exception e) {
            LOG.warn("Exception when sending package distributed message for pubAgentName={}, pkgId={}", new Object[]{pubAgentName, queueItem.getPackageId(), e});
        }
    }

    private PackageDistributedMessage createDistributedMessage(String pubAgentName, DistributionQueueItem queueItem) {
        return PackageDistributedMessage.builder().pubAgentName(pubAgentName).packageId(queueItem.getPackageId()).offset(((Long)queueItem.get((Object)"recordOffset")).longValue()).paths((String[])queueItem.get((Object)"request.paths")).deepPaths((String[])queueItem.get((Object)"request.deepPaths")).build();
    }

    private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
        try {
            Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
            this.eventAdmin.sendEvent(distributed);
            this.lastDistributedOffsets.put(pubAgentName, (Long)queueItem.getOrDefault((Object)"recordOffset", (Object)Long.MAX_VALUE));
        }
        catch (Exception e) {
            LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", new Object[]{pubAgentName, queueItem.getPackageId(), e});
        }
    }
}

