port of some remote multi-node tests

This commit is contained in:
Patrik Nordwall 2016-06-02 07:21:47 +02:00
parent 7e1697b6cc
commit aab46199fd
8 changed files with 166 additions and 293 deletions

View file

@ -13,17 +13,39 @@ import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._ import akka.testkit._
import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.actor.PoisonPill import akka.actor.PoisonPill
import com.typesafe.config.ConfigFactory
object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig { class AttemptSysMsgRedeliveryMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(debugConfig(on = false)) commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
akka.remote.artery.enabled = $artery
""")))
testTransport(on = true) testTransport(on = true)
}
class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
// FIXME this test is failing for Artery, a DeathWatchNotification is not delivered as expected?
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
object AttemptSysMsgRedeliverySpec {
class Echo extends Actor { class Echo extends Actor {
def receive = { def receive = {
case m sender ! m case m sender ! m
@ -31,13 +53,11 @@ object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig {
} }
} }
class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec abstract class AttemptSysMsgRedeliverySpec(multiNodeConfig: AttemptSysMsgRedeliveryMultiJvmSpec)
class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec extends MultiNodeSpec(multiNodeConfig)
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliverySpec extends MultiNodeSpec(AttemptSysMsgRedeliveryMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout { with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import AttemptSysMsgRedeliveryMultiJvmSpec._ import multiNodeConfig._
import AttemptSysMsgRedeliverySpec._
def initialParticipants = roles.size def initialParticipants = roles.size

View file

@ -11,28 +11,38 @@ import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.testkit._ import akka.testkit._
import akka.actor.Identify import akka.actor.Identify
import akka.actor.ActorIdentity import akka.actor.ActorIdentity
import com.typesafe.config.ConfigFactory
object LookupRemoteActorMultiJvmSpec extends MultiNodeConfig { class LookupRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
class SomeActor extends Actor { commonConfig(debugConfig(on = false).withFallback(
def receive = { ConfigFactory.parseString(s"""
case "identify" sender() ! self akka.remote.artery.enabled = $artery
} """)))
}
commonConfig(debugConfig(on = false))
val master = role("master") val master = role("master")
val slave = role("slave") val slave = role("slave")
} }
class LookupRemoteActorMultiJvmNode1 extends LookupRemoteActorSpec class LookupRemoteActorMultiJvmNode1 extends LookupRemoteActorSpec(new LookupRemoteActorMultiJvmSpec(artery = false))
class LookupRemoteActorMultiJvmNode2 extends LookupRemoteActorSpec class LookupRemoteActorMultiJvmNode2 extends LookupRemoteActorSpec(new LookupRemoteActorMultiJvmSpec(artery = false))
class LookupRemoteActorSpec extends MultiNodeSpec(LookupRemoteActorMultiJvmSpec) class ArteryLookupRemoteActorMultiJvmNode1 extends LookupRemoteActorSpec(new LookupRemoteActorMultiJvmSpec(artery = true))
class ArteryLookupRemoteActorMultiJvmNode2 extends LookupRemoteActorSpec(new LookupRemoteActorMultiJvmSpec(artery = true))
object LookupRemoteActorSpec {
class SomeActor extends Actor {
def receive = {
case "identify" sender() ! self
}
}
}
abstract class LookupRemoteActorSpec(multiNodeConfig: LookupRemoteActorMultiJvmSpec) extends MultiNodeSpec(multiNodeConfig)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout { with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import LookupRemoteActorMultiJvmSpec._ import multiNodeConfig._
import LookupRemoteActorSpec._
def initialParticipants = 2 def initialParticipants = 2

View file

@ -14,22 +14,13 @@ import akka.testkit._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ import scala.concurrent.duration._
object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { class NewRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
class SomeActor extends Actor {
def receive = {
case "identify" sender() ! self
}
}
class SomeActorWithParam(ignored: String) extends Actor {
def receive = {
case "identify" sender() ! self
}
}
commonConfig(debugConfig(on = false).withFallback( commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))) ConfigFactory.parseString(s"""
akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.enabled = $artery
""")))
val master = role("master") val master = role("master")
val slave = role("slave") val slave = role("slave")
@ -43,12 +34,31 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig {
deployOnAll("""/service-hello2.remote = "@slave@" """) deployOnAll("""/service-hello2.remote = "@slave@" """)
} }
class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = false))
class NewRemoteActorMultiJvmNode2 extends NewRemoteActorSpec class NewRemoteActorMultiJvmNode2 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = false))
class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) class ArteryNewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = true))
class ArteryNewRemoteActorMultiJvmNode2 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = true))
object NewRemoteActorSpec {
class SomeActor extends Actor {
def receive = {
case "identify" sender() ! self
}
}
class SomeActorWithParam(ignored: String) extends Actor {
def receive = {
case "identify" sender() ! self
}
}
}
abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec)
extends MultiNodeSpec(multiNodeConfig)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout { with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import NewRemoteActorMultiJvmSpec._ import multiNodeConfig._
import NewRemoteActorSpec._
def initialParticipants = roles.size def initialParticipants = roles.size

View file

@ -7,32 +7,41 @@ import akka.testkit._
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
object PiercingShouldKeepQuarantineSpec extends MultiNodeConfig { class PiercingShouldKeepQuarantineConfig(artery: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
commonConfig(debugConfig(on = false).withFallback( commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(""" ConfigFactory.parseString(s"""
#akka.loglevel = INFO
#akka.remote.log-remote-lifecycle-events = INFO
akka.remote.retry-gate-closed-for = 0.5s akka.remote.retry-gate-closed-for = 0.5s
akka.remote.artery.enabled = $artery
"""))) """)))
}
class PiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec(
new PiercingShouldKeepQuarantineConfig(artery = false))
class PiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec(
new PiercingShouldKeepQuarantineConfig(artery = false))
class ArteryPiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec(
new PiercingShouldKeepQuarantineConfig(artery = true))
class ArteryPiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec(
new PiercingShouldKeepQuarantineConfig(artery = true))
object PiercingShouldKeepQuarantineSpec {
class Subject extends Actor { class Subject extends Actor {
def receive = { def receive = {
case "getuid" sender() ! AddressUidExtension(context.system).addressUid case "getuid" sender() ! AddressUidExtension(context.system).addressUid
} }
} }
} }
class PiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldKeepQuarantineConfig)
class PiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(multiNodeConfig)
abstract class PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(PiercingShouldKeepQuarantineSpec)
with STMultiNodeSpec with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import multiNodeConfig._
import PiercingShouldKeepQuarantineSpec._ import PiercingShouldKeepQuarantineSpec._
override def initialParticipants = roles.size override def initialParticipants = roles.size

View file

@ -17,46 +17,62 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.testkit.TestEvent._ import akka.testkit.TestEvent._
object RemoteDeploymentDeathWatchMultiJvmSpec extends MultiNodeConfig { class RemoteDeploymentDeathWatchMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(debugConfig(on = false).withFallback( commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(""" ConfigFactory.parseString(s"""
akka.loglevel = INFO akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.enabled = $artery
"""))) """)))
deployOn(second, """/hello.remote = "@third@" """) deployOn(second, """/hello.remote = "@third@" """)
}
// Several different variations of the test
class RemoteDeploymentDeathWatchFastMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = false)
class RemoteDeploymentDeathWatchFastMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = false)
class RemoteDeploymentDeathWatchFastMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = false)
class ArteryRemoteDeploymentDeathWatchFastMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = true)
class ArteryRemoteDeploymentDeathWatchFastMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = true)
class ArteryRemoteDeploymentDeathWatchFastMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchFastSpec(artery = true)
abstract class RemoteDeploymentNodeDeathWatchFastSpec(artery: Boolean) extends RemoteDeploymentDeathWatchSpec(
new RemoteDeploymentDeathWatchMultiJvmSpec(artery)) {
override def scenario = "fast"
}
class RemoteDeploymentDeathWatchSlowMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = false)
class RemoteDeploymentDeathWatchSlowMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = false)
class RemoteDeploymentDeathWatchSlowMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = false)
class ArteryRemoteDeploymentDeathWatchSlowMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = true)
class ArteryRemoteDeploymentDeathWatchSlowMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = true)
class ArteryRemoteDeploymentDeathWatchSlowMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchSlowSpec(artery = true)
abstract class RemoteDeploymentNodeDeathWatchSlowSpec(artery: Boolean) extends RemoteDeploymentDeathWatchSpec(
new RemoteDeploymentDeathWatchMultiJvmSpec(artery)) {
override def scenario = "slow"
override def sleep(): Unit = Thread.sleep(3000)
}
object RemoteDeploymentDeathWatchSpec {
class Hello extends Actor { class Hello extends Actor {
def receive = Actor.emptyBehavior def receive = Actor.emptyBehavior
} }
} }
// Several different variations of the test abstract class RemoteDeploymentDeathWatchSpec(multiNodeConfig: RemoteDeploymentDeathWatchMultiJvmSpec)
extends MultiNodeSpec(multiNodeConfig)
class RemoteDeploymentDeathWatchFastMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchFastSpec
class RemoteDeploymentDeathWatchFastMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchFastSpec
class RemoteDeploymentDeathWatchFastMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchFastSpec
abstract class RemoteDeploymentNodeDeathWatchFastSpec extends RemoteDeploymentDeathWatchSpec {
override def scenario = "fast"
}
class RemoteDeploymentDeathWatchSlowMultiJvmNode1 extends RemoteDeploymentNodeDeathWatchSlowSpec
class RemoteDeploymentDeathWatchSlowMultiJvmNode2 extends RemoteDeploymentNodeDeathWatchSlowSpec
class RemoteDeploymentDeathWatchSlowMultiJvmNode3 extends RemoteDeploymentNodeDeathWatchSlowSpec
abstract class RemoteDeploymentNodeDeathWatchSlowSpec extends RemoteDeploymentDeathWatchSpec {
override def scenario = "slow"
override def sleep(): Unit = Thread.sleep(3000)
}
abstract class RemoteDeploymentDeathWatchSpec
extends MultiNodeSpec(RemoteDeploymentDeathWatchMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender { with STMultiNodeSpec with ImplicitSender {
import multiNodeConfig._
import RemoteDeploymentDeathWatchMultiJvmSpec._ import RemoteDeploymentDeathWatchSpec._
def scenario: String def scenario: String
// Possible to override to let them heartbeat for a while. // Possible to override to let them heartbeat for a while.

View file

@ -18,40 +18,53 @@ import akka.remote.testconductor.RoleName
import akka.actor.Identify import akka.actor.Identify
import scala.concurrent.Await import scala.concurrent.Await
object RemoteQuarantinePiercingSpec extends MultiNodeConfig { class RemoteQuarantinePiercingConfig(artery: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
commonConfig(debugConfig(on = false).withFallback( commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(""" ConfigFactory.parseString(s"""
akka.loglevel = INFO akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = INFO akka.remote.log-remote-lifecycle-events = INFO
akka.remote.artery.enabled = $artery
"""))) """)))
}
class RemoteQuarantinePiercingMultiJvmNode1 extends RemoteQuarantinePiercingSpec(
new RemoteQuarantinePiercingConfig(artery = false))
class RemoteQuarantinePiercingMultiJvmNode2 extends RemoteQuarantinePiercingSpec(
new RemoteQuarantinePiercingConfig(artery = false))
class ArteryRemoteQuarantinePiercingMultiJvmNode1 extends RemoteQuarantinePiercingSpec(
new RemoteQuarantinePiercingConfig(artery = true))
class ArteryRemoteQuarantinePiercingMultiJvmNode2 extends RemoteQuarantinePiercingSpec(
new RemoteQuarantinePiercingConfig(artery = true))
object RemoteQuarantinePiercingSpec {
class Subject extends Actor { class Subject extends Actor {
def receive = { def receive = {
case "shutdown" context.system.terminate() case "shutdown" context.system.terminate()
case "identify" sender() ! (AddressUidExtension(context.system).addressUid -> self) case "identify" sender() ! (AddressUidExtension(context.system).addressUid -> self)
} }
} }
} }
class RemoteQuarantinePiercingMultiJvmNode1 extends RemoteQuarantinePiercingSpec abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePiercingConfig)
class RemoteQuarantinePiercingMultiJvmNode2 extends RemoteQuarantinePiercingSpec extends MultiNodeSpec(multiNodeConfig)
abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuarantinePiercingSpec)
with STMultiNodeSpec with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import multiNodeConfig._
import RemoteQuarantinePiercingSpec._ import RemoteQuarantinePiercingSpec._
override def initialParticipants = roles.size override def initialParticipants = roles.size
def identify(role: RoleName, actorName: String): (Int, ActorRef) = { def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = {
within(timeout) {
system.actorSelection(node(role) / "user" / actorName) ! "identify" system.actorSelection(node(role) / "user" / actorName) ! "identify"
expectMsgType[(Int, ActorRef)] expectMsgType[(Int, ActorRef)]
} }
}
"RemoteNodeShutdownAndComesBack" must { "RemoteNodeShutdownAndComesBack" must {
@ -61,7 +74,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti
enterBarrier("actors-started") enterBarrier("actors-started")
// Acquire ActorRef from first system // Acquire ActorRef from first system
val (uidFirst, subjectFirst) = identify(second, "subject") val (uidFirst, subjectFirst) = identifyWithUid(second, "subject", 5.seconds)
enterBarrier("actor-identified") enterBarrier("actor-identified")
// Manually Quarantine the other system // Manually Quarantine the other system
@ -101,10 +114,8 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti
Await.ready(system.whenTerminated, 30.seconds) Await.ready(system.whenTerminated, 30.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp { akka.remote.netty.tcp.port = ${addr.port.get}
hostname = ${addr.host.get} akka.remote.artery.port = ${addr.port.get}
port = ${addr.port.get}
}
""").withFallback(system.settings.config)) """).withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject") freshSystem.actorOf(Props[Subject], "subject")

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit._
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.remote.testconductor.RoleName
import akka.remote.AddressUidExtension
import akka.remote.RARP
object PiercingShouldKeepQuarantineSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
#akka.loglevel = INFO
#akka.remote.log-remote-lifecycle-events = INFO
akka.remote.retry-gate-closed-for = 0.5s
akka.remote.artery.enabled = on
""")))
class Subject extends Actor {
def receive = {
case "getuid" sender() ! AddressUidExtension(context.system).addressUid
}
}
}
class PiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec
class PiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec
abstract class PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(PiercingShouldKeepQuarantineSpec)
with STMultiNodeSpec
with ImplicitSender {
import PiercingShouldKeepQuarantineSpec._
override def initialParticipants = roles.size
"While probing through the quarantine remoting" must {
"not lose existing quarantine marker" taggedAs LongRunningTest in {
runOn(first) {
enterBarrier("actors-started")
// Communicate with second system
system.actorSelection(node(second) / "user" / "subject") ! "getuid"
val uid = expectMsgType[Int](10.seconds)
enterBarrier("actor-identified")
// Manually Quarantine the other system
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
// Quarantining is not immediate
Thread.sleep(1000)
// Quarantine is up -- Should not be able to communicate with remote system any more
for (_ 1 to 4) {
system.actorSelection(node(second) / "user" / "subject") ! "getuid"
expectNoMsg(2.seconds)
}
enterBarrier("quarantine-intact")
}
runOn(second) {
system.actorOf(Props[Subject], "subject")
enterBarrier("actors-started")
enterBarrier("actor-identified")
enterBarrier("quarantine-intact")
}
}
}
}

