diff --git a/akka-actor-tests/src/test/scala/akka/io/TestUtils.scala b/akka-actor-tests/src/test/scala/akka/TestUtils.scala similarity index 98% rename from akka-actor-tests/src/test/scala/akka/io/TestUtils.scala rename to akka-actor-tests/src/test/scala/akka/TestUtils.scala index 27c9dd21a9..ed19d33367 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TestUtils.scala +++ b/akka-actor-tests/src/test/scala/akka/TestUtils.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2013 Typesafe Inc. */ -package akka.io +package akka import scala.collection.immutable import java.net.InetSocketAddress diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 001078a08f..0de13e7239 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -6,9 +6,6 @@ package akka.actor import language.postfixOps -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers - import akka.testkit._ import akka.util.Timeout import scala.concurrent.duration._ @@ -17,6 +14,7 @@ import java.lang.IllegalStateException import scala.concurrent.Promise import akka.pattern.ask import akka.serialization.JavaSerializer +import akka.TestUtils.verifyActorTermination object ActorRefSpec { @@ -316,17 +314,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val out = new ObjectOutputStream(baos) val sysImpl = system.asInstanceOf[ActorSystemImpl] - val addr = sysImpl.provider.rootPath.address - val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing")) + val ref = system.actorOf(Props[ReplyActor], "non-existing") + val serialized = SerializedActorRef(ref) out.writeObject(serialized) out.flush out.close + ref ! PoisonPill + + verifyActorTermination(ref) + JavaSerializer.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream) + in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream) } } @@ -401,7 +403,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Await.result(ffive, timeout.duration) must be("five") Await.result(fnull, timeout.duration) must be("null") - awaitCond(ref.isTerminated, 2000 millis) + verifyActorTermination(ref) } "restart when Kill:ed" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index f77a29ba4a..383165cf2a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -91,7 +91,7 @@ object FSMActorSpec { } // initialize the lock - initialize + initialize() private def doLock(): Unit = lockedLatch.open() diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 783efc8f1f..c15da0cd00 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -25,7 +25,7 @@ object FSMTransitionSpec { whenUnhandled { case Event("reply", _) ⇒ stay replying "reply" } - initialize + initialize() override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index f2a937112a..b4e8840b0e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -698,8 +698,7 @@ object SupervisorHierarchySpec { stop } - initialize - + initialize() } } diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala index f153c30649..aa4a82db0f 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala @@ -9,7 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.{ ActorRef, Actor, ActorSystem } import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.event.Logging.{ LogEvent, LoggerInitialized, InitializeLogger } +import akka.serialization.SerializationExtension +import akka.event.Logging.{ Warning, LogEvent, LoggerInitialized, InitializeLogger } object LoggerSpec { @@ -37,6 +38,21 @@ object LoggerSpec { } """).withFallback(AkkaSpec.testConf) + val ticket3165Config = ConfigFactory.parseString(""" + akka { + stdout-loglevel = "WARNING" + loglevel = "DEBUG" + loggers = ["akka.event.LoggerSpec$TestLogger1"] + actor { + serialize-messages = on + serialization-bindings { + "akka.event.Logging$LogEvent" = bytes + "java.io.Serializable" = java + } + } + } + """).withFallback(AkkaSpec.testConf) + case class SetTarget(ref: ActorRef, qualifier: Int) class TestLogger1 extends TestLogger(1) @@ -127,4 +143,16 @@ class LoggerSpec extends WordSpec with MustMatchers { } } } + + "Ticket 3165 - serialize-messages and dual-entry serialization of LogEvent" must { + "not cause StackOverflowError" in { + implicit val s = ActorSystem("foo", ticket3165Config) + try { + SerializationExtension(s).serialize(Warning("foo", classOf[String])) + } finally { + s.shutdown() + s.awaitTermination(5.seconds.dilated) + } + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index ea4e1a05b5..ad5b6a208b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -6,6 +6,7 @@ package akka.io import akka.testkit.{ TestProbe, AkkaSpec } import Tcp._ +import akka.TestUtils import TestUtils._ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 38152a2112..50add9d95b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -16,6 +16,7 @@ import scala.util.control.NonFatal import org.scalatest.matchers._ import akka.io.Tcp._ import akka.io.SelectionHandler._ +import akka.TestUtils import TestUtils._ import akka.actor.{ ActorRef, PoisonPill, Terminated } import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 9f35951ad9..cac2ab5a88 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -7,6 +7,7 @@ package akka.io import akka.testkit.AkkaSpec import akka.util.ByteString import Tcp._ +import akka.TestUtils import TestUtils._ import akka.testkit.EventFilter import java.io.IOException diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index 4ed3bd9950..358b1d614a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -10,6 +10,7 @@ import akka.actor.ActorRef import scala.collection.immutable import akka.io.Inet.SocketOption import Tcp._ +import akka.TestUtils import TestUtils._ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 4c0652664f..30103b7344 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -13,6 +13,7 @@ import Tcp._ import akka.testkit.EventFilter import akka.io.SelectionHandler._ import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } +import akka.TestUtils class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index e387255150..4651700ed1 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -4,6 +4,7 @@ package akka.io import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } +import akka.TestUtils import TestUtils._ import akka.util.ByteString import java.net.InetSocketAddress diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala index 1e2b960219..9bcf1d4400 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala @@ -5,6 +5,7 @@ package akka.io import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } import akka.io.UdpFF._ +import akka.TestUtils import TestUtils._ import akka.util.ByteString import java.net.InetSocketAddress diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index aeaa824656..adf945d6b8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -6,14 +6,12 @@ package akka.actor import akka.dispatch._ import akka.dispatch.sysmsg._ -import akka.util._ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.{ Serialization, JavaSerializer } import akka.event.EventStream import scala.annotation.tailrec import java.util.concurrent.ConcurrentHashMap import akka.event.LoggingAdapter -import scala.collection.JavaConverters /** * Immutable and serializable handle to an actor, which may or may not reside @@ -381,7 +379,7 @@ private[akka] class LocalActorRef private[akka] ( override def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } /** @@ -392,6 +390,10 @@ private[akka] class LocalActorRef private[akka] ( private[akka] case class SerializedActorRef private (path: String) { import akka.serialization.JavaSerializer.currentSystem + def this(actorRef: ActorRef) = { + this(Serialization.serializedActorPath(actorRef)) + } + @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = currentSystem.value match { case null ⇒ @@ -407,11 +409,8 @@ private[akka] case class SerializedActorRef private (path: String) { * INTERNAL API */ private[akka] object SerializedActorRef { - def apply(path: ActorPath): SerializedActorRef = { - Serialization.currentTransportAddress.value match { - case null ⇒ new SerializedActorRef(path.toSerializationFormat) - case addr ⇒ new SerializedActorRef(path.toSerializationFormatWithAddress(addr)) - } + def apply(actorRef: ActorRef): SerializedActorRef = { + new SerializedActorRef(actorRef) } } @@ -437,7 +436,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def restart(cause: Throwable): Unit = () @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } /** diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index ad70be0a0f..99f75dbaea 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -451,7 +451,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * Verify existence of initial state and setup timers. This should be the * last call within the constructor. */ - final def initialize: Unit = makeTransition(currentState) + final def initialize(): Unit = makeTransition(currentState) /** * Return current state name (i.e. object of type S) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 123a576462..9ce7169be3 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -157,7 +157,7 @@ private[akka] class RepointableActorRef( def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 19b3c3762a..60cbcedf5f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -390,10 +390,15 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = { val currentList = systemQueueGet - if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents) + if (currentList.head == NoMessage) new EarliestFirstSystemMessageList(null) + else if (systemQueuePut(currentList, newContents)) currentList.reverse + else systemDrain(newContents) } - def hasSystemMessages: Boolean = systemQueueGet.nonEmpty + def hasSystemMessages: Boolean = systemQueueGet.head match { + case null | NoMessage ⇒ false + case _ ⇒ true + } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 266958bf83..dc35609dee 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -570,7 +570,7 @@ object Logging { /** * Base type of LogEvents */ - sealed trait LogEvent { + sealed trait LogEvent extends NoSerializationVerificationNeeded { /** * The thread that created this log event */ diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index ef21355502..64f1031c3a 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -5,7 +5,7 @@ package akka.serialization import com.typesafe.config.Config -import akka.actor.{ Extension, ExtendedActorSystem, Address } +import akka.actor._ import akka.event.Logging import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer @@ -21,10 +21,11 @@ object Serialization { type ClassSerializer = (Class[_], Serializer) /** - * This holds a reference to the current transport address to be inserted - * into local actor refs during serialization. + * This holds a reference to the current transport serialization information used for + * serializing local actor refs. + * INTERNAL API */ - val currentTransportAddress = new DynamicVariable[Address](null) + private[akka] val currentTransportInformation = new DynamicVariable[Information](null) class Settings(val config: Config) { val Serializers: Map[String, String] = configToMap("akka.actor.serializers") @@ -35,6 +36,35 @@ object Serialization { config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) ⇒ (k -> v.toString) } } } + + /** + * Serialization information needed for serializing local actor refs. + * INTERNAL API + */ + private[akka] case class Information(address: Address, system: ActorSystem) + + /** + * The serialized path of an actorRef, based on the current transport serialization information. + * If there is no external address available for the requested address then the systems default + * address will be used. + */ + def serializedActorPath(actorRef: ActorRef): String = { + val path = actorRef.path + val originalSystem: ExtendedActorSystem = actorRef match { + case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem] + case _ ⇒ null + } + Serialization.currentTransportInformation.value match { + case null ⇒ path.toSerializationFormat + case Information(address, system) ⇒ + if (originalSystem == null || originalSystem == system) + path.toSerializationFormatWithAddress(address) + else { + val provider = originalSystem.provider + path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress)) + } + } + } } /** diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala index 3e1d2c6908..01003aff3d 100644 --- a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala +++ b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala @@ -276,7 +276,7 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[ case Active -> Idle ⇒ stopTimer() } - initialize + initialize() private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true) private def stopTimer() = cancelTimer("morePermits") diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java index 3eac0502d2..705f6ed5e4 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java @@ -7,7 +7,6 @@ import org.junit.Test; import static org.junit.Assert.*; //#imports import akka.actor.*; -import akka.remote.RemoteActorRefProvider; import akka.serialization.*; //#imports @@ -58,20 +57,10 @@ public class SerializationDocTestBase { //#actorref-serializer // Serialize // (beneath toBinary) - final Address transportAddress = - Serialization.currentTransportAddress().value(); - String identifier; + String identifier = Serialization.serializedActorPath(theActorRef); - // If there is no transportAddress, - // it means that either this Serializer isn't called - // within a piece of code that sets it, - // so either you need to supply your own, - // or simply use the local path. - if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat(); - else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress); // Then just serialize the identifier however you like - // Deserialize // (beneath fromBinary) final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier); @@ -118,16 +107,20 @@ public class SerializationDocTestBase { } //#external-address - - public void demonstrateExternalAddress() { - // this is not meant to be run, only to be compiled + static + //#external-address + public class ExternalAddressExample { + //#external-address final ActorSystem system = ActorSystem.create(); - final Address remoteAddr = new Address("", ""); - // #external-address - final Address addr = ExternalAddress.ID.get(system).getAddressFor(remoteAddr); - // #external-address + //#external-address + public String serializeTo(ActorRef ref, Address remote) { + return ref.path().toSerializationFormatWithAddress( + ExternalAddress.ID.get(system).getAddressFor(remote)); + } } + //#external-address + static //#external-address-default public class DefaultAddressExt implements Extension { diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index 7d72b6ef43..f58646ce4a 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -109,8 +109,11 @@ list which classes that should be serialized using it. Serializing ActorRefs --------------------- -All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer, -you might want to know how to serialize and deserialize them properly, here's the magic incantation: +All ActorRefs are serializable using JavaSerializer, but in case you are writing your +own serializer, you might want to know how to serialize and deserialize them properly. +In the general case, the local address to be used depends on the type of remote +address which shall be the recipient of the serialized information. Use +:meth:`Serialization.serializedActorPath(actorRef)` like this: .. includecode:: code/docs/serialization/SerializationDocTestBase.java :include: imports @@ -118,6 +121,22 @@ you might want to know how to serialize and deserialize them properly, here's th .. includecode:: code/docs/serialization/SerializationDocTestBase.java :include: actorref-serializer +This assumes that serialization happens in the context of sending a message +through the remote transport. There are other uses of serialization, though, +e.g. storing actor references outside of an actor application (database, +durable mailbox, etc.). In this case, it is important to keep in mind that the +address part of an actor’s path determines how that actor is communicated with. +Storing a local actor path might be the right choice if the retrieval happens +in the same logical context, but it is not enough when deserializing it on a +different network host: for that it would need to include the system’s remote +transport address. An actor system is not limited to having just one remote +transport per se, which makes this question a bit more interesting. To find out +the appropriate address to use when sending to ``remoteAddr`` you can use +:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this: + +.. includecode:: code/docs/serialization/SerializationDocTestBase.java + :include: external-address + .. note:: ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the @@ -132,25 +151,6 @@ you might want to know how to serialize and deserialize them properly, here's th include the unique id. -This assumes that serialization happens in the context of sending a message -through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the -address part of an actor’s path determines how that actor is communicated with. -Storing a local actor path might be the right choice if the retrieval happens -in the same logical context, but it is not enough when deserializing it on a -different network host: for that it would need to include the system’s remote -transport address. An actor system is not limited to having just one remote -transport per se, which makes this question a bit more interesting. - -In the general case, the local address to be used depends on the type of remote -address which shall be the recipient of the serialized information. Use -:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system -for the appropriate address to use when sending to ``remoteAddr``: - -.. includecode:: code/docs/serialization/SerializationDocTestBase.java - :include: external-address - This requires that you know at least which type of address will be supported by the system which will deserialize the resulting actor reference; if you have no concrete address handy you can create a dummy one for the right protocol using diff --git a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala index 4ce3ac07a6..59c62e93b9 100644 --- a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala @@ -81,7 +81,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { //#unhandled-elided //#fsm-body - initialize + initialize() } //#simple-fsm object DemoCode { diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index 0dacdeff3b..da3809b211 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -4,7 +4,6 @@ package docs.serialization { - import org.scalatest.matchers.MustMatchers import akka.testkit._ //#imports import akka.actor.{ ActorRef, ActorSystem } @@ -16,7 +15,6 @@ package docs.serialization { import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.Address - import akka.remote.RemoteActorRefProvider //#my-own-serializer class MyOwnSerializer extends Serializer { @@ -164,16 +162,8 @@ package docs.serialization { //#actorref-serializer // Serialize // (beneath toBinary) + val identifier: String = Serialization.serializedActorPath(theActorRef) - // If there is no transportAddress, - // it means that either this Serializer isn't called - // within a piece of code that sets it, - // so either you need to supply your own, - // or simply use the local path. - val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ theActorRef.path.toSerializationFormat - case address ⇒ theActorRef.path.toSerializationFormatWithAddress(address) - } // Then just serialize the identifier however you like // Deserialize diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index 1e59226b7d..285dd52e49 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -101,12 +101,31 @@ list which classes that should be serialized using it. Serializing ActorRefs --------------------- -All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer, -you might want to know how to serialize and deserialize them properly, here's the magic incantation: +All ActorRefs are serializable using JavaSerializer, but in case you are writing your +own serializer, you might want to know how to serialize and deserialize them properly. +In the general case, the local address to be used depends on the type of remote +address which shall be the recipient of the serialized information. Use +:meth:`Serialization.serializedActorPath(actorRef)` like this: .. includecode:: code/docs/serialization/SerializationDocSpec.scala :include: imports,actorref-serializer +This assumes that serialization happens in the context of sending a message +through the remote transport. There are other uses of serialization, though, +e.g. storing actor references outside of an actor application (database, +durable mailbox, etc.). In this case, it is important to keep in mind that the +address part of an actor’s path determines how that actor is communicated with. +Storing a local actor path might be the right choice if the retrieval happens +in the same logical context, but it is not enough when deserializing it on a +different network host: for that it would need to include the system’s remote +transport address. An actor system is not limited to having just one remote +transport per se, which makes this question a bit more interesting. To find out +the appropriate address to use when sending to ``remoteAddr`` you can use +:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this: + +.. includecode:: code/docs/serialization/SerializationDocSpec.scala + :include: external-address + .. note:: ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the @@ -120,24 +139,6 @@ you might want to know how to serialize and deserialize them properly, here's th storage of the reference, you can use ``toStringWithAddress``, which doesn't include the unique id. -This assumes that serialization happens in the context of sending a message -through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the -address part of an actor’s path determines how that actor is communicated with. -Storing a local actor path might be the right choice if the retrieval happens -in the same logical context, but it is not enough when deserializing it on a -different network host: for that it would need to include the system’s remote -transport address. An actor system is not limited to having just one remote -transport per se, which makes this question a bit more interesting. - -In the general case, the local address to be used depends on the type of remote -address which shall be the recipient of the serialized information. Use -:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system -for the appropriate address to use when sending to ``remoteAddr``: - -.. includecode:: code/docs/serialization/SerializationDocSpec.scala - :include: external-address This requires that you know at least which type of address will be supported by the system which will deserialize the resulting actor reference; if you have no diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 512881a4fd..392b7f5925 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -336,7 +336,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex stay } - initialize + initialize() } /** @@ -574,7 +574,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor throw BarrierTimeout(d) } - initialize + initialize() def handleBarrier(data: Data): State = { log.debug("handleBarrier({})", data) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index f787b82ddf..0316e2608c 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -257,8 +257,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) channel.close() } - initialize - + initialize() } /** diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 9b870e0f46..ccb1e148de 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -15,9 +15,9 @@ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.serialization.Serialization import akka.util.ByteString +import scala.util.control.NonFatal import akka.remote.transport.Transport.InvalidAssociationException import java.io.NotSerializableException -import scala.util.control.{ NoStackTrace, NonFatal } /** * INTERNAL API @@ -332,7 +332,7 @@ private[remote] class EndpointWriter( private def serializeMessage(msg: Any): MessageProtocol = handle match { case Some(h) ⇒ - Serialization.currentTransportAddress.withValue(h.localAddress) { + Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) { (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) } case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" + diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4ac2b95229..f9521bebda 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -271,8 +271,17 @@ private[akka] class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) actorFor(rootGuardian, elems) - else new RemoteActorRef(transport, transport.localAddressForRemote(address), - new RootActorPath(address) / elems, Nobody, props = None, deploy = None) + else { + val rootPath = RootActorPath(address) / elems + try { + new RemoteActorRef(transport, transport.localAddressForRemote(address), + rootPath, Nobody, props = None, deploy = None) + } catch { + case NonFatal(e) ⇒ + log.error(e, "Error while looking up address {}", rootPath.address) + new EmptyLocalActorRef(this, rootPath, eventStream) + } + } case _ ⇒ local.actorFor(ref, path) } @@ -378,5 +387,5 @@ private[akka] class RemoteActorRef private[akka] ( def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path) + private def writeReplace(): AnyRef = SerializedActorRef(this) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index c697068b07..6725383c49 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -22,7 +22,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * * The remote transport is responsible for sending and receiving messages. * Each transport has an address, which it should provide in - * Serialization.currentTransportAddress (thread-local) while serializing + * Serialization.currentTransportInformation (thread-local) while serializing * actor references (which might also be part of messages). This address must * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1fe6fc21a7..9be6deb426 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -74,7 +74,7 @@ private[remote] object Remoting { null) } case None ⇒ throw new RemoteTransportException( - s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null) + s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null) } } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala index 056439c23e..acbbc26cf2 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala @@ -6,10 +6,8 @@ package akka.remote.serialization import akka.serialization.{ Serializer, Serialization } import com.google.protobuf.Message -import akka.actor.DynamicAccess +import akka.actor.{ ActorSystem, ActorRef } import akka.remote.RemoteProtocol.ActorRefProtocol -import akka.actor.ActorSystem -import akka.actor.ActorRef object ProtobufSerializer { @@ -18,11 +16,7 @@ object ProtobufSerializer { * protobuf representation. */ def serializeActorRef(ref: ActorRef): ActorRefProtocol = { - val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ ref.path.toSerializationFormat - case address ⇒ ref.path.toSerializationFormatWithAddress(address) - } - ActorRefProtocol.newBuilder.setPath(identifier).build + ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build } /** diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 92f47f64fd..f49a2244f5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -37,14 +37,24 @@ object RemotingSpec { class Echo2 extends Actor { def receive = { - case "ping" ⇒ sender ! (("pong", sender)) + case "ping" ⇒ sender ! (("pong", sender)) + case a: ActorRef ⇒ a ! (("ping", sender)) + case ("ping", a: ActorRef) ⇒ sender ! (("pong", a)) + case ("pong", a: ActorRef) ⇒ a ! (("pong", sender.path.toSerializationFormat)) } } - val cfg: Config = ConfigFactory parseString (""" + class Proxy(val one: ActorRef, val another: ActorRef) extends Actor { + def receive = { + case s if sender.path == one.path ⇒ another ! s + case s if sender.path == another.path ⇒ one ! s + } + } + + val cfg: Config = ConfigFactory parseString (s""" common-ssl-settings { - key-store = "%s" - trust-store = "%s" + key-store = "${getClass.getClassLoader.getResource("keystore").getPath}" + trust-store = "${getClass.getClassLoader.getResource("truststore").getPath}" key-store-password = "changeme" trust-store-password = "changeme" protocol = "TLSv1" @@ -83,10 +93,10 @@ object RemotingSpec { } } - netty.tcp = ${common-netty-settings} - netty.udp = ${common-netty-settings} - netty.ssl = ${common-netty-settings} - netty.ssl.security = ${common-ssl-settings} + netty.tcp = $${common-netty-settings} + netty.udp = $${common-netty-settings} + netty.ssl = $${common-netty-settings} + netty.ssl.security = $${common-ssl-settings} test { transport-class = "akka.remote.transport.TestTransport" @@ -104,9 +114,7 @@ object RemotingSpec { /looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345" } } - """.format( - getClass.getClassLoader.getResource("keystore").getPath, - getClass.getClassLoader.getResource("truststore").getPath)) + """) } @@ -122,14 +130,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D maximum-payload-bytes = 48000 bytes } """).withFallback(system.settings.config).resolve() - val otherSystem = ActorSystem("remote-sys", conf) + val remoteSystem = ActorSystem("remote-sys", conf) for ( (name, proto) ← Seq( "/gonk" -> "tcp", "/zagzag" -> "udp", "/roghtaar" -> "ssl.tcp") - ) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto)))) + ) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto)))) def addr(sys: ActorSystem, proto: String) = sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get @@ -138,12 +146,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) } - val remote = otherSystem.actorOf(Props[Echo2], "echo") + val remote = remoteSystem.actorOf(Props[Echo2], "echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") private def verifySend(msg: Any)(afterSend: ⇒ Unit) { - val bigBounceOther = otherSystem.actorOf(Props(new Actor { + val bigBounceOther = remoteSystem.actorOf(Props(new Actor { def receive = { case x: Int ⇒ sender ! byteStringOfSize(x) case x ⇒ sender ! x @@ -166,16 +174,26 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) system.stop(eventForwarder) - otherSystem.stop(bigBounceOther) + remoteSystem.stop(bigBounceOther) } } + override def atStartup() = { + system.eventStream.publish(TestEvent.Mute( + EventFilter.error(start = "AssociationError"), + EventFilter.warning(pattern = "received dead letter.*"))) + remoteSystem.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointException](), + EventFilter.error(start = "AssociationError"), + EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)"))) + } + private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte)) val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt override def afterTermination() { - otherSystem.shutdown() + remoteSystem.shutdown() AssociationRegistry.clear() } @@ -203,16 +221,21 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "send dead letters on remote if actor does not exist" in { EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(otherSystem) + }(remoteSystem) } "not be exhausted by sending to broken connections" in { val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]"""). - withFallback(otherSystem.settings.config) - val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig)) - moreSystems foreach (_.actorOf(Props[Echo2], name = "echo")) + withFallback(remoteSystem.settings.config) + val moreSystems = Vector.fill(5)(ActorSystem(remoteSystem.name, tcpOnlyConfig)) + moreSystems foreach { sys ⇒ + sys.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointDisassociatedException](), + EventFilter.warning(pattern = "received dead letter.*"))) + sys.actorOf(Props[Echo2], name = "echo") + } val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo")) - val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo") + val aliveEcho = system.actorFor(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo") val n = 100 // first everything is up and running @@ -229,7 +252,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D moreSystems foreach { sys ⇒ sys.shutdown() sys.awaitTermination(5.seconds.dilated) - sys.isTerminated must be(true) } 1 to n foreach { x ⇒ @@ -259,7 +281,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "not send to remote re-created actor with same name" in { - val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1") + val echo = remoteSystem.actorOf(Props[Echo1], "otherEcho1") echo ! 71 expectMsg(71) echo ! PoisonPill @@ -267,7 +289,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D echo ! 72 expectNoMsg(1.second) - val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1") + val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1") echo2 ! 73 expectMsg(73) // msg to old ActorRef (different uid) should not get through @@ -275,7 +297,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D echo ! 74 expectNoMsg(1.second) - otherSystem.actorFor("/user/otherEcho1") ! 75 + remoteSystem.actorFor("/user/otherEcho1") ! 75 expectMsg(75) system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76 @@ -289,7 +311,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case s: String ⇒ sender ! context.actorFor(s) } }), "looker") - // child is configured to be deployed on remote-sys (otherSystem) + // child is configured to be deployed on remote-sys (remoteSystem) l ! ((Props[Echo1], "child")) val child = expectMsgType[ActorRef] // grandchild is configured to be deployed on RemotingSpec (system) @@ -335,7 +357,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (TCP)" in { val r = system.actorOf(Props[Echo1], "gonk") r.path.toString must be === - s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" + s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" r ! 42 expectMsg(42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -351,7 +373,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (UDP)" in { val r = system.actorOf(Props[Echo1], "zagzag") r.path.toString must be === - s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" + s"akka.udp://remote-sys@localhost:${port(remoteSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -367,7 +389,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (SSL)" in { val r = system.actorOf(Props[Echo1], "roghtaar") r.path.toString must be === - s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" + s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -415,15 +437,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } } + "be able to serialize a local actor ref from another actor system" in { + val config = ConfigFactory.parseString(""" + akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] + akka.remote.test.local-address = "test://other-system@localhost:12347" + """).withFallback(remoteSystem.settings.config) + val otherSystem = ActorSystem("other-system", config) + try { + val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy") + // check that we use the specified transport address instead of the default + val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp")) + val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo") + val proxyTcp = system.actorOf(Props(new Proxy(remoteEchoHereTcp, testActor)), "proxy-tcp") + proxyTcp ! otherGuy + expectMsg(3.seconds, ("pong", otherGuyRemoteTcp)) + // now check that we fall back to default when we haven't got a corresponding transport + val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test")) + val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo") + val proxySsl = system.actorOf(Props(new Proxy(remoteEchoHereSsl, testActor)), "proxy-ssl") + proxySsl ! otherGuy + expectMsg(3.seconds, ("pong", otherGuyRemoteTest)) + } finally { + otherSystem.shutdown() + otherSystem.awaitTermination(5.seconds.dilated) + } + } } - - override def beforeTermination() { - system.eventStream.publish(TestEvent.Mute( - EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) - otherSystem.eventStream.publish(TestEvent.Mute( - EventFilter[EndpointException](), - EventFilter.error(start = "AssociationError"), - EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)"))) - } - } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index 8ebaa4a2e9..b773f316d4 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -76,7 +76,7 @@ abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: FiniteDuration, stop } - initialize + initialize() } object Buncher { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index ad8ba0e0c6..fc43289c3c 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -55,7 +55,7 @@ class Chopstick extends Actor with FSM[ChopstickState, TakenBy] { } // Initialze the chopstick - initialize + initialize() } /** @@ -155,7 +155,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } // Initialize the hakker - initialize + initialize() private def startThinking(duration: FiniteDuration): State = { goto(Thinking) using TakenChopsticks(None, None) forMax duration