From ba4a3b7f65aac8fff6f43c4406734fb2df22dec8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Apr 2012 18:34:16 +0200 Subject: [PATCH] #1973 - Adding support for a mechanism to create Futures that will have a result after a specified duration. --- .../test/scala/akka/pattern/PatternSpec.scala | 19 ++++++++++++++- .../akka/pattern/FutureTimeoutSupport.scala | 23 +++++++++++++++++++ .../main/scala/akka/pattern/Patterns.scala | 22 ++++++++++++++++-- .../src/main/scala/akka/pattern/package.scala | 2 +- 4 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index ce1b9db201..2776beabce 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -8,9 +8,10 @@ import akka.testkit.AkkaSpec import akka.actor.Props import akka.actor.Actor import akka.actor.ActorTimeoutException -import akka.dispatch.Await import akka.util.Duration import akka.util.duration._ +import akka.dispatch.{ Future, Promise, Await } +import java.lang.IllegalStateException object PatternSpec { case class Work(duration: Duration) @@ -49,4 +50,20 @@ class PatternSpec extends AkkaSpec { } } } + + "pattern.after" must { + "be completed successfully eventually" in { + val f = after(1 second, using = system.scheduler)(Promise.successful(5)) + + val r = Future.firstCompletedOf(Seq(Promise[Int](), f)) + Await.result(r, remaining) must be(5) + } + + "be completed abnormally eventually" in { + val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico"))) + + val r = Future.firstCompletedOf(Seq(Promise[Int](), f)) + intercept[IllegalStateException] { Await.result(r, remaining) }.getMessage must be("Mexico") + } + } } diff --git a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala new file mode 100644 index 0000000000..25bc199ff5 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala @@ -0,0 +1,23 @@ +package akka.pattern + +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +import akka.util.Duration +import akka.actor._ +import akka.dispatch.{ ExecutionContext, Promise, Future } + +trait FutureTimeoutSupport { + /** + * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def after[T](duration: Duration, using: Scheduler)(value: ⇒ Future[T])(implicit ec: ExecutionContext): Future[T] = + if (duration.isFinite() && duration.length < 1) value else { + val p = Promise[T]() + val c = using.scheduleOnce(duration) { p completeWith value } + p onComplete { _ ⇒ c.cancel() } + p + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 7167775b29..b58e9a8fc1 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -3,10 +3,14 @@ */ package akka.pattern +import akka.actor.Scheduler +import akka.dispatch.ExecutionContext +import java.util.concurrent.Callable + object Patterns { import akka.actor.{ ActorRef, ActorSystem } import akka.dispatch.Future - import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe } + import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe, gracefulStop ⇒ scalaGracefulStop, after ⇒ scalaAfter } import akka.util.{ Timeout, Duration } /** @@ -99,5 +103,19 @@ object Patterns { * is completed with failure [[akka.actor.ActorTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = - akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] + scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided Callable + * after the specified duration. + */ + def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = + scalaAfter(duration, scheduler)(value.call())(context) + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = + scalaAfter(duration, scheduler)(value)(context) } diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index ec4786a4c0..f467ca72f2 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -40,6 +40,6 @@ import akka.util.{ Timeout, Duration } * ask(actor, message); * }}} */ -package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport { +package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport with FutureTimeoutSupport { } \ No newline at end of file