Merge pull request #16884 from drewhk/wip-16505-forwardport-drewhk

=rem #16505: Do not publish AddressTerminated (missing fwd port)
This commit is contained in:
drewhk 2015-03-25 14:44:15 +01:00
commit d9db42b757
5 changed files with 211 additions and 64 deletions

View file

@ -316,6 +316,26 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-4") enterBarrier("after-4")
} }
"support proxy only mode" in within(10.seconds) {
runOn(second) {
val proxy = system.actorOf(ShardRegion.proxyProps(
typeName = "counter",
role = None,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,
shardResolver = shardResolver),
name = "regionProxy")
proxy ! Get(1)
expectMsg(2)
proxy ! Get(2)
expectMsg(4)
}
enterBarrier("after-5")
}
"failover shards on crashed node" in within(30 seconds) { "failover shards on crashed node" in within(30 seconds) {
// mute logging of deadLetters during shutdown of systems // mute logging of deadLetters during shutdown of systems
if (!log.isDebugEnabled) if (!log.isDebugEnabled)
@ -346,7 +366,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
} }
} }
enterBarrier("after-5") enterBarrier("after-6")
} }
"use third and fourth node" in within(15 seconds) { "use third and fourth node" in within(15 seconds) {
@ -396,7 +416,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
lastSender.path should ===(region.path / "4" / "4") lastSender.path should ===(region.path / "4" / "4")
} }
enterBarrier("after-6") enterBarrier("after-7")
} }
"recover coordinator state after coordinator crash" in within(60 seconds) { "recover coordinator state after coordinator crash" in within(60 seconds) {
@ -427,7 +447,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
} }
enterBarrier("after-7") enterBarrier("after-8")
} }
"rebalance to nodes with less shards" in within(60 seconds) { "rebalance to nodes with less shards" in within(60 seconds) {
@ -459,29 +479,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
} }
} }
enterBarrier("after-8")
}
}
"support proxy only mode" in within(10.seconds) {
runOn(sixth) {
val proxy = system.actorOf(ShardRegion.proxyProps(
typeName = "counter",
role = None,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,
shardResolver = shardResolver),
name = "regionProxy")
proxy ! Get(1)
expectMsg(2)
proxy ! Get(2)
expectMsg(4)
}
enterBarrier("after-9") enterBarrier("after-9")
}
} }
"easy to use with extensions" in within(50.seconds) { "easy to use with extensions" in within(50.seconds) {
@ -532,7 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
} }
} }
enterBarrier("after-9") enterBarrier("after-10")
} }
"easy API for starting" in within(50.seconds) { "easy API for starting" in within(50.seconds) {
@ -549,7 +549,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
counterRegionViaStart should equal(counterRegionViaGet) counterRegionViaStart should equal(counterRegionViaGet)
} }
enterBarrier("after-10") enterBarrier("after-11")
} }
@ -611,7 +611,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
expectMsg(3 seconds, ActorIdentity(3, None)) expectMsg(3 seconds, ActorIdentity(3, None))
} }
enterBarrier("after-11") enterBarrier("after-12")
} }
"permanently stop entries which passivate" in within(15.seconds) { "permanently stop entries which passivate" in within(15.seconds) {
@ -682,7 +682,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
expectMsgType[ActorIdentity](3 seconds).ref should not be (None) expectMsgType[ActorIdentity](3 seconds).ref should not be (None)
} }
enterBarrier("after-12") enterBarrier("after-13")
} }
"restart entries which stop without passivating" in within(50.seconds) { "restart entries which stop without passivating" in within(50.seconds) {
@ -708,7 +708,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}, 5.seconds, 500.millis) }, 5.seconds, 500.millis)
} }
enterBarrier("after-13") enterBarrier("after-14")
} }
"be migrated to new regions upon region failure" in within(15.seconds) { "be migrated to new regions upon region failure" in within(15.seconds) {
@ -748,7 +748,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
expectMsg(2) expectMsg(2)
} }
enterBarrier("after-14") enterBarrier("after-15")
} }
"ensure rebalance restarts shards" in within(50.seconds) { "ensure rebalance restarts shards" in within(50.seconds) {
@ -784,7 +784,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
} }
} }
enterBarrier("after-15") enterBarrier("after-16")
} }
} }
} }

