#1973 - Adding support for a mechanism to create Futures that will have a result after a specified duration.

This commit is contained in:
Viktor Klang 2012-04-25 18:34:16 +02:00
parent 1f30be1f87
commit ba4a3b7f65
4 changed files with 62 additions and 4 deletions

View file

@ -8,9 +8,10 @@ import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorTimeoutException
import akka.dispatch.Await
import akka.util.Duration
import akka.util.duration._
import akka.dispatch.{ Future, Promise, Await }
import java.lang.IllegalStateException
object PatternSpec {
case class Work(duration: Duration)
@ -49,4 +50,20 @@ class PatternSpec extends AkkaSpec {
}
}
}
"pattern.after" must {
"be completed successfully eventually" in {
val f = after(1 second, using = system.scheduler)(Promise.successful(5))
val r = Future.firstCompletedOf(Seq(Promise[Int](), f))
Await.result(r, remaining) must be(5)
}
"be completed abnormally eventually" in {
val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico")))
val r = Future.firstCompletedOf(Seq(Promise[Int](), f))
intercept[IllegalStateException] { Await.result(r, remaining) }.getMessage must be("Mexico")
}
}
}

View file

@ -0,0 +1,23 @@
package akka.pattern
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.util.Duration
import akka.actor._
import akka.dispatch.{ ExecutionContext, Promise, Future }
trait FutureTimeoutSupport {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, using: Scheduler)(value: Future[T])(implicit ec: ExecutionContext): Future[T] =
if (duration.isFinite() && duration.length < 1) value else {
val p = Promise[T]()
val c = using.scheduleOnce(duration) { p completeWith value }
p onComplete { _ c.cancel() }
p
}
}

View file

@ -3,10 +3,14 @@
*/
package akka.pattern
import akka.actor.Scheduler
import akka.dispatch.ExecutionContext
import java.util.concurrent.Callable
object Patterns {
import akka.actor.{ ActorRef, ActorSystem }
import akka.dispatch.Future
import akka.pattern.{ ask scalaAsk, pipe scalaPipe }
import akka.pattern.{ ask scalaAsk, pipe scalaPipe, gracefulStop scalaGracefulStop, after scalaAfter }
import akka.util.{ Timeout, Duration }
/**
@ -99,5 +103,19 @@ object Patterns {
* is completed with failure [[akka.actor.ActorTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] =
scalaAfter(duration, scheduler)(value.call())(context)
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
scalaAfter(duration, scheduler)(value)(context)
}

View file

@ -40,6 +40,6 @@ import akka.util.{ Timeout, Duration }
* ask(actor, message);
* }}}
*/
package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport {
package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport with FutureTimeoutSupport {
}