From 96b68f6437e45908bad1157babc5432147286cc4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 Mar 2016 08:41:11 +0100 Subject: [PATCH] rem #19780: Skip acks during connection handoff * The problem: ACK that was targeted to an old incarnation was sent to the new, restarted, system with same host:port, and therefore resulting issues noticed as "Error encountered while processing system message acknowledgement buffer: [-1 {}] ack: ACK[0, {}]" when restarting actor system * The reason: 1. The endpoint reader was about to send OutgoingAck to parent reader, targeted to the old system. 2. At the same time there is an incoming connection from new system that triggered TakeOver in the endpoint writer, i.e. replacing the handle to the connection of the new system. 3. The OutgoingAck is received by the writer, which happily sends it to the new handle, the new system. * The solution: Ignore OutgoingAck during the handoff (TakeOver) process. --- .../scala/akka/cluster/RestartNodeSpec.scala | 55 ++++++++++++++----- .../src/main/scala/akka/remote/Endpoint.scala | 6 ++ 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 661f5e15ce..c6f3efe285 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -3,21 +3,25 @@ */ package akka.cluster -import language.postfixOps import scala.collection.immutable -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Identify +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.cluster.MemberStatus._ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import scala.concurrent.duration._ -import akka.actor.Address -import akka.actor.ActorSystem -import akka.actor.Props -import akka.actor.Actor -import akka.actor.RootActorPath -import akka.cluster.MemberStatus._ -import akka.actor.Deploy +import com.typesafe.config.ConfigFactory object RestartNodeMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -25,8 +29,27 @@ object RestartNodeMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 5s")). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = 5s + #akka.remote.use-passive-connections = off + """)). withFallback(MultiNodeClusterSpec.clusterConfig)) + + /** + * This was used together with sleep in EndpointReader before deliverAndAck + * to reproduce issue with misaligned ACKs when restarting system, + * issue #19780 + */ + class Watcher(a: Address, replyTo: ActorRef) extends Actor { + context.actorSelection(RootActorPath(a) / "user" / "address-receiver") ! Identify(None) + + def receive = { + case ActorIdentity(None, Some(ref)) ⇒ + context.watch(ref) + replyTo ! Done + case t: Terminated ⇒ + } + } } class RestartNodeMultiJvmNode1 extends RestartNodeSpec @@ -61,7 +84,7 @@ abstract class RestartNodeSpec } "Cluster nodes" must { - "be able to restart and join again" taggedAs LongRunningTest in within(60 seconds) { + "be able to restart and join again" taggedAs LongRunningTest in within(60.seconds) { // secondSystem is a separate ActorSystem, to be able to simulate restart // we must transfer its address to first runOn(first, third) { @@ -80,7 +103,7 @@ abstract class RestartNodeSpec secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress List(first, third) foreach { r ⇒ system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress - expectMsg(5 seconds, "ok") + expectMsg(5.seconds, "ok") } } enterBarrier("second-address-transfered") @@ -99,6 +122,10 @@ abstract class RestartNodeSpec // shutdown secondSystem runOn(second) { + // send system message just before shutdown, reproducer for issue #19780 + secondSystem.actorOf(Props(classOf[Watcher], address(first), testActor), "testwatcher") + expectMsg(Done) + shutdown(secondSystem, remaining) } enterBarrier("second-shutdown") diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index fbcf16a2c1..48f06d2513 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -809,6 +809,12 @@ private[remote] class EndpointWriter( case s: Send ⇒ enqueueInBuffer(s) + + case OutboundAck(_) ⇒ + // Ignore outgoing acks during take over, since we might have + // replaced the handle with a connection to a new, restarted, system + // and the ack might be targeted to the old incarnation. + // See issue #19780 } override def unhandled(message: Any): Unit = message match {