Merged with master

This commit is contained in:
Viktor Klang 2013-04-03 16:21:22 +02:00
commit 6976317bc7
35 changed files with 259 additions and 166 deletions

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
package akka
import scala.collection.immutable
import java.net.InetSocketAddress

View file

@ -6,9 +6,6 @@ package akka.actor
import language.postfixOps
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util.Timeout
import scala.concurrent.duration._
@ -17,6 +14,7 @@ import java.lang.IllegalStateException
import scala.concurrent.Promise
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.TestUtils.verifyActorTermination
object ActorRefSpec {
@ -316,17 +314,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val out = new ObjectOutputStream(baos)
val sysImpl = system.asInstanceOf[ActorSystemImpl]
val addr = sysImpl.provider.rootPath.address
val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing"))
val ref = system.actorOf(Props[ReplyActor], "non-existing")
val serialized = SerializedActorRef(ref)
out.writeObject(serialized)
out.flush
out.close
ref ! PoisonPill
verifyActorTermination(ref)
JavaSerializer.currentSystem.withValue(sysImpl) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream)
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream)
}
}
@ -401,7 +403,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
Await.result(ffive, timeout.duration) must be("five")
Await.result(fnull, timeout.duration) must be("null")
awaitCond(ref.isTerminated, 2000 millis)
verifyActorTermination(ref)
}
"restart when Kill:ed" in {

View file

@ -91,7 +91,7 @@ object FSMActorSpec {
}
// initialize the lock
initialize
initialize()
private def doLock(): Unit = lockedLatch.open()

View file

@ -25,7 +25,7 @@ object FSMTransitionSpec {
whenUnhandled {
case Event("reply", _) stay replying "reply"
}
initialize
initialize()
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
}

View file

@ -698,8 +698,7 @@ object SupervisorHierarchySpec {
stop
}
initialize
initialize()
}
}

View file

