rem #19780: Skip acks during connection handoff
* The problem: ACK that was targeted to an old incarnation
was sent to the new, restarted, system with same host:port, and
therefore resulting issues noticed as
"Error encountered while processing system message acknowledgement buffer: [-1 {}] ack: ACK[0, {}]"
when restarting actor system
* The reason:
1. The endpoint reader was about to send OutgoingAck to parent reader,
targeted to the old system.
2. At the same time there is an incoming connection from new system
that triggered TakeOver in the endpoint writer, i.e. replacing
the handle to the connection of the new system.
3. The OutgoingAck is received by the writer, which happily sends it
to the new handle, the new system.
* The solution: Ignore OutgoingAck during the handoff (TakeOver) process.
This commit is contained in:
parent
ed6acd63ec
commit
96b68f6437
2 changed files with 47 additions and 14 deletions
|
|
@ -3,21 +3,25 @@
|
|||
*/
|
||||
package akka.cluster
|
||||
|
||||
import language.postfixOps
|
||||
import scala.collection.immutable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.Identify
|
||||
import akka.actor.Props
|
||||
import akka.actor.RootActorPath
|
||||
import akka.actor.Terminated
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Address
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.actor.Deploy
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object RestartNodeMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -25,8 +29,27 @@ object RestartNodeMultiJvmSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 5s")).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.auto-down-unreachable-after = 5s
|
||||
#akka.remote.use-passive-connections = off
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
/**
|
||||
* This was used together with sleep in EndpointReader before deliverAndAck
|
||||
* to reproduce issue with misaligned ACKs when restarting system,
|
||||
* issue #19780
|
||||
*/
|
||||
class Watcher(a: Address, replyTo: ActorRef) extends Actor {
|
||||
context.actorSelection(RootActorPath(a) / "user" / "address-receiver") ! Identify(None)
|
||||
|
||||
def receive = {
|
||||
case ActorIdentity(None, Some(ref)) ⇒
|
||||
context.watch(ref)
|
||||
replyTo ! Done
|
||||
case t: Terminated ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RestartNodeMultiJvmNode1 extends RestartNodeSpec
|
||||
|
|
@ -61,7 +84,7 @@ abstract class RestartNodeSpec
|
|||
}
|
||||
|
||||
"Cluster nodes" must {
|
||||
"be able to restart and join again" taggedAs LongRunningTest in within(60 seconds) {
|
||||
"be able to restart and join again" taggedAs LongRunningTest in within(60.seconds) {
|
||||
// secondSystem is a separate ActorSystem, to be able to simulate restart
|
||||
// we must transfer its address to first
|
||||
runOn(first, third) {
|
||||
|
|
@ -80,7 +103,7 @@ abstract class RestartNodeSpec
|
|||
secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress
|
||||
List(first, third) foreach { r ⇒
|
||||
system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress
|
||||
expectMsg(5 seconds, "ok")
|
||||
expectMsg(5.seconds, "ok")
|
||||
}
|
||||
}
|
||||
enterBarrier("second-address-transfered")
|
||||
|
|
@ -99,6 +122,10 @@ abstract class RestartNodeSpec
|
|||
|
||||
// shutdown secondSystem
|
||||
runOn(second) {
|
||||
// send system message just before shutdown, reproducer for issue #19780
|
||||
secondSystem.actorOf(Props(classOf[Watcher], address(first), testActor), "testwatcher")
|
||||
expectMsg(Done)
|
||||
|
||||
shutdown(secondSystem, remaining)
|
||||
}
|
||||
enterBarrier("second-shutdown")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue