diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala new file mode 100644 index 0000000000..69297b743a --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.testkit.AkkaSpec +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorTimeoutException +import akka.dispatch.Await +import akka.util.Duration +import akka.util.duration._ + +object PatternSpec { + case class Work(duration: Duration) + class TargetActor extends Actor { + def receive = { + case Work(duration) ⇒ duration.sleep() + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class PatternSpec extends AkkaSpec { + + import PatternSpec._ + + "pattern.stop" must { + + "provide Future for stopping an actor" in { + val target = system.actorOf(Props[TargetActor]) + val result = gracefulStop(target, 5 seconds) + Await.result(result, 6 seconds) must be(true) + } + + "complete Future when actor already terminated" in { + val target = system.actorOf(Props[TargetActor]) + Await.ready(gracefulStop(target, 5 seconds), 6 seconds) + Await.ready(gracefulStop(target, 1 millis), 1 second) + } + + "complete Future with ActorTimeoutException when actor not terminated within timeout" in { + val target = system.actorOf(Props[TargetActor]) + target ! Work(200 millis) + val result = gracefulStop(target, 50 millis) + intercept[ActorTimeoutException] { + Await.result(result, 100 millis) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala new file mode 100644 index 0000000000..6840716bad --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.dispatch.Future +import akka.util.Duration + +/** + * Patterns is the Java API for the Akka patterns that provide solutions + * to commonly occurring problems. + */ +object Patterns { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[Boolean] = { + akka.pattern.gracefulStop(target, timeout)(system) + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala new file mode 100644 index 0000000000..eb24d9ae8e --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ActorTimeoutException +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.Terminated +import akka.dispatch.Future +import akka.dispatch.Promise +import akka.util.Duration + +/** + * Akka patterns that provide solutions to commonly occurring problems. + */ +package object pattern { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { + if (target.isTerminated) { + Promise.successful(true)(system.dispatcher) + } else { + val result = Promise[Boolean]()(system.dispatcher) + system.actorOf(Props(new Actor { + // Terminated will be received when target has been stopped + context watch target + target ! PoisonPill + // ReceiveTimeout will be received if nothing else is received within the timeout + context setReceiveTimeout timeout + + def receive = { + case Terminated(a) ⇒ + result.complete(Right(true)) + system.stop(self) + case ReceiveTimeout ⇒ + result.complete(Left( + new ActorTimeoutException("Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)))) + system.stop(self) + } + })) + result + } + } + +}