Update test and sample usages of ActorRef.? to .ask
This commit is contained in:
parent
bf10f8620a
commit
9fc3251a03
15 changed files with 36 additions and 31 deletions
|
|
@ -159,7 +159,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
|
||||||
* guardian
|
* guardian
|
||||||
*/
|
*/
|
||||||
def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
|
def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
|
||||||
Await.result(internalSystem ? (ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration)
|
Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spawn the given behavior. This is created as a child of the test kit
|
* Spawn the given behavior. This is created as a child of the test kit
|
||||||
|
|
@ -173,7 +173,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
|
||||||
* guardian
|
* guardian
|
||||||
*/
|
*/
|
||||||
def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
|
def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
|
||||||
Await.result(internalSystem ? (ActorTestKitGuardian.SpawnActor(name, behavior, _, props)), timeout.duration)
|
Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActor(name, behavior, _, props)), timeout.duration)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the actor under test and wait until it terminates.
|
* Stop the actor under test and wait until it terminates.
|
||||||
|
|
@ -181,7 +181,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
|
||||||
* Other actors will not be stopped by this method.
|
* Other actors will not be stopped by this method.
|
||||||
*/
|
*/
|
||||||
def stop[T](ref: ActorRef[T], max: FiniteDuration = timeout.duration): Unit = try {
|
def stop[T](ref: ActorRef[T], max: FiniteDuration = timeout.duration): Unit = try {
|
||||||
Await.result(internalSystem ? { x: ActorRef[ActorTestKitGuardian.Ack.type] ⇒ ActorTestKitGuardian.StopActor(ref, x) }, max)
|
Await.result(internalSystem.ask { x: ActorRef[ActorTestKitGuardian.Ack.type] ⇒ ActorTestKitGuardian.StopActor(ref, x) }, max)
|
||||||
} catch {
|
} catch {
|
||||||
case _: TimeoutException ⇒
|
case _: TimeoutException ⇒
|
||||||
assert(false, s"timeout ($max) during stop() waiting for actor [${ref.path}] to stop")
|
assert(false, s"timeout ($max) during stop() waiting for actor [${ref.path}] to stop")
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ object AsyncTestingExampleSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = {
|
private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = {
|
||||||
publisher ? (ref ⇒ Message(i, ref))
|
publisher.ask(ref ⇒ Message(i, ref))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import org.scalatest.WordSpecLike
|
||||||
|
|
||||||
object AskSpec {
|
object AskSpec {
|
||||||
sealed trait Msg
|
sealed trait Msg
|
||||||
final case class Foo(s: String)(val replyTo: ActorRef[String]) extends Msg
|
final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg
|
||||||
final case class Stop(replyTo: ActorRef[Unit]) extends Msg
|
final case class Stop(replyTo: ActorRef[Unit]) extends Msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -51,12 +51,12 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
||||||
"Ask pattern" must {
|
"Ask pattern" must {
|
||||||
"fail the future if the actor is already terminated" in {
|
"fail the future if the actor is already terminated" in {
|
||||||
val ref = spawn(behavior)
|
val ref = spawn(behavior)
|
||||||
(ref ? Stop).futureValue
|
(ref.ask(Stop)).futureValue
|
||||||
val probe = createTestProbe()
|
val probe = createTestProbe()
|
||||||
probe.expectTerminated(ref, probe.remainingOrDefault)
|
probe.expectTerminated(ref, probe.remainingOrDefault)
|
||||||
val answer =
|
val answer =
|
||||||
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
|
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
|
||||||
ref ? Foo("bar")
|
ref.ask(Foo("bar", _))
|
||||||
}
|
}
|
||||||
val result = answer.failed.futureValue
|
val result = answer.failed.futureValue
|
||||||
result shouldBe a[TimeoutException]
|
result shouldBe a[TimeoutException]
|
||||||
|
|
@ -65,7 +65,13 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
||||||
|
|
||||||
"succeed when the actor is alive" in {
|
"succeed when the actor is alive" in {
|
||||||
val ref = spawn(behavior)
|
val ref = spawn(behavior)
|
||||||
val response = ref ? Foo("bar")
|
val response = ref.ask(Foo("bar", _))
|
||||||
|
response.futureValue should ===("foo")
|
||||||
|
}
|
||||||
|
|
||||||
|
"provide a symbolic alias that works the same" in {
|
||||||
|
val ref = spawn(behavior)
|
||||||
|
val response = ref ? (Foo("bar", _))
|
||||||
response.futureValue should ===("foo")
|
response.futureValue should ===("foo")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -73,7 +79,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
||||||
val actor = spawn(Behaviors.empty[Foo])
|
val actor = spawn(Behaviors.empty[Foo])
|
||||||
implicit val timeout: Timeout = 10.millis
|
implicit val timeout: Timeout = 10.millis
|
||||||
EventFilter.warning(pattern = ".*unhandled message.*", occurrences = 1).intercept {
|
EventFilter.warning(pattern = ".*unhandled message.*", occurrences = 1).intercept {
|
||||||
val answer = actor ? Foo("bar")
|
val answer = actor.ask(Foo("bar", _))
|
||||||
val result = answer.failed.futureValue
|
val result = answer.failed.futureValue
|
||||||
result shouldBe a[TimeoutException]
|
result shouldBe a[TimeoutException]
|
||||||
result.getMessage should startWith("Ask timed out on")
|
result.getMessage should startWith("Ask timed out on")
|
||||||
|
|
@ -92,7 +98,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
||||||
|
|
||||||
val answer =
|
val answer =
|
||||||
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
|
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
|
||||||
noSuchActor ? Foo("bar")
|
noSuchActor.ask(Foo("bar", _))
|
||||||
}
|
}
|
||||||
val result = answer.failed.futureValue
|
val result = answer.failed.futureValue
|
||||||
result shouldBe a[TimeoutException]
|
result shouldBe a[TimeoutException]
|
||||||
|
|
@ -120,7 +126,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
implicit val scheduler = untypedSystem.toTyped.scheduler
|
implicit val scheduler = untypedSystem.toTyped.scheduler
|
||||||
val typedLegacy: ActorRef[AnyRef] = legacyActor
|
val typedLegacy: ActorRef[AnyRef] = legacyActor
|
||||||
(typedLegacy ? Ping).failed.futureValue should ===(ex)
|
(typedLegacy.ask(Ping)).failed.futureValue should ===(ex)
|
||||||
} finally {
|
} finally {
|
||||||
akka.testkit.TestKit.shutdownActorSystem(untypedSystem)
|
akka.testkit.TestKit.shutdownActorSystem(untypedSystem)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ class SpawnProtocolSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
val parent = spawn(SpawnProtocol.behavior, "parent2")
|
val parent = spawn(SpawnProtocol.behavior, "parent2")
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
implicit val timeout = Timeout(5.seconds)
|
implicit val timeout = Timeout(5.seconds)
|
||||||
val parentReply = parent ? SpawnProtocol.Spawn(target, "child", Props.empty)
|
val parentReply = parent.ask(SpawnProtocol.Spawn(target, "child", Props.empty))
|
||||||
val child = parentReply.futureValue
|
val child = parentReply.futureValue
|
||||||
val childReply = TestProbe[Pong.type]()
|
val childReply = TestProbe[Pong.type]()
|
||||||
child ! Ping(childReply.ref)
|
child ! Ping(childReply.ref)
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,7 @@ class LocalReceptionistSpec extends ScalaTestWithActorTestKit with WordSpecLike
|
||||||
"work with ask" in {
|
"work with ask" in {
|
||||||
val receptionist = spawn(LocalReceptionist.behavior)
|
val receptionist = spawn(LocalReceptionist.behavior)
|
||||||
val serviceA = spawn(behaviorA)
|
val serviceA = spawn(behaviorA)
|
||||||
val f: Future[Registered] = receptionist ? (Register(ServiceKeyA, serviceA, _))
|
val f: Future[Registered] = receptionist.ask(Register(ServiceKeyA, serviceA, _))
|
||||||
f.futureValue should be(Registered(ServiceKeyA, serviceA))
|
f.futureValue should be(Registered(ServiceKeyA, serviceA))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ object ReceptionistApiSpec {
|
||||||
// needs the explicit type on the future and the extra parenthesises
|
// needs the explicit type on the future and the extra parenthesises
|
||||||
// to work
|
// to work
|
||||||
val registered: Future[Receptionist.Registered] =
|
val registered: Future[Receptionist.Registered] =
|
||||||
system.receptionist ? (Receptionist.Register(key, service, _))
|
system.receptionist.ask(Receptionist.Register(key, service, _))
|
||||||
registered.foreach {
|
registered.foreach {
|
||||||
case key.Registered(ref) ⇒
|
case key.Registered(ref) ⇒
|
||||||
// ref is the right type here
|
// ref is the right type here
|
||||||
|
|
@ -41,7 +41,7 @@ object ReceptionistApiSpec {
|
||||||
|
|
||||||
// one-off ask outside of actor, should be uncommon but not rare
|
// one-off ask outside of actor, should be uncommon but not rare
|
||||||
val found: Future[Receptionist.Listing] =
|
val found: Future[Receptionist.Listing] =
|
||||||
system.receptionist ? (Receptionist.Find(key, _))
|
system.receptionist.ask(Receptionist.Find(key, _))
|
||||||
found.foreach {
|
found.foreach {
|
||||||
case key.Listing(instances) ⇒
|
case key.Listing(instances) ⇒
|
||||||
instances.foreach(_ ! "woho")
|
instances.foreach(_ ! "woho")
|
||||||
|
|
|
||||||
|
|
@ -63,15 +63,15 @@ class DispatchersDocSpec extends ScalaTestWithActorTestKit(DispatchersDocSpec.co
|
||||||
val probe = TestProbe[Dispatcher]()
|
val probe = TestProbe[Dispatcher]()
|
||||||
val actor: ActorRef[SpawnProtocol] = spawn(SpawnProtocol.behavior)
|
val actor: ActorRef[SpawnProtocol] = spawn(SpawnProtocol.behavior)
|
||||||
|
|
||||||
val withDefault = (actor ? Spawn(giveMeYourDispatcher, "default", Props.empty)).futureValue
|
val withDefault = actor.ask(Spawn(giveMeYourDispatcher, "default", Props.empty)).futureValue
|
||||||
withDefault ! WhichDispatcher(probe.ref)
|
withDefault ! WhichDispatcher(probe.ref)
|
||||||
probe.receiveMessage().id shouldEqual "akka.actor.default-dispatcher"
|
probe.receiveMessage().id shouldEqual "akka.actor.default-dispatcher"
|
||||||
|
|
||||||
val withBlocking = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.blocking())).futureValue
|
val withBlocking = actor.ask(Spawn(giveMeYourDispatcher, "default", DispatcherSelector.blocking())).futureValue
|
||||||
withBlocking ! WhichDispatcher(probe.ref)
|
withBlocking ! WhichDispatcher(probe.ref)
|
||||||
probe.receiveMessage().id shouldEqual "akka.actor.default-blocking-io-dispatcher"
|
probe.receiveMessage().id shouldEqual "akka.actor.default-blocking-io-dispatcher"
|
||||||
|
|
||||||
val withCustom = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.fromConfig("your-dispatcher"))).futureValue
|
val withCustom = actor.ask(Spawn(giveMeYourDispatcher, "default", DispatcherSelector.fromConfig("your-dispatcher"))).futureValue
|
||||||
withCustom ! WhichDispatcher(probe.ref)
|
withCustom ! WhichDispatcher(probe.ref)
|
||||||
probe.receiveMessage().id shouldEqual "your-dispatcher"
|
probe.receiveMessage().id shouldEqual "your-dispatcher"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -368,7 +368,7 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
implicit val scheduler = system.scheduler
|
implicit val scheduler = system.scheduler
|
||||||
|
|
||||||
val result: Future[Cookies] = cookieActorRef ? (ref ⇒ GiveMeCookies(ref))
|
val result: Future[Cookies] = cookieActorRef.ask(ref ⇒ GiveMeCookies(ref))
|
||||||
|
|
||||||
// the response callback will be executed on this execution context
|
// the response callback will be executed on this execution context
|
||||||
implicit val ec = system.executionContext
|
implicit val ec = system.executionContext
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ class SpawnProtocolDocSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
implicit val scheduler: Scheduler = system.scheduler
|
implicit val scheduler: Scheduler = system.scheduler
|
||||||
|
|
||||||
val greeter: Future[ActorRef[HelloWorld.Greet]] =
|
val greeter: Future[ActorRef[HelloWorld.Greet]] =
|
||||||
system ? SpawnProtocol.Spawn(behavior = HelloWorld.greeter, name = "greeter", props = Props.empty)
|
system.ask(SpawnProtocol.Spawn(behavior = HelloWorld.greeter, name = "greeter", props = Props.empty))
|
||||||
|
|
||||||
val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) ⇒
|
val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) ⇒
|
||||||
context.log.info("Greeting for {} from {}", message.whom, message.from)
|
context.log.info("Greeting for {} from {}", message.whom, message.from)
|
||||||
|
|
@ -69,7 +69,7 @@ class SpawnProtocolDocSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
}
|
}
|
||||||
|
|
||||||
val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
|
val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
|
||||||
system ? SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty)
|
system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty))
|
||||||
|
|
||||||
for (greeterRef ← greeter; replyToRef ← greetedReplyTo) {
|
for (greeterRef ← greeter; replyToRef ← greetedReplyTo) {
|
||||||
greeterRef ! HelloWorld.Greet("Akka", replyToRef)
|
greeterRef ! HelloWorld.Greet("Akka", replyToRef)
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,7 @@ import akka.util.JavaDurationConverters._
|
||||||
// Scala API impl
|
// Scala API impl
|
||||||
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
|
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
pipeToSelf((target ? createRequest)(responseTimeout, system.scheduler))(mapResponse)
|
pipeToSelf((target.ask(createRequest))(responseTimeout, system.scheduler))(mapResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Java API impl
|
// Java API impl
|
||||||
|
|
|
||||||
|
|
@ -31,5 +31,5 @@ import scala.compat.java8.FutureConverters._
|
||||||
*/
|
*/
|
||||||
object AskPattern {
|
object AskPattern {
|
||||||
def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
|
def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
|
||||||
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava
|
(actor.ask(message.apply)(timeout.asScala, scheduler)).toJava
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ import akka.actor.typed.internal.InternalRecipientRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ask-pattern implements the initiator side of a request–reply protocol.
|
* The ask-pattern implements the initiator side of a request–reply protocol.
|
||||||
* The `?` operator is pronounced as "ask".
|
|
||||||
*
|
*
|
||||||
* See [[AskPattern.Askable.ask]] for details
|
* See [[AskPattern.Askable.ask]] for details
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ class TypedActorBenchmark {
|
||||||
@Benchmark
|
@Benchmark
|
||||||
@OperationsPerInvocation(totalMessages)
|
@OperationsPerInvocation(totalMessages)
|
||||||
def echo(): Unit = {
|
def echo(): Unit = {
|
||||||
Await.result(system ? Start, timeout)
|
Await.result(system.ask(Start), timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,14 +106,14 @@ object ReplicatorSpec {
|
||||||
implicit val scheduler: Scheduler = ???
|
implicit val scheduler: Scheduler = ???
|
||||||
implicit val cluster: Cluster = ???
|
implicit val cluster: Cluster = ???
|
||||||
|
|
||||||
val reply1: Future[GetResponse[GCounter]] = replicator ? Replicator.Get(Key, Replicator.ReadLocal)
|
val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(Key, Replicator.ReadLocal))
|
||||||
|
|
||||||
val reply2: Future[UpdateResponse[GCounter]] =
|
val reply2: Future[UpdateResponse[GCounter]] =
|
||||||
replicator ? Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)
|
replicator.ask(Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1))
|
||||||
|
|
||||||
val reply3: Future[DeleteResponse[GCounter]] = replicator ? Replicator.Delete(Key, Replicator.WriteLocal)
|
val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(Key, Replicator.WriteLocal))
|
||||||
|
|
||||||
val reply4: Future[ReplicaCount] = replicator ? Replicator.GetReplicaCount()
|
val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount())
|
||||||
|
|
||||||
// suppress unused compiler warnings
|
// suppress unused compiler warnings
|
||||||
println("" + reply1 + reply2 + reply3 + reply4)
|
println("" + reply1 + reply2 + reply3 + reply4)
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ object PersistentActorCompileOnlyTest {
|
||||||
|
|
||||||
case class EventsInFlight(nextCorrelationId: Int, dataByCorrelationId: Map[Int, String])
|
case class EventsInFlight(nextCorrelationId: Int, dataByCorrelationId: Map[Int, String])
|
||||||
|
|
||||||
case class Request(correlationId: Int, data: String)(sender: ActorRef[Response])
|
case class Request(correlationId: Int, data: String, sender: ActorRef[Response])
|
||||||
case class Response(correlationId: Int)
|
case class Response(correlationId: Int)
|
||||||
val sideEffectProcessor: ActorRef[Request] = ???
|
val sideEffectProcessor: ActorRef[Request] = ???
|
||||||
|
|
||||||
|
|
@ -67,7 +67,7 @@ object PersistentActorCompileOnlyTest {
|
||||||
implicit val scheduler: akka.actor.Scheduler = ???
|
implicit val scheduler: akka.actor.Scheduler = ???
|
||||||
implicit val ec: ExecutionContext = ???
|
implicit val ec: ExecutionContext = ???
|
||||||
|
|
||||||
(sideEffectProcessor ? Request(correlationId, data))
|
sideEffectProcessor.ask(Request(correlationId, data, _))
|
||||||
.map(response ⇒ AcknowledgeSideEffect(response.correlationId))
|
.map(response ⇒ AcknowledgeSideEffect(response.correlationId))
|
||||||
.foreach(sender ! _)
|
.foreach(sender ! _)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue