Correctly serialize local actor refs from other actor systems. See #3137

This commit is contained in:
Björn Antonsson 2013-03-25 08:42:48 +01:00
parent 86fdfcd22f
commit 33080a4155
12 changed files with 151 additions and 89 deletions

View file

@ -318,17 +318,20 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val out = new ObjectOutputStream(baos) val out = new ObjectOutputStream(baos)
val sysImpl = system.asInstanceOf[ActorSystemImpl] val sysImpl = system.asInstanceOf[ActorSystemImpl]
val addr = sysImpl.provider.rootPath.address val ref = system.actorOf(Props[ReplyActor], "non-existing")
val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing")) val serialized = SerializedActorRef(ref)
out.writeObject(serialized) out.writeObject(serialized)
out.flush out.flush
out.close out.close
ref ! PoisonPill
awaitCond(ref.isTerminated, 2 seconds)
JavaSerializer.currentSystem.withValue(sysImpl) { JavaSerializer.currentSystem.withValue(sysImpl) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream) in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream)
} }
} }
@ -403,7 +406,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
Await.result(ffive, timeout.duration) must be("five") Await.result(ffive, timeout.duration) must be("five")
Await.result(fnull, timeout.duration) must be("null") Await.result(fnull, timeout.duration) must be("null")
awaitCond(ref.isTerminated, 2000 millis) awaitCond(ref.isTerminated, 2 seconds)
} }
"restart when Kill:ed" in { "restart when Kill:ed" in {

View file

@ -6,14 +6,12 @@ package akka.actor
import akka.dispatch._ import akka.dispatch._
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.util._
import java.lang.{ UnsupportedOperationException, IllegalStateException } import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.{ Serialization, JavaSerializer } import akka.serialization.{ Serialization, JavaSerializer }
import akka.event.EventStream import akka.event.EventStream
import scala.annotation.tailrec import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import scala.collection.JavaConverters
/** /**
* Immutable and serializable handle to an actor, which may or may not reside * Immutable and serializable handle to an actor, which may or may not reside
@ -381,7 +379,7 @@ private[akka] class LocalActorRef private[akka] (
override def restart(cause: Throwable): Unit = actorCell.restart(cause) override def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path) protected def writeReplace(): AnyRef = SerializedActorRef(this)
} }
/** /**
@ -407,11 +405,8 @@ private[akka] case class SerializedActorRef private (path: String) {
* INTERNAL API * INTERNAL API
*/ */
private[akka] object SerializedActorRef { private[akka] object SerializedActorRef {
def apply(path: ActorPath): SerializedActorRef = { def apply(actorRef: ActorRef): SerializedActorRef = {
Serialization.currentTransportAddress.value match { new SerializedActorRef(Serialization.serializedActorPath(actorRef))
case null new SerializedActorRef(path.toSerializationFormat)
case addr new SerializedActorRef(path.toSerializationFormatWithAddress(addr))
}
} }
} }
@ -437,7 +432,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def restart(cause: Throwable): Unit = () override def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path) protected def writeReplace(): AnyRef = SerializedActorRef(this)
} }
/** /**

View file

@ -157,7 +157,7 @@ private[akka] class RepointableActorRef(
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path) protected def writeReplace(): AnyRef = SerializedActorRef(this)
} }
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,

View file

@ -5,7 +5,7 @@
package akka.serialization package akka.serialization
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.{ Extension, ExtendedActorSystem, Address } import akka.actor._
import akka.event.Logging import akka.event.Logging
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@ -21,10 +21,10 @@ object Serialization {
type ClassSerializer = (Class[_], Serializer) type ClassSerializer = (Class[_], Serializer)
/** /**
* This holds a reference to the current transport address to be inserted * This holds a reference to the current transport serialization information used for
* into local actor refs during serialization. * serializing local actor refs.
*/ */
val currentTransportAddress = new DynamicVariable[Address](null) val currentTransportInformation = new DynamicVariable[SerializationInformation](null)
class Settings(val config: Config) { class Settings(val config: Config) {
val Serializers: Map[String, String] = configToMap("akka.actor.serializers") val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
@ -35,8 +35,34 @@ object Serialization {
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k -> v.toString) } config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k -> v.toString) }
} }
} }
/**
* The serialized path of an actorRef, based on the current transport serialization information.
*/
def serializedActorPath(actorRef: ActorRef): String = {
val path = actorRef.path
val originalSystem: ActorSystemImpl = actorRef match {
case a: ActorRefWithCell a.underlying.systemImpl
case _ null
}
Serialization.currentTransportInformation.value match {
case null path.toSerializationFormat
case SerializationInformation(address, system)
if (originalSystem == null || originalSystem == system)
path.toSerializationFormatWithAddress(address)
else {
val provider = originalSystem.provider
path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress))
}
}
}
} }
/**
* Serialization information needed for serializing local actor refs.
*/
case class SerializationInformation(val address: Address, val system: ActorSystem)
/** /**
* Serialization module. Contains methods for serialization and deserialization as well as * Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the configuration. * locating a Serializer for a particular class as defined in the mapping in the configuration.

View file

@ -7,7 +7,6 @@ import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
//#imports //#imports
import akka.actor.*; import akka.actor.*;
import akka.remote.RemoteActorRefProvider;
import akka.serialization.*; import akka.serialization.*;
//#imports //#imports
@ -58,20 +57,19 @@ public class SerializationDocTestBase {
//#actorref-serializer //#actorref-serializer
// Serialize // Serialize
// (beneath toBinary) // (beneath toBinary)
final Address transportAddress = final SerializationInformation info = Serialization.currentTransportInformation().value();
Serialization.currentTransportAddress().value();
String identifier; String identifier;
// If there is no transportAddress, // If there is no SerializationInformation,
// it means that either this Serializer isn't called // it means that this Serializer isn't called
// within a piece of code that sets it, // within a piece of code that sets it,
// so either you need to supply your own, // so either you need to supply your own,
// or simply use the local path. // or simply use the local path.
if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat(); if (info == null) identifier = theActorRef.path().toSerializationFormat();
else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress); else identifier = Serialization.serializedActorPath(theActorRef);
// Then just serialize the identifier however you like // Then just serialize the identifier however you like
// Deserialize // Deserialize
// (beneath fromBinary) // (beneath fromBinary)
final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier); final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier);

View file

@ -165,14 +165,14 @@ package docs.serialization {
// Serialize // Serialize
// (beneath toBinary) // (beneath toBinary)
// If there is no transportAddress, // If there is no SerializationInformation,
// it means that either this Serializer isn't called // it means that this Serializer isn't called
// within a piece of code that sets it, // within a piece of code that sets it,
// so either you need to supply your own, // so either you need to supply your own address,
// or simply use the local path. // or simply use the local path.
val identifier: String = Serialization.currentTransportAddress.value match { val identifier: String = Serialization.currentTransportInformation.value match {
case null theActorRef.path.toSerializationFormat case null theActorRef.path.toSerializationFormat
case address theActorRef.path.toSerializationFormatWithAddress(address) case _: SerializationInformation Serialization.serializedActorPath(theActorRef)
} }
// Then just serialize the identifier however you like // Then just serialize the identifier however you like

View file

@ -13,7 +13,7 @@ import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.transport.AkkaPduCodec._ import akka.remote.transport.AkkaPduCodec._
import akka.remote.transport.AssociationHandle._ import akka.remote.transport.AssociationHandle._
import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle }
import akka.serialization.Serialization import akka.serialization.{ SerializationInformation, Serialization }
import akka.util.ByteString import akka.util.ByteString
import akka.remote.transport.Transport.InvalidAssociationException import akka.remote.transport.Transport.InvalidAssociationException
import java.io.NotSerializableException import java.io.NotSerializableException
@ -332,7 +332,7 @@ private[remote] class EndpointWriter(
private def serializeMessage(msg: Any): MessageProtocol = handle match { private def serializeMessage(msg: Any): MessageProtocol = handle match {
case Some(h) case Some(h)
Serialization.currentTransportAddress.withValue(h.localAddress) { Serialization.currentTransportInformation.withValue(SerializationInformation(h.localAddress, context.system)) {
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
} }
case None throw new EndpointException("Internal error: No handle was present during serialization of" + case None throw new EndpointException("Internal error: No handle was present during serialization of" +

View file

@ -271,8 +271,15 @@ private[akka] class RemoteActorRefProvider(
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case ActorPathExtractor(address, elems) case ActorPathExtractor(address, elems)
if (hasAddress(address)) actorFor(rootGuardian, elems) if (hasAddress(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(transport, transport.localAddressForRemote(address), else try {
new RemoteActorRef(transport, transport.localAddressForRemote(address),
new RootActorPath(address) / elems, Nobody, props = None, deploy = None) new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
} catch {
case NonFatal(e)
val oldPath = RootActorPath(address) / elems
log.error(e, "Error while looking up address {}", oldPath.address)
new EmptyLocalActorRef(this, oldPath, eventStream)
}
case _ local.actorFor(ref, path) case _ local.actorFor(ref, path)
} }
@ -378,5 +385,5 @@ private[akka] class RemoteActorRef private[akka] (
def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause))
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path) private def writeReplace(): AnyRef = SerializedActorRef(this)
} }

View file

@ -22,7 +22,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
* *
* The remote transport is responsible for sending and receiving messages. * The remote transport is responsible for sending and receiving messages.
* Each transport has an address, which it should provide in * Each transport has an address, which it should provide in
* Serialization.currentTransportAddress (thread-local) while serializing * Serialization.currentTransportInformation (thread-local) while serializing
* actor references (which might also be part of messages). This address must * actor references (which might also be part of messages). This address must
* be available (i.e. fully initialized) by the time the first message is * be available (i.e. fully initialized) by the time the first message is
* received or when the start() method returns, whatever happens first. * received or when the start() method returns, whatever happens first.

View file

@ -74,7 +74,7 @@ private[remote] object Remoting {
null) null)
} }
case None throw new RemoteTransportException( case None throw new RemoteTransportException(
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null) s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null)
} }
} }

View file

@ -6,10 +6,8 @@ package akka.remote.serialization
import akka.serialization.{ Serializer, Serialization } import akka.serialization.{ Serializer, Serialization }
import com.google.protobuf.Message import com.google.protobuf.Message
import akka.actor.DynamicAccess import akka.actor.{ ActorSystem, ActorRef }
import akka.remote.RemoteProtocol.ActorRefProtocol import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.actor.ActorSystem
import akka.actor.ActorRef
object ProtobufSerializer { object ProtobufSerializer {
@ -18,11 +16,7 @@ object ProtobufSerializer {
* protobuf representation. * protobuf representation.
*/ */
def serializeActorRef(ref: ActorRef): ActorRefProtocol = { def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
val identifier: String = Serialization.currentTransportAddress.value match { ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build
case null ref.path.toSerializationFormat
case address ref.path.toSerializationFormatWithAddress(address)
}
ActorRefProtocol.newBuilder.setPath(identifier).build
} }
/** /**

View file

@ -38,13 +38,23 @@ object RemotingSpec {
class Echo2 extends Actor { class Echo2 extends Actor {
def receive = { def receive = {
case "ping" sender ! (("pong", sender)) case "ping" sender ! (("pong", sender))
case a: ActorRef a ! (("ping", sender))
case ("ping", a: ActorRef) sender ! (("pong", a))
case ("pong", a: ActorRef) a ! (("pong", sender.path.toSerializationFormat))
} }
} }
val cfg: Config = ConfigFactory parseString (""" class Proxy(val one: ActorRef, val another: ActorRef) extends Actor {
def receive = {
case s if sender.path == one.path another ! s
case s if sender.path == another.path one ! s
}
}
val cfg: Config = ConfigFactory parseString (s"""
common-ssl-settings { common-ssl-settings {
key-store = "%s" key-store = "${getClass.getClassLoader.getResource("keystore").getPath}"
trust-store = "%s" trust-store = "${getClass.getClassLoader.getResource("truststore").getPath}"
key-store-password = "changeme" key-store-password = "changeme"
trust-store-password = "changeme" trust-store-password = "changeme"
protocol = "TLSv1" protocol = "TLSv1"
@ -83,10 +93,10 @@ object RemotingSpec {
} }
} }
netty.tcp = ${common-netty-settings} netty.tcp = $${common-netty-settings}
netty.udp = ${common-netty-settings} netty.udp = $${common-netty-settings}
netty.ssl = ${common-netty-settings} netty.ssl = $${common-netty-settings}
netty.ssl.security = ${common-ssl-settings} netty.ssl.security = $${common-ssl-settings}
test { test {
transport-class = "akka.remote.transport.TestTransport" transport-class = "akka.remote.transport.TestTransport"
@ -104,9 +114,7 @@ object RemotingSpec {
/looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345" /looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345"
} }
} }
""".format( """)
getClass.getClassLoader.getResource("keystore").getPath,
getClass.getClassLoader.getResource("truststore").getPath))
} }
@ -122,14 +130,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
maximum-payload-bytes = 48000 bytes maximum-payload-bytes = 48000 bytes
} }
""").withFallback(system.settings.config).resolve() """).withFallback(system.settings.config).resolve()
val otherSystem = ActorSystem("remote-sys", conf) val remoteSystem = ActorSystem("remote-sys", conf)
for ( for (
(name, proto) Seq( (name, proto) Seq(
"/gonk" -> "tcp", "/gonk" -> "tcp",
"/zagzag" -> "udp", "/zagzag" -> "udp",
"/roghtaar" -> "ssl.tcp") "/roghtaar" -> "ssl.tcp")
) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto)))) ) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto))))
def addr(sys: ActorSystem, proto: String) = def addr(sys: ActorSystem, proto: String) =
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
@ -138,12 +146,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
} }
val remote = otherSystem.actorOf(Props[Echo2], "echo") val remote = remoteSystem.actorOf(Props[Echo2], "echo")
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
private def verifySend(msg: Any)(afterSend: Unit) { private def verifySend(msg: Any)(afterSend: Unit) {
val bigBounceOther = otherSystem.actorOf(Props(new Actor { val bigBounceOther = remoteSystem.actorOf(Props(new Actor {
def receive = { def receive = {
case x: Int sender ! byteStringOfSize(x) case x: Int sender ! byteStringOfSize(x)
case x sender ! x case x sender ! x
@ -166,16 +174,26 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent])
system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent])
system.stop(eventForwarder) system.stop(eventForwarder)
otherSystem.stop(bigBounceOther) remoteSystem.stop(bigBounceOther)
} }
} }
override def atStartup() = {
system.eventStream.publish(TestEvent.Mute(
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
remoteSystem.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
}
private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte)) private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte))
val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt
override def afterTermination() { override def afterTermination() {
otherSystem.shutdown() remoteSystem.shutdown()
AssociationRegistry.clear() AssociationRegistry.clear()
} }
@ -203,16 +221,21 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"send dead letters on remote if actor does not exist" in { "send dead letters on remote if actor does not exist" in {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh" system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(otherSystem) }(remoteSystem)
} }
"not be exhausted by sending to broken connections" in { "not be exhausted by sending to broken connections" in {
val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]"""). val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]""").
withFallback(otherSystem.settings.config) withFallback(remoteSystem.settings.config)
val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig)) val moreSystems = Vector.fill(5)(ActorSystem(remoteSystem.name, tcpOnlyConfig))
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo")) moreSystems foreach { sys
sys.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointDisassociatedException](),
EventFilter.warning(pattern = "received dead letter.*")))
sys.actorOf(Props[Echo2], name = "echo")
}
val moreRefs = moreSystems map (sys system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo")) val moreRefs = moreSystems map (sys system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo") val aliveEcho = system.actorFor(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo")
val n = 100 val n = 100
// first everything is up and running // first everything is up and running
@ -259,7 +282,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
"not send to remote re-created actor with same name" in { "not send to remote re-created actor with same name" in {
val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1") val echo = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
echo ! 71 echo ! 71
expectMsg(71) expectMsg(71)
echo ! PoisonPill echo ! PoisonPill
@ -267,7 +290,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
echo ! 72 echo ! 72
expectNoMsg(1.second) expectNoMsg(1.second)
val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1") val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
echo2 ! 73 echo2 ! 73
expectMsg(73) expectMsg(73)
// msg to old ActorRef (different uid) should not get through // msg to old ActorRef (different uid) should not get through
@ -275,7 +298,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
echo ! 74 echo ! 74
expectNoMsg(1.second) expectNoMsg(1.second)
otherSystem.actorFor("/user/otherEcho1") ! 75 remoteSystem.actorFor("/user/otherEcho1") ! 75
expectMsg(75) expectMsg(75)
system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76 system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76
@ -289,11 +312,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
case s: String sender ! context.actorFor(s) case s: String sender ! context.actorFor(s)
} }
}), "looker") }), "looker")
// child is configured to be deployed on remote-sys (otherSystem) // child is configured to be deployed on remote-sys (remoteSystem)
l ! (Props[Echo1], "child") l ! ((Props[Echo1], "child"))
val child = expectMsgType[ActorRef] val child = expectMsgType[ActorRef]
// grandchild is configured to be deployed on RemotingSpec (system) // grandchild is configured to be deployed on RemotingSpec (system)
child ! (Props[Echo1], "grandchild") child ! ((Props[Echo1], "grandchild"))
val grandchild = expectMsgType[ActorRef] val grandchild = expectMsgType[ActorRef]
grandchild.asInstanceOf[ActorRefScope].isLocal must be(true) grandchild.asInstanceOf[ActorRefScope].isLocal must be(true)
grandchild ! 43 grandchild ! 43
@ -313,7 +336,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
child ! PoisonPill child ! PoisonPill
expectMsg("postStop") expectMsg("postStop")
expectMsgType[Terminated].actor must be === child expectMsgType[Terminated].actor must be === child
l ! (Props[Echo1], "child") l ! ((Props[Echo1], "child"))
val child2 = expectMsgType[ActorRef] val child2 = expectMsgType[ActorRef]
child2 ! 45 child2 ! 45
expectMsg(45) expectMsg(45)
@ -335,7 +358,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (TCP)" in { "be able to use multiple transports and use the appropriate one (TCP)" in {
val r = system.actorOf(Props[Echo1], "gonk") val r = system.actorOf(Props[Echo1], "gonk")
r.path.toString must be === r.path.toString must be ===
s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
r ! 42 r ! 42
expectMsg(42) expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept { EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -351,7 +374,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (UDP)" in { "be able to use multiple transports and use the appropriate one (UDP)" in {
val r = system.actorOf(Props[Echo1], "zagzag") val r = system.actorOf(Props[Echo1], "zagzag")
r.path.toString must be === r.path.toString must be ===
s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" s"akka.udp://remote-sys@localhost:${port(remoteSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
r ! 42 r ! 42
expectMsg(10.seconds, 42) expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept { EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -367,7 +390,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (SSL)" in { "be able to use multiple transports and use the appropriate one (SSL)" in {
val r = system.actorOf(Props[Echo1], "roghtaar") val r = system.actorOf(Props[Echo1], "roghtaar")
r.path.toString must be === r.path.toString must be ===
s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
r ! 42 r ! 42
expectMsg(10.seconds, 42) expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept { EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -415,15 +438,31 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
} }
} }
"be able to serialize a local actor ref from another actor system" in {
val config = ConfigFactory.parseString("""
akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"]
akka.remote.test.local-address = "test://other-system@localhost:12347"
""").withFallback(remoteSystem.settings.config)
val otherSystem = ActorSystem("other-system", config)
try {
val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy")
// check that we use the specified transport address instead of the default
val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp"))
val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo")
val proxyTcp = system.actorOf(Props(new Proxy(remoteEchoHereTcp, self)), "proxy-tcp")
proxyTcp ! otherGuy
expectMsg(3.seconds, ("pong", otherGuyRemoteTcp))
// now check that we fall back to default when we haven't got a corresponding transport
val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test"))
val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo")
val proxySsl = system.actorOf(Props(new Proxy(remoteEchoHereSsl, self)), "proxy-ssl")
proxySsl ! otherGuy
expectMsg(3.seconds, ("pong", otherGuyRemoteTest))
} finally {
otherSystem.shutdown()
otherSystem.awaitTermination(5.seconds.dilated)
otherSystem.isTerminated must be(true)
}
} }
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
otherSystem.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
} }
} }