diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 4f61d5a7a0..41263e2018 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -160,7 +160,7 @@ private[cluster] class ClusterRemoteWatcher( if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) override protected def shouldWatch(watchee: InternalActorRef): Boolean = - clusterNodes(watchee.path.address) + clusterNodes(watchee.path.address) || super.shouldWatch(watchee) /** * When a cluster node is added this class takes over the diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterWatcherNoClusterWatcheeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterWatcherNoClusterWatcheeSpec.scala new file mode 100644 index 0000000000..ede5e322a3 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterWatcherNoClusterWatcheeSpec.scala @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.actor.Terminated +import akka.remote.RARP +import akka.remote.RemoteWatcher.Heartbeat +import akka.remote.RemoteWatcher.Stats +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +class ClusterWatcherNoClusterWatcheeConfig(val useUnsafe: Boolean, artery: Boolean) extends MultiNodeConfig { + + val clustered = role("clustered") + val remoting = role("remoting") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s""" + akka.remote.use-unsafe-remote-features-without-cluster = $useUnsafe + akka.remote.log-remote-lifecycle-events = off + akka.remote.artery.enabled = $artery + akka.remote.artery.advanced.flight-recorder.enabled = off + akka.log-dead-letters = off + akka.loggers =["akka.testkit.TestEventListener"] + akka.actor.allow-java-serialization = on + """))) + + nodeConfig(remoting)(ConfigFactory.parseString(s""" + akka.actor.provider = remote""")) + + nodeConfig(clustered)(ConfigFactory.parseString(""" + akka.actor.provider = cluster + akka.cluster.jmx.enabled = off""")) + +} + +class ClusterWatcherNoClusterWatcheeUnsafeArterySpecMultiJvmNode1 + extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = true) +class ClusterWatcherNoClusterWatcheeUnsafeArterySpecMultiJvmNode2 + extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = true) + +class ClusterWatcherNoClusterWatcheeSafeArterySpecMultiJvmNode1 + extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = false) +class ClusterWatcherNoClusterWatcheeSafeArterySpecMultiJvmNode2 + extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = false) + +class ClusterWatcherNoClusterWatcheeUnsafeClassicSpecMultiJvmNode1 + extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = true) +class ClusterWatcherNoClusterWatcheeUnsafeClassicSpecMultiJvmNode2 + extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = true) + +class ClusterWatcherNoClusterWatcheeSafeClassicSpecMultiJvmNode1 + extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = false) +class ClusterWatcherNoClusterWatcheeSafeClassicSpecMultiJvmNode2 + extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = false) + +abstract class ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe: Boolean) + extends ClusterWatcherNoClusterWatcheeSpec(new ClusterWatcherNoClusterWatcheeConfig(useUnsafe, artery = true)) + +abstract class ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe: Boolean) + extends ClusterWatcherNoClusterWatcheeSpec(new ClusterWatcherNoClusterWatcheeConfig(useUnsafe, artery = true)) + +private object ClusterWatcherNoClusterWatcheeSpec { + final case class WatchIt(watchee: ActorRef) + case object Ack + final case class WrappedTerminated(t: Terminated) + + class Listener(testActor: ActorRef) extends Actor { + def receive: Receive = { + case WatchIt(watchee) => + context.watch(watchee) + sender() ! Ack + case t: Terminated => + testActor.forward(WrappedTerminated(t)) + } + } +} + +abstract class ClusterWatcherNoClusterWatcheeSpec(multiNodeConfig: ClusterWatcherNoClusterWatcheeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec + with ImplicitSender + with ScalaFutures { + + import ClusterWatcherNoClusterWatcheeSpec._ + import multiNodeConfig._ + + override def initialParticipants: Int = roles.size + + muteDeadLetters(Heartbeat.getClass)() + + protected val probe = TestProbe() + + protected def identify(role: RoleName, actorName: String, within: FiniteDuration = 10.seconds): ActorRef = + identifyWithPath(role, "user", actorName, within) + + protected def identifyWithPath( + role: RoleName, + path: String, + actorName: String, + within: FiniteDuration = 10.seconds): ActorRef = { + system.actorSelection(node(role) / path / actorName) ! Identify(actorName) + val id = expectMsgType[ActorIdentity](within) + assert(id.ref.isDefined, s"Unable to Identify actor [$actorName] on node [$role].") + id.ref.get + } + + private val provider = RARP(system).provider + + s"Remoting with UseUnsafeRemoteFeaturesWithoutCluster enabled=$useUnsafe, " + + "watcher system using `cluster`, but watchee system using `remote`" must { + + val send = if (system.settings.HasCluster || (!system.settings.HasCluster && useUnsafe)) "send" else "not send" + + s"$send `Watch`/`Unwatch`/`Terminate` when watching from cluster to non-cluster remoting watchee" in { + runOn(remoting) { + system.actorOf(Props(classOf[Listener], probe.ref), "watchee") + enterBarrier("watchee-created") + enterBarrier("watcher-created") + } + + runOn(clustered) { + enterBarrier("watchee-created") + val watcher = system.actorOf(Props(classOf[Listener], probe.ref), "watcher") + enterBarrier("watcher-created") + + val watchee = identify(remoting, "watchee") + probe.send(watcher, WatchIt(watchee)) + probe.expectMsg(1.second, Ack) + provider.remoteWatcher.get ! Stats + awaitAssert(expectMsgType[Stats].watchingRefs == Set((watchee, watcher)), 2.seconds) + } + enterBarrier("cluster-watching-remote") + + runOn(remoting) { + system.stop(identify(remoting, "watchee")) + enterBarrier("watchee-stopped") + } + + runOn(clustered) { + enterBarrier("watchee-stopped") + if (useUnsafe) + probe.expectMsgType[WrappedTerminated](2.seconds) + else + probe.expectNoMessage(2.seconds) + } + } + + s"$send `Watch`/`Unwatch`/`Terminate` when watching from non-cluster remoting to cluster watchee" in { + runOn(clustered) { + system.actorOf(Props(classOf[Listener], probe.ref), "watchee2") + enterBarrier("watchee2-created") + enterBarrier("watcher2-created") + } + + runOn(remoting) { + enterBarrier("watchee2-created") + val watchee = identify(clustered, "watchee2") + + val watcher = system.actorOf(Props(classOf[Listener], probe.ref), "watcher2") + enterBarrier("watcher2-created") + + probe.send(watcher, WatchIt(watchee)) + probe.expectMsg(1.second, Ack) + + if (useUnsafe) { + provider.remoteWatcher.get ! Stats + awaitAssert(expectMsgType[Stats].watchingRefs == Set((watchee, watcher)), 2.seconds) + } + } + + runOn(clustered) { + system.stop(identify(clustered, "watchee2")) + enterBarrier("watchee2-stopped") + } + + runOn(remoting) { + enterBarrier("watchee2-stopped") + if (useUnsafe) + probe.expectMsgType[WrappedTerminated](2.seconds) + else + probe.expectNoMessage(2.seconds) + } + + enterBarrier("done") + } + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala index e8c67ca709..a6741e3b69 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -24,6 +24,7 @@ class AttemptSysMsgRedeliveryMultiJvmSpec(artery: Boolean) extends MultiNodeConf commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery + akka.remote.use-unsafe-remote-features-without-cluster = on """)).withFallback(RemotingMultiNodeSpec.commonConfig)) testTransport(on = true) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteFeaturesSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteFeaturesSpec.scala index d0e172aa34..b5dba635c3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteFeaturesSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteFeaturesSpec.scala @@ -16,6 +16,7 @@ import akka.actor.Nobody import akka.actor.PoisonPill import akka.actor.Props import akka.remote.RemoteNodeDeathWatchSpec.UnwatchIt +import akka.remote.RemoteNodeDeathWatchSpec.WatchIt import akka.remote.RemoteWatcher.Stats import akka.remote.routing.RemoteRouterConfig import akka.remote.testconductor.RoleName @@ -120,6 +121,28 @@ abstract class RemotingFeaturesSafeSpec expectMsgType[ActorIdentity].ref.get.path.address.hasGlobalScope shouldBe false } } + + "not receive Terminated on stop with watch attempt" in { + runOn(second) { + system.actorOf(Props(classOf[ProbeActor], probe.ref), "terminating") + } + runOn(first) { + val watcher = system.actorOf(Props(classOf[ProbeActor], probe.ref), "watch-terminating") + val terminating = identify(second, "terminating") + watcher ! WatchIt(terminating) + } + enterBarrier("watch-t-attempted") + + runOn(second) { + val terminating = identify(second, "terminating") + system.stop(terminating) + } + enterBarrier("t-stopped") + + runOn(first) { + probe.expectNoMessage(2.seconds) + } + } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala index 4f95c921a9..23060d777c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala @@ -26,6 +26,7 @@ object SurviveNetworkPartitionSpec extends MultiNodeConfig { akka.loglevel = INFO akka.remote.artery.enabled = on akka.remote.artery.advanced.give-up-system-message-after = 4s + akka.remote.use-unsafe-remote-features-without-cluster = on """)) .withFallback(RemotingMultiNodeSpec.commonConfig)) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eb27d607c9..c2424b9cea 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -688,21 +688,30 @@ private[akka] class RemoteActorRef private[akka] ( def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef): Boolean = { // If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted // If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers - val intercept = provider.remoteWatcher.exists(remoteWatcher => watcher != remoteWatcher) && watchee == this - if (intercept) provider.warnIfUnsafeDeathwatchWithoutCluster(watchee, watcher, "remote Watch/Unwatch") - intercept + provider.remoteWatcher.exists(remoteWatcher => watcher != remoteWatcher) && watchee == this } def sendSystemMessage(message: SystemMessage): Unit = try { //send to remote, unless watch message is intercepted by the remoteWatcher message match { - case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) => - provider.remoteWatcher.foreach(_ ! RemoteWatcher.WatchRemote(watchee, watcher)) + case Watch(watchee, watcher) => + if (isWatchIntercepted(watchee, watcher)) + provider.remoteWatcher.foreach(_ ! RemoteWatcher.WatchRemote(watchee, watcher)) + else if (provider.remoteWatcher.isDefined) + remote.send(message, OptionVal.None, this) + else + provider.warnIfUnsafeDeathwatchWithoutCluster(watchee, watcher, "remote Watch") + //Unwatch has a different signature, need to pattern match arguments against InternalActorRef - case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) => - provider.remoteWatcher.foreach(_ ! RemoteWatcher.UnwatchRemote(watchee, watcher)) - case _ => remote.send(message, OptionVal.None, this) + case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) => + if (isWatchIntercepted(watchee, watcher)) + provider.remoteWatcher.foreach(_ ! RemoteWatcher.UnwatchRemote(watchee, watcher)) + else if (provider.remoteWatcher.isDefined) + remote.send(message, OptionVal.None, this) + + case _ => + remote.send(message, OptionVal.None, this) } } catch handleException(message, Actor.noSender) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 894d01aab6..c52c719510 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -208,7 +208,7 @@ private[akka] class RemoteWatcher( @InternalApi protected def shouldWatch(@unused watchee: InternalActorRef): Boolean = { // In this it is unnecessary if only created by RARP, but cluster needs it. // Cleaner than overriding Cluster watcher addWatch/removeWatch just for one boolean test - remoteProvider.hasClusterOrUseUnsafe + remoteProvider.remoteSettings.UseUnsafeRemoteFeaturesWithoutCluster } def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 2a83976a63..d40a97333a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -14,7 +14,9 @@ import akka.actor.ActorIdentity import akka.actor.ActorSystem import akka.actor.Identify import akka.actor.RootActorPath -import akka.remote.{ AddressUidExtension, RARP, UniqueAddress } +import akka.remote.AddressUidExtension +import akka.remote.RARP +import akka.remote.UniqueAddress import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -28,25 +30,29 @@ import akka.testkit.ImplicitSender import akka.testkit.TestActors import akka.testkit.TestEvent import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import akka.util.OptionVal +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory object SystemMessageDeliverySpec { case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage - val config = ConfigFactory.parseString(s""" + val safe = ConfigFactory.parseString(s""" akka.loglevel = INFO akka.remote.artery.advanced.stop-idle-outbound-after = 1000 ms akka.remote.artery.advanced.inject-handshake-interval = 500 ms akka.remote.watch-failure-detector.heartbeat-interval = 2 s akka.remote.artery.log-received-messages = on akka.remote.artery.log-sent-messages = on - """.stripMargin).withFallback(ArterySpecSupport.defaultConfig) + """).withFallback(ArterySpecSupport.defaultConfig) + val config = + ConfigFactory.parseString("akka.remote.use-unsafe-remote-features-without-cluster = on").withFallback(safe) } -class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliverySpec.config) with ImplicitSender { +abstract class AbstractSystemMessageDeliverySpec(c: Config) extends ArteryMultiNodeSpec(c) with ImplicitSender { + import SystemMessageDeliverySpec._ val addressA = UniqueAddress(address(system), AddressUidExtension(system).longAddressUid) @@ -61,7 +67,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver system.eventStream.publish(TestEvent.Mute(EventFilter.warning(pattern = ".*negative acknowledgement.*"))) systemB.eventStream.publish(TestEvent.Mute(EventFilter.warning(pattern = ".*negative acknowledgement.*"))) - private def send( + protected def send( sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = { @@ -71,7 +77,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver .via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000)) } - private def inbound(inboundContext: InboundContext): Flow[OutboundEnvelope, InboundEnvelope, NotUsed] = { + protected def inbound(inboundContext: InboundContext): Flow[OutboundEnvelope, InboundEnvelope, NotUsed] = { val recipient = OptionVal.None // not used Flow[OutboundEnvelope] .map(outboundEnvelope => @@ -83,7 +89,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver .via(new SystemMessageAcker(inboundContext)) } - private def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = { + protected def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = { Flow[OutboundEnvelope].statefulMapConcat(() => { var dropping = dropSeqNumbers @@ -102,10 +108,14 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver }) } - private def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem => + protected def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem => if (ThreadLocalRandom.current().nextDouble() < dropRate) Nil else List(elem) } +} + +class SystemMessageDeliverySpec extends AbstractSystemMessageDeliverySpec(SystemMessageDeliverySpec.config) { + import SystemMessageDeliverySpec._ "System messages" must { @@ -309,3 +319,35 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver } } } + +class SystemMessageDeliverySafeSpec extends AbstractSystemMessageDeliverySpec(SystemMessageDeliverySpec.safe) { + "System messages without cluster" must { + + "not be delivered when concurrent idle stopping" in { + val systemBRef = systemB.actorOf(TestActors.echoActorProps, "echo") + + val remoteRef = { + system.actorSelection(rootB / "user" / "echo") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val idleTimeout = + RARP(system).provider.transport.asInstanceOf[ArteryTransport].settings.Advanced.StopIdleOutboundAfter + val rnd = ThreadLocalRandom.current() + + (1 to 5).foreach { _ => + (1 to 1).foreach { _ => + watch(remoteRef) + unwatch(remoteRef) + } + Thread.sleep((idleTimeout - 10.millis).toMillis + rnd.nextInt(20)) + } + + watch(remoteRef) + remoteRef ! "ping" + expectMsg("ping") + systemB.stop(systemBRef) + expectNoMessage(2.seconds) + } + } +} diff --git a/akka-remote/src/test/scala/akka/remote/classic/ActorsLeakSpec.scala b/akka-remote/src/test/scala/akka/remote/classic/ActorsLeakSpec.scala index 670d3ab15e..250ba5f667 100644 --- a/akka-remote/src/test/scala/akka/remote/classic/ActorsLeakSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/classic/ActorsLeakSpec.scala @@ -29,6 +29,7 @@ object ActorsLeakSpec { akka.remote.classic.transport-failure-detector.heartbeat-interval = 1 s akka.remote.classic.transport-failure-detector.acceptable-heartbeat-pause = 3 s akka.remote.classic.quarantine-after-silence = 3 s + akka.remote.use-unsafe-remote-features-without-cluster = on akka.test.filter-leeway = 12 s # test is using Java serialization and not priority to rewrite akka.actor.allow-java-serialization = on diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index 1cf5e3c897..a5cc9f213d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -155,6 +155,7 @@ object StreamRefsSpec { remote { artery.canonical.port = 0 classic.netty.tcp.port = 0 + use-unsafe-remote-features-without-cluster = on } } """).withFallback(ConfigFactory.load())