diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 44b2513fb7..eb43ea15bf 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -316,6 +316,26 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-4") } + "support proxy only mode" in within(10.seconds) { + runOn(second) { + val proxy = system.actorOf(ShardRegion.proxyProps( + typeName = "counter", + role = None, + coordinatorPath = "/user/counterCoordinator/singleton/coordinator", + retryInterval = 1.second, + bufferSize = 1000, + idExtractor = idExtractor, + shardResolver = shardResolver), + name = "regionProxy") + + proxy ! Get(1) + expectMsg(2) + proxy ! Get(2) + expectMsg(4) + } + enterBarrier("after-5") + } + "failover shards on crashed node" in within(30 seconds) { // mute logging of deadLetters during shutdown of systems if (!log.isDebugEnabled) @@ -346,7 +366,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - enterBarrier("after-5") + enterBarrier("after-6") } "use third and fourth node" in within(15 seconds) { @@ -396,7 +416,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult lastSender.path should ===(region.path / "4" / "4") } - enterBarrier("after-6") + enterBarrier("after-7") } "recover coordinator state after coordinator crash" in within(60 seconds) { @@ -427,7 +447,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } - enterBarrier("after-7") + enterBarrier("after-8") } "rebalance to nodes with less shards" in within(60 seconds) { @@ -459,31 +479,11 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - enterBarrier("after-8") + enterBarrier("after-9") } } - "support proxy only mode" in within(10.seconds) { - runOn(sixth) { - val proxy = system.actorOf(ShardRegion.proxyProps( - typeName = "counter", - role = None, - coordinatorPath = "/user/counterCoordinator/singleton/coordinator", - retryInterval = 1.second, - bufferSize = 1000, - idExtractor = idExtractor, - shardResolver = shardResolver), - name = "regionProxy") - - proxy ! Get(1) - expectMsg(2) - proxy ! Get(2) - expectMsg(4) - } - enterBarrier("after-9") - } - "easy to use with extensions" in within(50.seconds) { runOn(third, fourth, fifth, sixth) { //#counter-start @@ -532,7 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - enterBarrier("after-9") + enterBarrier("after-10") } "easy API for starting" in within(50.seconds) { @@ -549,7 +549,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult counterRegionViaStart should equal(counterRegionViaGet) } - enterBarrier("after-10") + enterBarrier("after-11") } @@ -611,7 +611,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult expectMsg(3 seconds, ActorIdentity(3, None)) } - enterBarrier("after-11") + enterBarrier("after-12") } "permanently stop entries which passivate" in within(15.seconds) { @@ -682,7 +682,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult expectMsgType[ActorIdentity](3 seconds).ref should not be (None) } - enterBarrier("after-12") + enterBarrier("after-13") } "restart entries which stop without passivating" in within(50.seconds) { @@ -708,7 +708,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult }, 5.seconds, 500.millis) } - enterBarrier("after-13") + enterBarrier("after-14") } "be migrated to new regions upon region failure" in within(15.seconds) { @@ -748,7 +748,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult expectMsg(2) } - enterBarrier("after-14") + enterBarrier("after-15") } "ensure rebalance restarts shards" in within(50.seconds) { @@ -784,7 +784,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - enterBarrier("after-15") + enterBarrier("after-16") } } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index de7b5cf387..ae6ecda855 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -166,6 +166,9 @@ private[remote] object ReliableDeliverySupervisor { case object AttemptSysMsgRedelivery final case class GotUid(uid: Int, remoteAddres: Address) + case object IsIdle + case object Idle + def props( handleOrActive: Option[AkkaProtocolHandle], localAddress: Address, @@ -272,6 +275,7 @@ private[remote] class ReliableDeliverySupervisor( resendAll() writer ! FlushAndStop context.become(flushWait) + case IsIdle ⇒ // Do not reply, we will Terminate soon, or send a GotUid case s: Send ⇒ handleSend(s) case ack: Ack ⇒ @@ -311,6 +315,7 @@ private[remote] class ReliableDeliverySupervisor( def gated: Receive = { case Terminated(_) ⇒ context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + case IsIdle ⇒ sender() ! Idle case Ungate ⇒ if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { // If we talk to a system we have not talked to before (or has given up talking to in the past) stop @@ -335,6 +340,7 @@ private[remote] class ReliableDeliverySupervisor( } def idle: Receive = { + case IsIdle ⇒ sender() ! Idle case s: Send ⇒ writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished @@ -352,6 +358,7 @@ private[remote] class ReliableDeliverySupervisor( } def flushWait: Receive = { + case IsIdle ⇒ // Do not reply, we will Terminate soon, which will do the inbound connection unstashing case Terminated(_) ⇒ // Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down // and don't really know if they were properly delivered or not. diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 0baaefde02..f4c7d6bebc 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -22,7 +22,6 @@ import scala.util.{ Failure, Success } import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import akka.event.AddressTerminatedTopic /** * INTERNAL API @@ -421,10 +420,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]() var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]() - def handleStashedInbound(endpoint: ActorRef) { + def handleStashedInbound(endpoint: ActorRef, writerIsIdle: Boolean) { val stashed = stashedInbound.getOrElse(endpoint, Vector.empty) stashedInbound -= endpoint - stashed foreach (handleInboundAssociation _) + stashed foreach (handleInboundAssociation(_, writerIsIdle)) } def keepQuarantinedOr(remoteAddress: Address)(body: ⇒ Unit): Unit = endpoints.refuseUid(remoteAddress) match { @@ -446,7 +445,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } - AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ @@ -456,7 +454,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends remoteAddress, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } - AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ @@ -468,7 +465,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) case _ ⇒ // disabled } - AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ @@ -478,7 +474,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends remoteAddress, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } - AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case NonFatal(e) ⇒ @@ -589,18 +584,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ - handleInboundAssociation(ia) + handleInboundAssociation(ia, writerIsIdle = false) case EndpointWriter.StoppedReading(endpoint) ⇒ acceptPendingReader(takingOverFrom = endpoint) case Terminated(endpoint) ⇒ acceptPendingReader(takingOverFrom = endpoint) endpoints.unregisterEndpoint(endpoint) - handleStashedInbound(endpoint) + handleStashedInbound(endpoint, writerIsIdle = false) case EndpointWriter.TookOver(endpoint, handle) ⇒ removePendingReader(takingOverFrom = endpoint, withHandle = handle) case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒ endpoints.registerWritableEndpointUid(remoteAddress, uid) - handleStashedInbound(sender) + handleStashedInbound(sender(), writerIsIdle = false) + case ReliableDeliverySupervisor.Idle ⇒ + handleStashedInbound(sender(), writerIsIdle = true) case Prune ⇒ endpoints.prune() case ShutdownAndFlush ⇒ @@ -631,7 +628,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(_) ⇒ // why should we care now? } - def handleInboundAssociation(ia: InboundAssociation): Unit = ia match { + def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match { case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case Some(endpoint) ⇒ pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) @@ -642,7 +639,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { case Some(Pass(ep, None, _)) ⇒ - stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) + // Idle writer will never send a GotUid or a Terminated so we need to "provoke it" + // to get an unstash event + if (!writerIsIdle) { + ep ! ReliableDeliverySupervisor.IsIdle + stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) + } else + createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) case Some(Pass(ep, Some(uid), _)) ⇒ if (handle.handshakeInfo.uid == uid) { pendingReadHandoffs.get(ep) foreach (_.disassociate()) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index bcbec25a4a..6ee645c221 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -468,8 +468,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture } yield handle) recover { case c: CancellationException ⇒ throw new NettyTransportException("Connection was cancelled") with NoStackTrace - case u @ (_: UnknownHostException | _: SecurityException | _: ConnectException) ⇒ throw new InvalidAssociationException(u.getMessage, u.getCause) - case NonFatal(t) ⇒ throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace + case NonFatal(t) ⇒ throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index b541711aec..229bf2a120 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -4,8 +4,11 @@ package akka.remote import akka.actor._ +import akka.event.AddressTerminatedTopic import akka.pattern.ask -import akka.remote.transport.AssociationRegistry +import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload, HandleEvent } +import akka.remote.transport._ +import akka.remote.transport.Transport.{ AssociationEvent, InvalidAssociationException } import akka.testkit._ import akka.util.ByteString import com.typesafe.config._ @@ -541,12 +544,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to serialize a local actor ref from another actor system" in { val config = ConfigFactory.parseString(""" - # Additional internal serialization verification need so be off, otherwise it triggers two error messages - # instead of one: one for the internal check, and one for the actual remote send -- tripping off this test - akka.actor.serialize-messages = off - akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] - akka.remote.test.local-address = "test://other-system@localhost:12347" - """).withFallback(remoteSystem.settings.config) + # Additional internal serialization verification need so be off, otherwise it triggers two error messages + # instead of one: one for the internal check, and one for the actual remote send -- tripping off this test + akka.actor.serialize-messages = off + akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] + akka.remote.test.local-address = "test://other-system@localhost:12347" + """).withFallback(remoteSystem.settings.config) val otherSystem = ActorSystem("other-system", config) try { val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy") @@ -569,12 +572,147 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } } + "should not publish AddressTerminated even on InvalidAssociationExecptions" in { + val localAddress = Address("akka.test", "system1", "localhost", 1) + val rawLocalAddress = localAddress.copy(protocol = "test") + val remoteAddress = Address("akka.test", "system2", "localhost", 2) + + val config = ConfigFactory.parseString(s""" + akka.remote.enabled-transports = ["akka.remote.test"] + akka.remote.retry-gate-closed-for = 5s + + akka.remote.test { + registry-key = tFdVxq + local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}" + } + """).withFallback(remoteSystem.settings.config) + + val thisSystem = ActorSystem("this-system", config) + + try { + class HackyRef extends MinimalActorRef { + @volatile var lastMsg: AnyRef = null + + override def provider: ActorRefProvider = RARP(thisSystem).provider + + override def path: ActorPath = thisSystem / "user" / "evilref" + + override def isTerminated: Boolean = false + + override def !(message: Any)(implicit sender: ActorRef): Unit = lastMsg = message.asInstanceOf[AnyRef] + } + + val terminatedListener = new HackyRef + + // Set up all connection attempts to fail + val registry = AssociationRegistry.get("tFdVxq") + awaitCond(registry.transportsReady(rawLocalAddress)) + awaitCond { + registry.transportFor(rawLocalAddress) match { + case None ⇒ false + case Some((testTransport, _)) ⇒ + testTransport.associateBehavior.pushError(new InvalidAssociationException("Test connection error")) + true + } + } + + AddressTerminatedTopic(thisSystem).subscribe(terminatedListener) + + val probe = new TestProbe(thisSystem) + val otherSelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere")) + otherSelection.tell("ping", probe.ref) + probe.expectNoMsg(1 seconds) + + terminatedListener.lastMsg should be(null) + + } finally shutdown(thisSystem) + } + + "should stash inbound connections until UID is known for pending outbound" in { + val localAddress = Address("akka.test", "system1", "localhost", 1) + val rawLocalAddress = localAddress.copy(protocol = "test") + val remoteAddress = Address("akka.test", "system2", "localhost", 2) + val rawRemoteAddress = remoteAddress.copy(protocol = "test") + + val config = ConfigFactory.parseString(s""" + akka.remote.enabled-transports = ["akka.remote.test"] + akka.remote.retry-gate-closed-for = 5s + akka.remote.log-lifecylce-events = on + #akka.loglevel = DEBUG + + akka.remote.test { + registry-key = TRKAzR + local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}" + } + """).withFallback(remoteSystem.settings.config) + val thisSystem = ActorSystem("this-system", config) + muteSystem(thisSystem) + + try { + + // Set up a mock remote system using the test transport + val registry = AssociationRegistry.get("TRKAzR") + val remoteTransport = new TestTransport(rawRemoteAddress, registry) + val remoteTransportProbe = TestProbe() + + registry.registerTransport(remoteTransport, associationEventListenerFuture = Future.successful(new Transport.AssociationEventListener { + override def notify(ev: Transport.AssociationEvent): Unit = remoteTransportProbe.ref ! ev + })) + + val outboundHandle = new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false) + + // Hijack associations through the test transport + awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress)) + val testTransport = registry.transportFor(rawLocalAddress).get._1 + testTransport.writeBehavior.pushConstant(true) + + // Force an outbound associate on the real system (which we will hijack) + // we send no handshake packet, so this remains a pending connection + val dummySelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere")) + dummySelection.tell("ping", system.deadLetters) + + val remoteHandle = remoteTransportProbe.expectMsgType[Transport.InboundAssociation] + remoteHandle.association.readHandlerPromise.success(new HandleEventListener { + override def notify(ev: HandleEvent): Unit = () + }) + + // Now we initiate an emulated inbound connection to the real system + val inboundHandleProbe = TestProbe() + val inboundHandle = Await.result(remoteTransport.associate(rawLocalAddress), 3.seconds) + inboundHandle.readHandlerPromise.success(new AssociationHandle.HandleEventListener { + override def notify(ev: HandleEvent): Unit = inboundHandleProbe.ref ! ev + }) + + awaitAssert { + registry.getRemoteReadHandlerFor(inboundHandle.asInstanceOf[TestAssociationHandle]).get + } + + val handshakePacket = AkkaPduProtobufCodec.constructAssociate(HandshakeInfo(rawRemoteAddress, uid = 0, cookie = None)) + val brokenPacket = AkkaPduProtobufCodec.constructPayload(ByteString(0, 1, 2, 3, 4, 5, 6)) + + // Finish the inbound handshake so now it is handed up to Remoting + inboundHandle.write(handshakePacket) + // Now bork the connection with a malformed packet that can only signal an error if the Endpoint is already registered + // but not while it is stashed + inboundHandle.write(brokenPacket) + + // No disassociation now, the connection is still stashed + inboundHandleProbe.expectNoMsg(1.second) + + // Finish the handshake for the outbound connection. This will unstash the inbound pending connection. + remoteHandle.association.write(handshakePacket) + + inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated] + } finally shutdown(thisSystem) + + } + "be able to connect to system even if it's not there at first" in { val config = ConfigFactory.parseString(s""" - akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - akka.remote.netty.tcp.port = 0 - akka.remote.retry-gate-closed-for = 5s - """).withFallback(remoteSystem.settings.config) + akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + akka.remote.netty.tcp.port = 0 + akka.remote.retry-gate-closed-for = 5s + """).withFallback(remoteSystem.settings.config) val thisSystem = ActorSystem("this-system", config) try { muteSystem(thisSystem) @@ -582,8 +720,8 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val probeSender = probe.ref val otherAddress = temporaryServerAddress() val otherConfig = ConfigFactory.parseString(s""" - akka.remote.netty.tcp.port = ${otherAddress.getPort} - """).withFallback(config) + akka.remote.netty.tcp.port = ${otherAddress.getPort} + """).withFallback(config) val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", probeSender) probe.expectNoMsg(1.seconds) @@ -608,10 +746,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "allow other system to connect even if it's not there at first" in { val config = ConfigFactory.parseString(s""" - akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - akka.remote.netty.tcp.port = 0 - akka.remote.retry-gate-closed-for = 5s - """).withFallback(remoteSystem.settings.config) + akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + akka.remote.netty.tcp.port = 0 + akka.remote.retry-gate-closed-for = 5s + """).withFallback(remoteSystem.settings.config) val thisSystem = ActorSystem("this-system", config) try { muteSystem(thisSystem) @@ -620,8 +758,8 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D thisSystem.actorOf(Props[Echo2], "echo") val otherAddress = temporaryServerAddress() val otherConfig = ConfigFactory.parseString(s""" - akka.remote.netty.tcp.port = ${otherAddress.getPort} - """).withFallback(config) + akka.remote.netty.tcp.port = ${otherAddress.getPort} + """).withFallback(config) val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", thisSender) thisProbe.expectNoMsg(1.seconds)