Porting RemotingSpec to artery #20714
This commit is contained in:
parent
415a1cc7f9
commit
c8eadd7d08
17 changed files with 710 additions and 148 deletions
|
|
@ -168,11 +168,11 @@ object ActorSystem {
|
||||||
final val ConfigVersion: String = getString("akka.version")
|
final val ConfigVersion: String = getString("akka.version")
|
||||||
final val ProviderClass: String =
|
final val ProviderClass: String =
|
||||||
getString("akka.actor.provider") match {
|
getString("akka.actor.provider") match {
|
||||||
case "local" => classOf[LocalActorRefProvider].getName
|
case "local" ⇒ classOf[LocalActorRefProvider].getName
|
||||||
// these two cannot be referenced by class as they may not be on the classpath
|
// these two cannot be referenced by class as they may not be on the classpath
|
||||||
case "remote" => "akka.remote.RemoteActorRefProvider"
|
case "remote" ⇒ "akka.remote.RemoteActorRefProvider"
|
||||||
case "cluster" => "akka.cluster.ClusterActorRefProvider"
|
case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider"
|
||||||
case fqcn => fqcn
|
case fqcn ⇒ fqcn
|
||||||
}
|
}
|
||||||
|
|
||||||
final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy")
|
final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy")
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import scala.util.control.NonFatal
|
||||||
import akka.actor.{ ActorRef, InternalActorRef }
|
import akka.actor.{ ActorRef, InternalActorRef }
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.remote.{ MessageSerializer, UniqueAddress }
|
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
|
||||||
import akka.remote.EndpointManager.Send
|
import akka.remote.EndpointManager.Send
|
||||||
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
|
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
|
||||||
import akka.serialization.{ Serialization, SerializationExtension }
|
import akka.serialization.{ Serialization, SerializationExtension }
|
||||||
|
|
@ -95,6 +95,10 @@ private[remote] class Encoder(
|
||||||
case _: SystemMessageEnvelope ⇒
|
case _: SystemMessageEnvelope ⇒
|
||||||
log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName)
|
log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName)
|
||||||
throw e
|
throw e
|
||||||
|
case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒
|
||||||
|
val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${send.recipient}: max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${send.message.getClass.getName}].")
|
||||||
|
log.error(reason, "Transient association error (association remains live)")
|
||||||
|
pull(in)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName)
|
log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName)
|
||||||
pull(in)
|
pull(in)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import akka.actor.{ ActorSystem, ExtendedActorSystem, RootActorPath }
|
||||||
|
import akka.remote.RARP
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
|
|
||||||
|
object ArteryMultiNodeSpec {
|
||||||
|
def defaultConfig =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka {
|
||||||
|
actor.provider = remote
|
||||||
|
actor.warn-about-java-serializer-usage = off
|
||||||
|
remote.artery {
|
||||||
|
enabled = on
|
||||||
|
hostname = localhost
|
||||||
|
port = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for remoting tests what needs to test interaction between a "local" actor system
|
||||||
|
* which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery
|
||||||
|
*/
|
||||||
|
abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArteryMultiNodeSpec.defaultConfig)) {
|
||||||
|
|
||||||
|
def this() = this(ConfigFactory.empty())
|
||||||
|
def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig))
|
||||||
|
|
||||||
|
/** just an alias to make tests more readable */
|
||||||
|
def localSystem = system
|
||||||
|
def localPort = port(localSystem)
|
||||||
|
def port(system: ActorSystem): Int = RARP(system).provider.getDefaultAddress.port.get
|
||||||
|
def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress
|
||||||
|
def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys))
|
||||||
|
def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}"
|
||||||
|
|
||||||
|
private var remoteSystems: Vector[ActorSystem] = Vector.empty
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return A new actor system configured with artery enabled. The system will
|
||||||
|
* automatically be terminated after test is completed to avoid leaks.
|
||||||
|
*/
|
||||||
|
def newRemoteSystem(extraConfig: Option[String] = None, name: Option[String] = None): ActorSystem = {
|
||||||
|
val config =
|
||||||
|
extraConfig.fold(
|
||||||
|
localSystem.settings.config
|
||||||
|
)(
|
||||||
|
str ⇒ ConfigFactory.parseString(str).withFallback(localSystem.settings.config)
|
||||||
|
)
|
||||||
|
|
||||||
|
val remoteSystem = ActorSystem(name.getOrElse(nextGeneratedSystemName), config)
|
||||||
|
remoteSystems = remoteSystems :+ remoteSystem
|
||||||
|
|
||||||
|
remoteSystem
|
||||||
|
}
|
||||||
|
|
||||||
|
override def afterTermination(): Unit = {
|
||||||
|
remoteSystems.foreach(sys ⇒ shutdown(sys))
|
||||||
|
remoteSystems = Vector.empty
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -3,13 +3,10 @@
|
||||||
*/
|
*/
|
||||||
package akka.remote.artery
|
package akka.remote.artery
|
||||||
|
|
||||||
import akka.actor.{ Actor, ActorRef, ActorSelection, ActorSystem, ExtendedActorSystem, Props, RootActorPath }
|
import akka.actor.{ Actor, ActorRef, ActorSelection, Props, RootActorPath }
|
||||||
import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef }
|
import akka.remote.{ LargeDestination, RARP, RegularDestination, RemoteActorRef }
|
||||||
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
|
import akka.testkit.TestProbe
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import com.typesafe.config.ConfigFactory
|
|
||||||
import org.scalatest.concurrent.ScalaFutures
|
|
||||||
import org.scalatest.{ ShouldMatchers, WordSpec }
|
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
@ -24,130 +21,99 @@ object LargeMessagesStreamSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class LargeMessagesStreamSpec extends WordSpec with ShouldMatchers with ScalaFutures {
|
class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
|
||||||
|
"""
|
||||||
|
akka {
|
||||||
|
loglevel = ERROR
|
||||||
|
remote.artery.large-message-destinations = [ "/user/large" ]
|
||||||
|
}
|
||||||
|
""".stripMargin) {
|
||||||
|
|
||||||
import LargeMessagesStreamSpec._
|
import LargeMessagesStreamSpec._
|
||||||
|
|
||||||
val config = ConfigFactory.parseString(
|
|
||||||
s"""
|
|
||||||
akka {
|
|
||||||
loglevel = ERROR
|
|
||||||
actor {
|
|
||||||
provider = remote
|
|
||||||
}
|
|
||||||
remote.artery {
|
|
||||||
enabled = on
|
|
||||||
hostname = localhost
|
|
||||||
port = 0
|
|
||||||
large-message-destinations = [
|
|
||||||
"/user/large"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
""")
|
|
||||||
|
|
||||||
"The large message support" should {
|
"The large message support" should {
|
||||||
|
|
||||||
"not affect regular communication" in {
|
"not affect regular communication" in {
|
||||||
val systemA = ActorSystem("systemA", config)
|
val systemA = localSystem
|
||||||
val systemB = ActorSystem("systemB", config)
|
val systemB = newRemoteSystem()
|
||||||
|
|
||||||
try {
|
val senderProbeA = TestProbe()(systemA)
|
||||||
val senderProbeA = TestProbe()(systemA)
|
val senderProbeB = TestProbe()(systemB)
|
||||||
val senderProbeB = TestProbe()(systemB)
|
|
||||||
|
|
||||||
// start actor and make sure it is up and running
|
// start actor and make sure it is up and running
|
||||||
val regular = systemB.actorOf(Props(new EchoSize), "regular")
|
val regular = systemB.actorOf(Props(new EchoSize), "regular")
|
||||||
regular.tell(Ping(), senderProbeB.ref)
|
regular.tell(Ping(), senderProbeB.ref)
|
||||||
senderProbeB.expectMsg(Pong(0))
|
senderProbeB.expectMsg(Pong(0))
|
||||||
|
|
||||||
// communicate with it from the other system
|
// communicate with it from the other system
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val regularRemote = awaitResolve(systemA.actorSelection(rootActorPath(systemB) / "user" / "regular"))
|
||||||
val rootB = RootActorPath(addressB)
|
regularRemote.tell(Ping(), senderProbeA.ref)
|
||||||
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
|
senderProbeA.expectMsg(Pong(0))
|
||||||
regularRemote.tell(Ping(), senderProbeA.ref)
|
|
||||||
senderProbeA.expectMsg(Pong(0))
|
|
||||||
|
|
||||||
// flag should be cached now
|
// flag should be cached now
|
||||||
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
|
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
|
||||||
|
|
||||||
} finally {
|
|
||||||
TestKit.shutdownActorSystem(systemA)
|
|
||||||
TestKit.shutdownActorSystem(systemB)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"pass small regular messages over the large-message stream" in {
|
"pass small regular messages over the large-message stream" in {
|
||||||
val systemA = ActorSystem("systemA", config)
|
val systemA = localSystem
|
||||||
val systemB = ActorSystem("systemB", config)
|
val systemB = newRemoteSystem()
|
||||||
|
|
||||||
try {
|
val senderProbeA = TestProbe()(systemA)
|
||||||
val senderProbeA = TestProbe()(systemA)
|
val senderProbeB = TestProbe()(systemB)
|
||||||
val senderProbeB = TestProbe()(systemB)
|
|
||||||
|
|
||||||
// start actor and make sure it is up and running
|
// start actor and make sure it is up and running
|
||||||
val large = systemB.actorOf(Props(new EchoSize), "large")
|
val large = systemB.actorOf(Props(new EchoSize), "large")
|
||||||
large.tell(Ping(), senderProbeB.ref)
|
large.tell(Ping(), senderProbeB.ref)
|
||||||
senderProbeB.expectMsg(Pong(0))
|
senderProbeB.expectMsg(Pong(0))
|
||||||
|
|
||||||
// communicate with it from the other system
|
// communicate with it from the other system
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||||
val rootB = RootActorPath(addressB)
|
val rootB = RootActorPath(addressB)
|
||||||
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
|
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
|
||||||
largeRemote.tell(Ping(), senderProbeA.ref)
|
largeRemote.tell(Ping(), senderProbeA.ref)
|
||||||
senderProbeA.expectMsg(Pong(0))
|
senderProbeA.expectMsg(Pong(0))
|
||||||
|
|
||||||
// flag should be cached now
|
// flag should be cached now
|
||||||
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
|
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
|
||||||
|
|
||||||
} finally {
|
|
||||||
TestKit.shutdownActorSystem(systemA)
|
|
||||||
TestKit.shutdownActorSystem(systemB)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"allow for normal communication while simultaneously sending large messages" in {
|
"allow for normal communication while simultaneously sending large messages" in {
|
||||||
val systemA = ActorSystem("systemA", config)
|
val systemA = localSystem
|
||||||
val systemB = ActorSystem("systemB", config)
|
val systemB = newRemoteSystem()
|
||||||
|
|
||||||
try {
|
val senderProbeB = TestProbe()(systemB)
|
||||||
|
|
||||||
val senderProbeB = TestProbe()(systemB)
|
// setup two actors, one with the large flag and one regular
|
||||||
|
val large = systemB.actorOf(Props(new EchoSize), "large")
|
||||||
|
large.tell(Ping(), senderProbeB.ref)
|
||||||
|
senderProbeB.expectMsg(Pong(0))
|
||||||
|
|
||||||
// setup two actors, one with the large flag and one regular
|
val regular = systemB.actorOf(Props(new EchoSize), "regular")
|
||||||
val large = systemB.actorOf(Props(new EchoSize), "large")
|
regular.tell(Ping(), senderProbeB.ref)
|
||||||
large.tell(Ping(), senderProbeB.ref)
|
senderProbeB.expectMsg(Pong(0))
|
||||||
senderProbeB.expectMsg(Pong(0))
|
|
||||||
|
|
||||||
val regular = systemB.actorOf(Props(new EchoSize), "regular")
|
// both up and running, resolve remote refs
|
||||||
regular.tell(Ping(), senderProbeB.ref)
|
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||||
senderProbeB.expectMsg(Pong(0))
|
val rootB = RootActorPath(addressB)
|
||||||
|
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
|
||||||
|
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
|
||||||
|
|
||||||
// both up and running, resolve remote refs
|
// send a large message, as well as regular one
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val remoteProbe = TestProbe()(systemA)
|
||||||
val rootB = RootActorPath(addressB)
|
|
||||||
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
|
|
||||||
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
|
|
||||||
|
|
||||||
// send a large message, as well as regular one
|
val largeBytes = 2000000
|
||||||
val remoteProbe = TestProbe()(systemA)
|
largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref)
|
||||||
|
regularRemote.tell(Ping(), remoteProbe.ref)
|
||||||
|
|
||||||
val largeBytes = 2000000
|
// should be no problems sending regular small messages while large messages are being sent
|
||||||
largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref)
|
remoteProbe.expectMsg(Pong(0))
|
||||||
regularRemote.tell(Ping(), remoteProbe.ref)
|
remoteProbe.expectMsg(10.seconds, Pong(largeBytes))
|
||||||
|
|
||||||
// should be no problems sending regular small messages while large messages are being sent
|
// cached flags should be set now
|
||||||
remoteProbe.expectMsg(Pong(0))
|
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
|
||||||
remoteProbe.expectMsg(10.seconds, Pong(largeBytes))
|
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
|
||||||
|
|
||||||
// cached flags should be set now
|
|
||||||
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
|
|
||||||
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
TestKit.shutdownActorSystem(systemA)
|
|
||||||
TestKit.shutdownActorSystem(systemB)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import akka.actor.{ Actor, ActorRef, ActorRefScope, PoisonPill, Props }
|
||||||
|
import akka.pattern.ask
|
||||||
|
import akka.remote.RemoteActorRef
|
||||||
|
import akka.remote.RemotingSpec.ActorForReq
|
||||||
|
import akka.testkit.{ EventFilter, _ }
|
||||||
|
import akka.util.Timeout
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
object RemoteActorForSpec {
|
||||||
|
final case class ActorForReq(s: String)
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteActorForSpec extends ArteryMultiNodeSpec("akka.loglevel=INFO") with ImplicitSender with DefaultTimeout {
|
||||||
|
|
||||||
|
val remoteSystem = newRemoteSystem()
|
||||||
|
val remotePort = port(remoteSystem)
|
||||||
|
|
||||||
|
"Remote lookups" should {
|
||||||
|
|
||||||
|
"support remote look-ups" in {
|
||||||
|
remoteSystem.actorOf(TestActors.echoActorProps, "remote-look-ups")
|
||||||
|
val remoteRef = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/remote-look-ups")
|
||||||
|
remoteRef ! "ping"
|
||||||
|
expectMsg("ping")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME does not log anything currently
|
||||||
|
"send warning message for wrong address" ignore {
|
||||||
|
filterEvents(EventFilter.warning(pattern = "Address is now gated for ", occurrences = 1)) {
|
||||||
|
localSystem.actorFor("artery://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"support ask" in {
|
||||||
|
remoteSystem.actorOf(TestActors.echoActorProps, "support-ask")
|
||||||
|
val remoteRef = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/support-ask")
|
||||||
|
|
||||||
|
implicit val timeout: Timeout = 10.seconds
|
||||||
|
(remoteRef ? "ping").futureValue should ===("ping")
|
||||||
|
}
|
||||||
|
|
||||||
|
"send dead letters on remote if actor does not exist" in {
|
||||||
|
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
||||||
|
localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/dead-letters-on-remote") ! "buh"
|
||||||
|
}(remoteSystem)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME can't communicate with new ref looked up after starting a new instance (!?!)
|
||||||
|
"not send to remote re-created actor with same name" ignore {
|
||||||
|
|
||||||
|
def lookItUp() = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/re-created")
|
||||||
|
|
||||||
|
val echo1 = remoteSystem.actorOf(TestActors.echoActorProps, "re-created")
|
||||||
|
val remoteRef1 = lookItUp()
|
||||||
|
remoteRef1 ! 2
|
||||||
|
expectMsg(2)
|
||||||
|
|
||||||
|
// now stop and start a new actor with the same name
|
||||||
|
watch(echo1)
|
||||||
|
remoteSystem.stop(echo1)
|
||||||
|
expectTerminated(echo1)
|
||||||
|
|
||||||
|
val echo2 = remoteSystem.actorOf(TestActors.echoActorProps, "re-created")
|
||||||
|
val remoteRef2 = lookItUp()
|
||||||
|
remoteRef2 ! 2
|
||||||
|
expectMsg(2)
|
||||||
|
|
||||||
|
// the old ref should not interact with the
|
||||||
|
// new actor instance at the same path
|
||||||
|
remoteRef1 ! 3
|
||||||
|
expectNoMsg(1.second)
|
||||||
|
|
||||||
|
// and additionally, but it would have failed already
|
||||||
|
// if this wasn't true
|
||||||
|
remoteRef1.path.uid should !==(remoteRef2.path.uid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME needs remote deployment section
|
||||||
|
"look-up actors across node boundaries" ignore {
|
||||||
|
val l = localSystem.actorOf(Props(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case (p: Props, n: String) ⇒ sender() ! context.actorOf(p, n)
|
||||||
|
case ActorForReq(s) ⇒ sender() ! context.actorFor(s)
|
||||||
|
}
|
||||||
|
}), "looker1")
|
||||||
|
// child is configured to be deployed on remote-sys (remoteSystem)
|
||||||
|
l ! ((TestActors.echoActorProps, "child"))
|
||||||
|
val child = expectMsgType[ActorRef]
|
||||||
|
// grandchild is configured to be deployed on RemotingSpec (system)
|
||||||
|
child ! ((TestActors.echoActorProps, "grandchild"))
|
||||||
|
val grandchild = expectMsgType[ActorRef]
|
||||||
|
grandchild.asInstanceOf[ActorRefScope].isLocal should ===(true)
|
||||||
|
grandchild ! 43
|
||||||
|
expectMsg(43)
|
||||||
|
val myref = localSystem.actorFor(system / "looker1" / "child" / "grandchild")
|
||||||
|
myref.isInstanceOf[RemoteActorRef] should ===(true)
|
||||||
|
myref ! 44
|
||||||
|
expectMsg(44)
|
||||||
|
lastSender should ===(grandchild)
|
||||||
|
lastSender should be theSameInstanceAs grandchild
|
||||||
|
child.asInstanceOf[RemoteActorRef].getParent should ===(l)
|
||||||
|
localSystem.actorFor("/user/looker1/child") should be theSameInstanceAs child
|
||||||
|
(l ? ActorForReq("child/..")).mapTo[AnyRef].futureValue should be theSameInstanceAs l
|
||||||
|
(localSystem.actorFor(system / "looker1" / "child") ? ActorForReq("..")).mapTo[AnyRef].futureValue should be theSameInstanceAs l
|
||||||
|
|
||||||
|
watch(child)
|
||||||
|
child ! PoisonPill
|
||||||
|
expectMsg("postStop")
|
||||||
|
expectTerminated(child)
|
||||||
|
l ! ((TestActors.echoActorProps, "child"))
|
||||||
|
val child2 = expectMsgType[ActorRef]
|
||||||
|
child2 ! 45
|
||||||
|
expectMsg(45)
|
||||||
|
// msg to old ActorRef (different uid) should not get through
|
||||||
|
child2.path.uid should not be (child.path.uid)
|
||||||
|
child ! 46
|
||||||
|
expectNoMsg(1.second)
|
||||||
|
system.actorFor(system / "looker1" / "child") ! 47
|
||||||
|
expectMsg(47)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -3,41 +3,19 @@
|
||||||
*/
|
*/
|
||||||
package akka.remote.artery
|
package akka.remote.artery
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import akka.actor.{ EmptyLocalActorRef, InternalActorRef }
|
||||||
import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath }
|
|
||||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
|
||||||
import akka.testkit.TestActors
|
|
||||||
import com.typesafe.config.ConfigFactory
|
|
||||||
import akka.testkit.EventFilter
|
|
||||||
import akka.actor.InternalActorRef
|
|
||||||
import akka.remote.RemoteActorRef
|
import akka.remote.RemoteActorRef
|
||||||
import akka.actor.EmptyLocalActorRef
|
import akka.testkit.{ EventFilter, TestActors }
|
||||||
|
|
||||||
object RemoteActorRefProviderSpec {
|
class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec {
|
||||||
|
|
||||||
val config = ConfigFactory.parseString(s"""
|
val addressA = address(localSystem)
|
||||||
akka {
|
|
||||||
actor.provider = remote
|
|
||||||
remote.artery.enabled = on
|
|
||||||
remote.artery.hostname = localhost
|
|
||||||
remote.artery.port = 0
|
|
||||||
}
|
|
||||||
""")
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
class RemoteActorRefProviderSpec extends AkkaSpec(RemoteActorRefProviderSpec.config) with ImplicitSender {
|
|
||||||
import RemoteActorRefProviderSpec._
|
|
||||||
|
|
||||||
val addressA = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
|
||||||
system.actorOf(TestActors.echoActorProps, "echo")
|
system.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
|
||||||
val systemB = ActorSystem("systemB", system.settings.config)
|
val systemB = newRemoteSystem()
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val addressB = address(systemB)
|
||||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
|
||||||
override def afterTermination(): Unit = shutdown(systemB)
|
|
||||||
|
|
||||||
"RemoteActorRefProvider" must {
|
"RemoteActorRefProvider" must {
|
||||||
|
|
||||||
"resolve local actor selection" in {
|
"resolve local actor selection" in {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorRefScope, ActorSelection, ActorSystem, ExtendedActorSystem, Identify, PoisonPill, Props, Terminated }
|
||||||
|
import akka.remote.RARP
|
||||||
|
import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestActors }
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
object RemoteActorSelectionSpec {
|
||||||
|
final case class ActorSelReq(s: String)
|
||||||
|
|
||||||
|
class SelectionActor extends Actor {
|
||||||
|
def receive = {
|
||||||
|
// if we get props and a name, create a child, send ref back
|
||||||
|
case (p: Props, n: String) ⇒ sender() ! context.actorOf(p, n)
|
||||||
|
// or select actor from here
|
||||||
|
case ActorSelReq(s) ⇒ sender() ! context.actorSelection(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
def selectionActorProps = Props(new SelectionActor)
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteActorSelectionSpec extends ArteryMultiNodeSpec with ImplicitSender {
|
||||||
|
|
||||||
|
import RemoteActorSelectionSpec._
|
||||||
|
|
||||||
|
val systemB = system
|
||||||
|
|
||||||
|
val systemA = {
|
||||||
|
val remotePort = port(systemB)
|
||||||
|
val remoteSysName = systemB.name
|
||||||
|
|
||||||
|
val localSysName = "local-" + remoteSysName
|
||||||
|
val localPort = SocketUtil.temporaryServerAddress(udp = true).getPort
|
||||||
|
|
||||||
|
// nesting the hierarchy across the two systems
|
||||||
|
newRemoteSystem(Some(s"""
|
||||||
|
akka {
|
||||||
|
remote.artery.port = $localPort
|
||||||
|
actor.deployment {
|
||||||
|
/looker2/child.remote = "artery://$remoteSysName@localhost:$remotePort"
|
||||||
|
/looker2/child/grandchild.remote = "artery://$localSysName@localhost:$localPort"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Remote actor selection" should {
|
||||||
|
|
||||||
|
// TODO would like to split up in smaller cases but find it hard
|
||||||
|
// TODO fails with "received Supervise from unregistered child" when looker2/child is created - akka/akka#20715
|
||||||
|
"select actors across node boundaries" ignore {
|
||||||
|
|
||||||
|
val localLooker2 = systemA.actorOf(selectionActorProps, "looker2")
|
||||||
|
|
||||||
|
// child is configured to be deployed on remoteSystem
|
||||||
|
localLooker2 ! ((selectionActorProps, "child"))
|
||||||
|
val remoteChild = expectMsgType[ActorRef]
|
||||||
|
|
||||||
|
// grandchild is configured to be deployed on local system but from remote system
|
||||||
|
remoteChild ! ((selectionActorProps, "grandchild"))
|
||||||
|
val localGrandchild = expectMsgType[ActorRef]
|
||||||
|
localGrandchild.asInstanceOf[ActorRefScope].isLocal should ===(true)
|
||||||
|
localGrandchild ! 53
|
||||||
|
expectMsg(53)
|
||||||
|
|
||||||
|
val localGrandchildSelection = systemA.actorSelection(system / "looker2" / "child" / "grandchild")
|
||||||
|
localGrandchildSelection ! 54
|
||||||
|
expectMsg(54)
|
||||||
|
lastSender should ===(localGrandchild)
|
||||||
|
lastSender should be theSameInstanceAs localGrandchild
|
||||||
|
localGrandchildSelection ! Identify(localGrandchildSelection)
|
||||||
|
val grandchild2 = expectMsgType[ActorIdentity].ref
|
||||||
|
grandchild2 should ===(Some(localGrandchild))
|
||||||
|
|
||||||
|
systemA.actorSelection("/user/looker2/child") ! Identify(None)
|
||||||
|
expectMsgType[ActorIdentity].ref should ===(Some(remoteChild))
|
||||||
|
|
||||||
|
localLooker2 ! ActorSelReq("child/..")
|
||||||
|
expectMsgType[ActorSelection] ! Identify(None)
|
||||||
|
expectMsgType[ActorIdentity].ref.get should be theSameInstanceAs localLooker2
|
||||||
|
|
||||||
|
system.actorSelection(system / "looker2" / "child") ! ActorSelReq("..")
|
||||||
|
expectMsgType[ActorSelection] ! Identify(None)
|
||||||
|
expectMsgType[ActorIdentity].ref.get should be theSameInstanceAs localLooker2
|
||||||
|
|
||||||
|
localGrandchild ! ((TestActors.echoActorProps, "grandgrandchild"))
|
||||||
|
val grandgrandchild = expectMsgType[ActorRef]
|
||||||
|
|
||||||
|
system.actorSelection("/user/looker2/child") ! Identify("idReq1")
|
||||||
|
expectMsg(ActorIdentity("idReq1", Some(remoteChild)))
|
||||||
|
system.actorSelection(remoteChild.path) ! Identify("idReq2")
|
||||||
|
expectMsg(ActorIdentity("idReq2", Some(remoteChild)))
|
||||||
|
system.actorSelection("/user/looker2/*") ! Identify("idReq3")
|
||||||
|
expectMsg(ActorIdentity("idReq3", Some(remoteChild)))
|
||||||
|
|
||||||
|
system.actorSelection("/user/looker2/child/grandchild") ! Identify("idReq4")
|
||||||
|
expectMsg(ActorIdentity("idReq4", Some(localGrandchild)))
|
||||||
|
system.actorSelection(remoteChild.path / "grandchild") ! Identify("idReq5")
|
||||||
|
expectMsg(ActorIdentity("idReq5", Some(localGrandchild)))
|
||||||
|
system.actorSelection("/user/looker2/*/grandchild") ! Identify("idReq6")
|
||||||
|
expectMsg(ActorIdentity("idReq6", Some(localGrandchild)))
|
||||||
|
system.actorSelection("/user/looker2/child/*") ! Identify("idReq7")
|
||||||
|
expectMsg(ActorIdentity("idReq7", Some(localGrandchild)))
|
||||||
|
system.actorSelection(remoteChild.path / "*") ! Identify("idReq8")
|
||||||
|
expectMsg(ActorIdentity("idReq8", Some(localGrandchild)))
|
||||||
|
|
||||||
|
system.actorSelection("/user/looker2/child/grandchild/grandgrandchild") ! Identify("idReq9")
|
||||||
|
expectMsg(ActorIdentity("idReq9", Some(grandgrandchild)))
|
||||||
|
system.actorSelection(remoteChild.path / "grandchild" / "grandgrandchild") ! Identify("idReq10")
|
||||||
|
expectMsg(ActorIdentity("idReq10", Some(grandgrandchild)))
|
||||||
|
system.actorSelection("/user/looker2/child/*/grandgrandchild") ! Identify("idReq11")
|
||||||
|
expectMsg(ActorIdentity("idReq11", Some(grandgrandchild)))
|
||||||
|
system.actorSelection("/user/looker2/child/*/*") ! Identify("idReq12")
|
||||||
|
expectMsg(ActorIdentity("idReq12", Some(grandgrandchild)))
|
||||||
|
system.actorSelection(remoteChild.path / "*" / "grandgrandchild") ! Identify("idReq13")
|
||||||
|
expectMsg(ActorIdentity("idReq13", Some(grandgrandchild)))
|
||||||
|
|
||||||
|
val sel1 = system.actorSelection("/user/looker2/child/grandchild/grandgrandchild")
|
||||||
|
system.actorSelection(sel1.toSerializationFormat) ! Identify("idReq18")
|
||||||
|
expectMsg(ActorIdentity("idReq18", Some(grandgrandchild)))
|
||||||
|
|
||||||
|
remoteChild ! Identify("idReq14")
|
||||||
|
expectMsg(ActorIdentity("idReq14", Some(remoteChild)))
|
||||||
|
watch(remoteChild)
|
||||||
|
remoteChild ! PoisonPill
|
||||||
|
expectMsg("postStop")
|
||||||
|
expectMsgType[Terminated].actor should ===(remoteChild)
|
||||||
|
localLooker2 ! ((TestActors.echoActorProps, "child"))
|
||||||
|
val child2 = expectMsgType[ActorRef]
|
||||||
|
child2 ! Identify("idReq15")
|
||||||
|
expectMsg(ActorIdentity("idReq15", Some(child2)))
|
||||||
|
system.actorSelection(remoteChild.path) ! Identify("idReq16")
|
||||||
|
expectMsg(ActorIdentity("idReq16", Some(child2)))
|
||||||
|
remoteChild ! Identify("idReq17")
|
||||||
|
expectMsg(ActorIdentity("idReq17", None))
|
||||||
|
|
||||||
|
child2 ! 55
|
||||||
|
expectMsg(55)
|
||||||
|
// msg to old ActorRef (different uid) should not get through
|
||||||
|
child2.path.uid should not be (remoteChild.path.uid)
|
||||||
|
remoteChild ! 56
|
||||||
|
expectNoMsg(1.second)
|
||||||
|
system.actorSelection(system / "looker2" / "child") ! 57
|
||||||
|
expectMsg(57)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import akka.actor.{ ActorSystem, ExtendedActorSystem }
|
||||||
|
import akka.remote.RARP
|
||||||
|
import akka.testkit.SocketUtil._
|
||||||
|
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestActors, TestEvent, TestProbe }
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
class RemoteConnectionSpec extends ArteryMultiNodeSpec("akka.remote.retry-gate-closed-for = 5s") with ImplicitSender {
|
||||||
|
|
||||||
|
def muteSystem(system: ActorSystem) {
|
||||||
|
system.eventStream.publish(TestEvent.Mute(
|
||||||
|
EventFilter.error(start = "AssociationError"),
|
||||||
|
EventFilter.warning(start = "AssociationError"),
|
||||||
|
EventFilter.warning(pattern = "received dead letter.*")))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Remoting between systems" should {
|
||||||
|
|
||||||
|
"be able to connect to system even if it's not there at first" in {
|
||||||
|
muteSystem(localSystem)
|
||||||
|
val localProbe = new TestProbe(localSystem)
|
||||||
|
|
||||||
|
val remotePort = temporaryServerAddress(udp = true).getPort
|
||||||
|
|
||||||
|
// try to talk to it before it is up
|
||||||
|
val selection = localSystem.actorSelection(s"artery://$nextGeneratedSystemName@localhost:$remotePort/user/echo")
|
||||||
|
selection.tell("ping", localProbe.ref)
|
||||||
|
localProbe.expectNoMsg(1.seconds)
|
||||||
|
|
||||||
|
// then start the remote system and try again
|
||||||
|
val remoteSystem = newRemoteSystem(extraConfig = Some(s"akka.remote.artery.port=$remotePort"))
|
||||||
|
|
||||||
|
muteSystem(remoteSystem)
|
||||||
|
localProbe.expectNoMsg(2.seconds)
|
||||||
|
remoteSystem.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
|
||||||
|
within(5.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
selection.tell("ping", localProbe.ref)
|
||||||
|
localProbe.expectMsg(500.millis, "ping")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"allow other system to connect even if it's not there at first" in {
|
||||||
|
val localSystem = newRemoteSystem()
|
||||||
|
|
||||||
|
val localPort = port(localSystem)
|
||||||
|
muteSystem(localSystem)
|
||||||
|
|
||||||
|
val localProbe = new TestProbe(localSystem)
|
||||||
|
localSystem.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
|
||||||
|
val remotePort = temporaryServerAddress(udp = true).getPort
|
||||||
|
|
||||||
|
// try to talk to remote before it is up
|
||||||
|
val selection = localSystem.actorSelection(s"artery://$nextGeneratedSystemName@localhost:$remotePort/user/echo")
|
||||||
|
selection.tell("ping", localProbe.ref)
|
||||||
|
localProbe.expectNoMsg(1.seconds)
|
||||||
|
|
||||||
|
// then when it is up, talk from other system
|
||||||
|
val remoteSystem = newRemoteSystem(extraConfig = Some(s"akka.remote.artery.port=$remotePort"))
|
||||||
|
|
||||||
|
muteSystem(remoteSystem)
|
||||||
|
localProbe.expectNoMsg(2.seconds)
|
||||||
|
val otherProbe = new TestProbe(remoteSystem)
|
||||||
|
val otherSender = otherProbe.ref
|
||||||
|
val thisSelection = remoteSystem.actorSelection(s"artery://${localSystem.name}@localhost:$localPort/user/echo")
|
||||||
|
within(5.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
thisSelection.tell("ping", otherSender)
|
||||||
|
otherProbe.expectMsg(500.millis, "ping")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.actor._
|
||||||
import akka.remote.routing._
|
import akka.remote.routing._
|
||||||
import com.typesafe.config._
|
import com.typesafe.config._
|
||||||
import akka.testkit.TestActors.echoActorProps
|
import akka.testkit.TestActors.echoActorProps
|
||||||
import akka.remote.RemoteScope
|
import akka.remote.{ RARP, RemoteScope }
|
||||||
|
|
||||||
object RemoteDeploymentSpec {
|
object RemoteDeploymentSpec {
|
||||||
class Echo1 extends Actor {
|
class Echo1 extends Actor {
|
||||||
|
|
@ -42,7 +42,7 @@ class RemoteDeploymentSpec extends AkkaSpec("""
|
||||||
|
|
||||||
import RemoteDeploymentSpec._
|
import RemoteDeploymentSpec._
|
||||||
|
|
||||||
val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
|
val port = RARP(system).provider.getDefaultAddress.port.get
|
||||||
val conf = ConfigFactory.parseString(
|
val conf = ConfigFactory.parseString(
|
||||||
s"""
|
s"""
|
||||||
akka.actor.deployment {
|
akka.actor.deployment {
|
||||||
|
|
@ -51,7 +51,7 @@ class RemoteDeploymentSpec extends AkkaSpec("""
|
||||||
""").withFallback(system.settings.config)
|
""").withFallback(system.settings.config)
|
||||||
|
|
||||||
val masterSystem = ActorSystem("Master" + system.name, conf)
|
val masterSystem = ActorSystem("Master" + system.name, conf)
|
||||||
val masterPort = masterSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
|
val masterPort = RARP(masterSystem).provider.getDefaultAddress.port.get
|
||||||
|
|
||||||
override def afterTermination(): Unit = {
|
override def afterTermination(): Unit = {
|
||||||
shutdown(masterSystem)
|
shutdown(masterSystem)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import akka.remote.EndpointDisassociatedException
|
||||||
|
import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent }
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender {
|
||||||
|
|
||||||
|
"Remoting" should {
|
||||||
|
|
||||||
|
"not be exhausted by sending to broken connections" in {
|
||||||
|
val remoteSystems = Vector.fill(5)(newRemoteSystem())
|
||||||
|
|
||||||
|
remoteSystems foreach { sys ⇒
|
||||||
|
sys.eventStream.publish(TestEvent.Mute(
|
||||||
|
EventFilter[EndpointDisassociatedException](),
|
||||||
|
EventFilter.warning(pattern = "received dead letter.*")))
|
||||||
|
sys.actorOf(TestActors.echoActorProps, name = "echo")
|
||||||
|
}
|
||||||
|
val remoteSelections = remoteSystems map { sys ⇒
|
||||||
|
system.actorSelection(rootActorPath(sys) / "user" / "echo")
|
||||||
|
}
|
||||||
|
|
||||||
|
val echo = system.actorOf(TestActors.echoActorProps, name = "echo")
|
||||||
|
|
||||||
|
val localSelection = system.actorSelection(rootActorPath(system) / "user" / "echo")
|
||||||
|
val n = 100
|
||||||
|
|
||||||
|
// first everything is up and running
|
||||||
|
1 to n foreach { x ⇒
|
||||||
|
localSelection ! "ping"
|
||||||
|
remoteSelections(x % remoteSystems.size) ! "ping"
|
||||||
|
}
|
||||||
|
|
||||||
|
within(5.seconds) {
|
||||||
|
receiveN(n * 2) foreach { reply ⇒ reply should ===("ping") }
|
||||||
|
}
|
||||||
|
|
||||||
|
// then we shutdown remote systems to simulate broken connections
|
||||||
|
remoteSystems foreach { sys ⇒
|
||||||
|
shutdown(sys)
|
||||||
|
}
|
||||||
|
|
||||||
|
1 to n foreach { x ⇒
|
||||||
|
localSelection ! "ping"
|
||||||
|
remoteSelections(x % remoteSystems.size) ! "ping"
|
||||||
|
}
|
||||||
|
|
||||||
|
// ping messages to localEcho should go through even though we use many different broken connections
|
||||||
|
within(5.seconds) {
|
||||||
|
receiveN(n) foreach { reply ⇒ reply should ===("ping") }
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,117 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import java.io.NotSerializableException
|
||||||
|
import java.util.concurrent.ThreadLocalRandom
|
||||||
|
|
||||||
|
import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, PoisonPill, Props }
|
||||||
|
import akka.remote.{ AssociationErrorEvent, DisassociatedEvent, OversizedPayloadException, RARP }
|
||||||
|
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestActors }
|
||||||
|
import akka.util.ByteString
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
object RemoteMessageSerializationSpec {
|
||||||
|
class ProxyActor(val one: ActorRef, val another: ActorRef) extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case s if sender().path == one.path ⇒ another ! s
|
||||||
|
case s if sender().path == another.path ⇒ one ! s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val maxPayloadBytes = ArteryTransport.MaximumFrameSize
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec("""
|
||||||
|
akka.actor.serialize-messages = off
|
||||||
|
akka.actor.serialize-creators = off
|
||||||
|
""") with ImplicitSender {
|
||||||
|
|
||||||
|
import RemoteMessageSerializationSpec._
|
||||||
|
|
||||||
|
val remoteSystem = newRemoteSystem()
|
||||||
|
val remotePort = port(remoteSystem)
|
||||||
|
|
||||||
|
"Remote message serialization" should {
|
||||||
|
|
||||||
|
"drop unserializable messages" in {
|
||||||
|
object Unserializable
|
||||||
|
EventFilter[NotSerializableException](pattern = ".*No configured serialization.*", occurrences = 1).intercept {
|
||||||
|
verifySend(Unserializable) {
|
||||||
|
expectNoMsg(1.second) // No AssocitionErrorEvent should be published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"allow messages up to payload size" in {
|
||||||
|
val maxProtocolOverhead = 500 // Make sure we're still under size after the message is serialized, etc
|
||||||
|
val big = byteStringOfSize(maxPayloadBytes - maxProtocolOverhead)
|
||||||
|
verifySend(big) {
|
||||||
|
expectMsg(3.seconds, big)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"drop sent messages over payload size" in {
|
||||||
|
val oversized = byteStringOfSize(maxPayloadBytes + 1)
|
||||||
|
EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload sent.*", occurrences = 1).intercept {
|
||||||
|
verifySend(oversized) {
|
||||||
|
expectNoMsg(1.second) // No AssocitionErrorEvent should be published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO max payload size is not configurable yet, so we cannot send a too big message, it fails no sending side
|
||||||
|
"drop received messages over payload size" ignore {
|
||||||
|
// Receiver should reply with a message of size maxPayload + 1, which will be dropped and an error logged
|
||||||
|
EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload received.*", occurrences = 1).intercept {
|
||||||
|
verifySend(maxPayloadBytes + 1) {
|
||||||
|
expectNoMsg(1.second) // No AssocitionErrorEvent should be published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to serialize a local actor ref from another actor system" in {
|
||||||
|
remoteSystem.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
val local = localSystem.actorOf(TestActors.echoActorProps, "echo")
|
||||||
|
|
||||||
|
val remoteEcho = system.actorSelection(rootActorPath(remoteSystem) / "user" / "echo").resolveOne(3.seconds).futureValue
|
||||||
|
remoteEcho ! local
|
||||||
|
expectMsg(3.seconds, local)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private def verifySend(msg: Any)(afterSend: ⇒ Unit) {
|
||||||
|
val bigBounceId = s"bigBounce-${ThreadLocalRandom.current.nextInt()}"
|
||||||
|
val bigBounceOther = remoteSystem.actorOf(Props(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case x: Int ⇒ sender() ! byteStringOfSize(x)
|
||||||
|
case x ⇒ sender() ! x
|
||||||
|
}
|
||||||
|
}), bigBounceId)
|
||||||
|
val bigBounceHere = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/$bigBounceId")
|
||||||
|
|
||||||
|
val eventForwarder = localSystem.actorOf(Props(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case x ⇒ testActor ! x
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
localSystem.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent])
|
||||||
|
localSystem.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent])
|
||||||
|
try {
|
||||||
|
bigBounceHere ! msg
|
||||||
|
afterSend
|
||||||
|
expectNoMsg(500.millis)
|
||||||
|
} finally {
|
||||||
|
localSystem.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent])
|
||||||
|
localSystem.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent])
|
||||||
|
eventForwarder ! PoisonPill
|
||||||
|
bigBounceOther ! PoisonPill
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.actor._
|
||||||
import akka.remote.routing._
|
import akka.remote.routing._
|
||||||
import com.typesafe.config._
|
import com.typesafe.config._
|
||||||
import akka.testkit.TestActors.echoActorProps
|
import akka.testkit.TestActors.echoActorProps
|
||||||
import akka.remote.RemoteScope
|
import akka.remote.{ RARP, RemoteScope }
|
||||||
|
|
||||||
object RemoteRouterSpec {
|
object RemoteRouterSpec {
|
||||||
class Parent extends Actor {
|
class Parent extends Actor {
|
||||||
|
|
@ -43,7 +43,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
||||||
|
|
||||||
import RemoteRouterSpec._
|
import RemoteRouterSpec._
|
||||||
|
|
||||||
val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
|
val port = RARP(system).provider.getDefaultAddress.port.get
|
||||||
val sysName = system.name
|
val sysName = system.name
|
||||||
val conf = ConfigFactory.parseString(
|
val conf = ConfigFactory.parseString(
|
||||||
s"""
|
s"""
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSyst
|
||||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor.Receive
|
import akka.actor.Actor.Receive
|
||||||
|
import akka.remote.RARP
|
||||||
|
|
||||||
object RemoteSendConsistencySpec {
|
object RemoteSendConsistencySpec {
|
||||||
|
|
||||||
|
|
@ -25,7 +26,7 @@ object RemoteSendConsistencySpec {
|
||||||
class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.config) with ImplicitSender {
|
class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.config) with ImplicitSender {
|
||||||
|
|
||||||
val systemB = ActorSystem("systemB", system.settings.config)
|
val systemB = ActorSystem("systemB", system.settings.config)
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||||
println(addressB)
|
println(addressB)
|
||||||
val rootB = RootActorPath(addressB)
|
val rootB = RootActorPath(addressB)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ class RemoteWatcherSpec extends AkkaSpec(
|
||||||
override def expectedTestDuration = 2.minutes
|
override def expectedTestDuration = 2.minutes
|
||||||
|
|
||||||
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
|
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
|
||||||
val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
|
||||||
def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
|
def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
|
||||||
|
|
||||||
Seq(system, remoteSystem).foreach(muteDeadLetters(
|
Seq(system, remoteSystem).foreach(muteDeadLetters(
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.remote.artery
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath }
|
import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath }
|
||||||
|
import akka.remote.RARP
|
||||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||||
import akka.testkit.TestActors
|
import akka.testkit.TestActors
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
@ -40,7 +41,7 @@ class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) wit
|
||||||
""").withFallback(system.settings.config)
|
""").withFallback(system.settings.config)
|
||||||
val systemB = ActorSystem("systemB", configB)
|
val systemB = ActorSystem("systemB", configB)
|
||||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||||
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||||
val rootB = RootActorPath(addressB)
|
val rootB = RootActorPath(addressB)
|
||||||
|
|
||||||
override def afterTermination(): Unit = shutdown(systemB)
|
override def afterTermination(): Unit = shutdown(systemB)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.remote.artery
|
package akka.remote.artery
|
||||||
|
|
||||||
import java.util.concurrent.ThreadLocalRandom
|
import java.util.concurrent.ThreadLocalRandom
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
|
@ -14,10 +15,8 @@ import akka.actor.Identify
|
||||||
import akka.actor.InternalActorRef
|
import akka.actor.InternalActorRef
|
||||||
import akka.actor.PoisonPill
|
import akka.actor.PoisonPill
|
||||||
import akka.actor.RootActorPath
|
import akka.actor.RootActorPath
|
||||||
import akka.remote.AddressUidExtension
|
import akka.remote.{ AddressUidExtension, RARP, RemoteActorRef, UniqueAddress }
|
||||||
import akka.remote.EndpointManager.Send
|
import akka.remote.EndpointManager.Send
|
||||||
import akka.remote.RemoteActorRef
|
|
||||||
import akka.remote.UniqueAddress
|
|
||||||
import akka.remote.artery.SystemMessageDelivery._
|
import akka.remote.artery.SystemMessageDelivery._
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.ActorMaterializerSettings
|
import akka.stream.ActorMaterializerSettings
|
||||||
|
|
@ -52,11 +51,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
|
||||||
import SystemMessageDeliverySpec._
|
import SystemMessageDeliverySpec._
|
||||||
|
|
||||||
val addressA = UniqueAddress(
|
val addressA = UniqueAddress(
|
||||||
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
|
RARP(system).provider.getDefaultAddress,
|
||||||
AddressUidExtension(system).addressUid)
|
AddressUidExtension(system).addressUid)
|
||||||
val systemB = ActorSystem("systemB", system.settings.config)
|
val systemB = ActorSystem("systemB", system.settings.config)
|
||||||
val addressB = UniqueAddress(
|
val addressB = UniqueAddress(
|
||||||
systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
|
RARP(systemB).provider.getDefaultAddress,
|
||||||
AddressUidExtension(systemB).addressUid)
|
AddressUidExtension(systemB).addressUid)
|
||||||
val rootB = RootActorPath(addressB.address)
|
val rootB = RootActorPath(addressB.address)
|
||||||
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
|
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import akka.testkit.TestProbe
|
||||||
import akka.actor.ActorSelection
|
import akka.actor.ActorSelection
|
||||||
import akka.testkit.TestEvent
|
import akka.testkit.TestEvent
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
|
import akka.remote.RARP
|
||||||
import akka.testkit.EventFilter
|
import akka.testkit.EventFilter
|
||||||
|
|
||||||
object UntrustedSpec {
|
object UntrustedSpec {
|
||||||
|
|
@ -77,7 +78,7 @@ class UntrustedSpec extends AkkaSpec("""
|
||||||
akka.remote.artery.hostname = localhost
|
akka.remote.artery.hostname = localhost
|
||||||
akka.remote.artery.port = 0
|
akka.remote.artery.port = 0
|
||||||
"""))
|
"""))
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val addr = RARP(system).provider.getDefaultAddress
|
||||||
|
|
||||||
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue