diff --git a/pom.xml b/pom.xml index 1dfc501..d9f0a6c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,8 @@ 0.1.19-SNAPSHOT - 0.1.12 - 1.7.14 + 0.1.34 + 1.7.15 21.0 3.0.2 @@ -14,6 +14,7 @@ 1.2.3 3.6.1 1.21 + 3.14.0 3.0.0-M3 1.6.8 @@ -132,6 +133,13 @@ ${jmh.version} test + + + org.assertj + assertj-core + ${assert.version} + test + diff --git a/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java new file mode 100644 index 0000000..3916911 --- /dev/null +++ b/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java @@ -0,0 +1,38 @@ +package com.github.phantomthief.failover.backoff; + +/** + * Provide a {@link BackOffExecution} that indicates the rate at which + * an operation should be retried. + * + *

Users of this interface are expected to use it like this: + * + *

+ * BackOffExecution exec = backOff.start();
+ *
+ * // In the operation recovery/retry loop:
+ * long waitInterval = exec.nextBackOff();
+ * if (waitInterval == BackOffExecution.STOP) {
+ *     // do not retry operation
+ * }
+ * else {
+ *     // sleep, e.g. Thread.sleep(waitInterval)
+ *     // retry operation
+ * }
+ * }
+ * + * Once the underlying operation has completed successfully, + * the execution instance can be simply discarded. + * + * @see BackOffExecution + */ +@FunctionalInterface +public interface BackOff { + + /** + * Start a new back off execution. + * + * @return a fresh {@link BackOffExecution} ready to be used + */ + BackOffExecution start(); + +} \ No newline at end of file diff --git a/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java b/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java new file mode 100644 index 0000000..3e9588a --- /dev/null +++ b/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java @@ -0,0 +1,26 @@ +package com.github.phantomthief.failover.backoff; + +/** + * Represent a particular back-off execution. + * + *

Implementations do not need to be thread safe. + * + * @see BackOff + */ +@FunctionalInterface +public interface BackOffExecution { + + /** + * Return value of {@link #nextBackOff()} that indicates that the operation + * should not be retried. + */ + long STOP = -1; + + /** + * Return the number of milliseconds to wait before retrying the operation + * or {@link #STOP} ({@value #STOP}) to indicate that no further attempt + * should be made for the operation. + */ + long nextBackOff(); + +} \ No newline at end of file diff --git a/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java new file mode 100644 index 0000000..9a46c09 --- /dev/null +++ b/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java @@ -0,0 +1,207 @@ +package com.github.phantomthief.failover.backoff; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Implementation of {@link BackOff} that increases the back off period for each + * retry attempt. When the interval has reached the {@link #setMaxInterval(long) + * max interval}, it is no longer increased. Stops retrying once the + * {@link #setMaxElapsedTime(long) max elapsed time} has been reached. + * + *

Example: The default interval is {@value #DEFAULT_INITIAL_INTERVAL} ms, + * the default multiplier is {@value #DEFAULT_MULTIPLIER}, and the default max + * interval is {@value #DEFAULT_MAX_INTERVAL}. For 10 attempts the sequence will be + * as follows: + * + *

+ * request#     back off
+ *
+ *  1              2000
+ *  2              3000
+ *  3              4500
+ *  4              6750
+ *  5             10125
+ *  6             15187
+ *  7             22780
+ *  8             30000
+ *  9             30000
+ * 10             30000
+ * 
+ * + *

Note that the default max elapsed time is {@link Long#MAX_VALUE}. Use + * {@link #setMaxElapsedTime(long)} to limit the maximum length of time + * that an instance should accumulate before returning + * {@link BackOffExecution#STOP}. + */ +public class ExponentialBackOff implements BackOff { + + /** + * The default initial interval. + */ + public static final long DEFAULT_INITIAL_INTERVAL = 2000L; + + /** + * The default multiplier (increases the interval by 50%). + */ + public static final double DEFAULT_MULTIPLIER = 1.5; + + /** + * The default maximum back off time. + */ + public static final long DEFAULT_MAX_INTERVAL = 30000L; + + /** + * The default maximum elapsed time. + */ + public static final long DEFAULT_MAX_ELAPSED_TIME = Long.MAX_VALUE; + + + private long initialInterval = DEFAULT_INITIAL_INTERVAL; + + private double multiplier = DEFAULT_MULTIPLIER; + + private long maxInterval = DEFAULT_MAX_INTERVAL; + + private long maxElapsedTime = DEFAULT_MAX_ELAPSED_TIME; + + + /** + * Create an instance with the default settings. + * + * @see #DEFAULT_INITIAL_INTERVAL + * @see #DEFAULT_MULTIPLIER + * @see #DEFAULT_MAX_INTERVAL + * @see #DEFAULT_MAX_ELAPSED_TIME + */ + public ExponentialBackOff() { + } + + /** + * Create an instance with the supplied settings. + * + * @param initialInterval the initial interval in milliseconds + * @param multiplier the multiplier (should be greater than or equal to 1) + */ + public ExponentialBackOff(long initialInterval, double multiplier) { + checkMultiplier(multiplier); + this.initialInterval = initialInterval; + this.multiplier = multiplier; + } + + /** + * The initial interval in milliseconds. + */ + public void setInitialInterval(long initialInterval) { + this.initialInterval = initialInterval; + } + + /** + * Return the initial interval in milliseconds. + */ + public long getInitialInterval() { + return this.initialInterval; + } + + /** + * The value to multiply the current interval by for each retry attempt. + */ + public void setMultiplier(double multiplier) { + checkMultiplier(multiplier); + this.multiplier = multiplier; + } + + /** + * Return the value to multiply the current interval by for each retry attempt. + */ + public double getMultiplier() { + return this.multiplier; + } + + /** + * The maximum back off time. + */ + public void setMaxInterval(long maxInterval) { + this.maxInterval = maxInterval; + } + + /** + * Return the maximum back off time. + */ + public long getMaxInterval() { + return this.maxInterval; + } + + /** + * The maximum elapsed time in milliseconds after which a call to + * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}. + */ + public void setMaxElapsedTime(long maxElapsedTime) { + this.maxElapsedTime = maxElapsedTime; + } + + /** + * Return the maximum elapsed time in milliseconds after which a call to + * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}. + */ + public long getMaxElapsedTime() { + return this.maxElapsedTime; + } + + @Override + public BackOffExecution start() { + return new ExponentialBackOffExecution(); + } + + private void checkMultiplier(double multiplier) { + checkArgument(multiplier >= 1, "Invalid multiplier '" + multiplier + "'. Should be greater than " + + "or equal to 1. A multiplier of 1 is equivalent to a fixed interval."); + } + + private class ExponentialBackOffExecution implements BackOffExecution { + + private long currentInterval = -1; + + private long currentElapsedTime = 0; + + @Override + public long nextBackOff() { + if (this.currentElapsedTime >= maxElapsedTime) { + return STOP; + } + + long nextInterval = computeNextInterval(); + this.currentElapsedTime += nextInterval; + return nextInterval; + } + + private long computeNextInterval() { + long maxInterval = getMaxInterval(); + if (this.currentInterval >= maxInterval) { + return maxInterval; + } else if (this.currentInterval < 0) { + long initialInterval = getInitialInterval(); + this.currentInterval = (initialInterval < maxInterval + ? initialInterval : maxInterval); + } else { + this.currentInterval = multiplyInterval(maxInterval); + } + return this.currentInterval; + } + + private long multiplyInterval(long maxInterval) { + long i = this.currentInterval; + i *= getMultiplier(); + return (i > maxInterval ? maxInterval : i); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ExponentialBackOff{"); + sb.append("currentInterval=").append(this.currentInterval < 0 ? "n/a" : this.currentInterval + "ms"); + sb.append(", multiplier=").append(getMultiplier()); + sb.append('}'); + return sb.toString(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java new file mode 100644 index 0000000..121e819 --- /dev/null +++ b/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java @@ -0,0 +1,104 @@ +package com.github.phantomthief.failover.backoff; + +/** + * A simple {@link BackOff} implementation that provides a fixed interval + * between two attempts and a maximum number of retries. + */ +public class FixedBackOff implements BackOff { + + /** + * The default recovery interval: 5000 ms = 5 seconds. + */ + public static final long DEFAULT_INTERVAL = 5000; + + /** + * Constant value indicating an unlimited number of attempts. + */ + public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE; + + private long interval = DEFAULT_INTERVAL; + + private long maxAttempts = UNLIMITED_ATTEMPTS; + + + /** + * Create an instance with an interval of {@value #DEFAULT_INTERVAL} + * ms and an unlimited number of attempts. + */ + public FixedBackOff() { + } + + /** + * Create an instance. + * + * @param interval the interval between two attempts + * @param maxAttempts the maximum number of attempts + */ + public FixedBackOff(long interval, long maxAttempts) { + this.interval = interval; + this.maxAttempts = maxAttempts; + } + + /** + * Set the interval between two attempts in milliseconds. + */ + public void setInterval(long interval) { + this.interval = interval; + } + + /** + * Return the interval between two attempts in milliseconds. + */ + public long getInterval() { + return this.interval; + } + + /** + * Set the maximum number of attempts in milliseconds. + */ + public void setMaxAttempts(long maxAttempts) { + this.maxAttempts = maxAttempts; + } + + /** + * Return the maximum number of attempts in milliseconds. + */ + public long getMaxAttempts() { + return this.maxAttempts; + } + + @Override + public BackOffExecution start() { + return new FixedBackOffExecution(); + } + + + private class FixedBackOffExecution implements BackOffExecution { + + private long currentAttempts = 0; + + @Override + public long nextBackOff() { + this.currentAttempts++; + if (this.currentAttempts <= getMaxAttempts()) { + return getInterval(); + } else { + return STOP; + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("FixedBackOff{"); + sb.append("interval=").append(FixedBackOff.this.interval); + String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE + ? "unlimited" + : String.valueOf(FixedBackOff.this.maxAttempts); + sb.append(", currentAttempts=").append(this.currentAttempts); + sb.append(", maxAttempts=").append(attemptValue); + sb.append('}'); + return sb.toString(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/github/phantomthief/failover/backoff/package-info.java b/src/main/java/com/github/phantomthief/failover/backoff/package-info.java new file mode 100644 index 0000000..0db908a --- /dev/null +++ b/src/main/java/com/github/phantomthief/failover/backoff/package-info.java @@ -0,0 +1,9 @@ +/** + * Copied from spring-core + * + * Write the code. Change the world. + * + * @author trang + * Created on 2019-11-09. + */ +package com.github.phantomthief.failover.backoff; \ No newline at end of file diff --git a/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java b/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java index 62b0a6a..d0c6dfa 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java +++ b/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java @@ -9,7 +9,9 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import com.github.phantomthief.failover.backoff.BackOff; import com.github.phantomthief.util.ThrowableFunction; import com.github.phantomthief.util.ThrowablePredicate; @@ -96,12 +98,21 @@ public GenericWeightFailoverBuilder checkDuration(long time, TimeUnit unit) { @CheckReturnValue @Nonnull - public GenericWeightFailoverBuilder - checker(@Nonnull ThrowableFunction failChecker) { + public GenericWeightFailoverBuilder checker( + @Nonnull ThrowableFunction failChecker) { builder.checker(failChecker); return this; } + @CheckReturnValue + @Nonnull + public GenericWeightFailoverBuilder checker( + @Nonnull ThrowableFunction failChecker, + @Nullable BackOff backOff) { + builder.checker(failChecker, backOff); + return this; + } + @CheckReturnValue @Nonnull public GenericWeightFailoverBuilder checker( @@ -111,6 +122,16 @@ public GenericWeightFailoverBuilder checker( return this; } + @CheckReturnValue + @Nonnull + public GenericWeightFailoverBuilder checker( + @Nonnull ThrowablePredicate failChecker, + @Nonnegative double recoveredInitRate, + @Nullable BackOff backOff) { + builder.checker(failChecker, recoveredInitRate, backOff); + return this; + } + @CheckReturnValue @Nonnull public GenericWeightFailoverBuilder filter(Predicate filter) { diff --git a/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java b/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java index 55faf7a..0eb09a8 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java +++ b/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java @@ -1,8 +1,13 @@ package com.github.phantomthief.failover.impl; import static com.github.phantomthief.tuple.Tuple.tuple; +import static com.github.phantomthief.util.MoreFunctions.catching; +import static com.github.phantomthief.util.MoreFunctions.runCatching; +import static com.github.phantomthief.util.MoreFunctions.runWithThreadName; +import static com.github.phantomthief.util.MoreFunctions.supplyWithThreadName; import static com.github.phantomthief.util.MoreSuppliers.lazy; import static com.google.common.primitives.Ints.constrainToRange; +import static com.google.common.util.concurrent.Futures.addCallback; import static java.lang.Integer.MAX_VALUE; import static java.lang.Math.max; import static java.lang.Math.min; @@ -21,20 +26,30 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import java.util.function.IntUnaryOperator; import java.util.function.Predicate; +import java.util.function.ToDoubleFunction; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.slf4j.Logger; import com.github.phantomthief.failover.Failover; +import com.github.phantomthief.failover.backoff.BackOff; +import com.github.phantomthief.failover.backoff.BackOffExecution; import com.github.phantomthief.failover.util.SharedCheckExecutorHolder; import com.github.phantomthief.tuple.TwoTuple; import com.github.phantomthief.util.MoreSuppliers.CloseableSupplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; /** * 默认权重记录 @@ -47,14 +62,19 @@ public class WeightFailover implements Failover, Closeable { private static final Logger logger = getLogger(WeightFailover.class); + private final String name; + private final IntUnaryOperator failReduceWeight; private final IntUnaryOperator successIncreaseWeight; private final ConcurrentMap initWeightMap; private final ConcurrentMap currentWeightMap; private final CloseableSupplier> recoveryFuture; + private final ConcurrentLinkedQueue recoveryTasks; + private final ConcurrentMap runningRecoveryTasks; private final Consumer onMinWeight; private final int minWeight; + private final ToDoubleFunction checker; /** * {@code null} if this feature is off. @@ -69,6 +89,7 @@ public class WeightFailover implements Failover, Closeable { private volatile boolean closed; WeightFailover(WeightFailoverBuilder builder) { + this.name = builder.name; this.minWeight = builder.minWeight; this.failReduceWeight = builder.failReduceWeight; this.successIncreaseWeight = builder.successIncreaseWeight; @@ -77,48 +98,110 @@ public class WeightFailover implements Failover, Closeable { this.onMinWeight = builder.onMinWeight; this.weightOnMissingNode = builder.weightOnMissingNode; this.filter = builder.filter; - this.recoveryFuture = lazy( - () -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> { - if (closed) { - tryCloseRecoveryScheduler(); - return; + this.checker = builder.checker; + this.recoveryTasks = new ConcurrentLinkedQueue<>(); + this.runningRecoveryTasks = new ConcurrentHashMap<>(); + this.recoveryFuture = builder.backOff == null ? initDefaultChecker(builder) : initBackOffChecker(builder); + } + + private CloseableSupplier> initDefaultChecker(WeightFailoverBuilder builder) { + return lazy(() -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> { + if (closed) { + tryCloseRecoveryScheduler(); + return; + } + Thread currentThread = Thread.currentThread(); + String origName = currentThread.getName(); + if (builder.name != null) { + currentThread.setName(origName + "-[" + builder.name + "]"); + } + try { + Map recoveredObjects = new HashMap<>(); + this.currentWeightMap.forEach((obj, weight) -> { + if (weight == 0) { + double recoverRate = builder.checker.applyAsDouble(obj); + if (recoverRate > 0) { + recoveredObjects.put(obj, recoverRate); + } + } + }); + if (!recoveredObjects.isEmpty()) { + logger.info("found recovered objects:{}", recoveredObjects); + } + recoveredObjects.forEach((recovered, rate) -> { + Integer initWeight = initWeightMap.get(recovered); + if (initWeight == null) { + throw new IllegalStateException("obj:" + recovered); } - Thread currentThread = Thread.currentThread(); - String origName = currentThread.getName(); - if (builder.name != null) { - currentThread.setName(origName + "-[" + builder.name + "]"); + int recoveredWeight = constrainToRange((int) (initWeight * rate), 1, + initWeight); + currentWeightMap.put(recovered, recoveredWeight); + if (builder.onRecovered != null) { + builder.onRecovered.accept(recovered); } - try { - Map recoveredObjects = new HashMap<>(); - this.currentWeightMap.forEach((obj, weight) -> { - if (weight == 0) { - double recoverRate = builder.checker.applyAsDouble(obj); - if (recoverRate > 0) { - recoveredObjects.put(obj, recoverRate); + }); + } catch (Throwable e) { + logger.error("", e); + } finally { + currentThread.setName(origName); + } + }, builder.checkDuration, builder.checkDuration, MILLISECONDS)); + } + + private CloseableSupplier> initBackOffChecker(WeightFailoverBuilder builder) { + return lazy(() -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> { + if (closed) { + tryCloseRecoveryScheduler(); + return; + } + // 为权重为 0 的资源创建健康检查任务,放入待检查队列 + this.currentWeightMap.entrySet().stream() + .filter(entry -> entry.getValue() == 0) + .forEach((entry -> recoveryTasks.offer(new RecoveryTask(entry.getKey(), builder.backOff)))); + // 如果队列不为空则将其转发到 scheduler + if (!recoveryTasks.isEmpty()) { + while (true) { + RecoveryTask task = recoveryTasks.poll(); + if (task == null) { + break; + } + // 如果健康检查耗时超过 checkDuration,可能导致对同一资源发起多次 hc 请求,这里规避一下 + if (runningRecoveryTasks.containsKey(task.getObj())) { + continue; + } + T obj = task.getObj(); + // 新建的 task 将其放入 running 队列中 + runningRecoveryTasks.put(obj, task); + // 与外层共用一个线程池,貌似也没什么不妥... + ListenableFuture future = + SharedCheckExecutorHolder.getInstance().schedule(task, task.nextBackOff(), MILLISECONDS); + addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Integer recoveredWeight) { + runWithThreadName(origin -> origin + "-[" + name + "]", () -> runCatching(() -> { + // 不管成不成功执行完毕先将 task 删除 + runningRecoveryTasks.remove(obj); + // 如果检查失败再次将其放入待检查队列,delay 时间顺延到下一周期 + if (recoveredWeight == null || recoveredWeight == 0) { + recoveryTasks.offer(task); + } else { + currentWeightMap.put(obj, recoveredWeight); + if (builder.onRecovered != null) { + builder.onRecovered.accept(obj); + } } - } - }); - if (!recoveredObjects.isEmpty()) { - logger.info("found recovered objects:{}", recoveredObjects); + })); } - recoveredObjects.forEach((recovered, rate) -> { - Integer initWeight = initWeightMap.get(recovered); - if (initWeight == null) { - throw new IllegalStateException("obj:" + recovered); - } - int recoveredWeight = constrainToRange((int) (initWeight * rate), 1, - initWeight); - currentWeightMap.put(recovered, recoveredWeight); - if (builder.onRecovered != null) { - builder.onRecovered.accept(recovered); - } - }); - } catch (Throwable e) { - logger.error("", e); - } finally { - currentThread.setName(origName); - } - }, builder.checkDuration, builder.checkDuration, MILLISECONDS)); + + @Override + public void onFailure(@Nonnull Throwable t) { + // 不管成不成功执行完毕先将 task 删除 + runningRecoveryTasks.remove(obj); + } + }); + } + } + }, builder.checkDuration, builder.checkDuration, MILLISECONDS)); } /** @@ -319,4 +402,41 @@ public Set getFailed() { public String toString() { return "WeightFailover [" + initWeightMap + "]" + "@" + Integer.toHexString(hashCode()); } + + class RecoveryTask implements Callable { + + private final T obj; + private final BackOffExecution backOff; + + RecoveryTask(T obj, BackOff backOff) { + this.obj = obj; + this.backOff = backOff.start(); + } + + @Override + public Integer call() throws Exception { + return supplyWithThreadName(origin -> origin + "-[" + name + "]", () -> catching(() -> { + double recoverRate = checker.applyAsDouble(obj); + if (recoverRate > 0) { + Integer initWeight = initWeightMap.get(obj); + if (initWeight == null) { + throw new IllegalStateException("obj:" + obj); + } + return constrainToRange((int) (initWeight * recoverRate), 1, initWeight); + } + return 0; + })); + } + + @Nonnull + T getObj() { + return obj; + } + + long nextBackOff() { + return backOff.nextBackOff(); + } + + } + } diff --git a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java index d3f0951..9a3d4e6 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java +++ b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java @@ -19,9 +19,11 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.slf4j.Logger; +import com.github.phantomthief.failover.backoff.BackOff; import com.github.phantomthief.util.ThrowableFunction; import com.github.phantomthief.util.ThrowablePredicate; @@ -39,13 +41,14 @@ public class WeightFailoverBuilder { Map initWeightMap; ToDoubleFunction checker; + BackOff backOff; long checkDuration; Consumer onMinWeight; Consumer onRecovered; int minWeight = 0; Integer weightOnMissingNode; String name; - + Predicate filter = alwaysTrue(); @CheckReturnValue @@ -148,8 +151,18 @@ public WeightFailoverBuilder filter(@Nonnull Predicate filter) { @SuppressWarnings("unchecked") @CheckReturnValue @Nonnull - public WeightFailoverBuilder - checker(@Nonnull ThrowableFunction failChecker) { + public WeightFailoverBuilder checker( + @Nonnull ThrowableFunction failChecker) { + checkNotNull(failChecker); + return checker(failChecker, null); + } + + @SuppressWarnings("unchecked") + @CheckReturnValue + @Nonnull + public WeightFailoverBuilder checker( + @Nonnull ThrowableFunction failChecker, + @Nullable BackOff backOff) { checkNotNull(failChecker); WeightFailoverBuilder thisBuilder = (WeightFailoverBuilder) this; thisBuilder.checker = t -> { @@ -160,6 +173,7 @@ public WeightFailoverBuilder filter(@Nonnull Predicate filter) { return 0; } }; + thisBuilder.backOff = backOff; return thisBuilder; } @@ -171,7 +185,19 @@ public WeightFailoverBuilder checker( @Nonnegative double recoveredInitRate) { checkArgument(recoveredInitRate >= 0 && recoveredInitRate <= 1); checkNotNull(failChecker); - return checker(it -> failChecker.test(it) ? recoveredInitRate : 0); + return checker(failChecker, recoveredInitRate, null); + } + + @SuppressWarnings("unchecked") + @CheckReturnValue + @Nonnull + public WeightFailoverBuilder checker( + @Nonnull ThrowablePredicate failChecker, + @Nonnegative double recoveredInitRate, + @Nullable BackOff backOff) { + checkArgument(recoveredInitRate >= 0 && recoveredInitRate <= 1); + checkNotNull(failChecker); + return checker(it -> failChecker.test(it) ? recoveredInitRate : 0, backOff); } @Nonnull diff --git a/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java b/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java index 426f69a..8dd923e 100644 --- a/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java +++ b/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java @@ -1,11 +1,14 @@ package com.github.phantomthief.failover.util; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static java.lang.Thread.MIN_PRIORITY; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import javax.annotation.Nonnull; + +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -15,26 +18,32 @@ public class SharedCheckExecutorHolder { private static final int THREAD_COUNT = 10; - public static ScheduledExecutorService getInstance() { + public static ListeningScheduledExecutorService getInstance() { return LazyHolder.INSTANCE; } private static class LazyHolder { - private static final ScheduledExecutorService INSTANCE = new ScheduledThreadPoolExecutor( - THREAD_COUNT, - new ThreadFactoryBuilder().setNameFormat("scheduled-failover-recovery-check-%d") - .setPriority(MIN_PRIORITY) - .setDaemon(true) // - .build()) { - - public void shutdown() { - throw new UnsupportedOperationException(); - } - - public List shutdownNow() { - throw new UnsupportedOperationException(); - } - }; + private static final ListeningScheduledExecutorService INSTANCE = + listeningDecorator(new ScheduledThreadPoolExecutor(THREAD_COUNT, + new ThreadFactoryBuilder() + .setNameFormat("scheduled-failover-recovery-check-%d") + .setPriority(MIN_PRIORITY) + .setDaemon(true) + .build()) { + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + }); + } -} + +} \ No newline at end of file diff --git a/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java b/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java new file mode 100644 index 0000000..9aa8ab5 --- /dev/null +++ b/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java @@ -0,0 +1,115 @@ +package com.github.phantomthief.failover.backoff; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +import org.junit.jupiter.api.Test; + +/** + * ExponentialBackOffTests + */ +class ExponentialBackOffTests { + + @Test + void defaultInstance() { + ExponentialBackOff backOff = new ExponentialBackOff(); + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(2000L); + assertThat(execution.nextBackOff()).isEqualTo(3000L); + assertThat(execution.nextBackOff()).isEqualTo(4500L); + } + + @Test + void simpleIncrease() { + ExponentialBackOff backOff = new ExponentialBackOff(100L, 2.0); + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(100L); + assertThat(execution.nextBackOff()).isEqualTo(200L); + assertThat(execution.nextBackOff()).isEqualTo(400L); + assertThat(execution.nextBackOff()).isEqualTo(800L); + } + + @Test + void fixedIncrease() { + ExponentialBackOff backOff = new ExponentialBackOff(100L, 1.0); + backOff.setMaxElapsedTime(300L); + + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(100L); + assertThat(execution.nextBackOff()).isEqualTo(100L); + assertThat(execution.nextBackOff()).isEqualTo(100L); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void maxIntervalReached() { + ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0); + backOff.setMaxInterval(4000L); + + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(2000L); + assertThat(execution.nextBackOff()).isEqualTo(4000L); + // max reached + assertThat(execution.nextBackOff()).isEqualTo(4000L); + assertThat(execution.nextBackOff()).isEqualTo(4000L); + } + + @Test + void maxAttemptsReached() { + ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0); + backOff.setMaxElapsedTime(4000L); + + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(2000L); + assertThat(execution.nextBackOff()).isEqualTo(4000L); + // > 4 sec wait in total + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void startReturnDifferentInstances() { + ExponentialBackOff backOff = new ExponentialBackOff(); + backOff.setInitialInterval(2000L); + backOff.setMultiplier(2.0); + backOff.setMaxElapsedTime(4000L); + + BackOffExecution execution = backOff.start(); + BackOffExecution execution2 = backOff.start(); + + assertThat(execution.nextBackOff()).isEqualTo(2000L); + assertThat(execution2.nextBackOff()).isEqualTo(2000L); + assertThat(execution.nextBackOff()).isEqualTo(4000L); + assertThat(execution2.nextBackOff()).isEqualTo(4000L); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + assertThat(execution2.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void invalidInterval() { + ExponentialBackOff backOff = new ExponentialBackOff(); + assertThatIllegalArgumentException().isThrownBy(() -> + backOff.setMultiplier(0.9)); + } + + @Test + void maxIntervalReachedImmediately() { + ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0); + backOff.setMaxInterval(50L); + + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(50L); + assertThat(execution.nextBackOff()).isEqualTo(50L); + } + + @Test + void toStringContent() { + ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0); + BackOffExecution execution = backOff.start(); + assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=n/a, multiplier=2.0}"); + execution.nextBackOff(); + assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=2000ms, multiplier=2.0}"); + execution.nextBackOff(); + assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=4000ms, multiplier=2.0}"); + } + +} diff --git a/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java b/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java new file mode 100644 index 0000000..3791333 --- /dev/null +++ b/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java @@ -0,0 +1,73 @@ +package com.github.phantomthief.failover.backoff; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +/** + * FixedBackOffTests + */ +class FixedBackOffTests { + + @Test + void defaultInstance() { + FixedBackOff backOff = new FixedBackOff(); + BackOffExecution execution = backOff.start(); + for (int i = 0; i < 100; i++) { + assertThat(execution.nextBackOff()).isEqualTo(FixedBackOff.DEFAULT_INTERVAL); + } + } + + @Test + void noAttemptAtAll() { + FixedBackOff backOff = new FixedBackOff(100L, 0L); + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void maxAttemptsReached() { + FixedBackOff backOff = new FixedBackOff(200L, 2); + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(200L); + assertThat(execution.nextBackOff()).isEqualTo(200L); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void startReturnDifferentInstances() { + FixedBackOff backOff = new FixedBackOff(100L, 1); + BackOffExecution execution = backOff.start(); + BackOffExecution execution2 = backOff.start(); + + assertThat(execution.nextBackOff()).isEqualTo(100L); + assertThat(execution2.nextBackOff()).isEqualTo(100L); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + assertThat(execution2.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void liveUpdate() { + FixedBackOff backOff = new FixedBackOff(100L, 1); + BackOffExecution execution = backOff.start(); + assertThat(execution.nextBackOff()).isEqualTo(100L); + + backOff.setInterval(200L); + backOff.setMaxAttempts(2); + + assertThat(execution.nextBackOff()).isEqualTo(200L); + assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP); + } + + @Test + void toStringContent() { + FixedBackOff backOff = new FixedBackOff(200L, 10); + BackOffExecution execution = backOff.start(); + assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=0, maxAttempts=10}"); + execution.nextBackOff(); + assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=1, maxAttempts=10}"); + execution.nextBackOff(); + assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=2, maxAttempts=10}"); + } + +} diff --git a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java index c07487a..b7090f8 100644 --- a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java +++ b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import com.github.phantomthief.failover.Failover; +import com.github.phantomthief.failover.backoff.FixedBackOff; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multiset; @@ -168,6 +169,49 @@ void testRateRecover2() { assertEquals(1, failover.currentWeight("s2")); } + @Test + void testBackOffRecover() { + WeightFailover failover = WeightFailover. newGenericBuilder() + .checker(it -> true, 0.00001, new FixedBackOff(500, 3)) + .checkDuration(200, MILLISECONDS) + .build(of("s1", "s2"), 100); + assertEquals(100, failover.currentWeight("s1")); + assertEquals(100, failover.currentWeight("s2")); + failover.down("s2"); + assertEquals(0, failover.currentWeight("s2")); + sleepUninterruptibly(1, SECONDS); + assertEquals(1, failover.currentWeight("s2")); + } + + @Test + void testBackOffRecover2() { + WeightFailover failover = WeightFailover. newGenericBuilder() + .checker(it -> true, 0.00001, new FixedBackOff(5000, 3)) + .checkDuration(200, MILLISECONDS) + .build(of("s1", "s2"), 100); + assertEquals(100, failover.currentWeight("s1")); + assertEquals(100, failover.currentWeight("s2")); + failover.down("s2"); + assertEquals(0, failover.currentWeight("s2")); + sleepUninterruptibly(1, SECONDS); + assertEquals(0, failover.currentWeight("s2")); + } + + @Test + void testBackOffRecoverException() { + WeightFailover failover = WeightFailover. newGenericBuilder() + .name("test") + .checker(it -> { throw new RuntimeException(); }, new FixedBackOff(500, 3)) + .checkDuration(200, MILLISECONDS) + .build(of("s1", "s2"), 100); + assertEquals(100, failover.currentWeight("s1")); + assertEquals(100, failover.currentWeight("s2")); + failover.down("s2"); + assertEquals(0, failover.currentWeight("s2")); + sleepUninterruptibly(1, SECONDS); + assertEquals(0, failover.currentWeight("s2")); + } + @Test void testPerf() { Map map = IntStream.range(1, 10).boxed()