diff --git a/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java
new file mode 100644
index 0000000..3916911
--- /dev/null
+++ b/src/main/java/com/github/phantomthief/failover/backoff/BackOff.java
@@ -0,0 +1,38 @@
+package com.github.phantomthief.failover.backoff;
+
+/**
+ * Provide a {@link BackOffExecution} that indicates the rate at which
+ * an operation should be retried.
+ *
+ * Users of this interface are expected to use it like this:
+ *
+ *
+ * BackOffExecution exec = backOff.start();
+ *
+ * // In the operation recovery/retry loop:
+ * long waitInterval = exec.nextBackOff();
+ * if (waitInterval == BackOffExecution.STOP) {
+ * // do not retry operation
+ * }
+ * else {
+ * // sleep, e.g. Thread.sleep(waitInterval)
+ * // retry operation
+ * }
+ * }
+ *
+ * Once the underlying operation has completed successfully,
+ * the execution instance can be simply discarded.
+ *
+ * @see BackOffExecution
+ */
+@FunctionalInterface
+public interface BackOff {
+
+ /**
+ * Start a new back off execution.
+ *
+ * @return a fresh {@link BackOffExecution} ready to be used
+ */
+ BackOffExecution start();
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java b/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java
new file mode 100644
index 0000000..3e9588a
--- /dev/null
+++ b/src/main/java/com/github/phantomthief/failover/backoff/BackOffExecution.java
@@ -0,0 +1,26 @@
+package com.github.phantomthief.failover.backoff;
+
+/**
+ * Represent a particular back-off execution.
+ *
+ * Implementations do not need to be thread safe.
+ *
+ * @see BackOff
+ */
+@FunctionalInterface
+public interface BackOffExecution {
+
+ /**
+ * Return value of {@link #nextBackOff()} that indicates that the operation
+ * should not be retried.
+ */
+ long STOP = -1;
+
+ /**
+ * Return the number of milliseconds to wait before retrying the operation
+ * or {@link #STOP} ({@value #STOP}) to indicate that no further attempt
+ * should be made for the operation.
+ */
+ long nextBackOff();
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java
new file mode 100644
index 0000000..9a46c09
--- /dev/null
+++ b/src/main/java/com/github/phantomthief/failover/backoff/ExponentialBackOff.java
@@ -0,0 +1,207 @@
+package com.github.phantomthief.failover.backoff;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Implementation of {@link BackOff} that increases the back off period for each
+ * retry attempt. When the interval has reached the {@link #setMaxInterval(long)
+ * max interval}, it is no longer increased. Stops retrying once the
+ * {@link #setMaxElapsedTime(long) max elapsed time} has been reached.
+ *
+ *
Example: The default interval is {@value #DEFAULT_INITIAL_INTERVAL} ms,
+ * the default multiplier is {@value #DEFAULT_MULTIPLIER}, and the default max
+ * interval is {@value #DEFAULT_MAX_INTERVAL}. For 10 attempts the sequence will be
+ * as follows:
+ *
+ *
+ * request# back off
+ *
+ * 1 2000
+ * 2 3000
+ * 3 4500
+ * 4 6750
+ * 5 10125
+ * 6 15187
+ * 7 22780
+ * 8 30000
+ * 9 30000
+ * 10 30000
+ *
+ *
+ * Note that the default max elapsed time is {@link Long#MAX_VALUE}. Use
+ * {@link #setMaxElapsedTime(long)} to limit the maximum length of time
+ * that an instance should accumulate before returning
+ * {@link BackOffExecution#STOP}.
+ */
+public class ExponentialBackOff implements BackOff {
+
+ /**
+ * The default initial interval.
+ */
+ public static final long DEFAULT_INITIAL_INTERVAL = 2000L;
+
+ /**
+ * The default multiplier (increases the interval by 50%).
+ */
+ public static final double DEFAULT_MULTIPLIER = 1.5;
+
+ /**
+ * The default maximum back off time.
+ */
+ public static final long DEFAULT_MAX_INTERVAL = 30000L;
+
+ /**
+ * The default maximum elapsed time.
+ */
+ public static final long DEFAULT_MAX_ELAPSED_TIME = Long.MAX_VALUE;
+
+
+ private long initialInterval = DEFAULT_INITIAL_INTERVAL;
+
+ private double multiplier = DEFAULT_MULTIPLIER;
+
+ private long maxInterval = DEFAULT_MAX_INTERVAL;
+
+ private long maxElapsedTime = DEFAULT_MAX_ELAPSED_TIME;
+
+
+ /**
+ * Create an instance with the default settings.
+ *
+ * @see #DEFAULT_INITIAL_INTERVAL
+ * @see #DEFAULT_MULTIPLIER
+ * @see #DEFAULT_MAX_INTERVAL
+ * @see #DEFAULT_MAX_ELAPSED_TIME
+ */
+ public ExponentialBackOff() {
+ }
+
+ /**
+ * Create an instance with the supplied settings.
+ *
+ * @param initialInterval the initial interval in milliseconds
+ * @param multiplier the multiplier (should be greater than or equal to 1)
+ */
+ public ExponentialBackOff(long initialInterval, double multiplier) {
+ checkMultiplier(multiplier);
+ this.initialInterval = initialInterval;
+ this.multiplier = multiplier;
+ }
+
+ /**
+ * The initial interval in milliseconds.
+ */
+ public void setInitialInterval(long initialInterval) {
+ this.initialInterval = initialInterval;
+ }
+
+ /**
+ * Return the initial interval in milliseconds.
+ */
+ public long getInitialInterval() {
+ return this.initialInterval;
+ }
+
+ /**
+ * The value to multiply the current interval by for each retry attempt.
+ */
+ public void setMultiplier(double multiplier) {
+ checkMultiplier(multiplier);
+ this.multiplier = multiplier;
+ }
+
+ /**
+ * Return the value to multiply the current interval by for each retry attempt.
+ */
+ public double getMultiplier() {
+ return this.multiplier;
+ }
+
+ /**
+ * The maximum back off time.
+ */
+ public void setMaxInterval(long maxInterval) {
+ this.maxInterval = maxInterval;
+ }
+
+ /**
+ * Return the maximum back off time.
+ */
+ public long getMaxInterval() {
+ return this.maxInterval;
+ }
+
+ /**
+ * The maximum elapsed time in milliseconds after which a call to
+ * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
+ */
+ public void setMaxElapsedTime(long maxElapsedTime) {
+ this.maxElapsedTime = maxElapsedTime;
+ }
+
+ /**
+ * Return the maximum elapsed time in milliseconds after which a call to
+ * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
+ */
+ public long getMaxElapsedTime() {
+ return this.maxElapsedTime;
+ }
+
+ @Override
+ public BackOffExecution start() {
+ return new ExponentialBackOffExecution();
+ }
+
+ private void checkMultiplier(double multiplier) {
+ checkArgument(multiplier >= 1, "Invalid multiplier '" + multiplier + "'. Should be greater than " +
+ "or equal to 1. A multiplier of 1 is equivalent to a fixed interval.");
+ }
+
+ private class ExponentialBackOffExecution implements BackOffExecution {
+
+ private long currentInterval = -1;
+
+ private long currentElapsedTime = 0;
+
+ @Override
+ public long nextBackOff() {
+ if (this.currentElapsedTime >= maxElapsedTime) {
+ return STOP;
+ }
+
+ long nextInterval = computeNextInterval();
+ this.currentElapsedTime += nextInterval;
+ return nextInterval;
+ }
+
+ private long computeNextInterval() {
+ long maxInterval = getMaxInterval();
+ if (this.currentInterval >= maxInterval) {
+ return maxInterval;
+ } else if (this.currentInterval < 0) {
+ long initialInterval = getInitialInterval();
+ this.currentInterval = (initialInterval < maxInterval
+ ? initialInterval : maxInterval);
+ } else {
+ this.currentInterval = multiplyInterval(maxInterval);
+ }
+ return this.currentInterval;
+ }
+
+ private long multiplyInterval(long maxInterval) {
+ long i = this.currentInterval;
+ i *= getMultiplier();
+ return (i > maxInterval ? maxInterval : i);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ExponentialBackOff{");
+ sb.append("currentInterval=").append(this.currentInterval < 0 ? "n/a" : this.currentInterval + "ms");
+ sb.append(", multiplier=").append(getMultiplier());
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java b/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java
new file mode 100644
index 0000000..121e819
--- /dev/null
+++ b/src/main/java/com/github/phantomthief/failover/backoff/FixedBackOff.java
@@ -0,0 +1,104 @@
+package com.github.phantomthief.failover.backoff;
+
+/**
+ * A simple {@link BackOff} implementation that provides a fixed interval
+ * between two attempts and a maximum number of retries.
+ */
+public class FixedBackOff implements BackOff {
+
+ /**
+ * The default recovery interval: 5000 ms = 5 seconds.
+ */
+ public static final long DEFAULT_INTERVAL = 5000;
+
+ /**
+ * Constant value indicating an unlimited number of attempts.
+ */
+ public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;
+
+ private long interval = DEFAULT_INTERVAL;
+
+ private long maxAttempts = UNLIMITED_ATTEMPTS;
+
+
+ /**
+ * Create an instance with an interval of {@value #DEFAULT_INTERVAL}
+ * ms and an unlimited number of attempts.
+ */
+ public FixedBackOff() {
+ }
+
+ /**
+ * Create an instance.
+ *
+ * @param interval the interval between two attempts
+ * @param maxAttempts the maximum number of attempts
+ */
+ public FixedBackOff(long interval, long maxAttempts) {
+ this.interval = interval;
+ this.maxAttempts = maxAttempts;
+ }
+
+ /**
+ * Set the interval between two attempts in milliseconds.
+ */
+ public void setInterval(long interval) {
+ this.interval = interval;
+ }
+
+ /**
+ * Return the interval between two attempts in milliseconds.
+ */
+ public long getInterval() {
+ return this.interval;
+ }
+
+ /**
+ * Set the maximum number of attempts in milliseconds.
+ */
+ public void setMaxAttempts(long maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ /**
+ * Return the maximum number of attempts in milliseconds.
+ */
+ public long getMaxAttempts() {
+ return this.maxAttempts;
+ }
+
+ @Override
+ public BackOffExecution start() {
+ return new FixedBackOffExecution();
+ }
+
+
+ private class FixedBackOffExecution implements BackOffExecution {
+
+ private long currentAttempts = 0;
+
+ @Override
+ public long nextBackOff() {
+ this.currentAttempts++;
+ if (this.currentAttempts <= getMaxAttempts()) {
+ return getInterval();
+ } else {
+ return STOP;
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("FixedBackOff{");
+ sb.append("interval=").append(FixedBackOff.this.interval);
+ String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE
+ ? "unlimited"
+ : String.valueOf(FixedBackOff.this.maxAttempts);
+ sb.append(", currentAttempts=").append(this.currentAttempts);
+ sb.append(", maxAttempts=").append(attemptValue);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/phantomthief/failover/backoff/package-info.java b/src/main/java/com/github/phantomthief/failover/backoff/package-info.java
new file mode 100644
index 0000000..0db908a
--- /dev/null
+++ b/src/main/java/com/github/phantomthief/failover/backoff/package-info.java
@@ -0,0 +1,9 @@
+/**
+ * Copied from spring-core
+ *
+ * Write the code. Change the world.
+ *
+ * @author trang
+ * Created on 2019-11-09.
+ */
+package com.github.phantomthief.failover.backoff;
\ No newline at end of file
diff --git a/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java b/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java
index 62b0a6a..d0c6dfa 100644
--- a/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java
+++ b/src/main/java/com/github/phantomthief/failover/impl/GenericWeightFailoverBuilder.java
@@ -9,7 +9,9 @@
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.github.phantomthief.failover.backoff.BackOff;
import com.github.phantomthief.util.ThrowableFunction;
import com.github.phantomthief.util.ThrowablePredicate;
@@ -96,12 +98,21 @@ public GenericWeightFailoverBuilder checkDuration(long time, TimeUnit unit) {
@CheckReturnValue
@Nonnull
- public GenericWeightFailoverBuilder
- checker(@Nonnull ThrowableFunction super E, Double, Throwable> failChecker) {
+ public GenericWeightFailoverBuilder checker(
+ @Nonnull ThrowableFunction super E, Double, Throwable> failChecker) {
builder.checker(failChecker);
return this;
}
+ @CheckReturnValue
+ @Nonnull
+ public GenericWeightFailoverBuilder checker(
+ @Nonnull ThrowableFunction super E, Double, Throwable> failChecker,
+ @Nullable BackOff backOff) {
+ builder.checker(failChecker, backOff);
+ return this;
+ }
+
@CheckReturnValue
@Nonnull
public GenericWeightFailoverBuilder checker(
@@ -111,6 +122,16 @@ public GenericWeightFailoverBuilder checker(
return this;
}
+ @CheckReturnValue
+ @Nonnull
+ public GenericWeightFailoverBuilder checker(
+ @Nonnull ThrowablePredicate super E, Throwable> failChecker,
+ @Nonnegative double recoveredInitRate,
+ @Nullable BackOff backOff) {
+ builder.checker(failChecker, recoveredInitRate, backOff);
+ return this;
+ }
+
@CheckReturnValue
@Nonnull
public GenericWeightFailoverBuilder filter(Predicate filter) {
diff --git a/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java b/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java
index 55faf7a..0eb09a8 100644
--- a/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java
+++ b/src/main/java/com/github/phantomthief/failover/impl/WeightFailover.java
@@ -1,8 +1,13 @@
package com.github.phantomthief.failover.impl;
import static com.github.phantomthief.tuple.Tuple.tuple;
+import static com.github.phantomthief.util.MoreFunctions.catching;
+import static com.github.phantomthief.util.MoreFunctions.runCatching;
+import static com.github.phantomthief.util.MoreFunctions.runWithThreadName;
+import static com.github.phantomthief.util.MoreFunctions.supplyWithThreadName;
import static com.github.phantomthief.util.MoreSuppliers.lazy;
import static com.google.common.primitives.Ints.constrainToRange;
+import static com.google.common.util.concurrent.Futures.addCallback;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -21,20 +26,30 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.function.Predicate;
+import java.util.function.ToDoubleFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.slf4j.Logger;
import com.github.phantomthief.failover.Failover;
+import com.github.phantomthief.failover.backoff.BackOff;
+import com.github.phantomthief.failover.backoff.BackOffExecution;
import com.github.phantomthief.failover.util.SharedCheckExecutorHolder;
import com.github.phantomthief.tuple.TwoTuple;
import com.github.phantomthief.util.MoreSuppliers.CloseableSupplier;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* 默认权重记录
@@ -47,14 +62,19 @@ public class WeightFailover implements Failover, Closeable {
private static final Logger logger = getLogger(WeightFailover.class);
+ private final String name;
+
private final IntUnaryOperator failReduceWeight;
private final IntUnaryOperator successIncreaseWeight;
private final ConcurrentMap initWeightMap;
private final ConcurrentMap currentWeightMap;
private final CloseableSupplier> recoveryFuture;
+ private final ConcurrentLinkedQueue recoveryTasks;
+ private final ConcurrentMap runningRecoveryTasks;
private final Consumer onMinWeight;
private final int minWeight;
+ private final ToDoubleFunction checker;
/**
* {@code null} if this feature is off.
@@ -69,6 +89,7 @@ public class WeightFailover implements Failover, Closeable {
private volatile boolean closed;
WeightFailover(WeightFailoverBuilder builder) {
+ this.name = builder.name;
this.minWeight = builder.minWeight;
this.failReduceWeight = builder.failReduceWeight;
this.successIncreaseWeight = builder.successIncreaseWeight;
@@ -77,48 +98,110 @@ public class WeightFailover implements Failover, Closeable {
this.onMinWeight = builder.onMinWeight;
this.weightOnMissingNode = builder.weightOnMissingNode;
this.filter = builder.filter;
- this.recoveryFuture = lazy(
- () -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> {
- if (closed) {
- tryCloseRecoveryScheduler();
- return;
+ this.checker = builder.checker;
+ this.recoveryTasks = new ConcurrentLinkedQueue<>();
+ this.runningRecoveryTasks = new ConcurrentHashMap<>();
+ this.recoveryFuture = builder.backOff == null ? initDefaultChecker(builder) : initBackOffChecker(builder);
+ }
+
+ private CloseableSupplier> initDefaultChecker(WeightFailoverBuilder builder) {
+ return lazy(() -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> {
+ if (closed) {
+ tryCloseRecoveryScheduler();
+ return;
+ }
+ Thread currentThread = Thread.currentThread();
+ String origName = currentThread.getName();
+ if (builder.name != null) {
+ currentThread.setName(origName + "-[" + builder.name + "]");
+ }
+ try {
+ Map recoveredObjects = new HashMap<>();
+ this.currentWeightMap.forEach((obj, weight) -> {
+ if (weight == 0) {
+ double recoverRate = builder.checker.applyAsDouble(obj);
+ if (recoverRate > 0) {
+ recoveredObjects.put(obj, recoverRate);
+ }
+ }
+ });
+ if (!recoveredObjects.isEmpty()) {
+ logger.info("found recovered objects:{}", recoveredObjects);
+ }
+ recoveredObjects.forEach((recovered, rate) -> {
+ Integer initWeight = initWeightMap.get(recovered);
+ if (initWeight == null) {
+ throw new IllegalStateException("obj:" + recovered);
}
- Thread currentThread = Thread.currentThread();
- String origName = currentThread.getName();
- if (builder.name != null) {
- currentThread.setName(origName + "-[" + builder.name + "]");
+ int recoveredWeight = constrainToRange((int) (initWeight * rate), 1,
+ initWeight);
+ currentWeightMap.put(recovered, recoveredWeight);
+ if (builder.onRecovered != null) {
+ builder.onRecovered.accept(recovered);
}
- try {
- Map recoveredObjects = new HashMap<>();
- this.currentWeightMap.forEach((obj, weight) -> {
- if (weight == 0) {
- double recoverRate = builder.checker.applyAsDouble(obj);
- if (recoverRate > 0) {
- recoveredObjects.put(obj, recoverRate);
+ });
+ } catch (Throwable e) {
+ logger.error("", e);
+ } finally {
+ currentThread.setName(origName);
+ }
+ }, builder.checkDuration, builder.checkDuration, MILLISECONDS));
+ }
+
+ private CloseableSupplier> initBackOffChecker(WeightFailoverBuilder builder) {
+ return lazy(() -> SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(() -> {
+ if (closed) {
+ tryCloseRecoveryScheduler();
+ return;
+ }
+ // 为权重为 0 的资源创建健康检查任务,放入待检查队列
+ this.currentWeightMap.entrySet().stream()
+ .filter(entry -> entry.getValue() == 0)
+ .forEach((entry -> recoveryTasks.offer(new RecoveryTask(entry.getKey(), builder.backOff))));
+ // 如果队列不为空则将其转发到 scheduler
+ if (!recoveryTasks.isEmpty()) {
+ while (true) {
+ RecoveryTask task = recoveryTasks.poll();
+ if (task == null) {
+ break;
+ }
+ // 如果健康检查耗时超过 checkDuration,可能导致对同一资源发起多次 hc 请求,这里规避一下
+ if (runningRecoveryTasks.containsKey(task.getObj())) {
+ continue;
+ }
+ T obj = task.getObj();
+ // 新建的 task 将其放入 running 队列中
+ runningRecoveryTasks.put(obj, task);
+ // 与外层共用一个线程池,貌似也没什么不妥...
+ ListenableFuture future =
+ SharedCheckExecutorHolder.getInstance().schedule(task, task.nextBackOff(), MILLISECONDS);
+ addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable Integer recoveredWeight) {
+ runWithThreadName(origin -> origin + "-[" + name + "]", () -> runCatching(() -> {
+ // 不管成不成功执行完毕先将 task 删除
+ runningRecoveryTasks.remove(obj);
+ // 如果检查失败再次将其放入待检查队列,delay 时间顺延到下一周期
+ if (recoveredWeight == null || recoveredWeight == 0) {
+ recoveryTasks.offer(task);
+ } else {
+ currentWeightMap.put(obj, recoveredWeight);
+ if (builder.onRecovered != null) {
+ builder.onRecovered.accept(obj);
+ }
}
- }
- });
- if (!recoveredObjects.isEmpty()) {
- logger.info("found recovered objects:{}", recoveredObjects);
+ }));
}
- recoveredObjects.forEach((recovered, rate) -> {
- Integer initWeight = initWeightMap.get(recovered);
- if (initWeight == null) {
- throw new IllegalStateException("obj:" + recovered);
- }
- int recoveredWeight = constrainToRange((int) (initWeight * rate), 1,
- initWeight);
- currentWeightMap.put(recovered, recoveredWeight);
- if (builder.onRecovered != null) {
- builder.onRecovered.accept(recovered);
- }
- });
- } catch (Throwable e) {
- logger.error("", e);
- } finally {
- currentThread.setName(origName);
- }
- }, builder.checkDuration, builder.checkDuration, MILLISECONDS));
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ // 不管成不成功执行完毕先将 task 删除
+ runningRecoveryTasks.remove(obj);
+ }
+ });
+ }
+ }
+ }, builder.checkDuration, builder.checkDuration, MILLISECONDS));
}
/**
@@ -319,4 +402,41 @@ public Set getFailed() {
public String toString() {
return "WeightFailover [" + initWeightMap + "]" + "@" + Integer.toHexString(hashCode());
}
+
+ class RecoveryTask implements Callable {
+
+ private final T obj;
+ private final BackOffExecution backOff;
+
+ RecoveryTask(T obj, BackOff backOff) {
+ this.obj = obj;
+ this.backOff = backOff.start();
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ return supplyWithThreadName(origin -> origin + "-[" + name + "]", () -> catching(() -> {
+ double recoverRate = checker.applyAsDouble(obj);
+ if (recoverRate > 0) {
+ Integer initWeight = initWeightMap.get(obj);
+ if (initWeight == null) {
+ throw new IllegalStateException("obj:" + obj);
+ }
+ return constrainToRange((int) (initWeight * recoverRate), 1, initWeight);
+ }
+ return 0;
+ }));
+ }
+
+ @Nonnull
+ T getObj() {
+ return obj;
+ }
+
+ long nextBackOff() {
+ return backOff.nextBackOff();
+ }
+
+ }
+
}
diff --git a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java
index d3f0951..9a3d4e6 100644
--- a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java
+++ b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverBuilder.java
@@ -19,9 +19,11 @@
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.slf4j.Logger;
+import com.github.phantomthief.failover.backoff.BackOff;
import com.github.phantomthief.util.ThrowableFunction;
import com.github.phantomthief.util.ThrowablePredicate;
@@ -39,13 +41,14 @@ public class WeightFailoverBuilder {
Map initWeightMap;
ToDoubleFunction checker;
+ BackOff backOff;
long checkDuration;
Consumer onMinWeight;
Consumer onRecovered;
int minWeight = 0;
Integer weightOnMissingNode;
String name;
-
+
Predicate filter = alwaysTrue();
@CheckReturnValue
@@ -148,8 +151,18 @@ public WeightFailoverBuilder filter(@Nonnull Predicate filter) {
@SuppressWarnings("unchecked")
@CheckReturnValue
@Nonnull
- public WeightFailoverBuilder
- checker(@Nonnull ThrowableFunction super E, Double, Throwable> failChecker) {
+ public WeightFailoverBuilder checker(
+ @Nonnull ThrowableFunction super E, Double, Throwable> failChecker) {
+ checkNotNull(failChecker);
+ return checker(failChecker, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @CheckReturnValue
+ @Nonnull
+ public WeightFailoverBuilder checker(
+ @Nonnull ThrowableFunction super E, Double, Throwable> failChecker,
+ @Nullable BackOff backOff) {
checkNotNull(failChecker);
WeightFailoverBuilder thisBuilder = (WeightFailoverBuilder) this;
thisBuilder.checker = t -> {
@@ -160,6 +173,7 @@ public WeightFailoverBuilder filter(@Nonnull Predicate filter) {
return 0;
}
};
+ thisBuilder.backOff = backOff;
return thisBuilder;
}
@@ -171,7 +185,19 @@ public WeightFailoverBuilder checker(
@Nonnegative double recoveredInitRate) {
checkArgument(recoveredInitRate >= 0 && recoveredInitRate <= 1);
checkNotNull(failChecker);
- return checker(it -> failChecker.test(it) ? recoveredInitRate : 0);
+ return checker(failChecker, recoveredInitRate, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @CheckReturnValue
+ @Nonnull
+ public WeightFailoverBuilder checker(
+ @Nonnull ThrowablePredicate super E, Throwable> failChecker,
+ @Nonnegative double recoveredInitRate,
+ @Nullable BackOff backOff) {
+ checkArgument(recoveredInitRate >= 0 && recoveredInitRate <= 1);
+ checkNotNull(failChecker);
+ return checker(it -> failChecker.test(it) ? recoveredInitRate : 0, backOff);
}
@Nonnull
diff --git a/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java b/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java
index 426f69a..8dd923e 100644
--- a/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java
+++ b/src/main/java/com/github/phantomthief/failover/util/SharedCheckExecutorHolder.java
@@ -1,11 +1,14 @@
package com.github.phantomthief.failover.util;
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.lang.Thread.MIN_PRIORITY;
import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import javax.annotation.Nonnull;
+
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -15,26 +18,32 @@ public class SharedCheckExecutorHolder {
private static final int THREAD_COUNT = 10;
- public static ScheduledExecutorService getInstance() {
+ public static ListeningScheduledExecutorService getInstance() {
return LazyHolder.INSTANCE;
}
private static class LazyHolder {
- private static final ScheduledExecutorService INSTANCE = new ScheduledThreadPoolExecutor(
- THREAD_COUNT,
- new ThreadFactoryBuilder().setNameFormat("scheduled-failover-recovery-check-%d")
- .setPriority(MIN_PRIORITY)
- .setDaemon(true) //
- .build()) {
-
- public void shutdown() {
- throw new UnsupportedOperationException();
- }
-
- public List shutdownNow() {
- throw new UnsupportedOperationException();
- }
- };
+ private static final ListeningScheduledExecutorService INSTANCE =
+ listeningDecorator(new ScheduledThreadPoolExecutor(THREAD_COUNT,
+ new ThreadFactoryBuilder()
+ .setNameFormat("scheduled-failover-recovery-check-%d")
+ .setPriority(MIN_PRIORITY)
+ .setDaemon(true)
+ .build()) {
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nonnull
+ @Override
+ public List shutdownNow() {
+ throw new UnsupportedOperationException();
+ }
+ });
+
}
-}
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java b/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java
new file mode 100644
index 0000000..9aa8ab5
--- /dev/null
+++ b/src/test/java/com/github/phantomthief/failover/backoff/ExponentialBackOffTests.java
@@ -0,0 +1,115 @@
+package com.github.phantomthief.failover.backoff;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * ExponentialBackOffTests
+ */
+class ExponentialBackOffTests {
+
+ @Test
+ void defaultInstance() {
+ ExponentialBackOff backOff = new ExponentialBackOff();
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(2000L);
+ assertThat(execution.nextBackOff()).isEqualTo(3000L);
+ assertThat(execution.nextBackOff()).isEqualTo(4500L);
+ }
+
+ @Test
+ void simpleIncrease() {
+ ExponentialBackOff backOff = new ExponentialBackOff(100L, 2.0);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+ assertThat(execution.nextBackOff()).isEqualTo(200L);
+ assertThat(execution.nextBackOff()).isEqualTo(400L);
+ assertThat(execution.nextBackOff()).isEqualTo(800L);
+ }
+
+ @Test
+ void fixedIncrease() {
+ ExponentialBackOff backOff = new ExponentialBackOff(100L, 1.0);
+ backOff.setMaxElapsedTime(300L);
+
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void maxIntervalReached() {
+ ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
+ backOff.setMaxInterval(4000L);
+
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(2000L);
+ assertThat(execution.nextBackOff()).isEqualTo(4000L);
+ // max reached
+ assertThat(execution.nextBackOff()).isEqualTo(4000L);
+ assertThat(execution.nextBackOff()).isEqualTo(4000L);
+ }
+
+ @Test
+ void maxAttemptsReached() {
+ ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
+ backOff.setMaxElapsedTime(4000L);
+
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(2000L);
+ assertThat(execution.nextBackOff()).isEqualTo(4000L);
+ // > 4 sec wait in total
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void startReturnDifferentInstances() {
+ ExponentialBackOff backOff = new ExponentialBackOff();
+ backOff.setInitialInterval(2000L);
+ backOff.setMultiplier(2.0);
+ backOff.setMaxElapsedTime(4000L);
+
+ BackOffExecution execution = backOff.start();
+ BackOffExecution execution2 = backOff.start();
+
+ assertThat(execution.nextBackOff()).isEqualTo(2000L);
+ assertThat(execution2.nextBackOff()).isEqualTo(2000L);
+ assertThat(execution.nextBackOff()).isEqualTo(4000L);
+ assertThat(execution2.nextBackOff()).isEqualTo(4000L);
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ assertThat(execution2.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void invalidInterval() {
+ ExponentialBackOff backOff = new ExponentialBackOff();
+ assertThatIllegalArgumentException().isThrownBy(() ->
+ backOff.setMultiplier(0.9));
+ }
+
+ @Test
+ void maxIntervalReachedImmediately() {
+ ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
+ backOff.setMaxInterval(50L);
+
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(50L);
+ assertThat(execution.nextBackOff()).isEqualTo(50L);
+ }
+
+ @Test
+ void toStringContent() {
+ ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=n/a, multiplier=2.0}");
+ execution.nextBackOff();
+ assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=2000ms, multiplier=2.0}");
+ execution.nextBackOff();
+ assertThat(execution.toString()).isEqualTo("ExponentialBackOff{currentInterval=4000ms, multiplier=2.0}");
+ }
+
+}
diff --git a/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java b/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java
new file mode 100644
index 0000000..3791333
--- /dev/null
+++ b/src/test/java/com/github/phantomthief/failover/backoff/FixedBackOffTests.java
@@ -0,0 +1,73 @@
+package com.github.phantomthief.failover.backoff;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * FixedBackOffTests
+ */
+class FixedBackOffTests {
+
+ @Test
+ void defaultInstance() {
+ FixedBackOff backOff = new FixedBackOff();
+ BackOffExecution execution = backOff.start();
+ for (int i = 0; i < 100; i++) {
+ assertThat(execution.nextBackOff()).isEqualTo(FixedBackOff.DEFAULT_INTERVAL);
+ }
+ }
+
+ @Test
+ void noAttemptAtAll() {
+ FixedBackOff backOff = new FixedBackOff(100L, 0L);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void maxAttemptsReached() {
+ FixedBackOff backOff = new FixedBackOff(200L, 2);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(200L);
+ assertThat(execution.nextBackOff()).isEqualTo(200L);
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void startReturnDifferentInstances() {
+ FixedBackOff backOff = new FixedBackOff(100L, 1);
+ BackOffExecution execution = backOff.start();
+ BackOffExecution execution2 = backOff.start();
+
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+ assertThat(execution2.nextBackOff()).isEqualTo(100L);
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ assertThat(execution2.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void liveUpdate() {
+ FixedBackOff backOff = new FixedBackOff(100L, 1);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.nextBackOff()).isEqualTo(100L);
+
+ backOff.setInterval(200L);
+ backOff.setMaxAttempts(2);
+
+ assertThat(execution.nextBackOff()).isEqualTo(200L);
+ assertThat(execution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
+ }
+
+ @Test
+ void toStringContent() {
+ FixedBackOff backOff = new FixedBackOff(200L, 10);
+ BackOffExecution execution = backOff.start();
+ assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=0, maxAttempts=10}");
+ execution.nextBackOff();
+ assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=1, maxAttempts=10}");
+ execution.nextBackOff();
+ assertThat(execution.toString()).isEqualTo("FixedBackOff{interval=200, currentAttempts=2, maxAttempts=10}");
+ }
+
+}
diff --git a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java
index c07487a..b7090f8 100644
--- a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java
+++ b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverTest.java
@@ -28,6 +28,7 @@
import org.junit.jupiter.api.Test;
import com.github.phantomthief.failover.Failover;
+import com.github.phantomthief.failover.backoff.FixedBackOff;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multiset;
@@ -168,6 +169,49 @@ void testRateRecover2() {
assertEquals(1, failover.currentWeight("s2"));
}
+ @Test
+ void testBackOffRecover() {
+ WeightFailover failover = WeightFailover. newGenericBuilder()
+ .checker(it -> true, 0.00001, new FixedBackOff(500, 3))
+ .checkDuration(200, MILLISECONDS)
+ .build(of("s1", "s2"), 100);
+ assertEquals(100, failover.currentWeight("s1"));
+ assertEquals(100, failover.currentWeight("s2"));
+ failover.down("s2");
+ assertEquals(0, failover.currentWeight("s2"));
+ sleepUninterruptibly(1, SECONDS);
+ assertEquals(1, failover.currentWeight("s2"));
+ }
+
+ @Test
+ void testBackOffRecover2() {
+ WeightFailover failover = WeightFailover. newGenericBuilder()
+ .checker(it -> true, 0.00001, new FixedBackOff(5000, 3))
+ .checkDuration(200, MILLISECONDS)
+ .build(of("s1", "s2"), 100);
+ assertEquals(100, failover.currentWeight("s1"));
+ assertEquals(100, failover.currentWeight("s2"));
+ failover.down("s2");
+ assertEquals(0, failover.currentWeight("s2"));
+ sleepUninterruptibly(1, SECONDS);
+ assertEquals(0, failover.currentWeight("s2"));
+ }
+
+ @Test
+ void testBackOffRecoverException() {
+ WeightFailover failover = WeightFailover. newGenericBuilder()
+ .name("test")
+ .checker(it -> { throw new RuntimeException(); }, new FixedBackOff(500, 3))
+ .checkDuration(200, MILLISECONDS)
+ .build(of("s1", "s2"), 100);
+ assertEquals(100, failover.currentWeight("s1"));
+ assertEquals(100, failover.currentWeight("s2"));
+ failover.down("s2");
+ assertEquals(0, failover.currentWeight("s2"));
+ sleepUninterruptibly(1, SECONDS);
+ assertEquals(0, failover.currentWeight("s2"));
+ }
+
@Test
void testPerf() {
Map map = IntStream.range(1, 10).boxed()