From ac4cd8a58eb949ba4cb800d7473ce6bdb56f96b9 Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Fri, 2 Jul 2010 00:16:11 +0530 Subject: [PATCH] type class based actor serialization implemented --- akka-core/src/main/scala/actor/Actor.scala | 92 ----- akka-core/src/main/scala/actor/ActorRef.scala | 293 +-------------- .../scala/actor/SerializationProtocol.scala | 248 +++++++++++++ .../src/main/scala/remote/RemoteServer.scala | 2 +- .../akka/actor/ProtobufProtocol.java | 341 +++++++++++++++++- .../src/test/protocol/ProtobufProtocol.proto | 5 + .../test/scala/SerializableActorSpec.scala | 112 ------ .../SerializableTypeClassActorSpec.scala | 181 ++++++++++ 8 files changed, 788 insertions(+), 486 deletions(-) create mode 100644 akka-core/src/main/scala/actor/SerializationProtocol.scala delete mode 100644 akka-core/src/test/scala/SerializableActorSpec.scala create mode 100644 akka-core/src/test/scala/SerializableTypeClassActorSpec.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5781b16963..7b60609f51 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -36,98 +36,6 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { self.makeRemote(hostname, port) } -/** - * Base trait defining a serializable actor. - * - * @author Jonas Bonér - */ -trait SerializableActor extends Actor - -/** - * Base trait defining a stateless serializable actor. - * - * @author Jonas Bonér - */ -trait StatelessSerializableActor extends SerializableActor - -/** - * Mix in this trait to create a serializable actor, serializable through - * a custom serialization protocol. This actor is the serialized state. - * - * @author Jonas Bonér - */ -trait StatefulSerializerSerializableActor extends SerializableActor { - val serializer: Serializer - - def toBinary: Array[Byte] -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a custom serialization protocol. This actor is wrapping serializable state. - * - * @author Jonas Bonér - */ -trait StatefulWrappedSerializableActor extends SerializableActor { - def toBinary: Array[Byte] - - def fromBinary(bytes: Array[Byte]) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * Protobuf. - * - * @author Jonas Bonér - */ -trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor { - def toBinary: Array[Byte] = toProtobuf.toByteArray - - def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T]) - - val clazz: Class[T] - - def toProtobuf: T - - def fromProtobuf(message: T): Unit -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * Java serialization. - * - * @author Jonas Bonér - */ -trait JavaSerializableActor extends StatefulSerializerSerializableActor { - @transient val serializer = Serializer.Java - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a Java JSON parser (Jackson). - * - * @author Jonas Bonér - */ -trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor { - val serializer = Serializer.JavaJSON - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - -/** - * Mix in this trait to create a serializable actor, serializable through - * a Scala JSON parser (SJSON). - * - * @author Jonas Bonér - */ -trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor { - val serializer = Serializer.ScalaJSON - - def toBinary: Array[Byte] = serializer.toBinary(this) -} - /** * Life-cycle messages for the Actors */ diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index de1f7b714b..c980f88876 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -27,130 +27,10 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} import java.lang.reflect.Field +import RemoteActorSerialization._ import com.google.protobuf.ByteString -/** - * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation - * or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance. - * - *

- * Binary -> ActorRef: - *

- *   val actorRef = ActorRef.fromBinaryToRemoteActorRef(bytes)
- *   actorRef ! message // send message to remote actor through its reference
- * 
- * - *

- * Protobuf Message -> RemoteActorRef: - *

- *   val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage)
- *   actorRef ! message // send message to remote actor through its reference
- * 
- * - *

- * Protobuf Message -> LocalActorRef: - *

- *   val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage)
- *   actorRef ! message // send message to local actor through its reference
- * 
- * - * @author Jonas Bonér - */ -object ActorRef { - - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ - def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = - fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ - def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) - - /** - * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. - */ - private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) - RemoteActorRef( - protocol.getUuid, - protocol.getActorClassname, - protocol.getHomeAddress.getHostname, - protocol.getHomeAddress.getPort, - protocol.getTimeout, - loader) - } - - /** - * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. - */ - def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef = - fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - - /** - * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. - */ - def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) - - /** - * Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance. - */ - private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) - - val serializer = if (protocol.hasSerializerClassname) { - val serializerClass = - if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) - else Class.forName(protocol.getSerializerClassname) - Some(serializerClass.newInstance.asInstanceOf[Serializer]) - } else None - - val lifeCycle = - if (protocol.hasLifeCycle) { - val lifeCycleProtocol = protocol.getLifeCycle - val restartCallbacks = - if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) - Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart)) - else None - Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) - else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks) - else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) - } else None - - val supervisor = - if (protocol.hasSupervisor) - Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) - else None - - val hotswap = - if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get - .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) - .asInstanceOf[PartialFunction[Any, Unit]]) - else None - - new LocalActorRef( - protocol.getUuid, - protocol.getId, - protocol.getActorClassname, - protocol.getActorInstance.toByteArray, - protocol.getOriginalAddress.getHostname, - protocol.getOriginalAddress.getPort, - if (protocol.hasIsTransactor) protocol.getIsTransactor else false, - if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, - lifeCycle, - supervisor, - hotswap, - loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader? - serializer, - protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]) - } -} - /** * ActorRef is an immutable and serializable handle to an Actor. *

@@ -357,22 +237,6 @@ trait ActorRef extends TransactionManagement { */ def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - /** - * Is the actor is serializable? - */ - def isSerializable: Boolean = actor.isInstanceOf[SerializableActor] - - /** - * Returns the 'Serializer' instance for the Actor as an Option. - *

- * It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor - * trait (which has a Serializer defined) and 'None' if not. - */ - def serializer: Option[Serializer] = - if (actor.isInstanceOf[StatefulSerializerSerializableActor]) - Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer) - else None - /** * Only for internal use. UUID is effectively final. */ @@ -488,11 +352,6 @@ trait ActorRef extends TransactionManagement { } else false } - /** - * Serializes the ActorRef instance into a byte array (Array[Byte]). - */ - def toBinary: Array[Byte] - /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -653,32 +512,6 @@ trait ActorRef extends TransactionManagement { */ def shutdownLinkedActors: Unit - protected def createRemoteRequestProtocolBuilder( - message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = { - val protocol = RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setMessage(MessageSerializer.serialize(message)) - .setTarget(actorClassName) - .setTimeout(timeout) - .setUuid(uuid) - .setIsActor(true) - .setIsOneWay(isOneWay) - .setIsEscaped(false) - - val id = registerSupervisorAsRemoteActor - if (id.isDefined) protocol.setSupervisorUuid(id.get) - - senderOption.foreach { sender => - RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) - protocol.setSender(sender.toRemoteActorRefProtocol) - } - protocol - } - - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol - - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol - protected[akka] def invoke(messageHandle: MessageInvocation): Unit protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit @@ -748,26 +581,16 @@ sealed class LocalActorRef private[akka]( __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], __loader: ClassLoader, - __serializer: Option[Serializer], - __messages: List[RemoteRequestProtocol]) = { + __messages: List[RemoteRequestProtocol], + __format: Format[_ <: Actor]) = { this(() => { val actorClass = __loader.loadClass(__actorClassName) - val actorInstance = actorClass.newInstance - if (actorInstance.isInstanceOf[StatelessSerializableActor]) { - actorInstance.asInstanceOf[Actor] - } else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) { - __serializer.getOrElse(throw new IllegalStateException( - "No serializer defined for SerializableActor [" + actorClass.getName + "]")) - .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] - } else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) { - val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor] - instance.fromBinary(__actorBytes) - instance - } else throw new IllegalStateException( - "Can't deserialize Actor that is not an instance of one of:\n" + - "\n\t- StatelessSerializableActor" + - "\n\t- StatefulSerializerSerializableActor" + - "\n\t- StatefulWrappedSerializableActor") + if (__format.isInstanceOf[SerializerBasedActorFormat[_]]) + __format.asInstanceOf[SerializerBasedActorFormat[_]] + .serializer + .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] + else + actorClass.newInstance.asInstanceOf[Actor] }) loader = Some(__loader) isDeserialized = true @@ -804,98 +627,11 @@ sealed class LocalActorRef private[akka]( if (runActorInitialization && !isDeserialized) initializeActorInstance - /** - * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. - */ - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = guard.withGuard { - val host = homeAddress.getHostName - val port = homeAddress.getPort - - if (!registeredInRemoteNodeDuringSerialization) { - Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) - RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActor(homeAddress, uuid, this) - registeredInRemoteNodeDuringSerialization = true - } - - RemoteActorRefProtocol.newBuilder - .setUuid(uuid) - .setActorClassname(actorClass.getName) - .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) - .setTimeout(timeout) - .build - } - - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard { - if (!isSerializable) throw new IllegalStateException( - "Can't serialize an ActorRef using SerializedActorRefProtocol" + - "\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") - - stop // stop actor since it can not be used any more since we have serialized it and taken all messagess with us - val lifeCycleProtocol: Option[LifeCycleProtocol] = { - def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { - case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) - case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY) - } - val builder = LifeCycleProtocol.newBuilder - lifeCycle match { - case Some(LifeCycle(scope, None)) => - setScope(builder, scope) - Some(builder.build) - case Some(LifeCycle(scope, Some(callbacks))) => - setScope(builder, scope) - builder.setPreRestart(callbacks.preRestart) - builder.setPostRestart(callbacks.postRestart) - Some(builder.build) - case None => None - } - } - - val originalAddress = AddressProtocol.newBuilder - .setHostname(homeAddress.getHostName) - .setPort(homeAddress.getPort) - .build - - val builder = SerializedActorRefProtocol.newBuilder - .setUuid(uuid) - .setId(id) - .setActorClassname(actorClass.getName) - .setOriginalAddress(originalAddress) - .setIsTransactor(isTransactor) - .setTimeout(timeout) - if (actor.isInstanceOf[StatefulSerializerSerializableActor]) builder.setActorInstance( - ByteString.copyFrom(actor.asInstanceOf[StatefulSerializerSerializableActor].toBinary)) - else if (actor.isInstanceOf[StatefulWrappedSerializableActor]) builder.setActorInstance( - ByteString.copyFrom(actor.asInstanceOf[StatefulWrappedSerializableActor].toBinary)) - serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName)) - lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol)) - // FIXME: how to serialize the hotswap PartialFunction ?? - //hotswap.foreach(builder.setHotswapStack(_)) - var message = mailbox.poll - while (message != null) { - builder.addMessages(createRemoteRequestProtocolBuilder( - message.message, message.senderFuture.isEmpty, message.sender)) - message = mailbox.poll - } - builder.build - } - /** * Returns the mailbox. */ def mailbox: Deque[MessageInvocation] = _mailbox - /** - * Serializes the ActorRef instance into a byte array (Array[Byte]). - */ - def toBinary: Array[Byte] = { - val protocol = if (isSerializable) toSerializedActorRefProtocol - else toRemoteActorRefProtocol - Actor.log.debug("Serializing ActorRef to binary:\n" + protocol) - protocol.toByteArray - } - /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -1186,7 +922,7 @@ sealed class LocalActorRef private[akka]( if (remoteAddress.isDefined) { RemoteClient.clientFor(remoteAddress.get).send[Any]( - createRemoteRequestProtocolBuilder(message, true, senderOption).build, None) + createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) if (dispatcher.usesActorMailbox) { @@ -1205,7 +941,7 @@ sealed class LocalActorRef private[akka]( if (remoteAddress.isDefined) { val future = RemoteClient.clientFor(remoteAddress.get).send( - createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture) + createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { @@ -1468,7 +1204,7 @@ private[akka] case class RemoteActorRef private[akka] ( lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader) def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - remoteClient.send[Any](createRemoteRequestProtocolBuilder(message, true, senderOption).build, None) + remoteClient.send[Any](createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None) } def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( @@ -1476,7 +1212,7 @@ private[akka] case class RemoteActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = remoteClient.send(createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture) + val future = remoteClient.send(createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } @@ -1500,7 +1236,6 @@ private[akka] case class RemoteActorRef private[akka] ( // ==== NOT SUPPORTED ==== def actorClass: Class[_ <: Actor] = unsupported - def toBinary: Array[Byte] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported def makeTransactionRequired: Unit = unsupported @@ -1521,8 +1256,6 @@ private[akka] case class RemoteActorRef private[akka] ( def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported - protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = unsupported - protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala new file mode 100644 index 0000000000..7737988d23 --- /dev/null +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.stm.TransactionManagement._ +import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer} +import se.scalablesolutions.akka.serialization.Serializer + +import com.google.protobuf.ByteString + +/** + * Type class definition for Actor Serialization + */ +trait FromBinary[T <: Actor] { + def fromBinary(bytes: Array[Byte], act: T): T +} + +trait ToBinary[T <: Actor] { + def toBinary(t: T): Array[Byte] +} + +// client needs to implement Format[] for the respective actor +trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] + +/** + * A default implementation for a stateless actor + * + * Create a Format object with the client actor as the implementation of the type class + * + *

+ * object BinaryFormatMyStatelessActor {
+ *   implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
+ * }
+ * 
+ */ +trait StatelessActorFormat[T <: Actor] extends Format[T] { + def fromBinary(bytes: Array[Byte], act: T) = act + def toBinary(ac: T) = Array.empty[Byte] +} + +/** + * A default implementation of the type class for a Format that specifies a serializer + * + * Create a Format object with the client actor as the implementation of the type class and + * a serializer object + * + *
+ * object BinaryFormatMyJavaSerializableActor {
+ *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
+ *     val serializer = Serializer.Java
+ *   }
+ * }
+ * 
+ */ +trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { + val serializer: Serializer + def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] + def toBinary(ac: T) = serializer.toBinary(ac) +} + +/** + * Module for local actor serialization + */ +object ActorSerialization { + + def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = + fromBinaryToLocalActorRef(bytes, format) + + def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = { + toSerializedActorRefProtocol(a, format).toByteArray + } + + private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = { + val lifeCycleProtocol: Option[LifeCycleProtocol] = { + def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { + case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) + case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY) + } + val builder = LifeCycleProtocol.newBuilder + a.lifeCycle match { + case Some(LifeCycle(scope, None)) => + setScope(builder, scope) + Some(builder.build) + case Some(LifeCycle(scope, Some(callbacks))) => + setScope(builder, scope) + builder.setPreRestart(callbacks.preRestart) + builder.setPostRestart(callbacks.postRestart) + Some(builder.build) + case None => None + } + } + + val originalAddress = AddressProtocol.newBuilder + .setHostname(a.homeAddress.getHostName) + .setPort(a.homeAddress.getPort) + .build + + val builder = SerializedActorRefProtocol.newBuilder + .setUuid(a.uuid) + .setId(a.id) + .setActorClassname(a.actorClass.getName) + .setOriginalAddress(originalAddress) + .setIsTransactor(a.isTransactor) + .setTimeout(a.timeout) + + builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T]))) + lifeCycleProtocol.foreach(builder.setLifeCycle(_)) + a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) + // FIXME: how to serialize the hotswap PartialFunction ?? + //hotswap.foreach(builder.setHotswapStack(_)) + builder.build + } + + private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = + fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) + + private def fromProtobufToLocalActorRef[T <: Actor](protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { + Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) + + val serializer = + if (format.isInstanceOf[SerializerBasedActorFormat[_]]) + Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) + else None + + val lifeCycle = + if (protocol.hasLifeCycle) { + val lifeCycleProtocol = protocol.getLifeCycle + val restartCallbacks = + if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) + Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart)) + else None + Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) + else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks) + else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) + } else None + + val supervisor = + if (protocol.hasSupervisor) + Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) + else None + + val hotswap = + if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get + .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) + .asInstanceOf[PartialFunction[Any, Unit]]) + else None + + val ar = new LocalActorRef( + protocol.getUuid, + protocol.getId, + protocol.getActorClassname, + protocol.getActorInstance.toByteArray, + protocol.getOriginalAddress.getHostname, + protocol.getOriginalAddress.getPort, + if (protocol.hasIsTransactor) protocol.getIsTransactor else false, + if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, + lifeCycle, + supervisor, + hotswap, + loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader? + protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]], format) + + if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false) + format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T]) + ar + } +} + +object RemoteActorSerialization { + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ + def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = + fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ + def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = + fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + + /** + * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. + */ + private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) + RemoteActorRef( + protocol.getUuid, + protocol.getActorClassname, + protocol.getHomeAddress.getHostname, + protocol.getHomeAddress.getPort, + protocol.getTimeout, + loader) + } + + /** + * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. + */ + def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = { + import ar._ + val host = homeAddress.getHostName + val port = homeAddress.getPort + + if (!registeredInRemoteNodeDuringSerialization) { + Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) + RemoteServer.getOrCreateServer(homeAddress) + RemoteServer.registerActor(homeAddress, uuid, ar) + registeredInRemoteNodeDuringSerialization = true + } + + RemoteActorRefProtocol.newBuilder + .setUuid(uuid) + .setActorClassname(actorClass.getName) + .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) + .setTimeout(timeout) + .build + } + + def createRemoteRequestProtocolBuilder(ar: ActorRef, + message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = { + import ar._ + val protocol = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) + .setMessage(MessageSerializer.serialize(message)) + .setTarget(actorClassName) + .setTimeout(timeout) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(isOneWay) + .setIsEscaped(false) + + val id = registerSupervisorAsRemoteActor + if (id.isDefined) protocol.setSupervisorUuid(id.get) + + senderOption.foreach { sender => + RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) + protocol.setSender(toRemoteActorRefProtocol(sender)) + } + protocol + } +} diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 6127ac9f62..943481ebc3 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -365,7 +365,7 @@ class RemoteServerHandler( actorRef.start val message = MessageSerializer.deserialize(request.getMessage) val sender = - if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) + if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None if (request.getIsOneWay) actorRef.!(message)(sender) else { diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java index 8d9ab752ee..183d2025d0 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java @@ -661,6 +661,331 @@ public final class ProtobufProtocol { // @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.Counter) } + public static final class DualCounter extends + com.google.protobuf.GeneratedMessage { + // Use DualCounter.newBuilder() to construct. + private DualCounter() { + initFields(); + } + private DualCounter(boolean noInit) {} + + private static final DualCounter defaultInstance; + public static DualCounter getDefaultInstance() { + return defaultInstance; + } + + public DualCounter getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable; + } + + // required uint32 count1 = 1; + public static final int COUNT1_FIELD_NUMBER = 1; + private boolean hasCount1; + private int count1_ = 0; + public boolean hasCount1() { return hasCount1; } + public int getCount1() { return count1_; } + + // required uint32 count2 = 2; + public static final int COUNT2_FIELD_NUMBER = 2; + private boolean hasCount2; + private int count2_ = 0; + public boolean hasCount2() { return hasCount2; } + public int getCount2() { return count2_; } + + private void initFields() { + } + public final boolean isInitialized() { + if (!hasCount1) return false; + if (!hasCount2) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasCount1()) { + output.writeUInt32(1, getCount1()); + } + if (hasCount2()) { + output.writeUInt32(2, getCount2()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasCount1()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(1, getCount1()); + } + if (hasCount2()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(2, getCount2()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter result; + + // Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter(); + return builder; + } + + protected se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDescriptor(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter getDefaultInstanceForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter) { + return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter other) { + if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDefaultInstance()) return this; + if (other.hasCount1()) { + setCount1(other.getCount1()); + } + if (other.hasCount2()) { + setCount2(other.getCount2()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setCount1(input.readUInt32()); + break; + } + case 16: { + setCount2(input.readUInt32()); + break; + } + } + } + } + + + // required uint32 count1 = 1; + public boolean hasCount1() { + return result.hasCount1(); + } + public int getCount1() { + return result.getCount1(); + } + public Builder setCount1(int value) { + result.hasCount1 = true; + result.count1_ = value; + return this; + } + public Builder clearCount1() { + result.hasCount1 = false; + result.count1_ = 0; + return this; + } + + // required uint32 count2 = 2; + public boolean hasCount2() { + return result.hasCount2(); + } + public int getCount2() { + return result.getCount2(); + } + public Builder setCount2(int value) { + result.hasCount2 = true; + result.count2_ = value; + return this; + } + public Builder clearCount2() { + result.hasCount2 = false; + result.count2_ = 0; + return this; + } + + // @@protoc_insertion_point(builder_scope:se.scalablesolutions.akka.actor.DualCounter) + } + + static { + defaultInstance = new DualCounter(true); + se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.DualCounter) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; private static @@ -671,6 +996,11 @@ public final class ProtobufProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -683,7 +1013,8 @@ public final class ProtobufProtocol { "\n\026ProtobufProtocol.proto\022\037se.scalablesol" + "utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" + "\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010\"\030\n\007C" + - "ounter\022\r\n\005count\030\001 \002(\r" + "ounter\022\r\n\005count\030\001 \002(\r\"-\n\013DualCounter\022\016\n\006" + + "count1\030\001 \002(\r\022\016\n\006count2\030\002 \002(\r" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -706,6 +1037,14 @@ public final class ProtobufProtocol { new java.lang.String[] { "Count", }, se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.class, se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.Builder.class); + internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor = + getDescriptor().getMessageTypes().get(2); + internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor, + new java.lang.String[] { "Count1", "Count2", }, + se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.class, + se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.Builder.class); return null; } }; diff --git a/akka-core/src/test/protocol/ProtobufProtocol.proto b/akka-core/src/test/protocol/ProtobufProtocol.proto index 17e5357750..35ffec95e3 100644 --- a/akka-core/src/test/protocol/ProtobufProtocol.proto +++ b/akka-core/src/test/protocol/ProtobufProtocol.proto @@ -19,3 +19,8 @@ message ProtobufPOJO { message Counter { required uint32 count = 1; } + +message DualCounter { + required uint32 count1 = 1; + required uint32 count2 = 2; +} diff --git a/akka-core/src/test/scala/SerializableActorSpec.scala b/akka-core/src/test/scala/SerializableActorSpec.scala deleted file mode 100644 index b4a864db37..0000000000 --- a/akka-core/src/test/scala/SerializableActorSpec.scala +++ /dev/null @@ -1,112 +0,0 @@ -package se.scalablesolutions.akka.actor - -import Actor._ - -import org.scalatest.Spec -import org.scalatest.Assertions -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import com.google.protobuf.Message - -@RunWith(classOf[JUnitRunner]) -class SerializableActorSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterAll { - - describe("SerializableActor") { - it("should be able to serialize and deserialize a JavaSerializableActor") { - val actor1 = actorOf[JavaSerializableTestActor].start - (actor1 !! "hello").getOrElse("_") should equal("world 1") - - val bytes = actor1.toBinary - val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes) - - actor2.start - (actor2 !! "hello").getOrElse("_") should equal("world 2") - } - - it("should be able to serialize and deserialize a ProtobufSerializableActor") { - val actor1 = actorOf[ProtobufSerializableTestActor].start - (actor1 !! "hello").getOrElse("_") should equal("world 1") - (actor1 !! "hello").getOrElse("_") should equal("world 2") - - val bytes = actor1.toBinary - val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes) - - actor2.start - (actor2 !! "hello").getOrElse("_") should equal("world 3") - } - - it("should be able to serialize and deserialize a StatelessSerializableActor") { - val actor1 = actorOf[StatelessSerializableTestActor].start - (actor1 !! "hello").getOrElse("_") should equal("world") - - val bytes = actor1.toBinary - val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes) - - actor2.start - (actor2 !! "hello").getOrElse("_") should equal("world") - } - - it("should be able to serialize and deserialize a StatelessSerializableTestActorWithMessagesInMailbox") { - val actor1 = actorOf[StatelessSerializableTestActorWithMessagesInMailbox].start - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - val actor2 = ActorRef.fromBinaryToLocalActorRef(actor1.toBinary) - Thread.sleep(1000) - (actor2 !! "hello-reply").getOrElse("_") should equal("world") - } - } -} - -@serializable class JavaSerializableTestActor extends JavaSerializableActor { - private var count = 0 - def receive = { - case "hello" => - count = count + 1 - self.reply("world " + count) - } -} - -class StatelessSerializableTestActor extends StatelessSerializableActor { - def receive = { - case "hello" => - self.reply("world") - } -} - -class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufProtocol.Counter] { - val clazz = classOf[ProtobufProtocol.Counter] - private var count = 0 - - def toProtobuf = ProtobufProtocol.Counter.newBuilder.setCount(count).build - def fromProtobuf(message: ProtobufProtocol.Counter) = count = message.getCount - - def receive = { - case "hello" => - count = count + 1 - self.reply("world " + count) - } -} - -class StatelessSerializableTestActorWithMessagesInMailbox extends StatelessSerializableActor { - def receive = { - case "hello" => - if (self ne null) println("# messages in mailbox " + self.mailbox.size) - Thread.sleep(500) - case "hello-reply" => self.reply("world") - } -} - diff --git a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala new file mode 100644 index 0000000000..e44ae76322 --- /dev/null +++ b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala @@ -0,0 +1,181 @@ +package se.scalablesolutions.akka.actor + +import Actor._ + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import com.google.protobuf.Message +import ActorSerialization._ + +@RunWith(classOf[JUnitRunner]) +class SerializableTypeClassActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + import se.scalablesolutions.akka.serialization.Serializer + + object BinaryFormatMyActor { + implicit object MyActorFormat extends Format[MyActor] { + def fromBinary(bytes: Array[Byte], act: MyActor) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + act.count = p.getCount + act + } + def toBinary(ac: MyActor) = + ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray + } + } + + object BinaryFormatMyActorWithDualCounter { + implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] { + def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + act.count1 = p.getCount1 + act.count2 = p.getCount2 + act + } + def toBinary(ac: MyActorWithDualCounter) = + ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray + } + } + + object BinaryFormatMyStatelessActor { + implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor] + } + + object BinaryFormatMyStatelessActorWithMessagesInMailbox { + implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox] + } + + object BinaryFormatMyJavaSerializableActor { + implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { + val serializer = Serializer.Java + } + } + + describe("Serializable actor") { + it("should be able to serialize and de-serialize a stateful actor") { + import BinaryFormatMyActor._ + + val actor1 = actorOf[MyActor].start + (actor1 !! "hello").getOrElse("_") should equal("world 1") + (actor1 !! "hello").getOrElse("_") should equal("world 2") + + val bytes = toBinary(actor1) + val actor2 = fromBinary(bytes) + actor2.start + (actor2 !! "hello").getOrElse("_") should equal("world 3") + } + + it("should be able to serialize and de-serialize a stateful actor with compound state") { + import BinaryFormatMyActorWithDualCounter._ + + val actor1 = actorOf[MyActorWithDualCounter].start + (actor1 !! "hello").getOrElse("_") should equal("world 1 1") + (actor1 !! "hello").getOrElse("_") should equal("world 2 2") + + val bytes = toBinary(actor1) + val actor2 = fromBinary(bytes) + actor2.start + (actor2 !! "hello").getOrElse("_") should equal("world 3 3") + } + + it("should be able to serialize and de-serialize a stateless actor") { + import BinaryFormatMyStatelessActor._ + + val actor1 = actorOf[MyStatelessActor].start + (actor1 !! "hello").getOrElse("_") should equal("world") + (actor1 !! "hello").getOrElse("_") should equal("world") + + val bytes = toBinary(actor1) + val actor2 = fromBinary(bytes) + actor2.start + (actor2 !! "hello").getOrElse("_") should equal("world") + } + + it("should be able to serialize and de-serialize a stateful actor with a given serializer") { + import BinaryFormatMyJavaSerializableActor._ + + val actor1 = actorOf[MyJavaSerializableActor].start + (actor1 !! "hello").getOrElse("_") should equal("world 1") + (actor1 !! "hello").getOrElse("_") should equal("world 2") + + val bytes = toBinary(actor1) + val actor2 = fromBinary(bytes) + actor2.start + (actor2 !! "hello").getOrElse("_") should equal("world 3") + } + + it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { + import BinaryFormatMyStatelessActorWithMessagesInMailbox._ + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + } + } +} + +class MyActorWithDualCounter extends Actor { + var count1 = 0 + var count2 = 0 + def receive = { + case "hello" => + count1 = count1 + 1 + count2 = count2 + 1 + self.reply("world " + count1 + " " + count2) + } +} + +class MyActor extends Actor { + var count = 0 + + def receive = { + case "hello" => + count = count + 1 + self.reply("world " + count) + } +} + +class MyStatelessActor extends Actor { + def receive = { + case "hello" => + self.reply("world") + } +} + +class MyStatelessActorWithMessagesInMailbox extends Actor { + def receive = { + case "hello" => + println("# messages in mailbox " + self.mailbox.size) + Thread.sleep(500) + case "hello-reply" => self.reply("world") + } +} + +@serializable class MyJavaSerializableActor extends Actor { + var count = 0 + + def receive = { + case "hello" => + count = count + 1 + self.reply("world " + count) + } +}