Merge pull request #17102 from ktoso/wip-promiseActorRef-wrong-deathwatch-ktoso
=act #17087 PromiseActorRef (ask pattern) must send right Terminated()
This commit is contained in:
commit
83854f87c9
2 changed files with 31 additions and 12 deletions
|
|
@ -3,16 +3,16 @@
|
|||
*/
|
||||
package akka.pattern
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
import akka.util.Timeout
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
|
||||
class AskSpec extends AkkaSpec {
|
||||
class AskSpec extends AkkaSpec with ScalaFutures {
|
||||
|
||||
"The “ask” pattern" must {
|
||||
|
||||
|
|
@ -199,6 +199,26 @@ class AskSpec extends AkkaSpec {
|
|||
deadListener.expectMsgClass(200 milliseconds, classOf[DeadLetter])
|
||||
}
|
||||
|
||||
"allow watching the promiseActor and send Terminated() when completes" in {
|
||||
implicit val timeout = Timeout(300 millis)
|
||||
val p = TestProbe()
|
||||
|
||||
val act = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case msg ⇒ p.ref ! sender() -> msg
|
||||
}
|
||||
}))
|
||||
|
||||
val f = (act ? "ask").mapTo[String]
|
||||
val (promiseActorRef, "ask") = p.expectMsgType[(ActorRef, String)]
|
||||
|
||||
watch(promiseActorRef)
|
||||
promiseActorRef ! "complete"
|
||||
|
||||
val completed = f.futureValue
|
||||
completed should ===("complete")
|
||||
expectTerminated(promiseActorRef, 1.second)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,16 +3,16 @@
|
|||
*/
|
||||
package akka.pattern
|
||||
|
||||
import language.implicitConversions
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import akka.actor._
|
||||
import akka.dispatch.sysmsg._
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
import scala.concurrent.{ Future, Promise, ExecutionContext }
|
||||
import akka.util.{ Timeout, Unsafe }
|
||||
import scala.util.{ Success, Failure }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||
import scala.language.implicitConversions
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
/**
|
||||
* This is what is used to complete a Future that is returned from an ask/? call,
|
||||
|
|
@ -177,9 +177,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
|
|||
*/
|
||||
private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any], _mcn: String)
|
||||
extends MinimalActorRef {
|
||||
import AbstractPromiseActorRef.{ stateOffset, watchedByOffset }
|
||||
import PromiseActorRef._
|
||||
import AbstractPromiseActorRef.stateOffset
|
||||
import AbstractPromiseActorRef.watchedByOffset
|
||||
|
||||
// This is necessary for weaving the PromiseActorRef into the asked message, i.e. the replyTo pattern.
|
||||
@volatile var messageClassName = _mcn
|
||||
|
|
@ -308,7 +307,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
|||
watchers foreach { watcher ⇒
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
watcher.asInstanceOf[InternalActorRef]
|
||||
.sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
|
||||
.sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue