From 884e8d08a8aa297363703a9448d25b2425029e81 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 30 Mar 2015 16:17:12 +0200 Subject: [PATCH] =act #17087 PromiseActorRef (ask pattern) must send right Terminated() --- .../src/test/scala/akka/pattern/AskSpec.scala | 26 ++++++++++++++++--- .../main/scala/akka/pattern/AskSupport.scala | 17 ++++++------ 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 22fa3047bc..3e2b2ac2a8 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -3,16 +3,16 @@ */ package akka.pattern -import language.postfixOps - import akka.actor._ import akka.testkit.{ TestProbe, AkkaSpec } import akka.util.Timeout +import org.scalatest.concurrent.ScalaFutures + import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Failure -class AskSpec extends AkkaSpec { +class AskSpec extends AkkaSpec with ScalaFutures { "The “ask” pattern" must { @@ -199,6 +199,26 @@ class AskSpec extends AkkaSpec { deadListener.expectMsgClass(200 milliseconds, classOf[DeadLetter]) } + "allow watching the promiseActor and send Terminated() when completes" in { + implicit val timeout = Timeout(300 millis) + val p = TestProbe() + + val act = system.actorOf(Props(new Actor { + def receive = { + case msg ⇒ p.ref ! sender() -> msg + } + })) + + val f = (act ? "ask").mapTo[String] + val (promiseActorRef, "ask") = p.expectMsgType[(ActorRef, String)] + + watch(promiseActorRef) + promiseActorRef ! "complete" + + val completed = f.futureValue + completed should ===("complete") + expectTerminated(promiseActorRef, 1.second) + } } } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index a422f2d629..31dca7985d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -3,16 +3,16 @@ */ package akka.pattern -import language.implicitConversions - import java.util.concurrent.TimeoutException + import akka.actor._ import akka.dispatch.sysmsg._ -import scala.annotation.tailrec -import scala.util.control.NonFatal -import scala.concurrent.{ Future, Promise, ExecutionContext } import akka.util.{ Timeout, Unsafe } -import scala.util.{ Success, Failure } + +import scala.annotation.tailrec +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.language.implicitConversions +import scala.util.{ Failure, Success } /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -177,9 +177,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { */ private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any], _mcn: String) extends MinimalActorRef { + import AbstractPromiseActorRef.{ stateOffset, watchedByOffset } import PromiseActorRef._ - import AbstractPromiseActorRef.stateOffset - import AbstractPromiseActorRef.watchedByOffset // This is necessary for weaving the PromiseActorRef into the asked message, i.e. the replyTo pattern. @volatile var messageClassName = _mcn @@ -308,7 +307,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide watchers foreach { watcher ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ watcher.asInstanceOf[InternalActorRef] - .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false)) + .sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false)) } } }