From 400402f76cccbb82bb6d0006e4a22a51f6470d20 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 14 Jul 2016 14:03:04 +0200 Subject: [PATCH] +act #20936 add CompletionStage API to CircuitBreaker (#20937) --- .../java/akka/pattern/CircuitBreakerTest.java | 42 +++++++++++++++++++ .../akka/pattern/CircuitBreakerSpec.scala | 2 +- .../scala/akka/pattern/CircuitBreaker.scala | 24 +++++++++-- .../circuitbreaker/DangerousJavaActor.java | 23 +++------- 4 files changed, 70 insertions(+), 21 deletions(-) create mode 100644 akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java diff --git a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java new file mode 100644 index 0000000000..c3e5da1606 --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.pattern; + +import akka.actor.*; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.compat.java8.FutureConverters; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class CircuitBreakerTest extends JUnitSuite { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("JavaAPI", AkkaSpec.testConf()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + @Test + public void useCircuitBreakerWithCompletableFuture() throws Exception { + final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); + final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); + final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis); + + final CompletableFuture f = new CompletableFuture<>(); + f.complete("hello"); + final CompletionStage res = breaker.callWithCircuitBreakerCS(() -> f); + assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index a0df91c16c..2c1e7adb1e 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -219,7 +219,7 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() val fut = breaker().withCircuitBreaker(Future { - Thread.sleep(150.millis.dilated.toMillis); + Thread.sleep(150.millis.dilated.toMillis) throwException }) checkLatch(breaker.openLatch) diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 5f34ac2e5a..6460ee039c 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -3,18 +3,24 @@ */ package akka.pattern -import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean } +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } + import akka.AkkaException import akka.actor.Scheduler import akka.util.Unsafe + import scala.util.control.NoStackTrace -import java.util.concurrent.{ Callable, CopyOnWriteArrayList } -import scala.concurrent.{ ExecutionContext, Future, Promise, Await } +import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList } + +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.concurrent.TimeoutException import scala.util.control.NonFatal import scala.util.Success import akka.dispatch.ExecutionContexts.sameThreadExecutionContext +import akka.japi.function.Creator + +import scala.compat.java8.FutureConverters /** * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread @@ -123,6 +129,18 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) + /** + * Java API (8) for [[#withCircuitBreaker]] + * + * @param body Call needing protected + * @return [[java.util.concurrent.CompletionStage]] containing the call result or a + * `scala.concurrent.TimeoutException` if the call timed out + */ + def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T] = + FutureConverters.toJava[T](callWithCircuitBreaker(new Callable[Future[T]] { + override def call(): Future[T] = FutureConverters.toScala(body.call()) + })) + /** * Wraps invocations of synchronous calls that need to be protected * diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java index f7eba2708f..92cfaba185 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -51,27 +51,16 @@ public class DangerousJavaActor extends UntypedActor { if (message instanceof String) { String m = (String) message; if ("is my middle name".equals(m)) { - pipe(breaker.callWithCircuitBreaker( - new Callable>() { - public Future call() throws Exception { - return future( - new Callable() { - public String call() { - return dangerousCall(); - } - }, getContext().dispatcher()); - } - }), getContext().dispatcher()).to(getSender()); + pipe( + breaker.callWithCircuitBreaker(() -> + future(() -> dangerousCall(), getContext().dispatcher()) + ), getContext().dispatcher() + ).to(getSender()); } if ("block for me".equals(m)) { getSender().tell(breaker .callWithSyncCircuitBreaker( - new Callable() { - @Override - public String call() throws Exception { - return dangerousCall(); - } - }), getSelf()); + () -> dangerousCall()), getSelf()); } } }