View file

@ -1,119 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociate, Direction }
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.ActorIdentity
import akka.remote.testconductor.RoleName
import akka.actor.Identify
import scala.concurrent.Await
import akka.remote.AddressUidExtension
import akka.remote.RARP
object RemoteQuarantinePiercingSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = INFO
akka.remote.artery.enabled = on
""")))
class Subject extends Actor {
def receive = {
case "shutdown" context.system.terminate()
case "identify" sender() ! (AddressUidExtension(context.system).addressUid -> self)
}
}
}
class RemoteQuarantinePiercingSpecMultiJvmNode1 extends RemoteQuarantinePiercingSpec
class RemoteQuarantinePiercingSpecMultiJvmNode2 extends RemoteQuarantinePiercingSpec
abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuarantinePiercingSpec)
with STMultiNodeSpec
with ImplicitSender {
import RemoteQuarantinePiercingSpec._
override def initialParticipants = roles.size
def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = {
within(timeout) {
system.actorSelection(node(role) / "user" / actorName) ! "identify"
expectMsgType[(Int, ActorRef)]
}
}
"RemoteNodeShutdownAndComesBack" must {
"allow piercing through the quarantine when remote UID is new" taggedAs LongRunningTest in {
runOn(first) {
val secondAddress = node(second).address
enterBarrier("actors-started")
// Acquire ActorRef from first system
val (uidFirst, subjectFirst) = identifyWithUid(second, "subject", 5.seconds)
enterBarrier("actor-identified")
// Manually Quarantine the other system
RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst))
// Quarantine is up -- Cannot communicate with remote system any more
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"
expectNoMsg(2.seconds)
// Shut down the other system -- which results in restart (see runOn(second))
Await.result(testConductor.shutdown(second), 30.seconds)
// Now wait until second system becomes alive again
within(30.seconds) {
// retry because the Subject actor might not be started yet
awaitAssert {
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"
val (uidSecond, subjectSecond) = expectMsgType[(Int, ActorRef)](1.second)
uidSecond should not be (uidFirst)
subjectSecond should not be (subjectFirst)
}
}
// If we got here the Quarantine was successfully pierced since it is configured to last 1 day
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown"
}
runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.actorOf(Props[Subject], "subject")
enterBarrier("actors-started")
enterBarrier("actor-identified")
Await.ready(system.whenTerminated, 30.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.artery.port = ${addr.port.get}
""").withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject")
Await.ready(freshSystem.whenTerminated, 30.seconds)
}
}
}
}