diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala new file mode 100644 index 0000000000..596fb3d67d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus._ +import akka.remote.AddressUidExtension +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object RestartNode2SpecMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = 2s + akka.cluster.retry-unsuccessful-join-after = 3s + akka.remote.retry-gate-closed-for = 45s + akka.remote.log-remote-lifecycle-events = INFO + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class RestartNode2SpecMultiJvmNode1 extends RestartNode2SpecSpec +class RestartNode2SpecMultiJvmNode2 extends RestartNode2SpecSpec + +abstract class RestartNode2SpecSpec + extends MultiNodeSpec(RestartNode2SpecMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender { + + import RestartNode2SpecMultiJvmSpec._ + + @volatile var seedNode1Address: Address = _ + + // use a separate ActorSystem, to be able to simulate restart + lazy val seed1System = ActorSystem(system.name, system.settings.config) + + def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2) + + // this is the node that will attempt to re-join, keep gate times low so it can retry quickly + lazy val restartedSeed1System = ActorSystem(system.name, + ConfigFactory.parseString( + s""" + akka.remote.netty.tcp.port= ${seedNodes.head.port.get} + #akka.remote.retry-gate-closed-for = 1s + """). + withFallback(system.settings.config)) + + override def afterAll(): Unit = { + runOn(seed1) { + shutdown( + if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System) + } + super.afterAll() + } + + "Cluster seed nodes" must { + "be able to restart first seed node and join other seed nodes" taggedAs LongRunningTest in within(60.seconds) { + // seed1System is a separate ActorSystem, to be able to simulate restart + // we must transfer its address to seed2 + runOn(seed2) { + system.actorOf(Props(new Actor { + def receive = { + case a: Address ⇒ + seedNode1Address = a + sender() ! "ok" + } + }).withDeploy(Deploy.local), name = "address-receiver") + enterBarrier("seed1-address-receiver-ready") + } + + runOn(seed1) { + enterBarrier("seed1-address-receiver-ready") + seedNode1Address = Cluster(seed1System).selfAddress + List(seed2) foreach { r ⇒ + system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! seedNode1Address + expectMsg(5.seconds, "ok") + } + } + enterBarrier("seed1-address-transfered") + + // now we can join seed1System, seed2 together + + runOn(seed1) { + Cluster(seed1System).joinSeedNodes(seedNodes) + awaitAssert(Cluster(seed1System).readView.members.size should be(2)) + awaitAssert(Cluster(seed1System).readView.members.map(_.status) should be(Set(Up))) + } + runOn(seed2) { + cluster.joinSeedNodes(seedNodes) + awaitMembersUp(2) + } + enterBarrier("started") + + // shutdown seed1System + runOn(seed1) { + shutdown(seed1System, remainingOrDefault) + } + enterBarrier("seed1-shutdown") + + // then start restartedSeed1System, which has the same address as seed1System + runOn(seed1) { + Cluster(restartedSeed1System).joinSeedNodes(seedNodes) + within(30.seconds) { + awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(2)) + awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up))) + } + } + runOn(seed2) { + awaitMembersUp(2) + } + enterBarrier("seed1-restarted") + + } + + } +} \ No newline at end of file diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala new file mode 100644 index 0000000000..52d9d97eb6 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import akka.remote.transport.AssociationHandle + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction } +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await + +object RemoteGatePiercingSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = INFO + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 5 s + """))) + + nodeConfig(first)( + ConfigFactory.parseString("akka.remote.retry-gate-closed-for = 1 d # Keep it long")) + + nodeConfig(second)( + ConfigFactory.parseString("akka.remote.retry-gate-closed-for = 1 s # Keep it short")) + + testTransport(on = true) + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ context.system.terminate() + } + } + +} + +class RemoteGatePiercingSpecMultiJvmNode1 extends RemoteGatePiercingSpec +class RemoteGatePiercingSpecMultiJvmNode2 extends RemoteGatePiercingSpec + +abstract class RemoteGatePiercingSpec + extends MultiNodeSpec(RemoteGatePiercingSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteGatePiercingSpec._ + + override def initialParticipants = 2 + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "RemoteNodeRestartGate" must { + + "allow restarted node to pass through gate" taggedAs LongRunningTest in { + runOn(first) { + system.actorOf(Props[Subject], "subject") + + identify(second, "subject") + + enterBarrier("actors-communicate") + + EventFilter.warning(pattern = "address is now gated", occurrences = 1).intercept { + Await.result(RARP(system).provider.transport.managementCommand( + ForceDisassociateExplicitly(node(second).address, AssociationHandle.Unknown)), 3.seconds) + } + + enterBarrier("gated") + + enterBarrier("gate-pierced") + + } + + runOn(second) { + system.actorOf(Props[Subject], "subject") + + enterBarrier("actors-communicate") + + enterBarrier("gated") + + // Pierce the gate + within(30.seconds) { + awaitAssert { + identify(first, "subject") + } + } + + enterBarrier("gate-pierced") + + } + + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala new file mode 100644 index 0000000000..a8c913cf5e --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import akka.remote.transport.AssociationHandle + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction } +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await + +object RemoteNodeRestartGateSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = INFO + akka.remote.retry-gate-closed-for = 1d # Keep it long + """))) + + testTransport(on = true) + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ context.system.terminate() + case msg ⇒ sender() ! msg + } + } + +} + +class RemoteNodeRestartGateSpecMultiJvmNode1 extends RemoteNodeRestartGateSpec +class RemoteNodeRestartGateSpecMultiJvmNode2 extends RemoteNodeRestartGateSpec + +abstract class RemoteNodeRestartGateSpec + extends MultiNodeSpec(RemoteNodeRestartGateSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteNodeRestartGateSpec._ + + override def initialParticipants = 2 + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "RemoteNodeRestartGate" must { + + "allow restarted node to pass through gate" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + system.actorOf(Props[Subject], "subject") + + identify(second, "subject") + + EventFilter.warning(pattern = "address is now gated", occurrences = 1).intercept { + Await.result(RARP(system).provider.transport.managementCommand( + ForceDisassociateExplicitly(node(second).address, AssociationHandle.Unknown)), 3.seconds) + } + + enterBarrier("gated") + + testConductor.shutdown(second).await + + within(10.seconds) { + awaitAssert { + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject") + expectMsgType[ActorIdentity].ref.get + } + } + + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown" + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val firstAddress = node(first).address + + system.actorOf(Props[Subject], "subject") + + enterBarrier("gated") + + Await.ready(system.whenTerminated, 10.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.retry-gate-closed-for = 0.5 s + akka.remote.netty.tcp { + hostname = ${addr.host.get} + port = ${addr.port.get} + } + """).withFallback(system.settings.config)) + + val probe = TestProbe()(freshSystem) + + // Pierce the gate + within(30.seconds) { + awaitAssert { + freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref) + probe.expectMsgType[ActorIdentity].ref.get + } + } + + // Now the other system will be able to pass, too + freshSystem.actorOf(Props[Subject], "subject") + + Await.ready(freshSystem.whenTerminated, 30.seconds) + } + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index f4c7d6bebc..cea300ff9a 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -292,7 +292,7 @@ private[remote] object EndpointManager { class EndpointRegistry { private var addressToWritable = HashMap[Address, EndpointPolicy]() private var writableToAddress = HashMap[ActorRef, Address]() - private var addressToReadonly = HashMap[Address, ActorRef]() + private var addressToReadonly = HashMap[Address, (ActorRef, Int)]() private var readonlyToAddress = HashMap[ActorRef, Address]() def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef = @@ -312,8 +312,8 @@ private[remote] object EndpointManager { } } - def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = { - addressToReadonly += address -> endpoint + def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = { + addressToReadonly += address -> ((endpoint, uid)) readonlyToAddress += endpoint -> address endpoint } @@ -338,7 +338,7 @@ private[remote] object EndpointManager { case _ ⇒ false } - def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address) + def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address) def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint @@ -528,25 +528,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender() - case Quarantine(address, uidOption) ⇒ + case Quarantine(address, uidToQuarantineOption) ⇒ // Stop writers - endpoints.writableEndpointWithPolicyFor(address) match { - case Some(Pass(endpoint, _, _)) ⇒ + (endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match { + case (Some(Pass(endpoint, _, _)), None) ⇒ context.stop(endpoint) - if (uidOption.isEmpty) { - log.warning("Association to [{}] with unknown UID is reported as quarantined, but " + - "address cannot be quarantined without knowing the UID, gating instead for {} ms.", - address, settings.RetryGateClosedFor.toMillis) - endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor) - } + log.warning("Association to [{}] with unknown UID is reported as quarantined, but " + + "address cannot be quarantined without knowing the UID, gating instead for {} ms.", + address, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor) + case (Some(Pass(endpoint, Some(currentUid), _)), Some(quarantineUid)) if currentUid == quarantineUid ⇒ + context.stop(endpoint) + case _ ⇒ + // Do nothing, because either: + // A: we don't know yet the UID of the writer, it will be checked against current quarantine state later + // B: we know the UID, but it does not match with the UID to be quarantined + } + + // Stop inbound read-only associations + (endpoints.readOnlyEndpointFor(address), uidToQuarantineOption) match { + case (Some((endpoint, _)), None) ⇒ context.stop(endpoint) + case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid ⇒ context.stop(endpoint) case _ ⇒ // nothing to stop } - // Stop inbound read-only associations - endpoints.readOnlyEndpointFor(address) match { - case Some(endpoint) ⇒ context.stop(endpoint) - case _ ⇒ // nothing to stop - } - uidOption foreach { uid ⇒ + uidToQuarantineOption foreach { uid ⇒ endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) } @@ -630,10 +635,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match { case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { - case Some(endpoint) ⇒ + case Some((endpoint, _)) ⇒ pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) pendingReadHandoffs += endpoint -> handle endpoint ! EndpointWriter.TakeOver(handle, self) + endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { + case Some(Pass(ep, _, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate + case _ ⇒ + } case None ⇒ if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate(AssociationHandle.Quarantined) @@ -651,6 +660,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends pendingReadHandoffs.get(ep) foreach (_.disassociate()) pendingReadHandoffs += ep -> handle ep ! EndpointWriter.StopReading(ep, self) + ep ! ReliableDeliverySupervisor.Ungate } else { context.stop(ep) endpoints.unregisterEndpoint(ep) @@ -677,7 +687,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends if (writing) endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint) else { - endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) + endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) if (!endpoints.hasWritableEndpointFor(handle.remoteAddress)) endpoints.removePolicy(handle.remoteAddress) } @@ -743,7 +753,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Some(handle), writing = false, refuseUid = None) - endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) + endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) } } diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index d9306f9051..5fc5e9df1e 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -34,10 +34,10 @@ class EndpointRegistrySpec extends AkkaSpec { val reg = new EndpointRegistry reg.readOnlyEndpointFor(address1) should ===(None) - reg.registerReadOnlyEndpoint(address1, actorA) should ===(actorA) + reg.registerReadOnlyEndpoint(address1, actorA, 0) should ===(actorA) - reg.readOnlyEndpointFor(address1) should ===(Some(actorA)) - reg.writableEndpointWithPolicyFor(address1) should ===(None) + reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 0))) + reg.writableEndpointWithPolicyFor(address1) should be(None) reg.isWritable(actorA) should ===(false) reg.isReadOnly(actorA) should ===(true) reg.isQuarantined(address1, 42) should ===(false) @@ -48,10 +48,10 @@ class EndpointRegistrySpec extends AkkaSpec { reg.readOnlyEndpointFor(address1) should ===(None) reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.registerReadOnlyEndpoint(address1, actorA) should ===(actorA) + reg.registerReadOnlyEndpoint(address1, actorA, 1) should ===(actorA) reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB) - reg.readOnlyEndpointFor(address1) should ===(Some(actorA)) + reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 1))) reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None))) reg.isWritable(actorA) should ===(false) @@ -77,7 +77,7 @@ class EndpointRegistrySpec extends AkkaSpec { "remove read-only endpoints if marked as failed" in { val reg = new EndpointRegistry - reg.registerReadOnlyEndpoint(address1, actorA) + reg.registerReadOnlyEndpoint(address1, actorA, 2) reg.markAsFailed(actorA, Deadline.now) reg.readOnlyEndpointFor(address1) should ===(None) }