@ -9,7 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.{ ActorRef, Actor, ActorSystem }
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.event.Logging.{ LogEvent, LoggerInitialized, InitializeLogger }
import akka.serialization.SerializationExtension
import akka.event.Logging.{ Warning, LogEvent, LoggerInitialized, InitializeLogger }
object LoggerSpec {
@ -37,6 +38,21 @@ object LoggerSpec {
}
""").withFallback(AkkaSpec.testConf)
val ticket3165Config = ConfigFactory.parseString("""
akka {
stdout-loglevel = "WARNING"
loglevel = "DEBUG"
loggers = ["akka.event.LoggerSpec$TestLogger1"]
actor {
serialize-messages = on
serialization-bindings {
"akka.event.Logging$LogEvent" = bytes
"java.io.Serializable" = java
}
}
}
""").withFallback(AkkaSpec.testConf)
case class SetTarget(ref: ActorRef, qualifier: Int)
class TestLogger1 extends TestLogger(1)
@ -127,4 +143,16 @@ class LoggerSpec extends WordSpec with MustMatchers {
}
}
}
"Ticket 3165 - serialize-messages and dual-entry serialization of LogEvent" must {
"not cause StackOverflowError" in {
implicit val s = ActorSystem("foo", ticket3165Config)
try {
SerializationExtension(s).serialize(Warning("foo", classOf[String]))
} finally {
s.shutdown()
s.awaitTermination(5.seconds.dilated)
}
}
}
}

View file

@ -6,6 +6,7 @@ package akka.io
import akka.testkit.{ TestProbe, AkkaSpec }
import Tcp._
import akka.TestUtils
import TestUtils._
class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4")

View file

@ -16,6 +16,7 @@ import scala.util.control.NonFatal
import org.scalatest.matchers._
import akka.io.Tcp._
import akka.io.SelectionHandler._
import akka.TestUtils
import TestUtils._
import akka.actor.{ ActorRef, PoisonPill, Terminated }
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }

View file

@ -7,6 +7,7 @@ package akka.io
import akka.testkit.AkkaSpec
import akka.util.ByteString
import Tcp._
import akka.TestUtils
import TestUtils._
import akka.testkit.EventFilter
import java.io.IOException

View file

@ -10,6 +10,7 @@ import akka.actor.ActorRef
import scala.collection.immutable
import akka.io.Inet.SocketOption
import Tcp._
import akka.TestUtils
import TestUtils._
trait TcpIntegrationSpecSupport { _: AkkaSpec

View file

@ -13,6 +13,7 @@ import Tcp._
import akka.testkit.EventFilter
import akka.io.SelectionHandler._
import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming }
import akka.TestUtils
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {

View file

@ -4,6 +4,7 @@
package akka.io
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
import akka.TestUtils
import TestUtils._
import akka.util.ByteString
import java.net.InetSocketAddress

View file

@ -5,6 +5,7 @@ package akka.io
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
import akka.io.UdpFF._
import akka.TestUtils
import TestUtils._
import akka.util.ByteString
import java.net.InetSocketAddress

View file

@ -6,14 +6,12 @@ package akka.actor
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.util._
import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.{ Serialization, JavaSerializer }
import akka.event.EventStream
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import akka.event.LoggingAdapter
import scala.collection.JavaConverters
/**
* Immutable and serializable handle to an actor, which may or may not reside
@ -381,7 +379,7 @@ private[akka] class LocalActorRef private[akka] (
override def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path)
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
/**
@ -392,6 +390,10 @@ private[akka] class LocalActorRef private[akka] (
private[akka] case class SerializedActorRef private (path: String) {
import akka.serialization.JavaSerializer.currentSystem
def this(actorRef: ActorRef) = {
this(Serialization.serializedActorPath(actorRef))
}
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = currentSystem.value match {
case null
@ -407,11 +409,8 @@ private[akka] case class SerializedActorRef private (path: String) {
* INTERNAL API
*/
private[akka] object SerializedActorRef {
def apply(path: ActorPath): SerializedActorRef = {
Serialization.currentTransportAddress.value match {
case null new SerializedActorRef(path.toSerializationFormat)
case addr new SerializedActorRef(path.toSerializationFormatWithAddress(addr))
}
def apply(actorRef: ActorRef): SerializedActorRef = {
new SerializedActorRef(actorRef)
}
}
@ -437,7 +436,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path)
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
/**

View file

@ -451,7 +451,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
* Verify existence of initial state and setup timers. This should be the
* last call within the constructor.
*/
final def initialize: Unit = makeTransition(currentState)
final def initialize(): Unit = makeTransition(currentState)
/**
* Return current state name (i.e. object of type S)

View file

@ -157,7 +157,7 @@ private[akka] class RepointableActorRef(
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path)
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,

View file

@ -390,10 +390,15 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
val currentList = systemQueueGet
if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents)
if (currentList.head == NoMessage) new EarliestFirstSystemMessageList(null)
else if (systemQueuePut(currentList, newContents)) currentList.reverse
else systemDrain(newContents)
}
def hasSystemMessages: Boolean = systemQueueGet.nonEmpty
def hasSystemMessages: Boolean = systemQueueGet.head match {
case null | NoMessage false
case _ true
}
}

View file

@ -570,7 +570,7 @@ object Logging {
/**
* Base type of LogEvents
*/
sealed trait LogEvent {
sealed trait LogEvent extends NoSerializationVerificationNeeded {
/**
* The thread that created this log event
*/

View file

@ -5,7 +5,7 @@
package akka.serialization
import com.typesafe.config.Config
import akka.actor.{ Extension, ExtendedActorSystem, Address }
import akka.actor._
import akka.event.Logging
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer
@ -21,10 +21,11 @@ object Serialization {
type ClassSerializer = (Class[_], Serializer)
/**
* This holds a reference to the current transport address to be inserted
* into local actor refs during serialization.
* This holds a reference to the current transport serialization information used for
* serializing local actor refs.
* INTERNAL API
*/
val currentTransportAddress = new DynamicVariable[Address](null)
private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
class Settings(val config: Config) {
val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
@ -35,6 +36,35 @@ object Serialization {
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k -> v.toString) }
}
}
/**
* Serialization information needed for serializing local actor refs.
* INTERNAL API
*/
private[akka] case class Information(address: Address, system: ActorSystem)
/**
* The serialized path of an actorRef, based on the current transport serialization information.
* If there is no external address available for the requested address then the systems default
* address will be used.
*/
def serializedActorPath(actorRef: ActorRef): String = {
val path = actorRef.path
val originalSystem: ExtendedActorSystem = actorRef match {
case a: ActorRefWithCell a.underlying.system.asInstanceOf[ExtendedActorSystem]
case _ null
}
Serialization.currentTransportInformation.value match {
case null path.toSerializationFormat
case Information(address, system)
if (originalSystem == null || originalSystem == system)
path.toSerializationFormatWithAddress(address)
else {
val provider = originalSystem.provider
path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress))
}
}
}
}
/**

View file

@ -276,7 +276,7 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[
case Active -> Idle stopTimer()
}
initialize
initialize()
private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true)
private def stopTimer() = cancelTimer("morePermits")

View file

@ -7,7 +7,6 @@ import org.junit.Test;
import static org.junit.Assert.*;
//#imports
import akka.actor.*;
import akka.remote.RemoteActorRefProvider;
import akka.serialization.*;
//#imports
@ -58,20 +57,10 @@ public class SerializationDocTestBase {
//#actorref-serializer
// Serialize
// (beneath toBinary)
final Address transportAddress =
Serialization.currentTransportAddress().value();
String identifier;
String identifier = Serialization.serializedActorPath(theActorRef);
// If there is no transportAddress,
// it means that either this Serializer isn't called
// within a piece of code that sets it,
// so either you need to supply your own,
// or simply use the local path.
if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat();
else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress);
// Then just serialize the identifier however you like
// Deserialize
// (beneath fromBinary)
final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier);
@ -118,16 +107,20 @@ public class SerializationDocTestBase {
}
//#external-address
public void demonstrateExternalAddress() {
// this is not meant to be run, only to be compiled
static
//#external-address
public class ExternalAddressExample {
//#external-address
final ActorSystem system = ActorSystem.create();
final Address remoteAddr = new Address("", "");
// #external-address
final Address addr = ExternalAddress.ID.get(system).getAddressFor(remoteAddr);
// #external-address
//#external-address
public String serializeTo(ActorRef ref, Address remote) {
return ref.path().toSerializationFormatWithAddress(
ExternalAddress.ID.get(system).getAddressFor(remote));
}
}
//#external-address
static
//#external-address-default
public class DefaultAddressExt implements Extension {

View file

@ -109,8 +109,11 @@ list which classes that should be serialized using it.
Serializing ActorRefs
---------------------
All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer,
you might want to know how to serialize and deserialize them properly, here's the magic incantation:
All ActorRefs are serializable using JavaSerializer, but in case you are writing your
own serializer, you might want to know how to serialize and deserialize them properly.
In the general case, the local address to be used depends on the type of remote
address which shall be the recipient of the serialized information. Use
:meth:`Serialization.serializedActorPath(actorRef)` like this:
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
:include: imports
@ -118,6 +121,22 @@ you might want to know how to serialize and deserialize them properly, here's th
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
:include: actorref-serializer
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,
e.g. storing actor references outside of an actor application (database,
durable mailbox, etc.). In this case, it is important to keep in mind that the
address part of an actors path determines how that actor is communicated with.
Storing a local actor path might be the right choice if the retrieval happens
in the same logical context, but it is not enough when deserializing it on a
different network host: for that it would need to include the systems remote
transport address. An actor system is not limited to having just one remote
transport per se, which makes this question a bit more interesting. To find out
the appropriate address to use when sending to ``remoteAddr`` you can use
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this:
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
:include: external-address
.. note::
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
@ -132,25 +151,6 @@ you might want to know how to serialize and deserialize them properly, here's th
include the unique id.
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,
e.g. storing actor references outside of an actor application (database,
durable mailbox, etc.). In this case, it is important to keep in mind that the
address part of an actors path determines how that actor is communicated with.
Storing a local actor path might be the right choice if the retrieval happens
in the same logical context, but it is not enough when deserializing it on a
different network host: for that it would need to include the systems remote
transport address. An actor system is not limited to having just one remote
transport per se, which makes this question a bit more interesting.
In the general case, the local address to be used depends on the type of remote
address which shall be the recipient of the serialized information. Use
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system
for the appropriate address to use when sending to ``remoteAddr``:
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
:include: external-address
This requires that you know at least which type of address will be supported by
the system which will deserialize the resulting actor reference; if you have no
concrete address handy you can create a dummy one for the right protocol using

View file

@ -81,7 +81,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
//#unhandled-elided
//#fsm-body
initialize
initialize()
}
//#simple-fsm
object DemoCode {

View file

@ -4,7 +4,6 @@
package docs.serialization {
import org.scalatest.matchers.MustMatchers
import akka.testkit._
//#imports
import akka.actor.{ ActorRef, ActorSystem }
@ -16,7 +15,6 @@ package docs.serialization {
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.Address
import akka.remote.RemoteActorRefProvider
//#my-own-serializer
class MyOwnSerializer extends Serializer {
@ -164,16 +162,8 @@ package docs.serialization {
//#actorref-serializer
// Serialize
// (beneath toBinary)
val identifier: String = Serialization.serializedActorPath(theActorRef)
// If there is no transportAddress,
// it means that either this Serializer isn't called
// within a piece of code that sets it,
// so either you need to supply your own,
// or simply use the local path.
val identifier: String = Serialization.currentTransportAddress.value match {
case null theActorRef.path.toSerializationFormat
case address theActorRef.path.toSerializationFormatWithAddress(address)
}
// Then just serialize the identifier however you like
// Deserialize

View file

@ -101,12 +101,31 @@ list which classes that should be serialized using it.
Serializing ActorRefs
---------------------
All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer,
you might want to know how to serialize and deserialize them properly, here's the magic incantation:
All ActorRefs are serializable using JavaSerializer, but in case you are writing your
own serializer, you might want to know how to serialize and deserialize them properly.
In the general case, the local address to be used depends on the type of remote
address which shall be the recipient of the serialized information. Use
:meth:`Serialization.serializedActorPath(actorRef)` like this:
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
:include: imports,actorref-serializer
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,
e.g. storing actor references outside of an actor application (database,
durable mailbox, etc.). In this case, it is important to keep in mind that the
address part of an actors path determines how that actor is communicated with.
Storing a local actor path might be the right choice if the retrieval happens
in the same logical context, but it is not enough when deserializing it on a
different network host: for that it would need to include the systems remote
transport address. An actor system is not limited to having just one remote
transport per se, which makes this question a bit more interesting. To find out
the appropriate address to use when sending to ``remoteAddr`` you can use
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this:
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
:include: external-address
.. note::
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
@ -120,24 +139,6 @@ you might want to know how to serialize and deserialize them properly, here's th
storage of the reference, you can use ``toStringWithAddress``, which doesn't
include the unique id.
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,
e.g. storing actor references outside of an actor application (database,
durable mailbox, etc.). In this case, it is important to keep in mind that the
address part of an actors path determines how that actor is communicated with.
Storing a local actor path might be the right choice if the retrieval happens
in the same logical context, but it is not enough when deserializing it on a
different network host: for that it would need to include the systems remote
transport address. An actor system is not limited to having just one remote
transport per se, which makes this question a bit more interesting.
In the general case, the local address to be used depends on the type of remote
address which shall be the recipient of the serialized information. Use
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system
for the appropriate address to use when sending to ``remoteAddr``:
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
:include: external-address
This requires that you know at least which type of address will be supported by
the system which will deserialize the resulting actor reference; if you have no

View file

@ -336,7 +336,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
stay
}
initialize
initialize()
}
/**
@ -574,7 +574,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
throw BarrierTimeout(d)
}
initialize
initialize()
def handleBarrier(data: Data): State = {
log.debug("handleBarrier({})", data)

View file

@ -257,8 +257,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
channel.close()
}
initialize
initialize()
}
/**

View file

@ -15,9 +15,9 @@ import akka.remote.transport.AssociationHandle._
import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle }
import akka.serialization.Serialization
import akka.util.ByteString
import scala.util.control.NonFatal
import akka.remote.transport.Transport.InvalidAssociationException
import java.io.NotSerializableException
import scala.util.control.{ NoStackTrace, NonFatal }
/**
* INTERNAL API
@ -332,7 +332,7 @@ private[remote] class EndpointWriter(
private def serializeMessage(msg: Any): MessageProtocol = handle match {
case Some(h)
Serialization.currentTransportAddress.withValue(h.localAddress) {
Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) {
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
}
case None throw new EndpointException("Internal error: No handle was present during serialization of" +

View file

@ -271,8 +271,17 @@ private[akka] class RemoteActorRefProvider(
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case ActorPathExtractor(address, elems)
if (hasAddress(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(transport, transport.localAddressForRemote(address),
new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
else {
val rootPath = RootActorPath(address) / elems
try {
new RemoteActorRef(transport, transport.localAddressForRemote(address),
rootPath, Nobody, props = None, deploy = None)
} catch {
case NonFatal(e)
log.error(e, "Error while looking up address {}", rootPath.address)
new EmptyLocalActorRef(this, rootPath, eventStream)
}
}
case _ local.actorFor(ref, path)
}
@ -378,5 +387,5 @@ private[akka] class RemoteActorRef private[akka] (
def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause))
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path)
private def writeReplace(): AnyRef = SerializedActorRef(this)
}

View file

@ -22,7 +22,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
*
* The remote transport is responsible for sending and receiving messages.
* Each transport has an address, which it should provide in
* Serialization.currentTransportAddress (thread-local) while serializing
* Serialization.currentTransportInformation (thread-local) while serializing
* actor references (which might also be part of messages). This address must
* be available (i.e. fully initialized) by the time the first message is
* received or when the start() method returns, whatever happens first.

View file

@ -74,7 +74,7 @@ private[remote] object Remoting {
null)
}
case None throw new RemoteTransportException(
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null)
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null)
}
}

View file

@ -6,10 +6,8 @@ package akka.remote.serialization
import akka.serialization.{ Serializer, Serialization }
import com.google.protobuf.Message
import akka.actor.DynamicAccess
import akka.actor.{ ActorSystem, ActorRef }
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.actor.ActorSystem
import akka.actor.ActorRef
object ProtobufSerializer {
@ -18,11 +16,7 @@ object ProtobufSerializer {
* protobuf representation.
*/
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
val identifier: String = Serialization.currentTransportAddress.value match {
case null ref.path.toSerializationFormat
case address ref.path.toSerializationFormatWithAddress(address)
}
ActorRefProtocol.newBuilder.setPath(identifier).build
ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build
}
/**

View file

@ -37,14 +37,24 @@ object RemotingSpec {
class Echo2 extends Actor {
def receive = {
case "ping" sender ! (("pong", sender))
case "ping" sender ! (("pong", sender))
case a: ActorRef a ! (("ping", sender))
case ("ping", a: ActorRef) sender ! (("pong", a))
case ("pong", a: ActorRef) a ! (("pong", sender.path.toSerializationFormat))
}
}
val cfg: Config = ConfigFactory parseString ("""
class Proxy(val one: ActorRef, val another: ActorRef) extends Actor {
def receive = {
case s if sender.path == one.path another ! s
case s if sender.path == another.path one ! s
}
}
val cfg: Config = ConfigFactory parseString (s"""
common-ssl-settings {
key-store = "%s"
trust-store = "%s"
key-store = "${getClass.getClassLoader.getResource("keystore").getPath}"
trust-store = "${getClass.getClassLoader.getResource("truststore").getPath}"
key-store-password = "changeme"
trust-store-password = "changeme"
protocol = "TLSv1"
@ -83,10 +93,10 @@ object RemotingSpec {
}
}
netty.tcp = ${common-netty-settings}
netty.udp = ${common-netty-settings}
netty.ssl = ${common-netty-settings}
netty.ssl.security = ${common-ssl-settings}
netty.tcp = $${common-netty-settings}
netty.udp = $${common-netty-settings}
netty.ssl = $${common-netty-settings}
netty.ssl.security = $${common-ssl-settings}
test {
transport-class = "akka.remote.transport.TestTransport"
@ -104,9 +114,7 @@ object RemotingSpec {
/looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345"
}
}
""".format(
getClass.getClassLoader.getResource("keystore").getPath,
getClass.getClassLoader.getResource("truststore").getPath))
""")
}
@ -122,14 +130,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
maximum-payload-bytes = 48000 bytes
}
""").withFallback(system.settings.config).resolve()
val otherSystem = ActorSystem("remote-sys", conf)
val remoteSystem = ActorSystem("remote-sys", conf)
for (
(name, proto) Seq(
"/gonk" -> "tcp",
"/zagzag" -> "udp",
"/roghtaar" -> "ssl.tcp")
) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto))))
) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto))))
def addr(sys: ActorSystem, proto: String) =
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
@ -138,12 +146,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
}
val remote = otherSystem.actorOf(Props[Echo2], "echo")
val remote = remoteSystem.actorOf(Props[Echo2], "echo")
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
private def verifySend(msg: Any)(afterSend: Unit) {
val bigBounceOther = otherSystem.actorOf(Props(new Actor {
val bigBounceOther = remoteSystem.actorOf(Props(new Actor {
def receive = {
case x: Int sender ! byteStringOfSize(x)
case x sender ! x
@ -166,16 +174,26 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent])
system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent])
system.stop(eventForwarder)
otherSystem.stop(bigBounceOther)
remoteSystem.stop(bigBounceOther)
}
}
override def atStartup() = {
system.eventStream.publish(TestEvent.Mute(
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
remoteSystem.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
}
private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte))
val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt
override def afterTermination() {
otherSystem.shutdown()
remoteSystem.shutdown()
AssociationRegistry.clear()
}
@ -203,16 +221,21 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"send dead letters on remote if actor does not exist" in {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(otherSystem)
}(remoteSystem)
}
"not be exhausted by sending to broken connections" in {
val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]""").
withFallback(otherSystem.settings.config)
val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig))
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo"))
withFallback(remoteSystem.settings.config)
val moreSystems = Vector.fill(5)(ActorSystem(remoteSystem.name, tcpOnlyConfig))
moreSystems foreach { sys
sys.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointDisassociatedException](),
EventFilter.warning(pattern = "received dead letter.*")))
sys.actorOf(Props[Echo2], name = "echo")
}
val moreRefs = moreSystems map (sys system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo")
val aliveEcho = system.actorFor(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo")
val n = 100
// first everything is up and running
@ -229,7 +252,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
moreSystems foreach { sys
sys.shutdown()
sys.awaitTermination(5.seconds.dilated)
sys.isTerminated must be(true)
}
1 to n foreach { x
@ -259,7 +281,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
}
"not send to remote re-created actor with same name" in {
val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1")
val echo = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
echo ! 71
expectMsg(71)
echo ! PoisonPill
@ -267,7 +289,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
echo ! 72
expectNoMsg(1.second)
val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1")
val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
echo2 ! 73
expectMsg(73)
// msg to old ActorRef (different uid) should not get through
@ -275,7 +297,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
echo ! 74
expectNoMsg(1.second)
otherSystem.actorFor("/user/otherEcho1") ! 75
remoteSystem.actorFor("/user/otherEcho1") ! 75
expectMsg(75)
system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76
@ -289,7 +311,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
case s: String sender ! context.actorFor(s)
}
}), "looker")
// child is configured to be deployed on remote-sys (otherSystem)
// child is configured to be deployed on remote-sys (remoteSystem)
l ! ((Props[Echo1], "child"))
val child = expectMsgType[ActorRef]
// grandchild is configured to be deployed on RemotingSpec (system)
@ -335,7 +357,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (TCP)" in {
val r = system.actorOf(Props[Echo1], "gonk")
r.path.toString must be ===
s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -351,7 +373,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (UDP)" in {
val r = system.actorOf(Props[Echo1], "zagzag")
r.path.toString must be ===
s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
s"akka.udp://remote-sys@localhost:${port(remoteSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
r ! 42
expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -367,7 +389,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (SSL)" in {
val r = system.actorOf(Props[Echo1], "roghtaar")
r.path.toString must be ===
s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
r ! 42
expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -415,15 +437,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
}
}
"be able to serialize a local actor ref from another actor system" in {
val config = ConfigFactory.parseString("""
akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"]
akka.remote.test.local-address = "test://other-system@localhost:12347"
""").withFallback(remoteSystem.settings.config)
val otherSystem = ActorSystem("other-system", config)
try {
val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy")
// check that we use the specified transport address instead of the default
val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp"))
val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo")
val proxyTcp = system.actorOf(Props(new Proxy(remoteEchoHereTcp, testActor)), "proxy-tcp")
proxyTcp ! otherGuy
expectMsg(3.seconds, ("pong", otherGuyRemoteTcp))
// now check that we fall back to default when we haven't got a corresponding transport
val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test"))
val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo")
val proxySsl = system.actorOf(Props(new Proxy(remoteEchoHereSsl, testActor)), "proxy-ssl")
proxySsl ! otherGuy
expectMsg(3.seconds, ("pong", otherGuyRemoteTest))
} finally {
otherSystem.shutdown()
otherSystem.awaitTermination(5.seconds.dilated)
}
}
}
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
otherSystem.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
}
}

View file

@ -76,7 +76,7 @@ abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: FiniteDuration,
stop
}
initialize
initialize()
}
object Buncher {

View file

@ -55,7 +55,7 @@ class Chopstick extends Actor with FSM[ChopstickState, TakenBy] {
}
// Initialze the chopstick
initialize
initialize()
}
/**
@ -155,7 +155,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
}
// Initialize the hakker
initialize
initialize()
private def startThinking(duration: FiniteDuration): State = {
goto(Thinking) using TakenChopsticks(None, None) forMax duration