diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5781b16963..7b60609f51 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -36,98 +36,6 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { self.makeRemote(hostname, port) } -/** - * Base trait defining a serializable actor. - * - * @author Jonas Bonér - */ -trait SerializableActor extends Actor - -/** - * Base trait defining a stateless serializable actor. - * - * @author Jonas Bonér - */ -trait StatelessSerializableActor extends SerializableActor - -/** - * Mix in this trait to create a serializable actor, serializable through - * a custom serialization protocol. This actor is the serialized state. - * - * @author Jonas Bonér - */ -trait StatefulSerializerSerializableActor extends SerializableActor { - val serializer: Serializer - - def toBinary: Array[Byte] -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a custom serialization protocol. This actor is wrapping serializable state. - * - * @author Jonas Bonér - */ -trait StatefulWrappedSerializableActor extends SerializableActor { - def toBinary: Array[Byte] - - def fromBinary(bytes: Array[Byte]) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * Protobuf. - * - * @author Jonas Bonér - */ -trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor { - def toBinary: Array[Byte] = toProtobuf.toByteArray - - def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) - - val clazz: Class[T] - - def toProtobuf: T - - def fromProtobuf(message: T): Unit -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * Java serialization. - * - * @author Jonas Bonér - */ -trait JavaSerializableActor extends StatefulSerializerSerializableActor { - @transient val serializer = Serializer.Java - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a Java JSON parser (Jackson). - * - * @author Jonas Bonér - */ -trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { - val serializer = Serializer.JavaJSON - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a Scala JSON parser (SJSON). - * - * @author Jonas Bonér - */ -trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor { - val serializer = Serializer.ScalaJSON - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - /** * Life-cycle messages for the Actors */ diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index de1f7b714b..c980f88876 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -27,130 +27,10 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} import java.lang.reflect.Field +import RemoteActorSerialization._ import com.google.protobuf.ByteString -/** - * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation - * or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance. - * - *
- * Binary -> ActorRef: - *- * val actorRef = ActorRef.fromBinaryToRemoteActorRef(bytes) - * actorRef ! message // send message to remote actor through its reference - *- * - * - * Protobuf Message -> RemoteActorRef: - *
- * val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage) - * actorRef ! message // send message to remote actor through its reference - *- * - * - * Protobuf Message -> LocalActorRef: - *
- * val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage) - * actorRef ! message // send message to local actor through its reference - *- * - * @author Jonas Bonér - */ -object ActorRef { - - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ - def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = - fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ - def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) - - /** - * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. - */ - private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) - RemoteActorRef( - protocol.getUuid, - protocol.getActorClassname, - protocol.getHomeAddress.getHostname, - protocol.getHomeAddress.getPort, - protocol.getTimeout, - loader) - } - - /** - * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. - */ - def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef = - fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - - /** - * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. - */ - def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) - - /** - * Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance. - */ - private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) - - val serializer = if (protocol.hasSerializerClassname) { - val serializerClass = - if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) - else Class.forName(protocol.getSerializerClassname) - Some(serializerClass.newInstance.asInstanceOf[Serializer]) - } else None - - val lifeCycle = - if (protocol.hasLifeCycle) { - val lifeCycleProtocol = protocol.getLifeCycle - val restartCallbacks = - if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) - Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart)) - else None - Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) - else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks) - else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) - } else None - - val supervisor = - if (protocol.hasSupervisor) - Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) - else None - - val hotswap = - if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get - .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) - .asInstanceOf[PartialFunction[Any, Unit]]) - else None - - new LocalActorRef( - protocol.getUuid, - protocol.getId, - protocol.getActorClassname, - protocol.getActorInstance.toByteArray, - protocol.getOriginalAddress.getHostname, - protocol.getOriginalAddress.getPort, - if (protocol.hasIsTransactor) protocol.getIsTransactor else false, - if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, - lifeCycle, - supervisor, - hotswap, - loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader? - serializer, - protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]) - } -} - /** * ActorRef is an immutable and serializable handle to an Actor. * @@ -357,22 +237,6 @@ trait ActorRef extends TransactionManagement { */ def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - /** - * Is the actor is serializable? - */ - def isSerializable: Boolean = actor.isInstanceOf[SerializableActor] - - /** - * Returns the 'Serializer' instance for the Actor as an Option. - * - * It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor - * trait (which has a Serializer defined) and 'None' if not. - */ - def serializer: Option[Serializer] = - if (actor.isInstanceOf[StatefulSerializerSerializableActor]) - Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer) - else None - /** * Only for internal use. UUID is effectively final. */ @@ -488,11 +352,6 @@ trait ActorRef extends TransactionManagement { } else false } - /** - * Serializes the ActorRef instance into a byte array (Array[Byte]). - */ - def toBinary: Array[Byte] - /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -653,32 +512,6 @@ trait ActorRef extends TransactionManagement { */ def shutdownLinkedActors: Unit - protected def createRemoteRequestProtocolBuilder( - message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = { - val protocol = RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setMessage(MessageSerializer.serialize(message)) - .setTarget(actorClassName) - .setTimeout(timeout) - .setUuid(uuid) - .setIsActor(true) - .setIsOneWay(isOneWay) - .setIsEscaped(false) - - val id = registerSupervisorAsRemoteActor - if (id.isDefined) protocol.setSupervisorUuid(id.get) - - senderOption.foreach { sender => - RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) - protocol.setSender(sender.toRemoteActorRefProtocol) - } - protocol - } - - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol - - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol - protected[akka] def invoke(messageHandle: MessageInvocation): Unit protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit @@ -748,26 +581,16 @@ sealed class LocalActorRef private[akka]( __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], __loader: ClassLoader, - __serializer: Option[Serializer], - __messages: List[RemoteRequestProtocol]) = { + __messages: List[RemoteRequestProtocol], + __format: Format[_ <: Actor]) = { this(() => { val actorClass = __loader.loadClass(__actorClassName) - val actorInstance = actorClass.newInstance - if (actorInstance.isInstanceOf[StatelessSerializableActor]) { - actorInstance.asInstanceOf[Actor] - } else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) { - __serializer.getOrElse(throw new IllegalStateException( - "No serializer defined for SerializableActor [" + actorClass.getName + "]")) - .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] - } else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) { - val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor] - instance.fromBinary(__actorBytes) - instance - } else throw new IllegalStateException( - "Can't deserialize Actor that is not an instance of one of:\n" + - "\n\t- StatelessSerializableActor" + - "\n\t- StatefulSerializerSerializableActor" + - "\n\t- StatefulWrappedSerializableActor") + if (__format.isInstanceOf[SerializerBasedActorFormat[_]]) + __format.asInstanceOf[SerializerBasedActorFormat[_]] + .serializer + .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + else + actorClass.newInstance.asInstanceOf[Actor] }) loader = Some(__loader) isDeserialized = true @@ -804,98 +627,11 @@ sealed class LocalActorRef private[akka]( if (runActorInitialization && !isDeserialized) initializeActorInstance - /** - * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. - */ - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = guard.withGuard { - val host = homeAddress.getHostName - val port = homeAddress.getPort - - if (!registeredInRemoteNodeDuringSerialization) { - Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) - RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActor(homeAddress, uuid, this) - registeredInRemoteNodeDuringSerialization = true - } - - RemoteActorRefProtocol.newBuilder - .setUuid(uuid) - .setActorClassname(actorClass.getName) - .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) - .setTimeout(timeout) - .build - } - - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard { - if (!isSerializable) throw new IllegalStateException( - "Can't serialize an ActorRef using SerializedActorRefProtocol" + - "\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") - - stop // stop actor since it can not be used any more since we have serialized it and taken all messagess with us - val lifeCycleProtocol: Option[LifeCycleProtocol] = { - def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { - case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) - case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY) - } - val builder = LifeCycleProtocol.newBuilder - lifeCycle match { - case Some(LifeCycle(scope, None)) => - setScope(builder, scope) - Some(builder.build) - case Some(LifeCycle(scope, Some(callbacks))) => - setScope(builder, scope) - builder.setPreRestart(callbacks.preRestart) - builder.setPostRestart(callbacks.postRestart) - Some(builder.build) - case None => None - } - } - - val originalAddress = AddressProtocol.newBuilder - .setHostname(homeAddress.getHostName) - .setPort(homeAddress.getPort) - .build - - val builder = SerializedActorRefProtocol.newBuilder - .setUuid(uuid) - .setId(id) - .setActorClassname(actorClass.getName) - .setOriginalAddress(originalAddress) - .setIsTransactor(isTransactor) - .setTimeout(timeout) - if (actor.isInstanceOf[StatefulSerializerSerializableActor]) builder.setActorInstance( - ByteString.copyFrom(actor.asInstanceOf[StatefulSerializerSerializableActor].toBinary)) - else if (actor.isInstanceOf[StatefulWrappedSerializableActor]) builder.setActorInstance( - ByteString.copyFrom(actor.asInstanceOf[StatefulWrappedSerializableActor].toBinary)) - serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName)) - lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol)) - // FIXME: how to serialize the hotswap PartialFunction ?? - //hotswap.foreach(builder.setHotswapStack(_)) - var message = mailbox.poll - while (message != null) { - builder.addMessages(createRemoteRequestProtocolBuilder( - message.message, message.senderFuture.isEmpty, message.sender)) - message = mailbox.poll - } - builder.build - } - /** * Returns the mailbox. */ def mailbox: Deque[MessageInvocation] = _mailbox - /** - * Serializes the ActorRef instance into a byte array (Array[Byte]). - */ - def toBinary: Array[Byte] = { - val protocol = if (isSerializable) toSerializedActorRefProtocol - else toRemoteActorRefProtocol - Actor.log.debug("Serializing ActorRef to binary:\n" + protocol) - protocol.toByteArray - } - /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -1186,7 +922,7 @@ sealed class LocalActorRef private[akka]( if (remoteAddress.isDefined) { RemoteClient.clientFor(remoteAddress.get).send[Any]( - createRemoteRequestProtocolBuilder(message, true, senderOption).build, None) + createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) if (dispatcher.usesActorMailbox) { @@ -1205,7 +941,7 @@ sealed class LocalActorRef private[akka]( if (remoteAddress.isDefined) { val future = RemoteClient.clientFor(remoteAddress.get).send( - createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture) + createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { @@ -1468,7 +1204,7 @@ private[akka] case class RemoteActorRef private[akka] ( lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader) def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - remoteClient.send[Any](createRemoteRequestProtocolBuilder(message, true, senderOption).build, None) + remoteClient.send[Any](createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None) } def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( @@ -1476,7 +1212,7 @@ private[akka] case class RemoteActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = remoteClient.send(createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture) + val future = remoteClient.send(createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } @@ -1500,7 +1236,6 @@ private[akka] case class RemoteActorRef private[akka] ( // ==== NOT SUPPORTED ==== def actorClass: Class[_ <: Actor] = unsupported - def toBinary: Array[Byte] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported def makeTransactionRequired: Unit = unsupported @@ -1521,8 +1256,6 @@ private[akka] case class RemoteActorRef private[akka] ( def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = unsupported - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala new file mode 100644 index 0000000000..7737988d23 --- /dev/null +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB
+ * object BinaryFormatMyStatelessActor {
+ * implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
+ * }
+ *
+ */
+trait StatelessActorFormat[T <: Actor] extends Format[T] {
+ def fromBinary(bytes: Array[Byte], act: T) = act
+ def toBinary(ac: T) = Array.empty[Byte]
+}
+
+/**
+ * A default implementation of the type class for a Format that specifies a serializer
+ *
+ * Create a Format object with the client actor as the implementation of the type class and
+ * a serializer object
+ *
+ *
+ * object BinaryFormatMyJavaSerializableActor {
+ * implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
+ * val serializer = Serializer.Java
+ * }
+ * }
+ *
+ */
+trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
+ val serializer: Serializer
+ def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
+ def toBinary(ac: T) = serializer.toBinary(ac)
+}
+
+/**
+ * Module for local actor serialization
+ */
+object ActorSerialization {
+
+ def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
+ fromBinaryToLocalActorRef(bytes, format)
+
+ def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = {
+ toSerializedActorRefProtocol(a, format).toByteArray
+ }
+
+ private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
+ val lifeCycleProtocol: Option[LifeCycleProtocol] = {
+ def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
+ case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
+ case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
+ }
+ val builder = LifeCycleProtocol.newBuilder
+ a.lifeCycle match {
+ case Some(LifeCycle(scope, None)) =>
+ setScope(builder, scope)
+ Some(builder.build)
+ case Some(LifeCycle(scope, Some(callbacks))) =>
+ setScope(builder, scope)
+ builder.setPreRestart(callbacks.preRestart)
+ builder.setPostRestart(callbacks.postRestart)
+ Some(builder.build)
+ case None => None
+ }
+ }
+
+ val originalAddress = AddressProtocol.newBuilder
+ .setHostname(a.homeAddress.getHostName)
+ .setPort(a.homeAddress.getPort)
+ .build
+
+ val builder = SerializedActorRefProtocol.newBuilder
+ .setUuid(a.uuid)
+ .setId(a.id)
+ .setActorClassname(a.actorClass.getName)
+ .setOriginalAddress(originalAddress)
+ .setIsTransactor(a.isTransactor)
+ .setTimeout(a.timeout)
+
+ builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T])))
+ lifeCycleProtocol.foreach(builder.setLifeCycle(_))
+ a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
+ // FIXME: how to serialize the hotswap PartialFunction ??
+ //hotswap.foreach(builder.setHotswapStack(_))
+ builder.build
+ }
+
+ private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
+ fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
+
+ private def fromProtobufToLocalActorRef[T <: Actor](protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
+ Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
+
+ val serializer =
+ if (format.isInstanceOf[SerializerBasedActorFormat[_]])
+ Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
+ else None
+
+ val lifeCycle =
+ if (protocol.hasLifeCycle) {
+ val lifeCycleProtocol = protocol.getLifeCycle
+ val restartCallbacks =
+ if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
+ Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
+ else None
+ Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
+ else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
+ else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
+ } else None
+
+ val supervisor =
+ if (protocol.hasSupervisor)
+ Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
+ else None
+
+ val hotswap =
+ if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
+ .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
+ .asInstanceOf[PartialFunction[Any, Unit]])
+ else None
+
+ val ar = new LocalActorRef(
+ protocol.getUuid,
+ protocol.getId,
+ protocol.getActorClassname,
+ protocol.getActorInstance.toByteArray,
+ protocol.getOriginalAddress.getHostname,
+ protocol.getOriginalAddress.getPort,
+ if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
+ if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
+ lifeCycle,
+ supervisor,
+ hotswap,
+ loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
+ protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]], format)
+
+ if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
+ format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
+ ar
+ }
+}
+
+object RemoteActorSerialization {
+ /**
+ * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
+ */
+ def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
+ fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
+
+ /**
+ * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
+ */
+ def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
+ fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
+
+ /**
+ * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
+ */
+ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
+ Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
+ RemoteActorRef(
+ protocol.getUuid,
+ protocol.getActorClassname,
+ protocol.getHomeAddress.getHostname,
+ protocol.getHomeAddress.getPort,
+ protocol.getTimeout,
+ loader)
+ }
+
+ /**
+ * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
+ */
+ def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
+ import ar._
+ val host = homeAddress.getHostName
+ val port = homeAddress.getPort
+
+ if (!registeredInRemoteNodeDuringSerialization) {
+ Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
+ RemoteServer.getOrCreateServer(homeAddress)
+ RemoteServer.registerActor(homeAddress, uuid, ar)
+ registeredInRemoteNodeDuringSerialization = true
+ }
+
+ RemoteActorRefProtocol.newBuilder
+ .setUuid(uuid)
+ .setActorClassname(actorClass.getName)
+ .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
+ .setTimeout(timeout)
+ .build
+ }
+
+ def createRemoteRequestProtocolBuilder(ar: ActorRef,
+ message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = {
+ import ar._
+ val protocol = RemoteRequestProtocol.newBuilder
+ .setId(RemoteRequestProtocolIdFactory.nextId)
+ .setMessage(MessageSerializer.serialize(message))
+ .setTarget(actorClassName)
+ .setTimeout(timeout)
+ .setUuid(uuid)
+ .setIsActor(true)
+ .setIsOneWay(isOneWay)
+ .setIsEscaped(false)
+
+ val id = registerSupervisorAsRemoteActor
+ if (id.isDefined) protocol.setSupervisorUuid(id.get)
+
+ senderOption.foreach { sender =>
+ RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
+ protocol.setSender(toRemoteActorRefProtocol(sender))
+ }
+ protocol
+ }
+}
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 6127ac9f62..943481ebc3 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -365,7 +365,7 @@ class RemoteServerHandler(
actorRef.start
val message = MessageSerializer.deserialize(request.getMessage)
val sender =
- if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
+ if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java
index 8d9ab752ee..183d2025d0 100644
--- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java
+++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java
@@ -661,6 +661,331 @@ public final class ProtobufProtocol {
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.Counter)
}
+ public static final class DualCounter extends
+ com.google.protobuf.GeneratedMessage {
+ // Use DualCounter.newBuilder() to construct.
+ private DualCounter() {
+ initFields();
+ }
+ private DualCounter(boolean noInit) {}
+
+ private static final DualCounter defaultInstance;
+ public static DualCounter getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public DualCounter getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable;
+ }
+
+ // required uint32 count1 = 1;
+ public static final int COUNT1_FIELD_NUMBER = 1;
+ private boolean hasCount1;
+ private int count1_ = 0;
+ public boolean hasCount1() { return hasCount1; }
+ public int getCount1() { return count1_; }
+
+ // required uint32 count2 = 2;
+ public static final int COUNT2_FIELD_NUMBER = 2;
+ private boolean hasCount2;
+ private int count2_ = 0;
+ public boolean hasCount2() { return hasCount2; }
+ public int getCount2() { return count2_; }
+
+ private void initFields() {
+ }
+ public final boolean isInitialized() {
+ if (!hasCount1) return false;
+ if (!hasCount2) return false;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (hasCount1()) {
+ output.writeUInt32(1, getCount1());
+ }
+ if (hasCount2()) {
+ output.writeUInt32(2, getCount2());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (hasCount1()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(1, getCount1());
+ }
+ if (hasCount2()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(2, getCount2());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder