2012-09-12 11:18:42 +02:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2012-09-12 11:18:42 +02:00
|
|
|
*/
|
|
|
|
|
package akka.remote
|
|
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import akka.pattern.ask
|
2013-03-20 20:38:49 +13:00
|
|
|
import akka.remote.transport.AssociationRegistry
|
2012-09-12 11:18:42 +02:00
|
|
|
import akka.testkit._
|
2013-03-20 20:38:49 +13:00
|
|
|
import akka.util.ByteString
|
2012-09-12 11:18:42 +02:00
|
|
|
import com.typesafe.config._
|
2013-03-20 20:38:49 +13:00
|
|
|
import java.io.NotSerializableException
|
2012-09-12 11:18:42 +02:00
|
|
|
import scala.concurrent.Await
|
|
|
|
|
import scala.concurrent.Future
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
|
|
|
|
object RemotingSpec {
|
2013-01-25 15:03:52 +01:00
|
|
|
class Echo1 extends Actor {
|
2012-09-12 11:18:42 +02:00
|
|
|
var target: ActorRef = context.system.deadLetters
|
|
|
|
|
|
|
|
|
|
def receive = {
|
2013-01-25 15:03:52 +01:00
|
|
|
case (p: Props, n: String) ⇒ sender ! context.actorOf(Props[Echo1], n)
|
2012-09-12 11:18:42 +02:00
|
|
|
case ex: Exception ⇒ throw ex
|
|
|
|
|
case s: String ⇒ sender ! context.actorFor(s)
|
|
|
|
|
case x ⇒ target = sender; sender ! x
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart() {}
|
|
|
|
|
override def preRestart(cause: Throwable, msg: Option[Any]) {
|
|
|
|
|
target ! "preRestart"
|
|
|
|
|
}
|
|
|
|
|
override def postRestart(cause: Throwable) {}
|
|
|
|
|
override def postStop() {
|
|
|
|
|
target ! "postStop"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-01-25 15:03:52 +01:00
|
|
|
class Echo2 extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "ping" ⇒ sender ! (("pong", sender))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-12 11:18:42 +02:00
|
|
|
val cfg: Config = ConfigFactory parseString ("""
|
|
|
|
|
common-ssl-settings {
|
|
|
|
|
key-store = "%s"
|
|
|
|
|
trust-store = "%s"
|
|
|
|
|
key-store-password = "changeme"
|
|
|
|
|
trust-store-password = "changeme"
|
|
|
|
|
protocol = "TLSv1"
|
|
|
|
|
random-number-generator = "AES128CounterSecureRNG"
|
|
|
|
|
enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
|
|
|
|
|
}
|
|
|
|
|
|
2013-03-21 10:07:48 +01:00
|
|
|
common-netty-settings {
|
|
|
|
|
port = 0
|
|
|
|
|
hostname = "localhost"
|
|
|
|
|
server-socket-worker-pool.pool-size-max = 2
|
|
|
|
|
client-socket-worker-pool.pool-size-max = 2
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-12 11:18:42 +02:00
|
|
|
akka {
|
|
|
|
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
2013-03-21 10:07:48 +01:00
|
|
|
|
|
|
|
|
remote {
|
|
|
|
|
transport = "akka.remote.Remoting"
|
|
|
|
|
|
|
|
|
|
retry-latch-closed-for = 1 s
|
|
|
|
|
log-remote-lifecycle-events = on
|
|
|
|
|
|
|
|
|
|
enabled-transports = [
|
|
|
|
|
"akka.remote.test",
|
|
|
|
|
"akka.remote.netty.tcp",
|
|
|
|
|
"akka.remote.netty.udp",
|
|
|
|
|
"akka.remote.netty.ssl"
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
writer-dispatcher {
|
|
|
|
|
executor = "fork-join-executor"
|
|
|
|
|
fork-join-executor {
|
|
|
|
|
parallelism-min = 2
|
|
|
|
|
parallelism-max = 2
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
netty.tcp = ${common-netty-settings}
|
|
|
|
|
netty.udp = ${common-netty-settings}
|
|
|
|
|
netty.ssl = ${common-netty-settings}
|
|
|
|
|
netty.ssl.security = ${common-ssl-settings}
|
|
|
|
|
|
|
|
|
|
test {
|
2012-09-12 11:18:42 +02:00
|
|
|
transport-class = "akka.remote.transport.TestTransport"
|
2012-11-23 10:15:19 +01:00
|
|
|
applied-adapters = []
|
|
|
|
|
registry-key = aX33k0jWKg
|
|
|
|
|
local-address = "test://RemotingSpec@localhost:12345"
|
|
|
|
|
maximum-payload-bytes = 32000 bytes
|
|
|
|
|
scheme-identifier = test
|
2013-03-21 10:07:48 +01:00
|
|
|
}
|
2012-11-23 10:15:19 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
|
|
|
|
|
actor.deployment {
|
2013-01-24 04:28:21 -08:00
|
|
|
/blub.remote = "akka.test://remote-sys@localhost:12346"
|
|
|
|
|
/looker/child.remote = "akka.test://remote-sys@localhost:12346"
|
|
|
|
|
/looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345"
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
}
|
2013-01-17 16:19:31 +01:00
|
|
|
""".format(
|
2012-09-12 11:18:42 +02:00
|
|
|
getClass.getClassLoader.getResource("keystore").getPath,
|
|
|
|
|
getClass.getClassLoader.getResource("truststore").getPath))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
|
|
|
|
class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with DefaultTimeout {
|
|
|
|
|
|
|
|
|
|
import RemotingSpec._
|
|
|
|
|
|
|
|
|
|
val conf = ConfigFactory.parseString(
|
|
|
|
|
"""
|
2013-03-20 20:38:49 +13:00
|
|
|
akka.remote.test {
|
|
|
|
|
local-address = "test://remote-sys@localhost:12346"
|
|
|
|
|
maximum-payload-bytes = 48000 bytes
|
2012-11-23 10:15:19 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
""").withFallback(system.settings.config).resolve()
|
2013-03-13 16:01:57 +01:00
|
|
|
val otherSystem = ActorSystem("remote-sys", conf)
|
2012-09-12 11:18:42 +02:00
|
|
|
|
2013-01-03 12:29:30 +01:00
|
|
|
for (
|
|
|
|
|
(name, proto) ← Seq(
|
|
|
|
|
"/gonk" -> "tcp",
|
|
|
|
|
"/zagzag" -> "udp",
|
2013-01-24 04:28:21 -08:00
|
|
|
"/roghtaar" -> "ssl.tcp")
|
2013-03-13 16:01:57 +01:00
|
|
|
) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto))))
|
2013-01-03 12:29:30 +01:00
|
|
|
|
|
|
|
|
def addr(sys: ActorSystem, proto: String) =
|
2013-01-24 04:28:21 -08:00
|
|
|
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
2013-01-03 12:29:30 +01:00
|
|
|
def port(sys: ActorSystem, proto: String) = addr(sys, proto).port.get
|
|
|
|
|
def deploy(sys: ActorSystem, d: Deploy) {
|
|
|
|
|
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
|
|
|
|
}
|
|
|
|
|
|
2013-03-13 16:01:57 +01:00
|
|
|
val remote = otherSystem.actorOf(Props[Echo2], "echo")
|
2012-09-12 11:18:42 +02:00
|
|
|
|
2013-01-24 04:28:21 -08:00
|
|
|
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
|
2012-09-12 11:18:42 +02:00
|
|
|
|
2013-03-20 20:38:49 +13:00
|
|
|
private def verifySend(msg: Any)(afterSend: ⇒ Unit) {
|
|
|
|
|
val bigBounceOther = otherSystem.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case x: Int ⇒ sender ! byteStringOfSize(x)
|
|
|
|
|
case x ⇒ sender ! x
|
|
|
|
|
}
|
|
|
|
|
}), "bigBounce")
|
|
|
|
|
val bigBounceHere = system.actorFor("akka.test://remote-sys@localhost:12346/user/bigBounce")
|
|
|
|
|
|
|
|
|
|
val eventForwarder = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case x ⇒ testActor ! x
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
system.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent])
|
|
|
|
|
system.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent])
|
|
|
|
|
try {
|
|
|
|
|
bigBounceHere ! msg
|
|
|
|
|
afterSend
|
|
|
|
|
expectNoMsg(500.millis.dilated)
|
|
|
|
|
} finally {
|
|
|
|
|
system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent])
|
|
|
|
|
system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent])
|
|
|
|
|
system.stop(eventForwarder)
|
|
|
|
|
otherSystem.stop(bigBounceOther)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte))
|
|
|
|
|
|
|
|
|
|
val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt
|
|
|
|
|
|
2013-01-03 17:17:12 +01:00
|
|
|
override def afterTermination() {
|
2013-03-13 16:01:57 +01:00
|
|
|
otherSystem.shutdown()
|
2012-09-12 11:18:42 +02:00
|
|
|
AssociationRegistry.clear()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"Remoting" must {
|
|
|
|
|
|
|
|
|
|
"support remote look-ups" in {
|
|
|
|
|
here ! "ping"
|
2012-11-23 10:15:19 +01:00
|
|
|
expectMsg(("pong", testActor))
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"send error message for wrong address" in {
|
2013-03-07 18:08:07 +01:00
|
|
|
filterEvents(EventFilter.error(start = "Association", occurrences = 6),
|
|
|
|
|
EventFilter.warning(pattern = ".*dead letter.*echo.*", occurrences = 1)) {
|
|
|
|
|
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
|
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"support ask" in {
|
|
|
|
|
Await.result(here ? "ping", timeout.duration) match {
|
|
|
|
|
case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good
|
|
|
|
|
case m ⇒ fail(m + " was not (pong, AskActorRef)")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"send dead letters on remote if actor does not exist" in {
|
|
|
|
|
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
2013-01-24 04:28:21 -08:00
|
|
|
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
|
2013-03-13 16:01:57 +01:00
|
|
|
}(otherSystem)
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
|
2013-01-25 15:03:52 +01:00
|
|
|
"not be exhausted by sending to broken connections" in {
|
2013-03-21 10:07:48 +01:00
|
|
|
val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]""").
|
2013-03-13 16:01:57 +01:00
|
|
|
withFallback(otherSystem.settings.config)
|
|
|
|
|
val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig))
|
2013-01-25 15:03:52 +01:00
|
|
|
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo"))
|
|
|
|
|
val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
|
2013-03-13 16:01:57 +01:00
|
|
|
val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo")
|
2013-01-25 15:03:52 +01:00
|
|
|
val n = 100
|
|
|
|
|
|
|
|
|
|
// first everything is up and running
|
|
|
|
|
1 to n foreach { x ⇒
|
|
|
|
|
aliveEcho ! "ping"
|
|
|
|
|
moreRefs(x % moreSystems.size) ! "ping"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
receiveN(n * 2) foreach { reply ⇒ reply must be(("pong", testActor)) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// then we shutdown all but one system to simulate broken connections
|
|
|
|
|
moreSystems foreach { sys ⇒
|
|
|
|
|
sys.shutdown()
|
|
|
|
|
sys.awaitTermination(5.seconds.dilated)
|
|
|
|
|
sys.isTerminated must be(true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
1 to n foreach { x ⇒
|
|
|
|
|
aliveEcho ! "ping"
|
|
|
|
|
moreRefs(x % moreSystems.size) ! "ping"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ping messages to aliveEcho should go through even though we use many different broken connections
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
receiveN(n) foreach { reply ⇒ reply must be(("pong", testActor)) }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-12 11:18:42 +02:00
|
|
|
"create and supervise children on remote node" in {
|
2013-01-25 15:03:52 +01:00
|
|
|
val r = system.actorOf(Props[Echo1], "blub")
|
2013-01-24 04:28:21 -08:00
|
|
|
r.path.toString must be === "akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub"
|
2012-09-12 11:18:42 +02:00
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
|
|
|
|
r ! new Exception("crash")
|
2013-03-07 12:10:30 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
expectMsg("preRestart")
|
|
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
system.stop(r)
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
}
|
|
|
|
|
|
2013-03-13 16:01:57 +01:00
|
|
|
"not send to remote re-created actor with same name" in {
|
|
|
|
|
val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1")
|
|
|
|
|
echo ! 71
|
|
|
|
|
expectMsg(71)
|
|
|
|
|
echo ! PoisonPill
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
echo ! 72
|
|
|
|
|
expectNoMsg(1.second)
|
|
|
|
|
|
|
|
|
|
val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1")
|
|
|
|
|
echo2 ! 73
|
|
|
|
|
expectMsg(73)
|
|
|
|
|
// msg to old ActorRef (different uid) should not get through
|
|
|
|
|
echo2.path.uid must not be (echo.path.uid)
|
|
|
|
|
echo ! 74
|
|
|
|
|
expectNoMsg(1.second)
|
|
|
|
|
|
|
|
|
|
otherSystem.actorFor("/user/otherEcho1") ! 75
|
|
|
|
|
expectMsg(75)
|
|
|
|
|
|
|
|
|
|
system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76
|
|
|
|
|
expectMsg(76)
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-12 11:18:42 +02:00
|
|
|
"look-up actors across node boundaries" in {
|
|
|
|
|
val l = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n)
|
|
|
|
|
case s: String ⇒ sender ! context.actorFor(s)
|
|
|
|
|
}
|
|
|
|
|
}), "looker")
|
2013-03-13 16:01:57 +01:00
|
|
|
// child is configured to be deployed on remote-sys (otherSystem)
|
2013-01-25 15:03:52 +01:00
|
|
|
l ! (Props[Echo1], "child")
|
2013-03-13 16:01:57 +01:00
|
|
|
val child = expectMsgType[ActorRef]
|
|
|
|
|
// grandchild is configured to be deployed on RemotingSpec (system)
|
|
|
|
|
child ! (Props[Echo1], "grandchild")
|
|
|
|
|
val grandchild = expectMsgType[ActorRef]
|
|
|
|
|
grandchild.asInstanceOf[ActorRefScope].isLocal must be(true)
|
|
|
|
|
grandchild ! 43
|
|
|
|
|
expectMsg(43)
|
2012-09-12 11:18:42 +02:00
|
|
|
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
|
|
|
|
|
myref.isInstanceOf[RemoteActorRef] must be(true)
|
2013-03-13 16:01:57 +01:00
|
|
|
myref ! 44
|
|
|
|
|
expectMsg(44)
|
|
|
|
|
lastSender must be(grandchild)
|
|
|
|
|
lastSender must be theSameInstanceAs grandchild
|
|
|
|
|
child.asInstanceOf[RemoteActorRef].getParent must be(l)
|
|
|
|
|
system.actorFor("/user/looker/child") must be theSameInstanceAs child
|
2012-09-12 11:18:42 +02:00
|
|
|
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
|
|
|
|
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
2013-03-13 16:01:57 +01:00
|
|
|
|
|
|
|
|
watch(child)
|
|
|
|
|
child ! PoisonPill
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
expectMsgType[Terminated].actor must be === child
|
|
|
|
|
l ! (Props[Echo1], "child")
|
|
|
|
|
val child2 = expectMsgType[ActorRef]
|
|
|
|
|
child2 ! 45
|
|
|
|
|
expectMsg(45)
|
|
|
|
|
// msg to old ActorRef (different uid) should not get through
|
|
|
|
|
child2.path.uid must not be (child.path.uid)
|
|
|
|
|
child ! 46
|
|
|
|
|
expectNoMsg(1.second)
|
|
|
|
|
system.actorFor(system / "looker" / "child") ! 47
|
|
|
|
|
expectMsg(47)
|
|
|
|
|
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not fail ask across node boundaries" in {
|
|
|
|
|
import system.dispatcher
|
|
|
|
|
val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
|
|
|
|
|
Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to use multiple transports and use the appropriate one (TCP)" in {
|
2013-01-25 15:03:52 +01:00
|
|
|
val r = system.actorOf(Props[Echo1], "gonk")
|
2013-01-03 12:29:30 +01:00
|
|
|
r.path.toString must be ===
|
2013-03-13 16:01:57 +01:00
|
|
|
s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
|
2012-09-12 11:18:42 +02:00
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
|
|
|
|
r ! new Exception("crash")
|
2013-03-07 12:10:30 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
expectMsg("preRestart")
|
|
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
system.stop(r)
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to use multiple transports and use the appropriate one (UDP)" in {
|
2013-01-25 15:03:52 +01:00
|
|
|
val r = system.actorOf(Props[Echo1], "zagzag")
|
2013-01-03 12:29:30 +01:00
|
|
|
r.path.toString must be ===
|
2013-03-13 16:01:57 +01:00
|
|
|
s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
|
2012-09-12 11:18:42 +02:00
|
|
|
r ! 42
|
2013-01-03 12:33:09 +01:00
|
|
|
expectMsg(10.seconds, 42)
|
2012-09-12 11:18:42 +02:00
|
|
|
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
|
|
|
|
r ! new Exception("crash")
|
2013-03-07 12:10:30 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
expectMsg("preRestart")
|
|
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
system.stop(r)
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to use multiple transports and use the appropriate one (SSL)" in {
|
2013-01-25 15:03:52 +01:00
|
|
|
val r = system.actorOf(Props[Echo1], "roghtaar")
|
2013-01-03 12:29:30 +01:00
|
|
|
r.path.toString must be ===
|
2013-03-13 16:01:57 +01:00
|
|
|
s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
|
2012-09-12 11:18:42 +02:00
|
|
|
r ! 42
|
2013-01-03 12:33:09 +01:00
|
|
|
expectMsg(10.seconds, 42)
|
2012-09-12 11:18:42 +02:00
|
|
|
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
|
|
|
|
r ! new Exception("crash")
|
2013-03-07 12:10:30 +01:00
|
|
|
}
|
2012-09-12 11:18:42 +02:00
|
|
|
expectMsg("preRestart")
|
|
|
|
|
r ! 42
|
|
|
|
|
expectMsg(42)
|
|
|
|
|
system.stop(r)
|
|
|
|
|
expectMsg("postStop")
|
|
|
|
|
}
|
|
|
|
|
|
2013-03-20 20:38:49 +13:00
|
|
|
"drop unserializable messages" in {
|
|
|
|
|
object Unserializable
|
|
|
|
|
verifySend(Unserializable) {
|
|
|
|
|
expectMsgPF(1.second) {
|
|
|
|
|
case AssociationErrorEvent(_: NotSerializableException, _, _, _) ⇒ ()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"allow messages up to payload size" in {
|
|
|
|
|
val maxProtocolOverhead = 500 // Make sure we're still under size after the message is serialized, etc
|
|
|
|
|
val big = byteStringOfSize(maxPayloadBytes - maxProtocolOverhead)
|
|
|
|
|
verifySend(big) {
|
|
|
|
|
expectMsg(1.second, big)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"drop sent messages over payload size" in {
|
|
|
|
|
val oversized = byteStringOfSize(maxPayloadBytes + 1)
|
|
|
|
|
verifySend(oversized) {
|
|
|
|
|
expectMsgPF(1.second) {
|
|
|
|
|
case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload sent") ⇒ ()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"drop received messages over payload size" in {
|
|
|
|
|
// Receiver should reply with a message of size maxPayload + 1, which will be dropped and an error logged
|
|
|
|
|
verifySend(maxPayloadBytes + 1) {
|
|
|
|
|
expectMsgPF(1.second) {
|
|
|
|
|
case AssociationErrorEvent(e: OversizedPayloadException, _, _, _) if e.getMessage.startsWith("Discarding oversized payload received") ⇒ ()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-01-03 16:38:18 +01:00
|
|
|
}
|
2013-01-03 12:29:30 +01:00
|
|
|
|
2013-01-03 16:38:18 +01:00
|
|
|
override def beforeTermination() {
|
|
|
|
|
system.eventStream.publish(TestEvent.Mute(
|
|
|
|
|
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
|
2013-03-13 16:01:57 +01:00
|
|
|
otherSystem.eventStream.publish(TestEvent.Mute(
|
2013-01-03 16:38:18 +01:00
|
|
|
EventFilter[EndpointException](),
|
|
|
|
|
EventFilter.error(start = "AssociationError"),
|
|
|
|
|
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
|
2012-09-12 11:18:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|