/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.connector.source.lookup.cache.trigger;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.util.Preconditions;

public class PeriodicCacheReloadTrigger
implements CacheReloadTrigger {
    private static final long serialVersionUID = 1L;
    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;
    private transient ScheduledExecutorService scheduledExecutor;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        Preconditions.checkArgument(!reloadInterval.isNegative() && !reloadInterval.isZero(), "Reload interval must be greater than zero.");
        this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @VisibleForTesting
    PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode, ScheduledExecutorService scheduledExecutor) {
        this(reloadInterval, scheduleMode);
        this.scheduledExecutor = scheduledExecutor;
    }

    @Override
    public void open(CacheReloadTrigger.Context context) {
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }
        switch (this.scheduleMode) {
            case FIXED_RATE: {
                this.scheduledExecutor.scheduleAtFixedRate(context::triggerReload, 0L, this.reloadInterval.toMillis(), TimeUnit.MILLISECONDS);
                break;
            }
            case FIXED_DELAY: {
                this.scheduledExecutor.scheduleWithFixedDelay(() -> {
                    try {
                        context.triggerReload().get();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Uncaught exception during the reload", e);
                    }
                }, 0L, this.reloadInterval.toMillis(), TimeUnit.MILLISECONDS);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unrecognized schedule mode \"%s\"", new Object[]{this.scheduleMode}));
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdownNow();
        }
    }

    public static PeriodicCacheReloadTrigger fromConfig(ReadableConfig config) {
        Preconditions.checkArgument(config.get(LookupOptions.CACHE_TYPE) == LookupOptions.LookupCacheType.FULL, "'%s' should be '%s' in order to build a Periodic cache reload trigger.", new Object[]{LookupOptions.CACHE_TYPE.key(), LookupOptions.LookupCacheType.FULL});
        Preconditions.checkArgument(config.get(LookupOptions.FULL_CACHE_RELOAD_STRATEGY) == LookupOptions.ReloadStrategy.PERIODIC, "'%s' should be '%s' in order to build a Periodic cache reload trigger.", new Object[]{LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key(), LookupOptions.ReloadStrategy.PERIODIC});
        Preconditions.checkArgument(config.getOptional(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL).isPresent(), "Missing '%s' in the configuration. This option is required to build Periodic cache reload trigger.", LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key());
        return new PeriodicCacheReloadTrigger(config.get(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL), config.get(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE));
    }

    public static enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE;

    }
}

