Adds ActorRef.ignore (#28630)

This commit is contained in:
Renato Cavalcanti 2020-03-12 12:40:56 +01:00 committed by GitHub
parent 8721b05a66
commit 6e171815b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 409 additions and 22 deletions

View file

@ -73,7 +73,9 @@ import org.slf4j.LoggerFactory
def isTerminated: Boolean = whenTerminated.isCompleted
val deadLettersInbox = new DebugRef[Any](path.parent / "deadLetters", true)
override def deadLetters[U]: akka.actor.typed.ActorRef[U] = deadLettersInbox
override def deadLetters[U]: ActorRef[U] = deadLettersInbox
override def ignoreRef[U]: ActorRef[U] = deadLettersInbox
val controlledExecutor = new ControlledExecutor
implicit override def executionContext: scala.concurrent.ExecutionContextExecutor = controlledExecutor

View file

@ -99,6 +99,11 @@ public class InteractionPatternsTest extends JUnitSuite {
// #request-response-send
cookieFabric.tell(new CookieFabric.Request("give me cookies", context.getSelf()));
// #request-response-send
// #ignore-reply
cookieFabric.tell(
new CookieFabric.Request("don't send cookies back", context.getSystem().ignoreRef()));
// #ignore-reply
}
}

View file

@ -0,0 +1,147 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import java.util.concurrent.TimeoutException
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.concurrent.PatienceConfiguration.{ Timeout => PatienceTimeout }
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
class ActorRefIgnoreSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike {
case class Request(replyTo: ActorRef[Int])
// this Actor behavior receives simple request and answers back total number of
// messages it received so far
val askMeActorBehavior: Behavior[Request] = {
def internalBehavior(counter: Int): Behavior[Request] =
Behaviors.receiveMessage[Request] {
case Request(replyTo) =>
val newCounter = counter + 1
replyTo ! newCounter
internalBehavior(newCounter)
}
internalBehavior(0)
}
/**
* This actor sends a ask to 'askMeRef' at bootstrap and forward the answer to the probe.
* We will use it through out this test.
*/
def behavior(askMeRef: ActorRef[Request], probe: TestProbe[Int]) = Behaviors.setup[Int] { context =>
implicit val timeout: Timeout = 1.second
// send a message to interactWithRef
context.ask(askMeRef, Request) {
case Success(res) => res
case Failure(ex) => throw ex
}
Behaviors.receiveMessage { num =>
// receive response from interactWithRef and sent to prob
probe.ref ! num
Behaviors.same
}
}
"IgnoreActorRef instance" should {
"ignore all incoming messages" in {
val askMeRef = testKit.spawn(askMeActorBehavior)
val probe = testKit.createTestProbe[Int]("response-probe")
askMeRef ! Request(probe.ref)
probe.expectMessage(1)
// this is more a compile-time proof
// since the reply is ignored, we can't check that a message was sent to it
askMeRef ! Request(testKit.system.ignoreRef)
probe.expectNoMessage()
// but we do check that the counter has increased when we used the ActorRef.ignore
askMeRef ! Request(probe.ref)
probe.expectMessage(3)
}
// this is kind of obvious, the Future won't complete because the ignoreRef is used
"make a Future timeout when used in a 'ask'" in {
implicit val timeout: Timeout = 500.millis
val askMeRef = testKit.spawn(askMeActorBehavior)
val failedAsk =
askMeRef
.ask { _: ActorRef[Request] =>
Request(testKit.system.ignoreRef) // <- pass the ignoreRef instead, so Future never completes
}
.failed
.futureValue(PatienceTimeout(1.second))
failedAsk shouldBe a[TimeoutException]
}
// similar to above, but using actor-to-actor interaction
"ignore messages when used in actor-to-actor interaction ('ask')" in {
val probe = testKit.createTestProbe[Int]("probe-response")
// this prove that the machinery works, probe will receive a response
val askMeRef = testKit.spawn(askMeActorBehavior)
testKit.spawn(behavior(askMeRef, probe))
probe.expectMessage(1)
// new interaction using ignoreRef, probe won't receive anything
val ignoreRef = testKit.system.ignoreRef[Request]
testKit.spawn(behavior(ignoreRef, probe))
probe.expectNoMessage()
}
"be watchable from another actor without throwing an exception" in {
val probe = testKit.createTestProbe[String]("probe-response")
val forwardMessageRef =
Behaviors.setup[String] { ctx =>
ctx.watch(testKit.system.ignoreRef[String])
Behaviors.receiveMessage { str =>
probe.ref ! str
Behaviors.same
}
}
// this proves that the actor started and is operational and 'watch' didn't impact it
val ref = testKit.spawn(forwardMessageRef)
ref ! "abc"
probe.expectMessage("abc")
}
"be a singleton" in {
withClue("using the same type") {
testKit.system.ignoreRef[String] shouldBe theSameInstanceAs(testKit.system.ignoreRef[String])
}
withClue("using different types") {
testKit.system.ignoreRef[String] shouldBe theSameInstanceAs(testKit.system.ignoreRef[Int])
}
}
"be adaptable back and forth to classic" in {
testKit.system.ignoreRef[String].toClassic.toTyped[String]
}
}
}

