diff --git a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java index aa75a4f3a4..1932f8ef75 100644 --- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java @@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.TestProbe; import org.apache.pekko.util.Timeout; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; @@ -36,6 +37,7 @@ import static org.apache.pekko.pattern.Patterns.ask; import static org.apache.pekko.pattern.Patterns.pipe; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** Copyright (C) 2009-2018 Lightbend Inc. */ public class PatternsTest extends JUnitSuite { @@ -485,6 +487,51 @@ public class PatternsTest extends JUnitSuite { assertEquals(expected, actual); } + @Test + public void testCompletedStageWithTimeout() throws Exception { + final String expected = "Hello"; + final CompletionStage delayedStage = + Patterns.timeout( + Duration.ofMillis(200), + system.scheduler(), + ec, + () -> CompletableFuture.completedFuture(expected)); + final String actual = delayedStage.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test + public void testFailedCompletedStageWithTimeout() throws Exception { + final CompletionStage delayedStage = + Patterns.timeout( + Duration.ofMillis(200), + system.scheduler(), + ec, + () -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IllegalStateException("Illegal!")); + return f; + }); + try { + delayedStage.toCompletableFuture().get(3, SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + assertEquals("Illegal!", e.getCause().getMessage()); + } + } + + @Test + public void testCompletedWithTimeout() throws Exception { + final CompletionStage delayedStage = + Patterns.timeout(Duration.ofMillis(200), system.scheduler(), ec, CompletableFuture::new); + try { + delayedStage.toCompletableFuture().get(3, SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + assertEquals("Timeout of 200 milliseconds expired", e.getCause().getMessage()); + } + } + @Test public void testGracefulStop() throws Exception { ActorRef target = system.actorOf(Props.create(StopActor.class)); diff --git a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala index 56853ba50f..e18148d849 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala @@ -13,10 +13,8 @@ package org.apache.pekko.pattern -import scala.concurrent.{ Await, Future, Promise } -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{ Await, ExecutionContextExecutor, Future, Promise, TimeoutException } import scala.concurrent.duration._ - import org.apache.pekko import pekko.actor.{ Actor, Props } import pekko.testkit.{ PekkoSpec, TestLatch } @@ -76,4 +74,23 @@ class PatternSpec extends PekkoSpec { intercept[IllegalStateException] { Await.result(r, remainingOrDefault) }.getMessage should ===("Mexico") } } + + "pattern.timeout" must { + "be completed successfully eventually" in { + val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.successful(5)) + Await.result(f, remainingOrDefault) should ===(5) + } + + "be completed abnormally eventually" in { + val f = + pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.failed(new IllegalStateException("ABC"))) + intercept[IllegalStateException] { Await.result(f, remainingOrDefault) }.getMessage should ===("ABC") + } + + "be completed with a TimeoutException if not completed within the specified time" in { + val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.never) + intercept[TimeoutException] { Await.result(f, remainingOrDefault) } + } + } + } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala index 2be1ad59b6..c37d8b4fd1 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala @@ -13,8 +13,7 @@ package org.apache.pekko.pattern -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage +import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutException } import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration @@ -87,4 +86,66 @@ trait FutureTimeoutSupport { } p } + + /** + * Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]] + * if the provided value is not completed within the specified duration. + * @since 1.2.0 + */ + def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])( + implicit ec: ExecutionContext): Future[T] = { + val future = + try value + catch { + case NonFatal(t) => Future.failed(t) + } + future.value match { + case Some(_) => future + case None => // not completed yet + val p = Promise[T]() + val timeout = using.scheduleOnce(duration) { + p.tryFailure(new TimeoutException(s"Timeout of $duration expired")) + if (future.isInstanceOf[CompletableFuture[T @unchecked]]) { + future.asInstanceOf[CompletableFuture[T]] + .toCompletableFuture + .cancel(true) + } + } + future.onComplete { result => + timeout.cancel() + p.tryComplete(result) + }(pekko.dispatch.ExecutionContexts.parasitic) + p.future + } + } + + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]] + * if the provided value is not completed within the specified duration. + * @since 1.2.0 + */ + def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])( + implicit ec: ExecutionContext): CompletionStage[T] = { + val stage: CompletionStage[T] = + try value + catch { + case NonFatal(t) => Futures.failedCompletionStage(t) + } + if (stage.toCompletableFuture.isDone) { + stage + } else { + val p = new CompletableFuture[T] + val timeout = using.scheduleOnce(duration) { + p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired")) + stage.toCompletableFuture.cancel(true) + } + stage.handle[Unit]((v: T, ex: Throwable) => { + timeout.cancel() + if (v != null) p.complete(v) + if (ex != null) p.completeExceptionally(ex) + }) + p + } + } + } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala index 2b8759e026..c263d852db 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala @@ -465,6 +465,18 @@ object Patterns { value: Callable[CompletionStage[T]]): CompletionStage[T] = afterCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[java.util.concurrent.TimeoutException]] + * if the provided value is not completed within the specified duration. + * @since 1.2.0 + */ + def timeout[T]( + duration: java.time.Duration, + scheduler: Scheduler, + context: ExecutionContext, + value: Callable[CompletionStage[T]]): CompletionStage[T] = + timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration.