/*
 * Decompiled with CFR 0.152.
 */
package com.atomiccache.cassandra.impl.cache;

import com.atomiccache.api.cache.IAtomicCache;
import com.atomiccache.api.cache.IAtomicCacheFetch;
import com.atomiccache.api.serializer.IAtomicCacheSerializer;
import com.atomiccache.cassandra.impl.cache.dao.ICassandraDao;
import com.atomiccache.cassandra.impl.cache.model.AtomicCacheRecordState;
import com.atomiccache.cassandra.impl.cache.model.InsertResult;
import com.atomiccache.cassandra.impl.cache.model.SelectResult;
import com.atomiccache.cassandra.impl.cache.model.UpdateStartFetchingResult;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class AtomicCacheCassandraImpl<K, V>
implements IAtomicCache<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(AtomicCacheCassandraImpl.class);
    private static final int INITIAL_GUARD = 3;
    private final IAtomicCacheSerializer<K, V> serializer;
    private final ICassandraDao dao;
    private final Duration maxLockWait;
    private final Duration maxLockFallbackTime;
    private final Duration sleepBetweenChecks;

    public AtomicCacheCassandraImpl(IAtomicCacheSerializer<K, V> serializer, ICassandraDao dao, Duration maxLockWait, Duration maxLockFallbackTime, Duration sleepBetweenChecks) {
        this.serializer = serializer;
        this.dao = dao;
        this.maxLockWait = maxLockWait;
        this.maxLockFallbackTime = maxLockFallbackTime;
        this.sleepBetweenChecks = sleepBetweenChecks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public V getOrFetch(@NonNull K aKey, @NonNull IAtomicCacheFetch<K, V> aFetch) {
        String keyText = this.serializer.serializeKey(aKey);
        MDC.put((String)"key", (String)keyText);
        try {
            LOG.debug("getOrFetch()");
            InsertResult insertResult = this.dao.insert(keyText, Instant.now(), UUID.randomUUID());
            if (insertResult.hasFetchedValue()) {
                Object object = this.serializer.deserializeValueFromText(insertResult.getValue());
                return (V)object;
            }
            if (insertResult.isWasApplied()) {
                V v = this.fetchResult(aKey, aFetch);
                return v;
            }
            UUID previousFetchId = insertResult.getFetchId();
            if (Instant.now().minus(this.maxLockFallbackTime).isAfter(insertResult.getStartFetchingAt())) {
                LOG.debug("Started fetching time is tool old {}", (Object)insertResult.getStartFetchingAt());
                V v = this.fetchResultAgain(aKey, previousFetchId, aFetch, 3);
                return v;
            }
            if (insertResult.getState() == AtomicCacheRecordState.ERROR) {
                V v = this.fetchResultAgain(aKey, previousFetchId, aFetch, 3);
                return v;
            }
            V v = this.fetchLoop(aKey, aFetch, keyText, previousFetchId, 3);
            return v;
        }
        finally {
            MDC.remove((String)"key");
        }
    }

    private V fetchLoop(K aKey, IAtomicCacheFetch<K, V> aFetch, String keyText, UUID previousFetchId, int aGuard) {
        LOG.atDebug().addKeyValue("previous_fetch_id", (Object)previousFetchId).addKeyValue("guard", (Object)aGuard).log("fetchInCycle()");
        long endTime = System.currentTimeMillis() + this.maxLockWait.toMillis();
        while (System.currentTimeMillis() <= endTime) {
            SelectResult selectResult = this.dao.select(keyText).orElseThrow(() -> new IllegalStateException("No any results found for key " + keyText));
            if (selectResult.getState() == AtomicCacheRecordState.FETCHED) {
                return (V)this.serializer.deserializeValueFromText(selectResult.getValue());
            }
            if (selectResult.getState() == AtomicCacheRecordState.ERROR) {
                return this.fetchResultAgain(aKey, previousFetchId, aFetch, aGuard);
            }
            try {
                Thread.sleep(this.sleepBetweenChecks.toMillis());
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for fetch result. Skipping.");
            }
        }
        return this.fetchResultAgain(aKey, previousFetchId, aFetch, aGuard);
    }

    private V fetchResultAgain(K aKey, UUID aFetchId, @NonNull IAtomicCacheFetch<K, V> aFetch, int aGuard) {
        LOG.atDebug().addKeyValue("aFetchId", (Object)aFetchId).addKeyValue("guard", (Object)aGuard).log("fetchResultAgain()");
        if (--aGuard <= 0) {
            throw new IllegalStateException("Guard exhausted");
        }
        String keyText = this.serializer.serializeKey(aKey);
        UpdateStartFetchingResult result = this.dao.updateStartedFetchingAt(keyText, Instant.now(), aFetchId, UUID.randomUUID());
        if (!result.isApplied()) {
            return this.fetchLoop(aKey, aFetch, keyText, result.getFetchId(), aGuard);
        }
        return this.fetchResult(aKey, aFetch);
    }

    private V fetchResult(K aKey, IAtomicCacheFetch<K, V> aFetch) {
        Object value;
        LOG.atDebug().log("fetchResult()");
        String keyText = this.serializer.serializeKey(aKey);
        try {
            LOG.debug("Fetching value ...");
            value = aFetch.fetchValue(aKey);
            LOG.debug("Fetched value {}", value);
        }
        catch (Exception e) {
            this.dao.setError(keyText, "Cannot fetch value: " + e);
            throw new IllegalStateException("Cannot fetch value for key " + aKey, e);
        }
        if (value == null) {
            this.dao.setError(keyText, "fetchValue() returned null value");
            throw new IllegalStateException("fetchValue() returned null value for key " + aKey);
        }
        this.dao.setValue(keyText, this.serializer.serializeValue(value));
        return (V)value;
    }
}

