Merge pull request #20093 from akka/wip-19780-ack-takeover-patriknw

rem #19780: Skip acks during connection handoff
This commit is contained in:
Patrik Nordwall 2016-03-22 14:01:29 +01:00
commit 3e7cd4d98c
2 changed files with 47 additions and 14 deletions

View file

@ -3,21 +3,25 @@
*/
package akka.cluster
import language.postfixOps
import scala.collection.immutable
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import scala.concurrent.duration._
import akka.Done
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Deploy
import akka.actor.Identify
import akka.actor.Props
import akka.actor.RootActorPath
import akka.actor.Terminated
import akka.cluster.MemberStatus._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.actor.RootActorPath
import akka.cluster.MemberStatus._
import akka.actor.Deploy
import com.typesafe.config.ConfigFactory
object RestartNodeMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -25,8 +29,27 @@ object RestartNodeMultiJvmSpec extends MultiNodeConfig {
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 5s")).
withFallback(ConfigFactory.parseString("""
akka.cluster.auto-down-unreachable-after = 5s
#akka.remote.use-passive-connections = off
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
/**
* This was used together with sleep in EndpointReader before deliverAndAck
* to reproduce issue with misaligned ACKs when restarting system,
* issue #19780
*/
class Watcher(a: Address, replyTo: ActorRef) extends Actor {
context.actorSelection(RootActorPath(a) / "user" / "address-receiver") ! Identify(None)
def receive = {
case ActorIdentity(None, Some(ref))
context.watch(ref)
replyTo ! Done
case t: Terminated
}
}
}
class RestartNodeMultiJvmNode1 extends RestartNodeSpec
@ -61,7 +84,7 @@ abstract class RestartNodeSpec
}
"Cluster nodes" must {
"be able to restart and join again" taggedAs LongRunningTest in within(60 seconds) {
"be able to restart and join again" taggedAs LongRunningTest in within(60.seconds) {
// secondSystem is a separate ActorSystem, to be able to simulate restart
// we must transfer its address to first
runOn(first, third) {
@ -80,7 +103,7 @@ abstract class RestartNodeSpec
secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress
List(first, third) foreach { r
system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress
expectMsg(5 seconds, "ok")
expectMsg(5.seconds, "ok")
}
}
enterBarrier("second-address-transfered")
@ -99,6 +122,10 @@ abstract class RestartNodeSpec
// shutdown secondSystem
runOn(second) {
// send system message just before shutdown, reproducer for issue #19780
secondSystem.actorOf(Props(classOf[Watcher], address(first), testActor), "testwatcher")
expectMsg(Done)
shutdown(secondSystem, remaining)
}
enterBarrier("second-shutdown")