Removed typeclass serialization in favor of reflection-based.
Removed possibility to create multiple ClusterNode, now just one in Cluster.node. Changed timestamp format for default EventHandler listener to display NANOS. Cleaned up ClusterModule in ReflectiveAccess. Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
parent
76d9c3c33a
commit
b63709d82c
19 changed files with 279 additions and 317 deletions
|
|
@ -23,25 +23,27 @@ import akka.remote.{ RemoteClientSettings, MessageSerializer }
|
|||
* Module for local actor serialization.
|
||||
*/
|
||||
object ActorSerialization {
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress)(implicit format: Format[T]): ActorRef =
|
||||
implicit val defaultSerializer = Format.Default
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress)(implicit format: Serializer): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, Some(homeAddress), format)
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Serializer): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, None, format)
|
||||
|
||||
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] =
|
||||
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Serializer): Array[Byte] =
|
||||
toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
|
||||
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Serializer): ActorRef =
|
||||
fromBinary(bytes)(format)
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
|
||||
def toBinaryJ[T <: Actor](a: ActorRef, format: Serializer, srlMailBox: Boolean = true): Array[Byte] =
|
||||
toBinary(a, srlMailBox)(format)
|
||||
|
||||
private[akka] def toSerializedActorRefProtocol[T <: Actor](
|
||||
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
|
||||
actorRef: ActorRef, format: Serializer, serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||
actorRef.lifeCycle match {
|
||||
case Permanent ⇒ Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build)
|
||||
|
|
@ -57,6 +59,7 @@ object ActorSerialization {
|
|||
.setTimeout(actorRef.timeout)
|
||||
|
||||
if (serializeMailBox == true) {
|
||||
if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
|
||||
val messages =
|
||||
actorRef.mailbox match {
|
||||
case q: java.util.Queue[MessageInvocation] ⇒
|
||||
|
|
@ -94,18 +97,13 @@ object ActorSerialization {
|
|||
private def fromBinaryToLocalActorRef[T <: Actor](
|
||||
bytes: Array[Byte],
|
||||
homeAddress: Option[InetSocketAddress],
|
||||
format: Format[T]): ActorRef = {
|
||||
format: Serializer): ActorRef = {
|
||||
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
|
||||
fromProtobufToLocalActorRef(builder.build, format, None)
|
||||
}
|
||||
|
||||
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
|
||||
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
|
||||
|
||||
val serializer =
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
|
||||
else None
|
||||
protocol: SerializedActorRefProtocol, format: Serializer, loader: Option[ClassLoader]): ActorRef = {
|
||||
|
||||
val lifeCycle =
|
||||
if (protocol.hasLifeCycle) {
|
||||
|
|
@ -121,19 +119,23 @@ object ActorSerialization {
|
|||
else None
|
||||
|
||||
val hotswap =
|
||||
if (serializer.isDefined && protocol.hasHotswapStack) serializer.get
|
||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]]))
|
||||
.asInstanceOf[Stack[PartialFunction[Any, Unit]]]
|
||||
else Stack[PartialFunction[Any, Unit]]()
|
||||
try {
|
||||
format
|
||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]]))
|
||||
.asInstanceOf[Stack[PartialFunction[Any, Unit]]]
|
||||
} catch {
|
||||
case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]()
|
||||
}
|
||||
|
||||
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
||||
|
||||
val factory = () ⇒ {
|
||||
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||
format.asInstanceOf[SerializerBasedActorFormat[_]].serializer.fromBinary(
|
||||
protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor]
|
||||
else actorClass.newInstance.asInstanceOf[Actor]
|
||||
try {
|
||||
format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor]
|
||||
} catch {
|
||||
case e: Exception ⇒ actorClass.newInstance.asInstanceOf[Actor]
|
||||
}
|
||||
}
|
||||
|
||||
val ar = new LocalActorRef(
|
||||
|
|
@ -149,8 +151,9 @@ object ActorSerialization {
|
|||
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
|
||||
messages.foreach(message ⇒ ar ! MessageSerializer.deserialize(message.getMessage))
|
||||
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
||||
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
||||
//if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
||||
// format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
||||
//ar
|
||||
ar
|
||||
}
|
||||
}
|
||||
|
|
@ -276,23 +279,23 @@ object RemoteActorSerialization {
|
|||
*/
|
||||
object TypedActorSerialization {
|
||||
|
||||
def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U =
|
||||
def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Serializer): U =
|
||||
fromBinaryToLocalTypedActorRef(bytes, format)
|
||||
|
||||
def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = {
|
||||
def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Serializer): Array[Byte] = {
|
||||
toSerializedTypedActorRefProtocol(proxy, format).toByteArray
|
||||
}
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
|
||||
def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Serializer): U =
|
||||
fromBinary(bytes)(format)
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] =
|
||||
def toBinaryJ[T <: Actor](a: AnyRef, format: Serializer): Array[Byte] =
|
||||
toBinary(a)(format)
|
||||
|
||||
private def toSerializedTypedActorRefProtocol[T <: Actor](
|
||||
proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = {
|
||||
proxy: AnyRef, format: Serializer): SerializedTypedActorRefProtocol = {
|
||||
|
||||
val init = AspectInitRegistry.initFor(proxy)
|
||||
if (init eq null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
|
||||
|
|
@ -303,11 +306,11 @@ object TypedActorSerialization {
|
|||
.build
|
||||
}
|
||||
|
||||
private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
|
||||
private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Serializer): U =
|
||||
fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
|
||||
|
||||
private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef](
|
||||
protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = {
|
||||
protocol: SerializedTypedActorRefProtocol, format: Serializer, loader: Option[ClassLoader]): U = {
|
||||
val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader)
|
||||
val intfClass = toClass(loader, protocol.getInterfaceName)
|
||||
TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue