Fix intercept of Watch in use of safe remote features #27333 (#27334)

And added one more case for cluster watcher, watching remote no cluster watchee, and the reverse.
This commit is contained in:
Helena Edelson 2019-07-12 12:18:37 -07:00 committed by GitHub
parent d27be3fa48
commit a110be8f69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 298 additions and 19 deletions

View file

@ -160,7 +160,7 @@ private[cluster] class ClusterRemoteWatcher(
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
override protected def shouldWatch(watchee: InternalActorRef): Boolean =
clusterNodes(watchee.path.address)
clusterNodes(watchee.path.address) || super.shouldWatch(watchee)
/**
* When a cluster node is added this class takes over the

View file

@ -0,0 +1,201 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.actor.Terminated
import akka.remote.RARP
import akka.remote.RemoteWatcher.Heartbeat
import akka.remote.RemoteWatcher.Stats
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
class ClusterWatcherNoClusterWatcheeConfig(val useUnsafe: Boolean, artery: Boolean) extends MultiNodeConfig {
val clustered = role("clustered")
val remoting = role("remoting")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s"""
akka.remote.use-unsafe-remote-features-without-cluster = $useUnsafe
akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.enabled = $artery
akka.remote.artery.advanced.flight-recorder.enabled = off
akka.log-dead-letters = off
akka.loggers =["akka.testkit.TestEventListener"]
akka.actor.allow-java-serialization = on
""")))
nodeConfig(remoting)(ConfigFactory.parseString(s"""
akka.actor.provider = remote"""))
nodeConfig(clustered)(ConfigFactory.parseString("""
akka.actor.provider = cluster
akka.cluster.jmx.enabled = off"""))
}
class ClusterWatcherNoClusterWatcheeUnsafeArterySpecMultiJvmNode1
extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = true)
class ClusterWatcherNoClusterWatcheeUnsafeArterySpecMultiJvmNode2
extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = true)
class ClusterWatcherNoClusterWatcheeSafeArterySpecMultiJvmNode1
extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = false)
class ClusterWatcherNoClusterWatcheeSafeArterySpecMultiJvmNode2
extends ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe = false)
class ClusterWatcherNoClusterWatcheeUnsafeClassicSpecMultiJvmNode1
extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = true)
class ClusterWatcherNoClusterWatcheeUnsafeClassicSpecMultiJvmNode2
extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = true)
class ClusterWatcherNoClusterWatcheeSafeClassicSpecMultiJvmNode1
extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = false)
class ClusterWatcherNoClusterWatcheeSafeClassicSpecMultiJvmNode2
extends ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe = false)
abstract class ClusterWatcherNoClusterWatcheeArterySpec(useUnsafe: Boolean)
extends ClusterWatcherNoClusterWatcheeSpec(new ClusterWatcherNoClusterWatcheeConfig(useUnsafe, artery = true))
abstract class ClusterWatcherNoClusterWatcheeClassicSpec(useUnsafe: Boolean)
extends ClusterWatcherNoClusterWatcheeSpec(new ClusterWatcherNoClusterWatcheeConfig(useUnsafe, artery = true))
private object ClusterWatcherNoClusterWatcheeSpec {
final case class WatchIt(watchee: ActorRef)
case object Ack
final case class WrappedTerminated(t: Terminated)
class Listener(testActor: ActorRef) extends Actor {
def receive: Receive = {
case WatchIt(watchee) =>
context.watch(watchee)
sender() ! Ack
case t: Terminated =>
testActor.forward(WrappedTerminated(t))
}
}
}
abstract class ClusterWatcherNoClusterWatcheeSpec(multiNodeConfig: ClusterWatcherNoClusterWatcheeConfig)
extends MultiNodeSpec(multiNodeConfig)
with MultiNodeClusterSpec
with ImplicitSender
with ScalaFutures {
import ClusterWatcherNoClusterWatcheeSpec._
import multiNodeConfig._
override def initialParticipants: Int = roles.size
muteDeadLetters(Heartbeat.getClass)()
protected val probe = TestProbe()
protected def identify(role: RoleName, actorName: String, within: FiniteDuration = 10.seconds): ActorRef =
identifyWithPath(role, "user", actorName, within)
protected def identifyWithPath(
role: RoleName,
path: String,
actorName: String,
within: FiniteDuration = 10.seconds): ActorRef = {
system.actorSelection(node(role) / path / actorName) ! Identify(actorName)
val id = expectMsgType[ActorIdentity](within)
assert(id.ref.isDefined, s"Unable to Identify actor [$actorName] on node [$role].")
id.ref.get
}
private val provider = RARP(system).provider
s"Remoting with UseUnsafeRemoteFeaturesWithoutCluster enabled=$useUnsafe, " +
"watcher system using `cluster`, but watchee system using `remote`" must {
val send = if (system.settings.HasCluster || (!system.settings.HasCluster && useUnsafe)) "send" else "not send"
s"$send `Watch`/`Unwatch`/`Terminate` when watching from cluster to non-cluster remoting watchee" in {
runOn(remoting) {
system.actorOf(Props(classOf[Listener], probe.ref), "watchee")
enterBarrier("watchee-created")
enterBarrier("watcher-created")
}
runOn(clustered) {
enterBarrier("watchee-created")
val watcher = system.actorOf(Props(classOf[Listener], probe.ref), "watcher")
enterBarrier("watcher-created")
val watchee = identify(remoting, "watchee")
probe.send(watcher, WatchIt(watchee))
probe.expectMsg(1.second, Ack)
provider.remoteWatcher.get ! Stats
awaitAssert(expectMsgType[Stats].watchingRefs == Set((watchee, watcher)), 2.seconds)
}
enterBarrier("cluster-watching-remote")
runOn(remoting) {
system.stop(identify(remoting, "watchee"))
enterBarrier("watchee-stopped")
}
runOn(clustered) {
enterBarrier("watchee-stopped")
if (useUnsafe)
probe.expectMsgType[WrappedTerminated](2.seconds)
else
probe.expectNoMessage(2.seconds)
}
}
s"$send `Watch`/`Unwatch`/`Terminate` when watching from non-cluster remoting to cluster watchee" in {
runOn(clustered) {
system.actorOf(Props(classOf[Listener], probe.ref), "watchee2")
enterBarrier("watchee2-created")
enterBarrier("watcher2-created")
}
runOn(remoting) {
enterBarrier("watchee2-created")
val watchee = identify(clustered, "watchee2")
val watcher = system.actorOf(Props(classOf[Listener], probe.ref), "watcher2")
enterBarrier("watcher2-created")
probe.send(watcher, WatchIt(watchee))
probe.expectMsg(1.second, Ack)
if (useUnsafe) {
provider.remoteWatcher.get ! Stats
awaitAssert(expectMsgType[Stats].watchingRefs == Set((watchee, watcher)), 2.seconds)
}
}
runOn(clustered) {
system.stop(identify(clustered, "watchee2"))
enterBarrier("watchee2-stopped")
}
runOn(remoting) {
enterBarrier("watchee2-stopped")
if (useUnsafe)
probe.expectMsgType[WrappedTerminated](2.seconds)
else
probe.expectNoMessage(2.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -24,6 +24,7 @@ class AttemptSysMsgRedeliveryMultiJvmSpec(artery: Boolean) extends MultiNodeConf
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s"""
akka.remote.artery.enabled = $artery
akka.remote.use-unsafe-remote-features-without-cluster = on
""")).withFallback(RemotingMultiNodeSpec.commonConfig))
testTransport(on = true)

View file

@ -16,6 +16,7 @@ import akka.actor.Nobody
import akka.actor.PoisonPill
import akka.actor.Props
import akka.remote.RemoteNodeDeathWatchSpec.UnwatchIt
import akka.remote.RemoteNodeDeathWatchSpec.WatchIt
import akka.remote.RemoteWatcher.Stats
import akka.remote.routing.RemoteRouterConfig
import akka.remote.testconductor.RoleName
@ -120,6 +121,28 @@ abstract class RemotingFeaturesSafeSpec
expectMsgType[ActorIdentity].ref.get.path.address.hasGlobalScope shouldBe false
}
}
"not receive Terminated on stop with watch attempt" in {
runOn(second) {
system.actorOf(Props(classOf[ProbeActor], probe.ref), "terminating")
}
runOn(first) {
val watcher = system.actorOf(Props(classOf[ProbeActor], probe.ref), "watch-terminating")
val terminating = identify(second, "terminating")
watcher ! WatchIt(terminating)
}
enterBarrier("watch-t-attempted")
runOn(second) {
val terminating = identify(second, "terminating")
system.stop(terminating)
}
enterBarrier("t-stopped")
runOn(first) {
probe.expectNoMessage(2.seconds)
}
}
}
}

View file

@ -26,6 +26,7 @@ object SurviveNetworkPartitionSpec extends MultiNodeConfig {
akka.loglevel = INFO
akka.remote.artery.enabled = on
akka.remote.artery.advanced.give-up-system-message-after = 4s
akka.remote.use-unsafe-remote-features-without-cluster = on
"""))
.withFallback(RemotingMultiNodeSpec.commonConfig))

View file

@ -688,21 +688,30 @@ private[akka] class RemoteActorRef private[akka] (
def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef): Boolean = {
// If watchee != this then watcher should == this. This is a reverse watch, and it is not intercepted
// If watchee == this, only the watches from remoteWatcher are sent on the wire, on behalf of other watchers
val intercept = provider.remoteWatcher.exists(remoteWatcher => watcher != remoteWatcher) && watchee == this
if (intercept) provider.warnIfUnsafeDeathwatchWithoutCluster(watchee, watcher, "remote Watch/Unwatch")
intercept
provider.remoteWatcher.exists(remoteWatcher => watcher != remoteWatcher) && watchee == this
}
def sendSystemMessage(message: SystemMessage): Unit =
try {
//send to remote, unless watch message is intercepted by the remoteWatcher
message match {
case Watch(watchee, watcher) if isWatchIntercepted(watchee, watcher) =>
provider.remoteWatcher.foreach(_ ! RemoteWatcher.WatchRemote(watchee, watcher))
case Watch(watchee, watcher) =>
if (isWatchIntercepted(watchee, watcher))
provider.remoteWatcher.foreach(_ ! RemoteWatcher.WatchRemote(watchee, watcher))
else if (provider.remoteWatcher.isDefined)
remote.send(message, OptionVal.None, this)
else
provider.warnIfUnsafeDeathwatchWithoutCluster(watchee, watcher, "remote Watch")
//Unwatch has a different signature, need to pattern match arguments against InternalActorRef
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) =>
provider.remoteWatcher.foreach(_ ! RemoteWatcher.UnwatchRemote(watchee, watcher))
case _ => remote.send(message, OptionVal.None, this)
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) =>
if (isWatchIntercepted(watchee, watcher))
provider.remoteWatcher.foreach(_ ! RemoteWatcher.UnwatchRemote(watchee, watcher))
else if (provider.remoteWatcher.isDefined)
remote.send(message, OptionVal.None, this)
case _ =>
remote.send(message, OptionVal.None, this)
}
} catch handleException(message, Actor.noSender)

View file

@ -208,7 +208,7 @@ private[akka] class RemoteWatcher(
@InternalApi protected def shouldWatch(@unused watchee: InternalActorRef): Boolean = {
// In this it is unnecessary if only created by RARP, but cluster needs it.
// Cleaner than overriding Cluster watcher addWatch/removeWatch just for one boolean test
remoteProvider.hasClusterOrUseUnsafe
remoteProvider.remoteSettings.UseUnsafeRemoteFeaturesWithoutCluster
}
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {

View file

@ -14,7 +14,9 @@ import akka.actor.ActorIdentity
import akka.actor.ActorSystem
import akka.actor.Identify
import akka.actor.RootActorPath
import akka.remote.{ AddressUidExtension, RARP, UniqueAddress }
import akka.remote.AddressUidExtension
import akka.remote.RARP
import akka.remote.UniqueAddress
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
@ -28,25 +30,29 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestActors
import akka.testkit.TestEvent
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import akka.util.OptionVal
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
object SystemMessageDeliverySpec {
case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage
val config = ConfigFactory.parseString(s"""
val safe = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.remote.artery.advanced.stop-idle-outbound-after = 1000 ms
akka.remote.artery.advanced.inject-handshake-interval = 500 ms
akka.remote.watch-failure-detector.heartbeat-interval = 2 s
akka.remote.artery.log-received-messages = on
akka.remote.artery.log-sent-messages = on
""".stripMargin).withFallback(ArterySpecSupport.defaultConfig)
""").withFallback(ArterySpecSupport.defaultConfig)
val config =
ConfigFactory.parseString("akka.remote.use-unsafe-remote-features-without-cluster = on").withFallback(safe)
}
class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliverySpec.config) with ImplicitSender {
abstract class AbstractSystemMessageDeliverySpec(c: Config) extends ArteryMultiNodeSpec(c) with ImplicitSender {
import SystemMessageDeliverySpec._
val addressA = UniqueAddress(address(system), AddressUidExtension(system).longAddressUid)
@ -61,7 +67,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver
system.eventStream.publish(TestEvent.Mute(EventFilter.warning(pattern = ".*negative acknowledgement.*")))
systemB.eventStream.publish(TestEvent.Mute(EventFilter.warning(pattern = ".*negative acknowledgement.*")))
private def send(
protected def send(
sendCount: Int,
resendInterval: FiniteDuration,
outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = {
@ -71,7 +77,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver
.via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000))
}
private def inbound(inboundContext: InboundContext): Flow[OutboundEnvelope, InboundEnvelope, NotUsed] = {
protected def inbound(inboundContext: InboundContext): Flow[OutboundEnvelope, InboundEnvelope, NotUsed] = {
val recipient = OptionVal.None // not used
Flow[OutboundEnvelope]
.map(outboundEnvelope =>
@ -83,7 +89,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver
.via(new SystemMessageAcker(inboundContext))
}
private def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = {
protected def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = {
Flow[OutboundEnvelope].statefulMapConcat(() => {
var dropping = dropSeqNumbers
@ -102,10 +108,14 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver
})
}
private def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem =>
protected def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem =>
if (ThreadLocalRandom.current().nextDouble() < dropRate) Nil
else List(elem)
}
}
class SystemMessageDeliverySpec extends AbstractSystemMessageDeliverySpec(SystemMessageDeliverySpec.config) {
import SystemMessageDeliverySpec._
"System messages" must {
@ -309,3 +319,35 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver
}
}
}
class SystemMessageDeliverySafeSpec extends AbstractSystemMessageDeliverySpec(SystemMessageDeliverySpec.safe) {
"System messages without cluster" must {
"not be delivered when concurrent idle stopping" in {
val systemBRef = systemB.actorOf(TestActors.echoActorProps, "echo")
val remoteRef = {
system.actorSelection(rootB / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val idleTimeout =
RARP(system).provider.transport.asInstanceOf[ArteryTransport].settings.Advanced.StopIdleOutboundAfter
val rnd = ThreadLocalRandom.current()
(1 to 5).foreach { _ =>
(1 to 1).foreach { _ =>
watch(remoteRef)
unwatch(remoteRef)
}
Thread.sleep((idleTimeout - 10.millis).toMillis + rnd.nextInt(20))
}
watch(remoteRef)
remoteRef ! "ping"
expectMsg("ping")
systemB.stop(systemBRef)
expectNoMessage(2.seconds)
}
}
}

View file

@ -29,6 +29,7 @@ object ActorsLeakSpec {
akka.remote.classic.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.classic.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.classic.quarantine-after-silence = 3 s
akka.remote.use-unsafe-remote-features-without-cluster = on
akka.test.filter-leeway = 12 s
# test is using Java serialization and not priority to rewrite
akka.actor.allow-java-serialization = on

View file

@ -155,6 +155,7 @@ object StreamRefsSpec {
remote {
artery.canonical.port = 0
classic.netty.tcp.port = 0
use-unsafe-remote-features-without-cluster = on
}
}
""").withFallback(ConfigFactory.load())