/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.ExceptionUtils;
import org.apache.kafka.streams.state.internals.FilteredCacheIterator;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreIterator;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedCacheFunction;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CachingWindowStore
extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
implements WindowStore<Bytes, byte[]>,
CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class);
    private final long windowSize;
    private final SegmentedCacheFunction cacheFunction;
    private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
    private String cacheName;
    private boolean sendOldValues;
    private InternalProcessorContext<?, ?> internalContext;
    private StateSerdes<Bytes, byte[]> bytesSerdes;
    private CacheFlushListener<byte[], byte[]> flushListener;
    private final AtomicLong maxObservedTimestamp;

    CachingWindowStore(WindowStore<Bytes, byte[]> underlying, long windowSize, long segmentInterval) {
        super(underlying);
        this.windowSize = windowSize;
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, segmentInterval);
        this.maxObservedTimestamp = new AtomicLong(-1L);
    }

    @Override
    public void init(StateStoreContext stateStoreContext, StateStore root) {
        String changelogTopic = ProcessorContextUtils.changelogFor(stateStoreContext, this.name(), Boolean.TRUE);
        this.internalContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        this.bytesSerdes = new StateSerdes(changelogTopic, Serdes.Bytes(), Serdes.ByteArray());
        this.cacheName = String.valueOf(this.internalContext.taskId()) + "-" + this.name();
        this.internalContext.registerCacheFlushListener(this.cacheName, entries -> {
            for (ThreadCache.DirtyEntry entry : entries) {
                this.putAndMaybeForward(entry, this.internalContext);
            }
        });
        super.init(stateStoreContext, root);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putAndMaybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext<?, ?> context) {
        byte[] binaryWindowKey = this.cacheFunction.key(entry.key()).get();
        Windowed<Bytes> windowedKeyBytes = WindowKeySchema.fromStoreBytesKey(binaryWindowKey, this.windowSize);
        long windowStartTimestamp = windowedKeyBytes.window().start();
        Bytes binaryKey = windowedKeyBytes.key();
        if (this.flushListener != null) {
            byte[] rawOldValue;
            byte[] rawNewValue = entry.newValue();
            byte[] byArray = rawOldValue = rawNewValue == null || this.sendOldValues ? (byte[])((WindowStore)this.wrapped()).fetch(binaryKey, windowStartTimestamp) : null;
            if (rawNewValue != null || rawOldValue != null) {
                ProcessorRecordContext current = context.recordContext();
                try {
                    context.setRecordContext(entry.entry().context());
                    ((WindowStore)this.wrapped()).put(binaryKey, entry.newValue(), windowStartTimestamp);
                    this.flushListener.apply(new Record<byte[], Change<byte[]>>(binaryWindowKey, new Change<byte[]>(rawNewValue, (byte[])(this.sendOldValues ? rawOldValue : null)), entry.entry().context().timestamp(), entry.entry().context().headers()));
                }
                finally {
                    context.setRecordContext(current);
                }
            }
        } else {
            ProcessorRecordContext current = context.recordContext();
            try {
                context.setRecordContext(entry.entry().context());
                ((WindowStore)this.wrapped()).put(binaryKey, entry.newValue(), windowStartTimestamp);
            }
            finally {
                context.setRecordContext(current);
            }
        }
    }

    @Override
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> flushListener, boolean sendOldValues) {
        this.flushListener = flushListener;
        this.sendOldValues = sendOldValues;
        return true;
    }

    @Override
    public synchronized void put(Bytes key, byte[] value, long windowStartTimestamp) {
        this.validateStoreOpen();
        Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
        LRUCacheEntry entry = new LRUCacheEntry(value, this.internalContext.headers(), true, this.internalContext.offset(), this.internalContext.timestamp(), this.internalContext.partition(), this.internalContext.topic(), this.internalContext.recordContext().sourceRawKey(), this.internalContext.recordContext().sourceRawValue());
        this.internalContext.cache().put(this.cacheName, this.cacheFunction.cacheKey(keyBytes), entry);
        this.maxObservedTimestamp.set(Math.max(this.keySchema.segmentTimestamp(keyBytes), this.maxObservedTimestamp.get()));
    }

    @Override
    public byte[] fetch(Bytes key, long timestamp) {
        this.validateStoreOpen();
        Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
        Bytes cacheKey = this.cacheFunction.cacheKey(bytesKey);
        if (this.internalContext.cache() == null) {
            return (byte[])((WindowStore)this.wrapped()).fetch(key, timestamp);
        }
        LRUCacheEntry entry = this.internalContext.cache().get(this.cacheName, cacheKey);
        if (entry == null) {
            return (byte[])((WindowStore)this.wrapped()).fetch(key, timestamp);
        }
        return entry.value();
    }

    @Override
    public synchronized WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        WindowStoreIterator<byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetch(key, timeFrom, timeTo);
        if (this.internalContext.cache() == null) {
            return underlyingIterator;
        }
        CacheIteratorWrapper cacheIterator = ((WindowStore)this.wrapped()).persistent() ? new CacheIteratorWrapper(key, timeFrom, timeTo, true) : this.internalContext.cache().range(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(key, timeFrom)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(key, timeTo)));
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(key, key, timeFrom, timeTo, true);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)filteredCacheIterator, (KeyValueIterator<Long, byte[]>)underlyingIterator, true);
    }

    @Override
    public synchronized WindowStoreIterator<byte[]> backwardFetch(Bytes key, long timeFrom, long timeTo) {
        this.validateStoreOpen();
        WindowStoreIterator<byte[]> underlyingIterator = ((WindowStore)this.wrapped()).backwardFetch(key, timeFrom, timeTo);
        if (this.internalContext.cache() == null) {
            return underlyingIterator;
        }
        CacheIteratorWrapper cacheIterator = ((WindowStore)this.wrapped()).persistent() ? new CacheIteratorWrapper(key, timeFrom, timeTo, false) : this.internalContext.cache().reverseRange(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(key, timeFrom)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(key, timeTo)));
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(key, key, timeFrom, timeTo, false);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)filteredCacheIterator, (KeyValueIterator<Long, byte[]>)underlyingIterator, false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo) {
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetch(keyFrom, keyTo, timeFrom, timeTo);
        if (this.internalContext.cache() == null) {
            return underlyingIterator;
        }
        CacheIteratorWrapper cacheIterator = ((WindowStore)this.wrapped()).persistent() ? new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) : this.internalContext.cache().range(this.cacheName, keyFrom == null ? null : this.cacheFunction.cacheKey(this.keySchema.lowerRange(keyFrom, timeFrom)), keyTo == null ? null : this.cacheFunction.cacheKey(this.keySchema.upperRange(keyTo, timeTo)));
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo, true);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo) {
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
        if (this.internalContext.cache() == null) {
            return underlyingIterator;
        }
        CacheIteratorWrapper cacheIterator = ((WindowStore)this.wrapped()).persistent() ? new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) : this.internalContext.cache().reverseRange(this.cacheName, keyFrom == null ? null : this.cacheFunction.cacheKey(this.keySchema.lowerRange(keyFrom, timeFrom)), keyTo == null ? null : this.cacheFunction.cacheKey(this.keySchema.upperRange(keyTo, timeTo)));
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo, false);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long timeFrom, long timeTo) {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).fetchAll(timeFrom, timeTo);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.internalContext.cache().all(this.cacheName);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(null, null, timeFrom, timeTo, true);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long timeFrom, long timeTo) {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).backwardFetchAll(timeFrom, timeTo);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.internalContext.cache().reverseAll(this.cacheName);
        HasNextCondition hasNextCondition = this.keySchema.hasNextCondition(null, null, timeFrom, timeTo, false);
        FilteredCacheIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, this.cacheFunction);
        return new MergedSortedCacheWindowStoreKeyValueIterator(filteredCacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).all();
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.internalContext.cache().all(this.cacheName);
        return new MergedSortedCacheWindowStoreKeyValueIterator(cacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        this.validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = ((WindowStore)this.wrapped()).backwardAll();
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.internalContext.cache().reverseAll(this.cacheName);
        return new MergedSortedCacheWindowStoreKeyValueIterator(cacheIterator, underlyingIterator, this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override
    public synchronized void flush() {
        this.internalContext.cache().flush(this.cacheName);
        ((WindowStore)this.wrapped()).flush();
    }

    @Override
    public void flushCache() {
        this.internalContext.cache().flush(this.cacheName);
    }

    @Override
    public void clearCache() {
        this.internalContext.cache().clear(this.cacheName);
    }

    @Override
    public synchronized void close() {
        Runnable[] runnableArray = new Runnable[3];
        runnableArray[0] = () -> this.internalContext.cache().flush(this.cacheName);
        runnableArray[1] = () -> this.internalContext.cache().close(this.cacheName);
        runnableArray[2] = ((WindowStore)this.wrapped())::close;
        LinkedList<RuntimeException> suppressed = ExceptionUtils.executeAll(runnableArray);
        if (!suppressed.isEmpty()) {
            ExceptionUtils.throwSuppressed("Caught an exception while closing caching window store for store " + this.name(), suppressed);
        }
    }

    private class CacheIteratorWrapper
    implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
        private final long segmentInterval;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long timeTo;
        private final boolean forward;
        private long lastSegmentId;
        private long currentSegmentId;
        private Bytes cacheKeyFrom;
        private Bytes cacheKeyTo;
        private ThreadCache.MemoryLRUCacheBytesIterator current;

        private CacheIteratorWrapper(Bytes key, long timeFrom, long timeTo, boolean forward) {
            this(key, key, timeFrom, timeTo, forward);
        }

        private CacheIteratorWrapper(Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo, boolean forward) {
            this.keyFrom = keyFrom;
            this.keyTo = keyTo;
            this.timeTo = timeTo;
            this.forward = forward;
            this.segmentInterval = CachingWindowStore.this.cacheFunction.getSegmentInterval();
            if (forward) {
                this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(timeTo, CachingWindowStore.this.maxObservedTimestamp.get()));
                this.currentSegmentId = CachingWindowStore.this.cacheFunction.segmentId(timeFrom);
                this.setCacheKeyRange(timeFrom, this.currentSegmentLastTime());
                this.current = CachingWindowStore.this.internalContext.cache().range(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
            } else {
                this.currentSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(timeTo, CachingWindowStore.this.maxObservedTimestamp.get()));
                this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(timeFrom);
                this.setCacheKeyRange(this.currentSegmentBeginTime(), Math.min(timeTo, CachingWindowStore.this.maxObservedTimestamp.get()));
                this.current = CachingWindowStore.this.internalContext.cache().reverseRange(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
            }
        }

        @Override
        public boolean hasNext() {
            if (this.current == null) {
                return false;
            }
            if (this.current.hasNext()) {
                return true;
            }
            while (!this.current.hasNext()) {
                this.getNextSegmentIterator();
                if (this.current != null) continue;
                return false;
            }
            return true;
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.peekNextKey();
        }

        @Override
        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.peekNext();
        }

        @Override
        public KeyValue<Bytes, LRUCacheEntry> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.current.next();
        }

        @Override
        public void close() {
            this.current.close();
        }

        private long currentSegmentBeginTime() {
            return this.currentSegmentId * this.segmentInterval;
        }

        private long currentSegmentLastTime() {
            return Math.min(this.timeTo, this.currentSegmentBeginTime() + this.segmentInterval - 1L);
        }

        private void getNextSegmentIterator() {
            if (this.forward) {
                ++this.currentSegmentId;
                this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(this.timeTo, CachingWindowStore.this.maxObservedTimestamp.get()));
                if (this.currentSegmentId > this.lastSegmentId) {
                    this.current = null;
                    return;
                }
                this.setCacheKeyRange(this.currentSegmentBeginTime(), this.currentSegmentLastTime());
                this.current.close();
                this.current = CachingWindowStore.this.internalContext.cache().range(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
            } else {
                --this.currentSegmentId;
                if (this.currentSegmentId < this.lastSegmentId) {
                    this.current = null;
                    return;
                }
                this.setCacheKeyRange(this.currentSegmentBeginTime(), this.currentSegmentLastTime());
                this.current.close();
                this.current = CachingWindowStore.this.internalContext.cache().reverseRange(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
            }
        }

        private void setCacheKeyRange(long lowerRangeEndTime, long upperRangeEndTime) {
            if (CachingWindowStore.this.cacheFunction.segmentId(lowerRangeEndTime) != CachingWindowStore.this.cacheFunction.segmentId(upperRangeEndTime)) {
                throw new IllegalStateException("Error iterating over segments: segment interval has changed");
            }
            if (this.keyFrom != null && this.keyFrom.equals((Object)this.keyTo)) {
                this.cacheKeyFrom = CachingWindowStore.this.cacheFunction.cacheKey(this.segmentLowerRangeFixedSize(this.keyFrom, lowerRangeEndTime));
                this.cacheKeyTo = CachingWindowStore.this.cacheFunction.cacheKey(this.segmentUpperRangeFixedSize(this.keyTo, upperRangeEndTime));
            } else {
                this.cacheKeyFrom = this.keyFrom == null ? null : CachingWindowStore.this.cacheFunction.cacheKey(CachingWindowStore.this.keySchema.lowerRange(this.keyFrom, lowerRangeEndTime), this.currentSegmentId);
                this.cacheKeyTo = this.keyTo == null ? null : CachingWindowStore.this.cacheFunction.cacheKey(CachingWindowStore.this.keySchema.upperRange(this.keyTo, this.timeTo), this.currentSegmentId);
            }
        }

        private Bytes segmentLowerRangeFixedSize(Bytes key, long segmentBeginTime) {
            return WindowKeySchema.toStoreKeyBinary(key, Math.max(0L, segmentBeginTime), 0);
        }

        private Bytes segmentUpperRangeFixedSize(Bytes key, long segmentEndTime) {
            return WindowKeySchema.toStoreKeyBinary(key, segmentEndTime, Integer.MAX_VALUE);
        }
    }
}