View file

@ -166,6 +166,9 @@ private[remote] object ReliableDeliverySupervisor {
case object AttemptSysMsgRedelivery case object AttemptSysMsgRedelivery
final case class GotUid(uid: Int, remoteAddres: Address) final case class GotUid(uid: Int, remoteAddres: Address)
case object IsIdle
case object Idle
def props( def props(
handleOrActive: Option[AkkaProtocolHandle], handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address, localAddress: Address,
@ -272,6 +275,7 @@ private[remote] class ReliableDeliverySupervisor(
resendAll() resendAll()
writer ! FlushAndStop writer ! FlushAndStop
context.become(flushWait) context.become(flushWait)
case IsIdle // Do not reply, we will Terminate soon, or send a GotUid
case s: Send case s: Send
handleSend(s) handleSend(s)
case ack: Ack case ack: Ack
@ -311,6 +315,7 @@ private[remote] class ReliableDeliverySupervisor(
def gated: Receive = { def gated: Receive = {
case Terminated(_) case Terminated(_)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
case IsIdle sender() ! Idle
case Ungate case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
// If we talk to a system we have not talked to before (or has given up talking to in the past) stop // If we talk to a system we have not talked to before (or has given up talking to in the past) stop
@ -335,6 +340,7 @@ private[remote] class ReliableDeliverySupervisor(
} }
def idle: Receive = { def idle: Receive = {
case IsIdle sender() ! Idle
case s: Send case s: Send
writer = createWriter() writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished // Resending will be triggered by the incoming GotUid message after the connection finished
@ -352,6 +358,7 @@ private[remote] class ReliableDeliverySupervisor(
} }
def flushWait: Receive = { def flushWait: Receive = {
case IsIdle // Do not reply, we will Terminate soon, which will do the inbound connection unstashing
case Terminated(_) case Terminated(_)
// Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down // Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
// and don't really know if they were properly delivered or not. // and don't really know if they were properly delivered or not.

View file

@ -22,7 +22,6 @@ import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic
/** /**
* INTERNAL API * INTERNAL API
@ -421,10 +420,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]() var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]() var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
def handleStashedInbound(endpoint: ActorRef) { def handleStashedInbound(endpoint: ActorRef, writerIsIdle: Boolean) {
val stashed = stashedInbound.getOrElse(endpoint, Vector.empty) val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
stashedInbound -= endpoint stashedInbound -= endpoint
stashed foreach (handleInboundAssociation _) stashed foreach (handleInboundAssociation(_, writerIsIdle))
} }
def keepQuarantinedOr(remoteAddress: Address)(body: Unit): Unit = endpoints.refuseUid(remoteAddress) match { def keepQuarantinedOr(remoteAddress: Address)(body: Unit): Unit = endpoints.refuseUid(remoteAddress) match {
@ -446,7 +445,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy) remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage, causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
} }
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop Stop
case ShutDownAssociation(localAddress, remoteAddress, _) case ShutDownAssociation(localAddress, remoteAddress, _)
@ -456,7 +454,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis) remoteAddress, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
} }
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop Stop
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason)
@ -468,7 +465,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
case _ // disabled case _ // disabled
} }
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop Stop
case HopelessAssociation(localAddress, remoteAddress, None, _) case HopelessAssociation(localAddress, remoteAddress, None, _)
@ -478,7 +474,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
remoteAddress, settings.RetryGateClosedFor.toMillis) remoteAddress, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
} }
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop Stop
case NonFatal(e) case NonFatal(e)
@ -589,18 +584,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
case ia @ InboundAssociation(handle: AkkaProtocolHandle) case ia @ InboundAssociation(handle: AkkaProtocolHandle)
handleInboundAssociation(ia) handleInboundAssociation(ia, writerIsIdle = false)
case EndpointWriter.StoppedReading(endpoint) case EndpointWriter.StoppedReading(endpoint)
acceptPendingReader(takingOverFrom = endpoint) acceptPendingReader(takingOverFrom = endpoint)
case Terminated(endpoint) case Terminated(endpoint)
acceptPendingReader(takingOverFrom = endpoint) acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint) endpoints.unregisterEndpoint(endpoint)
handleStashedInbound(endpoint) handleStashedInbound(endpoint, writerIsIdle = false)
case EndpointWriter.TookOver(endpoint, handle) case EndpointWriter.TookOver(endpoint, handle)
removePendingReader(takingOverFrom = endpoint, withHandle = handle) removePendingReader(takingOverFrom = endpoint, withHandle = handle)
case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) case ReliableDeliverySupervisor.GotUid(uid, remoteAddress)
endpoints.registerWritableEndpointUid(remoteAddress, uid) endpoints.registerWritableEndpointUid(remoteAddress, uid)
handleStashedInbound(sender) handleStashedInbound(sender(), writerIsIdle = false)
case ReliableDeliverySupervisor.Idle
handleStashedInbound(sender(), writerIsIdle = true)
case Prune case Prune
endpoints.prune() endpoints.prune()
case ShutdownAndFlush case ShutdownAndFlush
@ -631,7 +628,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(_) // why should we care now? case Terminated(_) // why should we care now?
} }
def handleInboundAssociation(ia: InboundAssociation): Unit = ia match { def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match {
case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint) case Some(endpoint)
pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
@ -642,7 +639,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
handle.disassociate(AssociationHandle.Quarantined) handle.disassociate(AssociationHandle.Quarantined)
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep, None, _)) case Some(Pass(ep, None, _))
// Idle writer will never send a GotUid or a Terminated so we need to "provoke it"
// to get an unstash event
if (!writerIsIdle) {
ep ! ReliableDeliverySupervisor.IsIdle
stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
} else
createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
case Some(Pass(ep, Some(uid), _)) case Some(Pass(ep, Some(uid), _))
if (handle.handshakeInfo.uid == uid) { if (handle.handshakeInfo.uid == uid) {
pendingReadHandoffs.get(ep) foreach (_.disassociate()) pendingReadHandoffs.get(ep) foreach (_.disassociate())

View file

@ -468,7 +468,6 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture
} yield handle) recover { } yield handle) recover {
case c: CancellationException throw new NettyTransportException("Connection was cancelled") with NoStackTrace case c: CancellationException throw new NettyTransportException("Connection was cancelled") with NoStackTrace
case u @ (_: UnknownHostException | _: SecurityException | _: ConnectException) throw new InvalidAssociationException(u.getMessage, u.getCause)
case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace
} }
} }

View file

@ -4,8 +4,11 @@
package akka.remote package akka.remote
import akka.actor._ import akka.actor._
import akka.event.AddressTerminatedTopic
import akka.pattern.ask import akka.pattern.ask
import akka.remote.transport.AssociationRegistry import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload, HandleEvent }
import akka.remote.transport._
import akka.remote.transport.Transport.{ AssociationEvent, InvalidAssociationException }
import akka.testkit._ import akka.testkit._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config._ import com.typesafe.config._
@ -569,6 +572,141 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
} }
"should not publish AddressTerminated even on InvalidAssociationExecptions" in {
val localAddress = Address("akka.test", "system1", "localhost", 1)
val rawLocalAddress = localAddress.copy(protocol = "test")
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
val config = ConfigFactory.parseString(s"""
akka.remote.enabled-transports = ["akka.remote.test"]
akka.remote.retry-gate-closed-for = 5s
akka.remote.test {
registry-key = tFdVxq
local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}"
}
""").withFallback(remoteSystem.settings.config)
val thisSystem = ActorSystem("this-system", config)
try {
class HackyRef extends MinimalActorRef {
@volatile var lastMsg: AnyRef = null
override def provider: ActorRefProvider = RARP(thisSystem).provider
override def path: ActorPath = thisSystem / "user" / "evilref"
override def isTerminated: Boolean = false
override def !(message: Any)(implicit sender: ActorRef): Unit = lastMsg = message.asInstanceOf[AnyRef]
}
val terminatedListener = new HackyRef
// Set up all connection attempts to fail
val registry = AssociationRegistry.get("tFdVxq")
awaitCond(registry.transportsReady(rawLocalAddress))
awaitCond {
registry.transportFor(rawLocalAddress) match {
case None false
case Some((testTransport, _))
testTransport.associateBehavior.pushError(new InvalidAssociationException("Test connection error"))
true
}
}
AddressTerminatedTopic(thisSystem).subscribe(terminatedListener)
val probe = new TestProbe(thisSystem)
val otherSelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere"))
otherSelection.tell("ping", probe.ref)
probe.expectNoMsg(1 seconds)
terminatedListener.lastMsg should be(null)
} finally shutdown(thisSystem)
}
"should stash inbound connections until UID is known for pending outbound" in {
val localAddress = Address("akka.test", "system1", "localhost", 1)
val rawLocalAddress = localAddress.copy(protocol = "test")
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
val rawRemoteAddress = remoteAddress.copy(protocol = "test")
val config = ConfigFactory.parseString(s"""
akka.remote.enabled-transports = ["akka.remote.test"]
akka.remote.retry-gate-closed-for = 5s
akka.remote.log-lifecylce-events = on
#akka.loglevel = DEBUG
akka.remote.test {
registry-key = TRKAzR
local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}"
}
""").withFallback(remoteSystem.settings.config)
val thisSystem = ActorSystem("this-system", config)
muteSystem(thisSystem)
try {
// Set up a mock remote system using the test transport
val registry = AssociationRegistry.get("TRKAzR")
val remoteTransport = new TestTransport(rawRemoteAddress, registry)
val remoteTransportProbe = TestProbe()
registry.registerTransport(remoteTransport, associationEventListenerFuture = Future.successful(new Transport.AssociationEventListener {
override def notify(ev: Transport.AssociationEvent): Unit = remoteTransportProbe.ref ! ev
}))
val outboundHandle = new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false)
// Hijack associations through the test transport
awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress))
val testTransport = registry.transportFor(rawLocalAddress).get._1
testTransport.writeBehavior.pushConstant(true)
// Force an outbound associate on the real system (which we will hijack)
// we send no handshake packet, so this remains a pending connection
val dummySelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere"))
dummySelection.tell("ping", system.deadLetters)
val remoteHandle = remoteTransportProbe.expectMsgType[Transport.InboundAssociation]
remoteHandle.association.readHandlerPromise.success(new HandleEventListener {
override def notify(ev: HandleEvent): Unit = ()
})
// Now we initiate an emulated inbound connection to the real system
val inboundHandleProbe = TestProbe()
val inboundHandle = Await.result(remoteTransport.associate(rawLocalAddress), 3.seconds)
inboundHandle.readHandlerPromise.success(new AssociationHandle.HandleEventListener {
override def notify(ev: HandleEvent): Unit = inboundHandleProbe.ref ! ev
})
awaitAssert {
registry.getRemoteReadHandlerFor(inboundHandle.asInstanceOf[TestAssociationHandle]).get
}
val handshakePacket = AkkaPduProtobufCodec.constructAssociate(HandshakeInfo(rawRemoteAddress, uid = 0, cookie = None))
val brokenPacket = AkkaPduProtobufCodec.constructPayload(ByteString(0, 1, 2, 3, 4, 5, 6))
// Finish the inbound handshake so now it is handed up to Remoting
inboundHandle.write(handshakePacket)
// Now bork the connection with a malformed packet that can only signal an error if the Endpoint is already registered
// but not while it is stashed
inboundHandle.write(brokenPacket)
// No disassociation now, the connection is still stashed
inboundHandleProbe.expectNoMsg(1.second)
// Finish the handshake for the outbound connection. This will unstash the inbound pending connection.
remoteHandle.association.write(handshakePacket)
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
} finally shutdown(thisSystem)
}
"be able to connect to system even if it's not there at first" in { "be able to connect to system even if it's not there at first" in {
val config = ConfigFactory.parseString(s""" val config = ConfigFactory.parseString(s"""
akka.remote.enabled-transports = ["akka.remote.netty.tcp"] akka.remote.enabled-transports = ["akka.remote.netty.tcp"]