View file

@ -82,11 +82,19 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpec
val context = new {
def self = probe.ref
}
// #request-response-send
cookieFabric ! CookieFabric.Request("give me cookies", context.self)
// #request-response-send
probe.receiveMessage()
Behaviors.setup[Nothing] { context =>
// #ignore-reply
cookieFabric ! CookieFabric.Request("don't send cookies back", context.system.ignoreRef)
// #ignore-reply
Behaviors.empty
}
}
"contain a sample for adapted response" in {

View file

@ -0,0 +1,2 @@
# Typed ActorRef that silently ignores all messages #25306 - PR #28630
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.ActorSystem.ignoreRef")

View file

@ -7,7 +7,6 @@ package akka.actor.typed
import akka.annotation.DoNotInherit
import akka.{ actor => classic }
import scala.annotation.unchecked.uncheckedVariance
import akka.actor.typed.internal.InternalRecipientRef
/**
@ -66,7 +65,6 @@ object ActorRef {
*/
def !(msg: T): Unit = ref.tell(msg)
}
}
/**

View file

@ -4,10 +4,8 @@
package akka.actor.typed
import akka.actor.ActorRefWithCell
import akka.actor.ExtendedActorSystem
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.actor.{ ActorRefWithCell, ExtendedActorSystem }
import akka.annotation.{ DoNotInherit, InternalApi }
object ActorRefResolver extends ExtensionId[ActorRefResolver] {
def get(system: ActorSystem[_]): ActorRefResolver = apply(system)
@ -75,8 +73,13 @@ abstract class ActorRefResolver extends Extension {
}
}
override def resolveActorRef[T](serializedActorRef: String): ActorRef[T] =
classicSystem.provider.resolveActorRef(serializedActorRef)
override def resolveActorRef[T](serializedActorRef: String): ActorRef[T] = {
val ref = classicSystem.provider.resolveActorRef(serializedActorRef)
if (ref eq classicSystem.provider.ignoreRef)
classicSystem.toTyped.ignoreRef
else
ref
}
}
object ActorRefResolverSetup {

View file

@ -6,12 +6,12 @@ package akka.actor.typed
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.{ Address, BootstrapSetup, ClassicActorSystemProvider }
import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.internal.{ EventStreamExtension, InternalRecipientRef }
import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, GuardianStartupBehavior, PropsAdapter }
import akka.actor.typed.internal.{ EventStreamExtension, InternalRecipientRef }
import akka.actor.typed.receptionist.Receptionist
import akka.actor.{ Address, BootstrapSetup, ClassicActorSystemProvider }
import akka.annotation.DoNotInherit
import akka.util.Helpers.Requiring
import akka.{ Done, actor => classic }
@ -141,6 +141,11 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA
*/
def deadLetters[U]: ActorRef[U]
/**
* An ActorRef that ignores any incoming messages.
*/
def ignoreRef[U]: ActorRef[U]
/**
* Create a string representation of the actor hierarchy within this system
* for debugging purposes.

View file

@ -76,6 +76,10 @@ import org.slf4j.{ Logger, LoggerFactory }
// Members declared in akka.actor.typed.ActorSystem
override def deadLetters[U]: ActorRef[U] = ActorRefAdapter(system.deadLetters)
private val cachedIgnoreRef: ActorRef[Nothing] = ActorRefAdapter(provider.ignoreRef)
override def ignoreRef[U]: ActorRef[U] = cachedIgnoreRef.unsafeUpcast[U]
override def dispatchers: Dispatchers = new Dispatchers {
override def lookup(selector: DispatcherSelector): ExecutionContextExecutor =
selector match {
@ -116,6 +120,7 @@ import org.slf4j.{ Logger, LoggerFactory }
}
override def address: Address = system.provider.getDefaultAddress
}
private[akka] object ActorSystemAdapter {

View file

@ -0,0 +1,2 @@
# Typed ActorRef that silently ignores all messages #25306 - PR #28630
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.ignoreRef")

View file

@ -368,16 +368,26 @@ final class ChildActorPath private[akka] (val parent: ActorPath, val name: Strin
}
override def toStringWithAddress(addr: Address): String = {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
buildToString(new JStringBuilder(length), length, diff, _.toStringWithAddress(addr)).toString
if (IgnoreActorRef.isIgnoreRefPath(this)) {
// we never change address for IgnoreActorRef
this.toString
} else {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
buildToString(new JStringBuilder(length), length, diff, _.toStringWithAddress(addr)).toString
}
}
override def toSerializationFormatWithAddress(addr: Address): String = {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
val sb = buildToString(new JStringBuilder(length + 12), length, diff, _.toStringWithAddress(addr))
appendUidFragment(sb).toString
if (IgnoreActorRef.isIgnoreRefPath(this)) {
// we never change address for IgnoreActorRef
this.toString
} else {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
val sb = buildToString(new JStringBuilder(length + 12), length, diff, _.toStringWithAddress(addr))
appendUidFragment(sb).toString
}
}
private def addressStringLengthDiff(address: Address): Int = {

View file

@ -7,10 +7,10 @@ package akka.actor
import java.util.concurrent.ConcurrentHashMap
import akka.annotation.InternalApi
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.annotation.DoNotInherit
import akka.dispatch._
import akka.dispatch.sysmsg._
@ -476,6 +476,53 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
/**
* An ActorRef that ignores any incoming messages.
*
* INTERNAL API
*/
@InternalApi private[akka] final class IgnoreActorRef(override val provider: ActorRefProvider) extends MinimalActorRef {
override val path: ActorPath = IgnoreActorRef.path
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = SerializedIgnore
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object IgnoreActorRef {
private val fakeSystemName = "local"
val path: ActorPath =
RootActorPath(Address("akka", IgnoreActorRef.fakeSystemName)) / "ignore"
private val pathString = path.toString
/**
* Check if the passed `otherPath` is the same as IgnoreActorRef.path
*/
def isIgnoreRefPath(otherPath: String): Boolean =
pathString == otherPath
/**
* Check if the passed `otherPath` is the same as IgnoreActorRef.path
*/
def isIgnoreRefPath(otherPath: ActorPath): Boolean =
path == otherPath
}
/**
* INTERNAL API
*/
@InternalApi @SerialVersionUID(1L) private[akka] object SerializedIgnore extends Serializable {
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = IgnoreActorRef
}
/**
* Subscribe to this class to be notified about all [[DeadLetter]] (also the suppressed ones)
* and [[Dropped]].

View file

@ -10,12 +10,12 @@ import akka.routing._
import akka.event._
import akka.util.Helpers
import akka.util.Collections.EmptyImmutableSeq
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.annotation.implicitNotFound
import akka.ConfigurationException
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
@ -58,6 +58,9 @@ import akka.util.OptionVal
*/
def deadLetters: ActorRef
/** INTERNAL API */
@InternalApi private[akka] def ignoreRef: ActorRef
/**
* The root path for all actors within this actor system, not including any remote address information.
*/
@ -391,6 +394,8 @@ private[akka] class LocalActorRefProvider private[akka] (
.getOrElse((p: ActorPath) => new DeadLetterActorRef(this, p, eventStream))
.apply(rootPath / "deadLetters")
override val ignoreRef: ActorRef = new IgnoreActorRef(this)
private[this] final val terminationPromise: Promise[Terminated] = Promise[Terminated]()
def terminationFuture: Future[Terminated] = terminationPromise.future

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, ActorRefResolver, ActorSystem }
import akka.actor.{ ExtendedActorSystem, IgnoreActorRef }
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import akka.{ actor => classic }
class ActorRefIgnoreSerializationSpec extends AnyWordSpec with ScalaFutures with Matchers with BeforeAndAfterAll {
private var system1: ActorSystem[String] = _
private var system2: ActorSystem[String] = _
val config = ConfigFactory.parseString(s"""
akka {
loglevel = debug
actor.provider = cluster
remote.classic.netty.tcp.port = 0
remote.artery {
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
override protected def beforeAll(): Unit = {
system1 = ActorSystem(Behaviors.empty[String], "sys1", config)
system2 = ActorSystem(Behaviors.empty[String], "sys2", config)
}
override protected def afterAll(): Unit = {
system1.terminate()
system2.terminate()
}
"ActorSystem.ignoreRef (in typed)" should {
"return a serializable ActorRef that can be sent between two ActorSystems using remote" in {
val ignoreRef = system1.ignoreRef[String]
val remoteRefStr = ActorRefResolver(system1).toSerializationFormat(ignoreRef)
withClue("check ActorRef path stays untouched, ie: /local/ignore") {
remoteRefStr shouldBe IgnoreActorRef.path.toString
}
withClue("check ActorRef path stays untouched when deserialized by another actor system") {
val deserRef: ActorRef[String] = ActorRefResolver(system2).resolveActorRef[String](remoteRefStr)
deserRef.path shouldBe IgnoreActorRef.path
(deserRef should be).theSameInstanceAs(system2.ignoreRef[String])
}
}
"return same instance when deserializing it twice (IgnoreActorRef is cached)" in {
val resolver = ActorRefResolver(system1)
withClue("using the same type") {
val deserRef1 = resolver.resolveActorRef[String](resolver.toSerializationFormat(system1.ignoreRef[String]))
val deserRef2 = resolver.resolveActorRef[String](resolver.toSerializationFormat(system1.ignoreRef[String]))
(deserRef1 should be).theSameInstanceAs(deserRef2)
}
withClue("using different types") {
val deserRef1 = resolver.resolveActorRef[String](resolver.toSerializationFormat(system1.ignoreRef[String]))
val deserRef2 = resolver.resolveActorRef[Int](resolver.toSerializationFormat(system1.ignoreRef[Int]))
(deserRef1 should be).theSameInstanceAs(deserRef2)
}
}
}
"IgnoreActorRef (in classic)" should {
"return a serializable ActorRef that can be sent between two ActorSystems using remote (akka classic)" in {
val ignoreRef = system1.ignoreRef[String].toClassic
val remoteRefStr = ignoreRef.path.toSerializationFormatWithAddress(system1.address)
withClue("check ActorRef path stays untouched, ie: /local/ignore") {
remoteRefStr shouldBe IgnoreActorRef.path.toString
}
withClue("check ActorRef path stays untouched when deserialized by another actor system") {
val providerSys2 = system2.classicSystem.asInstanceOf[ExtendedActorSystem].provider
val deserRef: classic.ActorRef = providerSys2.resolveActorRef(remoteRefStr)
deserRef.path shouldBe IgnoreActorRef.path
(deserRef should be).theSameInstanceAs(system2.ignoreRef[String].toClassic)
}
}
"return same instance when deserializing it twice (IgnoreActorRef is cached)" in {
val ignoreRef = system1.ignoreRef[String].toClassic
val remoteRefStr = ignoreRef.path.toSerializationFormat
val providerSys1 = system1.classicSystem.asInstanceOf[ExtendedActorSystem].provider
val deserRef1 = providerSys1.resolveActorRef(remoteRefStr)
val deserRef2 = providerSys1.resolveActorRef(remoteRefStr)
(deserRef1 should be).theSameInstanceAs(deserRef2)
}
}
}

View file

@ -76,7 +76,7 @@ Java
: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #request-response-protocol }
The sender would use its own @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`], which it can access through @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`], for the `respondTo`.
The sender would use its own @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`], which it can access through @scala[`ActorContext.self`]@java[`ActorContext.getSelf()`], for the `replyTo`.
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #request-response-send }
@ -225,6 +225,31 @@ Java
* There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor))
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
## Ignoring replies
In some situations an actor has a response for a particular request message but you are not interested in the response. In this case you can pass @scala[`system.ignoreRef`]@java[`system.ignoreRef()`] turning the request-response into a fire-and-forget.
@scala[`system.ignoreRef`]@java[`system.ignoreRef()`], as the name indicates, returns an `ActorRef` that ignores any message sent to it.
With the same protocol as the @ref[request response](#request-response) above, if the sender would prefer to ignore the reply it could pass @scala[`system.ignoreRef`]@java[`system.ignoreRef()`] for the `replyTo`, which it can access through @scala[`ActorContext.system.ignoreRef`]@java[`ActorContext.getSystem().ignoreRef()`].
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #ignore-reply }
Java
: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #ignore-reply }
**Useful when:**
* Sending a message for which the protocol defines a reply, but you are not interested in getting the reply
**Problems:**
The returned `ActorRef` ignores all messages sent to it, therefore it should be used carefully.
* Passing it around inadvertently as if it was a normal `ActorRef` may result in broken actor-to-actor interactions.
* Using it when performing an `ask` from outside the Actor System will cause the @scala[`Future`]@java[`CompletionStage`] returned by the `ask` to timeout since it will never complete.
* Finally, it's legal to `watch` it, but since it's of a special kind, it never terminates and therefore you will never receive a `Terminated` signal from it.
## Send Future result to self
When using an API that returns a @scala[`Future`]@java[`CompletionStage`] from an actor it's common that you would

View file

@ -187,6 +187,7 @@ private[akka] class RemoteActorRefProvider(
override def rootPath: ActorPath = local.rootPath
override def deadLetters: InternalActorRef = local.deadLetters
override def ignoreRef: ActorRef = local.ignoreRef
// these are only available after init()
override def rootGuardian: InternalActorRef = local.rootGuardian
@ -524,6 +525,9 @@ private[akka] class RemoteActorRefProvider(
* public `resolveActorRef(path: String)`.
*/
private[akka] def internalResolveActorRef(path: String): ActorRef = path match {
case p if IgnoreActorRef.isIgnoreRefPath(p) => this.ignoreRef
case ActorPathExtractor(address, elems) =>
if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems)
else {
@ -542,6 +546,7 @@ private[akka] class RemoteActorRefProvider(
new EmptyLocalActorRef(this, rootPath, eventStream)
}
}
case _ =>
log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path)
deadLetters

View file

@ -455,7 +455,10 @@ lazy val actorTestkitTyped = akkaModule("akka-actor-testkit-typed")
.settings(Dependencies.actorTestkitTyped)
lazy val actorTypedTests = akkaModule("akka-actor-typed-tests")
.dependsOn(actorTyped, actorTestkitTyped % "compile->compile;test->test")
.dependsOn(
actorTyped,
actorTestkitTyped % "compile->compile;test->test"
)
.settings(AkkaBuild.mayChangeSettings)
.disablePlugins(MimaPlugin)
.enablePlugins(NoPublish)