diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 661f5e15ce..c6f3efe285 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -3,21 +3,25 @@ */ package akka.cluster -import language.postfixOps import scala.collection.immutable -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Identify +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.cluster.MemberStatus._ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import scala.concurrent.duration._ -import akka.actor.Address -import akka.actor.ActorSystem -import akka.actor.Props -import akka.actor.Actor -import akka.actor.RootActorPath -import akka.cluster.MemberStatus._ -import akka.actor.Deploy +import com.typesafe.config.ConfigFactory object RestartNodeMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -25,8 +29,27 @@ object RestartNodeMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 5s")). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = 5s + #akka.remote.use-passive-connections = off + """)). withFallback(MultiNodeClusterSpec.clusterConfig)) + + /** + * This was used together with sleep in EndpointReader before deliverAndAck + * to reproduce issue with misaligned ACKs when restarting system, + * issue #19780 + */ + class Watcher(a: Address, replyTo: ActorRef) extends Actor { + context.actorSelection(RootActorPath(a) / "user" / "address-receiver") ! Identify(None) + + def receive = { + case ActorIdentity(None, Some(ref)) ⇒ + context.watch(ref) + replyTo ! Done + case t: Terminated ⇒ + } + } } class RestartNodeMultiJvmNode1 extends RestartNodeSpec @@ -61,7 +84,7 @@ abstract class RestartNodeSpec } "Cluster nodes" must { - "be able to restart and join again" taggedAs LongRunningTest in within(60 seconds) { + "be able to restart and join again" taggedAs LongRunningTest in within(60.seconds) { // secondSystem is a separate ActorSystem, to be able to simulate restart // we must transfer its address to first runOn(first, third) { @@ -80,7 +103,7 @@ abstract class RestartNodeSpec secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress List(first, third) foreach { r ⇒ system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress - expectMsg(5 seconds, "ok") + expectMsg(5.seconds, "ok") } } enterBarrier("second-address-transfered") @@ -99,6 +122,10 @@ abstract class RestartNodeSpec // shutdown secondSystem runOn(second) { + // send system message just before shutdown, reproducer for issue #19780 + secondSystem.actorOf(Props(classOf[Watcher], address(first), testActor), "testwatcher") + expectMsg(Done) + shutdown(secondSystem, remaining) } enterBarrier("second-shutdown") diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index fbcf16a2c1..48f06d2513 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -809,6 +809,12 @@ private[remote] class EndpointWriter( case s: Send ⇒ enqueueInBuffer(s) + + case OutboundAck(_) ⇒ + // Ignore outgoing acks during take over, since we might have + // replaced the handle with a connection to a new, restarted, system + // and the ack might be targeted to the old incarnation. + // See issue #19780 } override def unhandled(message: Any): Unit = message match {