type class based actor serialization implemented
This commit is contained in:
parent
ab11c9c136
commit
ac4cd8a58e
8 changed files with 788 additions and 486 deletions
|
|
@ -36,98 +36,6 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
|
||||||
self.makeRemote(hostname, port)
|
self.makeRemote(hostname, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Base trait defining a serializable actor.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait SerializableActor extends Actor
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base trait defining a stateless serializable actor.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait StatelessSerializableActor extends SerializableActor
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* a custom serialization protocol. This actor <b>is</b> the serialized state.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait StatefulSerializerSerializableActor extends SerializableActor {
|
|
||||||
val serializer: Serializer
|
|
||||||
|
|
||||||
def toBinary: Array[Byte]
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* a custom serialization protocol. This actor <b>is wrapping</b> serializable state.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait StatefulWrappedSerializableActor extends SerializableActor {
|
|
||||||
def toBinary: Array[Byte]
|
|
||||||
|
|
||||||
def fromBinary(bytes: Array[Byte])
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* Protobuf.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait ProtobufSerializableActor[T <: Message] extends StatefulWrappedSerializableActor {
|
|
||||||
def toBinary: Array[Byte] = toProtobuf.toByteArray
|
|
||||||
|
|
||||||
def fromBinary(bytes: Array[Byte]) = fromProtobuf(Serializer.Protobuf.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
|
|
||||||
|
|
||||||
val clazz: Class[T]
|
|
||||||
|
|
||||||
def toProtobuf: T
|
|
||||||
|
|
||||||
def fromProtobuf(message: T): Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* Java serialization.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait JavaSerializableActor extends StatefulSerializerSerializableActor {
|
|
||||||
@transient val serializer = Serializer.Java
|
|
||||||
|
|
||||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* a Java JSON parser (Jackson).
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait JavaJSONSerializableActor extends StatefulSerializerSerializableActor {
|
|
||||||
val serializer = Serializer.JavaJSON
|
|
||||||
|
|
||||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mix in this trait to create a serializable actor, serializable through
|
|
||||||
* a Scala JSON parser (SJSON).
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
trait ScalaJSONSerializableActor extends StatefulSerializerSerializableActor {
|
|
||||||
val serializer = Serializer.ScalaJSON
|
|
||||||
|
|
||||||
def toBinary: Array[Byte] = serializer.toBinary(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Life-cycle messages for the Actors
|
* Life-cycle messages for the Actors
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -27,130 +27,10 @@ import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.{Map => JMap}
|
import java.util.{Map => JMap}
|
||||||
import java.lang.reflect.Field
|
import java.lang.reflect.Field
|
||||||
|
import RemoteActorSerialization._
|
||||||
|
|
||||||
import com.google.protobuf.ByteString
|
import com.google.protobuf.ByteString
|
||||||
|
|
||||||
/**
|
|
||||||
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
|
|
||||||
* or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance.
|
|
||||||
*
|
|
||||||
* <p/>
|
|
||||||
* Binary -> ActorRef:
|
|
||||||
* <pre>
|
|
||||||
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(bytes)
|
|
||||||
* actorRef ! message // send message to remote actor through its reference
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* <p/>
|
|
||||||
* Protobuf Message -> RemoteActorRef:
|
|
||||||
* <pre>
|
|
||||||
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage)
|
|
||||||
* actorRef ! message // send message to remote actor through its reference
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* <p/>
|
|
||||||
* Protobuf Message -> LocalActorRef:
|
|
||||||
* <pre>
|
|
||||||
* val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage)
|
|
||||||
* actorRef ! message // send message to local actor through its reference
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
object ActorRef {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
|
||||||
*/
|
|
||||||
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
|
|
||||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
|
||||||
*/
|
|
||||||
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
|
||||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
|
|
||||||
*/
|
|
||||||
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
|
||||||
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
|
|
||||||
RemoteActorRef(
|
|
||||||
protocol.getUuid,
|
|
||||||
protocol.getActorClassname,
|
|
||||||
protocol.getHomeAddress.getHostname,
|
|
||||||
protocol.getHomeAddress.getPort,
|
|
||||||
protocol.getTimeout,
|
|
||||||
loader)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
|
|
||||||
*/
|
|
||||||
def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef =
|
|
||||||
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
|
|
||||||
*/
|
|
||||||
def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
|
||||||
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
|
|
||||||
*/
|
|
||||||
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
|
||||||
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
|
|
||||||
|
|
||||||
val serializer = if (protocol.hasSerializerClassname) {
|
|
||||||
val serializerClass =
|
|
||||||
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
|
|
||||||
else Class.forName(protocol.getSerializerClassname)
|
|
||||||
Some(serializerClass.newInstance.asInstanceOf[Serializer])
|
|
||||||
} else None
|
|
||||||
|
|
||||||
val lifeCycle =
|
|
||||||
if (protocol.hasLifeCycle) {
|
|
||||||
val lifeCycleProtocol = protocol.getLifeCycle
|
|
||||||
val restartCallbacks =
|
|
||||||
if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
|
|
||||||
Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
|
|
||||||
else None
|
|
||||||
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
|
|
||||||
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
|
|
||||||
else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
|
||||||
} else None
|
|
||||||
|
|
||||||
val supervisor =
|
|
||||||
if (protocol.hasSupervisor)
|
|
||||||
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
|
||||||
else None
|
|
||||||
|
|
||||||
val hotswap =
|
|
||||||
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
|
||||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
|
||||||
.asInstanceOf[PartialFunction[Any, Unit]])
|
|
||||||
else None
|
|
||||||
|
|
||||||
new LocalActorRef(
|
|
||||||
protocol.getUuid,
|
|
||||||
protocol.getId,
|
|
||||||
protocol.getActorClassname,
|
|
||||||
protocol.getActorInstance.toByteArray,
|
|
||||||
protocol.getOriginalAddress.getHostname,
|
|
||||||
protocol.getOriginalAddress.getPort,
|
|
||||||
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
|
|
||||||
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
|
|
||||||
lifeCycle,
|
|
||||||
supervisor,
|
|
||||||
hotswap,
|
|
||||||
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
|
|
||||||
serializer,
|
|
||||||
protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ActorRef is an immutable and serializable handle to an Actor.
|
* ActorRef is an immutable and serializable handle to an Actor.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
@ -357,22 +237,6 @@ trait ActorRef extends TransactionManagement {
|
||||||
*/
|
*/
|
||||||
def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message)
|
def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message)
|
||||||
|
|
||||||
/**
|
|
||||||
* Is the actor is serializable?
|
|
||||||
*/
|
|
||||||
def isSerializable: Boolean = actor.isInstanceOf[SerializableActor]
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the 'Serializer' instance for the Actor as an Option.
|
|
||||||
* <p/>
|
|
||||||
* It returns 'Some(serializer)' if the Actor is extending the StatefulSerializerSerializableActor
|
|
||||||
* trait (which has a Serializer defined) and 'None' if not.
|
|
||||||
*/
|
|
||||||
def serializer: Option[Serializer] =
|
|
||||||
if (actor.isInstanceOf[StatefulSerializerSerializableActor])
|
|
||||||
Some(actor.asInstanceOf[StatefulSerializerSerializableActor].serializer)
|
|
||||||
else None
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only for internal use. UUID is effectively final.
|
* Only for internal use. UUID is effectively final.
|
||||||
*/
|
*/
|
||||||
|
|
@ -488,11 +352,6 @@ trait ActorRef extends TransactionManagement {
|
||||||
} else false
|
} else false
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Serializes the ActorRef instance into a byte array (Array[Byte]).
|
|
||||||
*/
|
|
||||||
def toBinary: Array[Byte]
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the class for the Actor instance that is managed by the ActorRef.
|
* Returns the class for the Actor instance that is managed by the ActorRef.
|
||||||
*/
|
*/
|
||||||
|
|
@ -653,32 +512,6 @@ trait ActorRef extends TransactionManagement {
|
||||||
*/
|
*/
|
||||||
def shutdownLinkedActors: Unit
|
def shutdownLinkedActors: Unit
|
||||||
|
|
||||||
protected def createRemoteRequestProtocolBuilder(
|
|
||||||
message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = {
|
|
||||||
val protocol = RemoteRequestProtocol.newBuilder
|
|
||||||
.setId(RemoteRequestProtocolIdFactory.nextId)
|
|
||||||
.setMessage(MessageSerializer.serialize(message))
|
|
||||||
.setTarget(actorClassName)
|
|
||||||
.setTimeout(timeout)
|
|
||||||
.setUuid(uuid)
|
|
||||||
.setIsActor(true)
|
|
||||||
.setIsOneWay(isOneWay)
|
|
||||||
.setIsEscaped(false)
|
|
||||||
|
|
||||||
val id = registerSupervisorAsRemoteActor
|
|
||||||
if (id.isDefined) protocol.setSupervisorUuid(id.get)
|
|
||||||
|
|
||||||
senderOption.foreach { sender =>
|
|
||||||
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
|
||||||
protocol.setSender(sender.toRemoteActorRefProtocol)
|
|
||||||
}
|
|
||||||
protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol
|
|
||||||
|
|
||||||
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol
|
|
||||||
|
|
||||||
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
|
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
|
||||||
|
|
||||||
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
|
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
|
||||||
|
|
@ -748,26 +581,16 @@ sealed class LocalActorRef private[akka](
|
||||||
__supervisor: Option[ActorRef],
|
__supervisor: Option[ActorRef],
|
||||||
__hotswap: Option[PartialFunction[Any, Unit]],
|
__hotswap: Option[PartialFunction[Any, Unit]],
|
||||||
__loader: ClassLoader,
|
__loader: ClassLoader,
|
||||||
__serializer: Option[Serializer],
|
__messages: List[RemoteRequestProtocol],
|
||||||
__messages: List[RemoteRequestProtocol]) = {
|
__format: Format[_ <: Actor]) = {
|
||||||
this(() => {
|
this(() => {
|
||||||
val actorClass = __loader.loadClass(__actorClassName)
|
val actorClass = __loader.loadClass(__actorClassName)
|
||||||
val actorInstance = actorClass.newInstance
|
if (__format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||||
if (actorInstance.isInstanceOf[StatelessSerializableActor]) {
|
__format.asInstanceOf[SerializerBasedActorFormat[_]]
|
||||||
actorInstance.asInstanceOf[Actor]
|
.serializer
|
||||||
} else if (actorInstance.isInstanceOf[StatefulSerializerSerializableActor]) {
|
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
|
||||||
__serializer.getOrElse(throw new IllegalStateException(
|
else
|
||||||
"No serializer defined for SerializableActor [" + actorClass.getName + "]"))
|
actorClass.newInstance.asInstanceOf[Actor]
|
||||||
.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
|
|
||||||
} else if (actorInstance.isInstanceOf[StatefulWrappedSerializableActor]) {
|
|
||||||
val instance = actorInstance.asInstanceOf[StatefulWrappedSerializableActor]
|
|
||||||
instance.fromBinary(__actorBytes)
|
|
||||||
instance
|
|
||||||
} else throw new IllegalStateException(
|
|
||||||
"Can't deserialize Actor that is not an instance of one of:\n" +
|
|
||||||
"\n\t- StatelessSerializableActor" +
|
|
||||||
"\n\t- StatefulSerializerSerializableActor" +
|
|
||||||
"\n\t- StatefulWrappedSerializableActor")
|
|
||||||
})
|
})
|
||||||
loader = Some(__loader)
|
loader = Some(__loader)
|
||||||
isDeserialized = true
|
isDeserialized = true
|
||||||
|
|
@ -804,98 +627,11 @@ sealed class LocalActorRef private[akka](
|
||||||
|
|
||||||
if (runActorInitialization && !isDeserialized) initializeActorInstance
|
if (runActorInitialization && !isDeserialized) initializeActorInstance
|
||||||
|
|
||||||
/**
|
|
||||||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
|
||||||
*/
|
|
||||||
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = guard.withGuard {
|
|
||||||
val host = homeAddress.getHostName
|
|
||||||
val port = homeAddress.getPort
|
|
||||||
|
|
||||||
if (!registeredInRemoteNodeDuringSerialization) {
|
|
||||||
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
|
|
||||||
RemoteServer.getOrCreateServer(homeAddress)
|
|
||||||
RemoteServer.registerActor(homeAddress, uuid, this)
|
|
||||||
registeredInRemoteNodeDuringSerialization = true
|
|
||||||
}
|
|
||||||
|
|
||||||
RemoteActorRefProtocol.newBuilder
|
|
||||||
.setUuid(uuid)
|
|
||||||
.setActorClassname(actorClass.getName)
|
|
||||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
|
||||||
.setTimeout(timeout)
|
|
||||||
.build
|
|
||||||
}
|
|
||||||
|
|
||||||
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
|
|
||||||
if (!isSerializable) throw new IllegalStateException(
|
|
||||||
"Can't serialize an ActorRef using SerializedActorRefProtocol" +
|
|
||||||
"\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
|
|
||||||
|
|
||||||
stop // stop actor since it can not be used any more since we have serialized it and taken all messagess with us
|
|
||||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
|
||||||
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
|
||||||
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
|
|
||||||
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
|
|
||||||
}
|
|
||||||
val builder = LifeCycleProtocol.newBuilder
|
|
||||||
lifeCycle match {
|
|
||||||
case Some(LifeCycle(scope, None)) =>
|
|
||||||
setScope(builder, scope)
|
|
||||||
Some(builder.build)
|
|
||||||
case Some(LifeCycle(scope, Some(callbacks))) =>
|
|
||||||
setScope(builder, scope)
|
|
||||||
builder.setPreRestart(callbacks.preRestart)
|
|
||||||
builder.setPostRestart(callbacks.postRestart)
|
|
||||||
Some(builder.build)
|
|
||||||
case None => None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val originalAddress = AddressProtocol.newBuilder
|
|
||||||
.setHostname(homeAddress.getHostName)
|
|
||||||
.setPort(homeAddress.getPort)
|
|
||||||
.build
|
|
||||||
|
|
||||||
val builder = SerializedActorRefProtocol.newBuilder
|
|
||||||
.setUuid(uuid)
|
|
||||||
.setId(id)
|
|
||||||
.setActorClassname(actorClass.getName)
|
|
||||||
.setOriginalAddress(originalAddress)
|
|
||||||
.setIsTransactor(isTransactor)
|
|
||||||
.setTimeout(timeout)
|
|
||||||
if (actor.isInstanceOf[StatefulSerializerSerializableActor]) builder.setActorInstance(
|
|
||||||
ByteString.copyFrom(actor.asInstanceOf[StatefulSerializerSerializableActor].toBinary))
|
|
||||||
else if (actor.isInstanceOf[StatefulWrappedSerializableActor]) builder.setActorInstance(
|
|
||||||
ByteString.copyFrom(actor.asInstanceOf[StatefulWrappedSerializableActor].toBinary))
|
|
||||||
serializer.foreach(s => builder.setSerializerClassname(s.getClass.getName))
|
|
||||||
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
|
|
||||||
supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol))
|
|
||||||
// FIXME: how to serialize the hotswap PartialFunction ??
|
|
||||||
//hotswap.foreach(builder.setHotswapStack(_))
|
|
||||||
var message = mailbox.poll
|
|
||||||
while (message != null) {
|
|
||||||
builder.addMessages(createRemoteRequestProtocolBuilder(
|
|
||||||
message.message, message.senderFuture.isEmpty, message.sender))
|
|
||||||
message = mailbox.poll
|
|
||||||
}
|
|
||||||
builder.build
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the mailbox.
|
* Returns the mailbox.
|
||||||
*/
|
*/
|
||||||
def mailbox: Deque[MessageInvocation] = _mailbox
|
def mailbox: Deque[MessageInvocation] = _mailbox
|
||||||
|
|
||||||
/**
|
|
||||||
* Serializes the ActorRef instance into a byte array (Array[Byte]).
|
|
||||||
*/
|
|
||||||
def toBinary: Array[Byte] = {
|
|
||||||
val protocol = if (isSerializable) toSerializedActorRefProtocol
|
|
||||||
else toRemoteActorRefProtocol
|
|
||||||
Actor.log.debug("Serializing ActorRef to binary:\n" + protocol)
|
|
||||||
protocol.toByteArray
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the class for the Actor instance that is managed by the ActorRef.
|
* Returns the class for the Actor instance that is managed by the ActorRef.
|
||||||
*/
|
*/
|
||||||
|
|
@ -1186,7 +922,7 @@ sealed class LocalActorRef private[akka](
|
||||||
|
|
||||||
if (remoteAddress.isDefined) {
|
if (remoteAddress.isDefined) {
|
||||||
RemoteClient.clientFor(remoteAddress.get).send[Any](
|
RemoteClient.clientFor(remoteAddress.get).send[Any](
|
||||||
createRemoteRequestProtocolBuilder(message, true, senderOption).build, None)
|
createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None)
|
||||||
} else {
|
} else {
|
||||||
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
|
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
|
||||||
if (dispatcher.usesActorMailbox) {
|
if (dispatcher.usesActorMailbox) {
|
||||||
|
|
@ -1205,7 +941,7 @@ sealed class LocalActorRef private[akka](
|
||||||
|
|
||||||
if (remoteAddress.isDefined) {
|
if (remoteAddress.isDefined) {
|
||||||
val future = RemoteClient.clientFor(remoteAddress.get).send(
|
val future = RemoteClient.clientFor(remoteAddress.get).send(
|
||||||
createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture)
|
createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture)
|
||||||
if (future.isDefined) future.get
|
if (future.isDefined) future.get
|
||||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -1468,7 +1204,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)
|
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)
|
||||||
|
|
||||||
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
|
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
|
||||||
remoteClient.send[Any](createRemoteRequestProtocolBuilder(message, true, senderOption).build, None)
|
remoteClient.send[Any](createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
|
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
|
||||||
|
|
@ -1476,7 +1212,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
timeout: Long,
|
timeout: Long,
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||||
val future = remoteClient.send(createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture)
|
val future = remoteClient.send(createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture)
|
||||||
if (future.isDefined) future.get
|
if (future.isDefined) future.get
|
||||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||||
}
|
}
|
||||||
|
|
@ -1500,7 +1236,6 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
|
|
||||||
// ==== NOT SUPPORTED ====
|
// ==== NOT SUPPORTED ====
|
||||||
def actorClass: Class[_ <: Actor] = unsupported
|
def actorClass: Class[_ <: Actor] = unsupported
|
||||||
def toBinary: Array[Byte] = unsupported
|
|
||||||
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
|
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
|
||||||
def dispatcher: MessageDispatcher = unsupported
|
def dispatcher: MessageDispatcher = unsupported
|
||||||
def makeTransactionRequired: Unit = unsupported
|
def makeTransactionRequired: Unit = unsupported
|
||||||
|
|
@ -1521,8 +1256,6 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
def mailboxSize: Int = unsupported
|
def mailboxSize: Int = unsupported
|
||||||
def supervisor: Option[ActorRef] = unsupported
|
def supervisor: Option[ActorRef] = unsupported
|
||||||
def shutdownLinkedActors: Unit = unsupported
|
def shutdownLinkedActors: Unit = unsupported
|
||||||
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = unsupported
|
|
||||||
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = unsupported
|
|
||||||
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
|
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
|
||||||
protected[akka] def restart(reason: Throwable): Unit = unsupported
|
protected[akka] def restart(reason: Throwable): Unit = unsupported
|
||||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
|
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
|
||||||
|
|
|
||||||
248
akka-core/src/main/scala/actor/SerializationProtocol.scala
Normal file
248
akka-core/src/main/scala/actor/SerializationProtocol.scala
Normal file
|
|
@ -0,0 +1,248 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.actor
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||||
|
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||||
|
import se.scalablesolutions.akka.stm.global._
|
||||||
|
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||||
|
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||||
|
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||||
|
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer}
|
||||||
|
import se.scalablesolutions.akka.serialization.Serializer
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Type class definition for Actor Serialization
|
||||||
|
*/
|
||||||
|
trait FromBinary[T <: Actor] {
|
||||||
|
def fromBinary(bytes: Array[Byte], act: T): T
|
||||||
|
}
|
||||||
|
|
||||||
|
trait ToBinary[T <: Actor] {
|
||||||
|
def toBinary(t: T): Array[Byte]
|
||||||
|
}
|
||||||
|
|
||||||
|
// client needs to implement Format[] for the respective actor
|
||||||
|
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A default implementation for a stateless actor
|
||||||
|
*
|
||||||
|
* Create a Format object with the client actor as the implementation of the type class
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* object BinaryFormatMyStatelessActor {
|
||||||
|
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
trait StatelessActorFormat[T <: Actor] extends Format[T] {
|
||||||
|
def fromBinary(bytes: Array[Byte], act: T) = act
|
||||||
|
def toBinary(ac: T) = Array.empty[Byte]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A default implementation of the type class for a Format that specifies a serializer
|
||||||
|
*
|
||||||
|
* Create a Format object with the client actor as the implementation of the type class and
|
||||||
|
* a serializer object
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* object BinaryFormatMyJavaSerializableActor {
|
||||||
|
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||||
|
* val serializer = Serializer.Java
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
|
||||||
|
val serializer: Serializer
|
||||||
|
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
|
||||||
|
def toBinary(ac: T) = serializer.toBinary(ac)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Module for local actor serialization
|
||||||
|
*/
|
||||||
|
object ActorSerialization {
|
||||||
|
|
||||||
|
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
|
||||||
|
fromBinaryToLocalActorRef(bytes, format)
|
||||||
|
|
||||||
|
def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = {
|
||||||
|
toSerializedActorRefProtocol(a, format).toByteArray
|
||||||
|
}
|
||||||
|
|
||||||
|
private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
|
||||||
|
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||||
|
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
||||||
|
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
|
||||||
|
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
|
||||||
|
}
|
||||||
|
val builder = LifeCycleProtocol.newBuilder
|
||||||
|
a.lifeCycle match {
|
||||||
|
case Some(LifeCycle(scope, None)) =>
|
||||||
|
setScope(builder, scope)
|
||||||
|
Some(builder.build)
|
||||||
|
case Some(LifeCycle(scope, Some(callbacks))) =>
|
||||||
|
setScope(builder, scope)
|
||||||
|
builder.setPreRestart(callbacks.preRestart)
|
||||||
|
builder.setPostRestart(callbacks.postRestart)
|
||||||
|
Some(builder.build)
|
||||||
|
case None => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val originalAddress = AddressProtocol.newBuilder
|
||||||
|
.setHostname(a.homeAddress.getHostName)
|
||||||
|
.setPort(a.homeAddress.getPort)
|
||||||
|
.build
|
||||||
|
|
||||||
|
val builder = SerializedActorRefProtocol.newBuilder
|
||||||
|
.setUuid(a.uuid)
|
||||||
|
.setId(a.id)
|
||||||
|
.setActorClassname(a.actorClass.getName)
|
||||||
|
.setOriginalAddress(originalAddress)
|
||||||
|
.setIsTransactor(a.isTransactor)
|
||||||
|
.setTimeout(a.timeout)
|
||||||
|
|
||||||
|
builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T])))
|
||||||
|
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
|
||||||
|
a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
|
||||||
|
// FIXME: how to serialize the hotswap PartialFunction ??
|
||||||
|
//hotswap.foreach(builder.setHotswapStack(_))
|
||||||
|
builder.build
|
||||||
|
}
|
||||||
|
|
||||||
|
private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
|
||||||
|
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
|
||||||
|
|
||||||
|
private def fromProtobufToLocalActorRef[T <: Actor](protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
|
||||||
|
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
|
||||||
|
|
||||||
|
val serializer =
|
||||||
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||||
|
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
|
||||||
|
else None
|
||||||
|
|
||||||
|
val lifeCycle =
|
||||||
|
if (protocol.hasLifeCycle) {
|
||||||
|
val lifeCycleProtocol = protocol.getLifeCycle
|
||||||
|
val restartCallbacks =
|
||||||
|
if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
|
||||||
|
Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
|
||||||
|
else None
|
||||||
|
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
|
||||||
|
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
|
||||||
|
else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
||||||
|
} else None
|
||||||
|
|
||||||
|
val supervisor =
|
||||||
|
if (protocol.hasSupervisor)
|
||||||
|
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||||
|
else None
|
||||||
|
|
||||||
|
val hotswap =
|
||||||
|
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
||||||
|
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
||||||
|
.asInstanceOf[PartialFunction[Any, Unit]])
|
||||||
|
else None
|
||||||
|
|
||||||
|
val ar = new LocalActorRef(
|
||||||
|
protocol.getUuid,
|
||||||
|
protocol.getId,
|
||||||
|
protocol.getActorClassname,
|
||||||
|
protocol.getActorInstance.toByteArray,
|
||||||
|
protocol.getOriginalAddress.getHostname,
|
||||||
|
protocol.getOriginalAddress.getPort,
|
||||||
|
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
|
||||||
|
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
|
||||||
|
lifeCycle,
|
||||||
|
supervisor,
|
||||||
|
hotswap,
|
||||||
|
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
|
||||||
|
protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]], format)
|
||||||
|
|
||||||
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
||||||
|
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
||||||
|
ar
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object RemoteActorSerialization {
|
||||||
|
/**
|
||||||
|
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||||
|
*/
|
||||||
|
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
|
||||||
|
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||||
|
*/
|
||||||
|
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||||
|
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
|
||||||
|
*/
|
||||||
|
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
||||||
|
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
|
||||||
|
RemoteActorRef(
|
||||||
|
protocol.getUuid,
|
||||||
|
protocol.getActorClassname,
|
||||||
|
protocol.getHomeAddress.getHostname,
|
||||||
|
protocol.getHomeAddress.getPort,
|
||||||
|
protocol.getTimeout,
|
||||||
|
loader)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||||
|
*/
|
||||||
|
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
|
||||||
|
import ar._
|
||||||
|
val host = homeAddress.getHostName
|
||||||
|
val port = homeAddress.getPort
|
||||||
|
|
||||||
|
if (!registeredInRemoteNodeDuringSerialization) {
|
||||||
|
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
|
||||||
|
RemoteServer.getOrCreateServer(homeAddress)
|
||||||
|
RemoteServer.registerActor(homeAddress, uuid, ar)
|
||||||
|
registeredInRemoteNodeDuringSerialization = true
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteActorRefProtocol.newBuilder
|
||||||
|
.setUuid(uuid)
|
||||||
|
.setActorClassname(actorClass.getName)
|
||||||
|
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||||
|
.setTimeout(timeout)
|
||||||
|
.build
|
||||||
|
}
|
||||||
|
|
||||||
|
def createRemoteRequestProtocolBuilder(ar: ActorRef,
|
||||||
|
message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = {
|
||||||
|
import ar._
|
||||||
|
val protocol = RemoteRequestProtocol.newBuilder
|
||||||
|
.setId(RemoteRequestProtocolIdFactory.nextId)
|
||||||
|
.setMessage(MessageSerializer.serialize(message))
|
||||||
|
.setTarget(actorClassName)
|
||||||
|
.setTimeout(timeout)
|
||||||
|
.setUuid(uuid)
|
||||||
|
.setIsActor(true)
|
||||||
|
.setIsOneWay(isOneWay)
|
||||||
|
.setIsEscaped(false)
|
||||||
|
|
||||||
|
val id = registerSupervisorAsRemoteActor
|
||||||
|
if (id.isDefined) protocol.setSupervisorUuid(id.get)
|
||||||
|
|
||||||
|
senderOption.foreach { sender =>
|
||||||
|
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
||||||
|
protocol.setSender(toRemoteActorRefProtocol(sender))
|
||||||
|
}
|
||||||
|
protocol
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -365,7 +365,7 @@ class RemoteServerHandler(
|
||||||
actorRef.start
|
actorRef.start
|
||||||
val message = MessageSerializer.deserialize(request.getMessage)
|
val message = MessageSerializer.deserialize(request.getMessage)
|
||||||
val sender =
|
val sender =
|
||||||
if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
|
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
|
||||||
else None
|
else None
|
||||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||||
else {
|
else {
|
||||||
|
|
|
||||||
|
|
@ -661,6 +661,331 @@ public final class ProtobufProtocol {
|
||||||
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.Counter)
|
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.Counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final class DualCounter extends
|
||||||
|
com.google.protobuf.GeneratedMessage {
|
||||||
|
// Use DualCounter.newBuilder() to construct.
|
||||||
|
private DualCounter() {
|
||||||
|
initFields();
|
||||||
|
}
|
||||||
|
private DualCounter(boolean noInit) {}
|
||||||
|
|
||||||
|
private static final DualCounter defaultInstance;
|
||||||
|
public static DualCounter getDefaultInstance() {
|
||||||
|
return defaultInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DualCounter getDefaultInstanceForType() {
|
||||||
|
return defaultInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final com.google.protobuf.Descriptors.Descriptor
|
||||||
|
getDescriptor() {
|
||||||
|
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
|
||||||
|
internalGetFieldAccessorTable() {
|
||||||
|
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable;
|
||||||
|
}
|
||||||
|
|
||||||
|
// required uint32 count1 = 1;
|
||||||
|
public static final int COUNT1_FIELD_NUMBER = 1;
|
||||||
|
private boolean hasCount1;
|
||||||
|
private int count1_ = 0;
|
||||||
|
public boolean hasCount1() { return hasCount1; }
|
||||||
|
public int getCount1() { return count1_; }
|
||||||
|
|
||||||
|
// required uint32 count2 = 2;
|
||||||
|
public static final int COUNT2_FIELD_NUMBER = 2;
|
||||||
|
private boolean hasCount2;
|
||||||
|
private int count2_ = 0;
|
||||||
|
public boolean hasCount2() { return hasCount2; }
|
||||||
|
public int getCount2() { return count2_; }
|
||||||
|
|
||||||
|
private void initFields() {
|
||||||
|
}
|
||||||
|
public final boolean isInitialized() {
|
||||||
|
if (!hasCount1) return false;
|
||||||
|
if (!hasCount2) return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeTo(com.google.protobuf.CodedOutputStream output)
|
||||||
|
throws java.io.IOException {
|
||||||
|
getSerializedSize();
|
||||||
|
if (hasCount1()) {
|
||||||
|
output.writeUInt32(1, getCount1());
|
||||||
|
}
|
||||||
|
if (hasCount2()) {
|
||||||
|
output.writeUInt32(2, getCount2());
|
||||||
|
}
|
||||||
|
getUnknownFields().writeTo(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int memoizedSerializedSize = -1;
|
||||||
|
public int getSerializedSize() {
|
||||||
|
int size = memoizedSerializedSize;
|
||||||
|
if (size != -1) return size;
|
||||||
|
|
||||||
|
size = 0;
|
||||||
|
if (hasCount1()) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeUInt32Size(1, getCount1());
|
||||||
|
}
|
||||||
|
if (hasCount2()) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeUInt32Size(2, getCount2());
|
||||||
|
}
|
||||||
|
size += getUnknownFields().getSerializedSize();
|
||||||
|
memoizedSerializedSize = size;
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
com.google.protobuf.ByteString data)
|
||||||
|
throws com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
return newBuilder().mergeFrom(data).buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
com.google.protobuf.ByteString data,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
return newBuilder().mergeFrom(data, extensionRegistry)
|
||||||
|
.buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(byte[] data)
|
||||||
|
throws com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
return newBuilder().mergeFrom(data).buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
byte[] data,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
return newBuilder().mergeFrom(data, extensionRegistry)
|
||||||
|
.buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(java.io.InputStream input)
|
||||||
|
throws java.io.IOException {
|
||||||
|
return newBuilder().mergeFrom(input).buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
java.io.InputStream input,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws java.io.IOException {
|
||||||
|
return newBuilder().mergeFrom(input, extensionRegistry)
|
||||||
|
.buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom(java.io.InputStream input)
|
||||||
|
throws java.io.IOException {
|
||||||
|
Builder builder = newBuilder();
|
||||||
|
if (builder.mergeDelimitedFrom(input)) {
|
||||||
|
return builder.buildParsed();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseDelimitedFrom(
|
||||||
|
java.io.InputStream input,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws java.io.IOException {
|
||||||
|
Builder builder = newBuilder();
|
||||||
|
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
|
||||||
|
return builder.buildParsed();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
com.google.protobuf.CodedInputStream input)
|
||||||
|
throws java.io.IOException {
|
||||||
|
return newBuilder().mergeFrom(input).buildParsed();
|
||||||
|
}
|
||||||
|
public static se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter parseFrom(
|
||||||
|
com.google.protobuf.CodedInputStream input,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws java.io.IOException {
|
||||||
|
return newBuilder().mergeFrom(input, extensionRegistry)
|
||||||
|
.buildParsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Builder newBuilder() { return Builder.create(); }
|
||||||
|
public Builder newBuilderForType() { return newBuilder(); }
|
||||||
|
public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter prototype) {
|
||||||
|
return newBuilder().mergeFrom(prototype);
|
||||||
|
}
|
||||||
|
public Builder toBuilder() { return newBuilder(this); }
|
||||||
|
|
||||||
|
public static final class Builder extends
|
||||||
|
com.google.protobuf.GeneratedMessage.Builder<Builder> {
|
||||||
|
private se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter result;
|
||||||
|
|
||||||
|
// Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.newBuilder()
|
||||||
|
private Builder() {}
|
||||||
|
|
||||||
|
private static Builder create() {
|
||||||
|
Builder builder = new Builder();
|
||||||
|
builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter internalGetResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder clear() {
|
||||||
|
if (result == null) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Cannot call clear() after build().");
|
||||||
|
}
|
||||||
|
result = new se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder clone() {
|
||||||
|
return create().mergeFrom(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public com.google.protobuf.Descriptors.Descriptor
|
||||||
|
getDescriptorForType() {
|
||||||
|
return se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter getDefaultInstanceForType() {
|
||||||
|
return se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDefaultInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInitialized() {
|
||||||
|
return result.isInitialized();
|
||||||
|
}
|
||||||
|
public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter build() {
|
||||||
|
if (result != null && !isInitialized()) {
|
||||||
|
throw newUninitializedMessageException(result);
|
||||||
|
}
|
||||||
|
return buildPartial();
|
||||||
|
}
|
||||||
|
|
||||||
|
private se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter buildParsed()
|
||||||
|
throws com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
if (!isInitialized()) {
|
||||||
|
throw newUninitializedMessageException(
|
||||||
|
result).asInvalidProtocolBufferException();
|
||||||
|
}
|
||||||
|
return buildPartial();
|
||||||
|
}
|
||||||
|
|
||||||
|
public se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter buildPartial() {
|
||||||
|
if (result == null) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"build() has already been called on this Builder.");
|
||||||
|
}
|
||||||
|
se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter returnMe = result;
|
||||||
|
result = null;
|
||||||
|
return returnMe;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder mergeFrom(com.google.protobuf.Message other) {
|
||||||
|
if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter) {
|
||||||
|
return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter)other);
|
||||||
|
} else {
|
||||||
|
super.mergeFrom(other);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter other) {
|
||||||
|
if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.getDefaultInstance()) return this;
|
||||||
|
if (other.hasCount1()) {
|
||||||
|
setCount1(other.getCount1());
|
||||||
|
}
|
||||||
|
if (other.hasCount2()) {
|
||||||
|
setCount2(other.getCount2());
|
||||||
|
}
|
||||||
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder mergeFrom(
|
||||||
|
com.google.protobuf.CodedInputStream input,
|
||||||
|
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
|
||||||
|
throws java.io.IOException {
|
||||||
|
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
|
||||||
|
com.google.protobuf.UnknownFieldSet.newBuilder(
|
||||||
|
this.getUnknownFields());
|
||||||
|
while (true) {
|
||||||
|
int tag = input.readTag();
|
||||||
|
switch (tag) {
|
||||||
|
case 0:
|
||||||
|
this.setUnknownFields(unknownFields.build());
|
||||||
|
return this;
|
||||||
|
default: {
|
||||||
|
if (!parseUnknownField(input, unknownFields,
|
||||||
|
extensionRegistry, tag)) {
|
||||||
|
this.setUnknownFields(unknownFields.build());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 8: {
|
||||||
|
setCount1(input.readUInt32());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 16: {
|
||||||
|
setCount2(input.readUInt32());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// required uint32 count1 = 1;
|
||||||
|
public boolean hasCount1() {
|
||||||
|
return result.hasCount1();
|
||||||
|
}
|
||||||
|
public int getCount1() {
|
||||||
|
return result.getCount1();
|
||||||
|
}
|
||||||
|
public Builder setCount1(int value) {
|
||||||
|
result.hasCount1 = true;
|
||||||
|
result.count1_ = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearCount1() {
|
||||||
|
result.hasCount1 = false;
|
||||||
|
result.count1_ = 0;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// required uint32 count2 = 2;
|
||||||
|
public boolean hasCount2() {
|
||||||
|
return result.hasCount2();
|
||||||
|
}
|
||||||
|
public int getCount2() {
|
||||||
|
return result.getCount2();
|
||||||
|
}
|
||||||
|
public Builder setCount2(int value) {
|
||||||
|
result.hasCount2 = true;
|
||||||
|
result.count2_ = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearCount2() {
|
||||||
|
result.hasCount2 = false;
|
||||||
|
result.count2_ = 0;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// @@protoc_insertion_point(builder_scope:se.scalablesolutions.akka.actor.DualCounter)
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
defaultInstance = new DualCounter(true);
|
||||||
|
se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit();
|
||||||
|
defaultInstance.initFields();
|
||||||
|
}
|
||||||
|
|
||||||
|
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.DualCounter)
|
||||||
|
}
|
||||||
|
|
||||||
private static com.google.protobuf.Descriptors.Descriptor
|
private static com.google.protobuf.Descriptors.Descriptor
|
||||||
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
|
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
|
||||||
private static
|
private static
|
||||||
|
|
@ -671,6 +996,11 @@ public final class ProtobufProtocol {
|
||||||
private static
|
private static
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable
|
||||||
internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable;
|
internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable;
|
||||||
|
private static com.google.protobuf.Descriptors.Descriptor
|
||||||
|
internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor;
|
||||||
|
private static
|
||||||
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable
|
||||||
|
internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable;
|
||||||
|
|
||||||
public static com.google.protobuf.Descriptors.FileDescriptor
|
public static com.google.protobuf.Descriptors.FileDescriptor
|
||||||
getDescriptor() {
|
getDescriptor() {
|
||||||
|
|
@ -683,7 +1013,8 @@ public final class ProtobufProtocol {
|
||||||
"\n\026ProtobufProtocol.proto\022\037se.scalablesol" +
|
"\n\026ProtobufProtocol.proto\022\037se.scalablesol" +
|
||||||
"utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" +
|
"utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" +
|
||||||
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010\"\030\n\007C" +
|
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010\"\030\n\007C" +
|
||||||
"ounter\022\r\n\005count\030\001 \002(\r"
|
"ounter\022\r\n\005count\030\001 \002(\r\"-\n\013DualCounter\022\016\n\006" +
|
||||||
|
"count1\030\001 \002(\r\022\016\n\006count2\030\002 \002(\r"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
|
@ -706,6 +1037,14 @@ public final class ProtobufProtocol {
|
||||||
new java.lang.String[] { "Count", },
|
new java.lang.String[] { "Count", },
|
||||||
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.class,
|
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.class,
|
||||||
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.Builder.class);
|
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.Builder.class);
|
||||||
|
internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor =
|
||||||
|
getDescriptor().getMessageTypes().get(2);
|
||||||
|
internal_static_se_scalablesolutions_akka_actor_DualCounter_fieldAccessorTable = new
|
||||||
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
|
internal_static_se_scalablesolutions_akka_actor_DualCounter_descriptor,
|
||||||
|
new java.lang.String[] { "Count1", "Count2", },
|
||||||
|
se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.class,
|
||||||
|
se.scalablesolutions.akka.actor.ProtobufProtocol.DualCounter.Builder.class);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -19,3 +19,8 @@ message ProtobufPOJO {
|
||||||
message Counter {
|
message Counter {
|
||||||
required uint32 count = 1;
|
required uint32 count = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DualCounter {
|
||||||
|
required uint32 count1 = 1;
|
||||||
|
required uint32 count2 = 2;
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,112 +0,0 @@
|
||||||
package se.scalablesolutions.akka.actor
|
|
||||||
|
|
||||||
import Actor._
|
|
||||||
|
|
||||||
import org.scalatest.Spec
|
|
||||||
import org.scalatest.Assertions
|
|
||||||
import org.scalatest.matchers.ShouldMatchers
|
|
||||||
import org.scalatest.BeforeAndAfterAll
|
|
||||||
import org.scalatest.junit.JUnitRunner
|
|
||||||
import org.junit.runner.RunWith
|
|
||||||
|
|
||||||
import com.google.protobuf.Message
|
|
||||||
|
|
||||||
@RunWith(classOf[JUnitRunner])
|
|
||||||
class SerializableActorSpec extends
|
|
||||||
Spec with
|
|
||||||
ShouldMatchers with
|
|
||||||
BeforeAndAfterAll {
|
|
||||||
|
|
||||||
describe("SerializableActor") {
|
|
||||||
it("should be able to serialize and deserialize a JavaSerializableActor") {
|
|
||||||
val actor1 = actorOf[JavaSerializableTestActor].start
|
|
||||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
|
||||||
|
|
||||||
val bytes = actor1.toBinary
|
|
||||||
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
|
|
||||||
|
|
||||||
actor2.start
|
|
||||||
(actor2 !! "hello").getOrElse("_") should equal("world 2")
|
|
||||||
}
|
|
||||||
|
|
||||||
it("should be able to serialize and deserialize a ProtobufSerializableActor") {
|
|
||||||
val actor1 = actorOf[ProtobufSerializableTestActor].start
|
|
||||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
|
||||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
|
||||||
|
|
||||||
val bytes = actor1.toBinary
|
|
||||||
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
|
|
||||||
|
|
||||||
actor2.start
|
|
||||||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
|
||||||
}
|
|
||||||
|
|
||||||
it("should be able to serialize and deserialize a StatelessSerializableActor") {
|
|
||||||
val actor1 = actorOf[StatelessSerializableTestActor].start
|
|
||||||
(actor1 !! "hello").getOrElse("_") should equal("world")
|
|
||||||
|
|
||||||
val bytes = actor1.toBinary
|
|
||||||
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
|
|
||||||
|
|
||||||
actor2.start
|
|
||||||
(actor2 !! "hello").getOrElse("_") should equal("world")
|
|
||||||
}
|
|
||||||
|
|
||||||
it("should be able to serialize and deserialize a StatelessSerializableTestActorWithMessagesInMailbox") {
|
|
||||||
val actor1 = actorOf[StatelessSerializableTestActorWithMessagesInMailbox].start
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
(actor1 ! "hello")
|
|
||||||
val actor2 = ActorRef.fromBinaryToLocalActorRef(actor1.toBinary)
|
|
||||||
Thread.sleep(1000)
|
|
||||||
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@serializable class JavaSerializableTestActor extends JavaSerializableActor {
|
|
||||||
private var count = 0
|
|
||||||
def receive = {
|
|
||||||
case "hello" =>
|
|
||||||
count = count + 1
|
|
||||||
self.reply("world " + count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class StatelessSerializableTestActor extends StatelessSerializableActor {
|
|
||||||
def receive = {
|
|
||||||
case "hello" =>
|
|
||||||
self.reply("world")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufProtocol.Counter] {
|
|
||||||
val clazz = classOf[ProtobufProtocol.Counter]
|
|
||||||
private var count = 0
|
|
||||||
|
|
||||||
def toProtobuf = ProtobufProtocol.Counter.newBuilder.setCount(count).build
|
|
||||||
def fromProtobuf(message: ProtobufProtocol.Counter) = count = message.getCount
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case "hello" =>
|
|
||||||
count = count + 1
|
|
||||||
self.reply("world " + count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class StatelessSerializableTestActorWithMessagesInMailbox extends StatelessSerializableActor {
|
|
||||||
def receive = {
|
|
||||||
case "hello" =>
|
|
||||||
if (self ne null) println("# messages in mailbox " + self.mailbox.size)
|
|
||||||
Thread.sleep(500)
|
|
||||||
case "hello-reply" => self.reply("world")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
181
akka-core/src/test/scala/SerializableTypeClassActorSpec.scala
Normal file
181
akka-core/src/test/scala/SerializableTypeClassActorSpec.scala
Normal file
|
|
@ -0,0 +1,181 @@
|
||||||
|
package se.scalablesolutions.akka.actor
|
||||||
|
|
||||||
|
import Actor._
|
||||||
|
|
||||||
|
import org.scalatest.Spec
|
||||||
|
import org.scalatest.Assertions
|
||||||
|
import org.scalatest.matchers.ShouldMatchers
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import org.scalatest.junit.JUnitRunner
|
||||||
|
import org.junit.runner.RunWith
|
||||||
|
|
||||||
|
import com.google.protobuf.Message
|
||||||
|
import ActorSerialization._
|
||||||
|
|
||||||
|
@RunWith(classOf[JUnitRunner])
|
||||||
|
class SerializableTypeClassActorSpec extends
|
||||||
|
Spec with
|
||||||
|
ShouldMatchers with
|
||||||
|
BeforeAndAfterAll {
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.serialization.Serializer
|
||||||
|
|
||||||
|
object BinaryFormatMyActor {
|
||||||
|
implicit object MyActorFormat extends Format[MyActor] {
|
||||||
|
def fromBinary(bytes: Array[Byte], act: MyActor) = {
|
||||||
|
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
|
||||||
|
act.count = p.getCount
|
||||||
|
act
|
||||||
|
}
|
||||||
|
def toBinary(ac: MyActor) =
|
||||||
|
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object BinaryFormatMyActorWithDualCounter {
|
||||||
|
implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] {
|
||||||
|
def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = {
|
||||||
|
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
|
||||||
|
act.count1 = p.getCount1
|
||||||
|
act.count2 = p.getCount2
|
||||||
|
act
|
||||||
|
}
|
||||||
|
def toBinary(ac: MyActorWithDualCounter) =
|
||||||
|
ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object BinaryFormatMyStatelessActor {
|
||||||
|
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
||||||
|
}
|
||||||
|
|
||||||
|
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
|
||||||
|
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
|
||||||
|
}
|
||||||
|
|
||||||
|
object BinaryFormatMyJavaSerializableActor {
|
||||||
|
implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||||
|
val serializer = Serializer.Java
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("Serializable actor") {
|
||||||
|
it("should be able to serialize and de-serialize a stateful actor") {
|
||||||
|
import BinaryFormatMyActor._
|
||||||
|
|
||||||
|
val actor1 = actorOf[MyActor].start
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||||
|
|
||||||
|
val bytes = toBinary(actor1)
|
||||||
|
val actor2 = fromBinary(bytes)
|
||||||
|
actor2.start
|
||||||
|
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should be able to serialize and de-serialize a stateful actor with compound state") {
|
||||||
|
import BinaryFormatMyActorWithDualCounter._
|
||||||
|
|
||||||
|
val actor1 = actorOf[MyActorWithDualCounter].start
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 1 1")
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 2 2")
|
||||||
|
|
||||||
|
val bytes = toBinary(actor1)
|
||||||
|
val actor2 = fromBinary(bytes)
|
||||||
|
actor2.start
|
||||||
|
(actor2 !! "hello").getOrElse("_") should equal("world 3 3")
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should be able to serialize and de-serialize a stateless actor") {
|
||||||
|
import BinaryFormatMyStatelessActor._
|
||||||
|
|
||||||
|
val actor1 = actorOf[MyStatelessActor].start
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world")
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world")
|
||||||
|
|
||||||
|
val bytes = toBinary(actor1)
|
||||||
|
val actor2 = fromBinary(bytes)
|
||||||
|
actor2.start
|
||||||
|
(actor2 !! "hello").getOrElse("_") should equal("world")
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should be able to serialize and de-serialize a stateful actor with a given serializer") {
|
||||||
|
import BinaryFormatMyJavaSerializableActor._
|
||||||
|
|
||||||
|
val actor1 = actorOf[MyJavaSerializableActor].start
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||||
|
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||||
|
|
||||||
|
val bytes = toBinary(actor1)
|
||||||
|
val actor2 = fromBinary(bytes)
|
||||||
|
actor2.start
|
||||||
|
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {
|
||||||
|
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
|
||||||
|
|
||||||
|
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
(actor1 ! "hello")
|
||||||
|
val actor2 = fromBinary(toBinary(actor1))
|
||||||
|
Thread.sleep(1000)
|
||||||
|
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyActorWithDualCounter extends Actor {
|
||||||
|
var count1 = 0
|
||||||
|
var count2 = 0
|
||||||
|
def receive = {
|
||||||
|
case "hello" =>
|
||||||
|
count1 = count1 + 1
|
||||||
|
count2 = count2 + 1
|
||||||
|
self.reply("world " + count1 + " " + count2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyActor extends Actor {
|
||||||
|
var count = 0
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case "hello" =>
|
||||||
|
count = count + 1
|
||||||
|
self.reply("world " + count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyStatelessActor extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case "hello" =>
|
||||||
|
self.reply("world")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyStatelessActorWithMessagesInMailbox extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case "hello" =>
|
||||||
|
println("# messages in mailbox " + self.mailbox.size)
|
||||||
|
Thread.sleep(500)
|
||||||
|
case "hello-reply" => self.reply("world")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@serializable class MyJavaSerializableActor extends Actor {
|
||||||
|
var count = 0
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case "hello" =>
|
||||||
|
count = count + 1
|
||||||
|
self.reply("world " + count)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue