Replace Java serialization of DaemonMsg by protobuf. See #1755
* Serializers for DaemonMsgCreate and DaemonMsgWatch * Protobuf for DaemonMsgCreateProtocol, PropsProtocol, DeployProtocol, DaemonMsgWatchProtocol * Removed unused MailboxProtocol.proto * Fixed wrong serializeActorRef in DurableMessageSerialization
This commit is contained in:
parent
2e248e4b49
commit
6dd017d6c1
10 changed files with 2724 additions and 296 deletions
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
option java_package = "akka.actor.mailbox";
|
||||
option optimize_for = SPEED;
|
||||
|
||||
/******************************************
|
||||
Compile with:
|
||||
cd ./akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol
|
||||
protoc MailboxProtocol.proto --java_out ../java
|
||||
*******************************************/
|
||||
|
||||
/**
|
||||
* Defines the durable mailbox message.
|
||||
*/
|
||||
message DurableMailboxMessageProtocol {
|
||||
required string ownerAddress = 1;
|
||||
optional string senderAddress = 2;
|
||||
optional UuidProtocol futureUuid = 3;
|
||||
required bytes message = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a UUID.
|
||||
*/
|
||||
message UuidProtocol {
|
||||
required uint64 high = 1;
|
||||
required uint64 low = 2;
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ import akka.remote.MessageSerializer
|
|||
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol }
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.ActorSystem
|
||||
import akka.serialization.Serialization
|
||||
|
||||
private[akka] object DurableExecutableMailboxConfig {
|
||||
val Name = "[\\.\\/\\$\\s]".r
|
||||
|
|
@ -26,9 +27,10 @@ abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue
|
|||
|
||||
trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
||||
|
||||
def serialize(durableMessage: Envelope): Array[Byte] = {
|
||||
import akka.serialization.ProtobufSerializer.serializeActorRef
|
||||
import akka.serialization.ProtobufSerializer.deserializeActorRef
|
||||
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
|
||||
def serialize(durableMessage: Envelope): Array[Byte] = {
|
||||
|
||||
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
|
||||
val builder = RemoteMessageProtocol.newBuilder
|
||||
|
|
@ -41,11 +43,9 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
|||
|
||||
def deserialize(bytes: Array[Byte]): Envelope = {
|
||||
|
||||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = system.actorFor(refProtocol.getPath)
|
||||
|
||||
val durableMessage = RemoteMessageProtocol.parseFrom(bytes)
|
||||
val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
|
||||
val sender = deserializeActorRef(durableMessage.getSender)
|
||||
val sender = deserializeActorRef(system, durableMessage.getSender)
|
||||
|
||||
new Envelope(message, sender)(system)
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -79,10 +79,39 @@ message AddressProtocol {
|
|||
}
|
||||
|
||||
/**
|
||||
* Defines the durable mailbox message.
|
||||
* Defines akka.remote.DaemonMsgCreate
|
||||
*/
|
||||
message DurableMailboxMessageProtocol {
|
||||
required ActorRefProtocol recipient= 1;
|
||||
optional ActorRefProtocol sender = 2;
|
||||
required bytes message = 3;
|
||||
message DaemonMsgCreateProtocol {
|
||||
required PropsProtocol props = 1;
|
||||
required DeployProtocol deploy = 2;
|
||||
required string path = 3;
|
||||
required ActorRefProtocol supervisor = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialization of akka.actor.Props
|
||||
*/
|
||||
message PropsProtocol {
|
||||
required bytes creator = 1;
|
||||
required string dispatcher = 2;
|
||||
required DeployProtocol deploy = 3;
|
||||
optional bytes routerConfig = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialization of akka.actor.Deploy
|
||||
*/
|
||||
message DeployProtocol {
|
||||
required string path = 1;
|
||||
optional bytes config = 2;
|
||||
optional bytes routerConfig = 3;
|
||||
optional bytes scope = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialization of akka.remote.DaemonMsgWatch
|
||||
*/
|
||||
message DaemonMsgWatchProtocol {
|
||||
required ActorRefProtocol watcher = 1;
|
||||
required ActorRefProtocol watched = 2;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ akka {
|
|||
|
||||
serializers {
|
||||
proto = "akka.serialization.ProtobufSerializer"
|
||||
daemon-create = "akka.serialization.DaemonMsgCreateSerializer"
|
||||
daemon-watch = "akka.serialization.DaemonMsgWatchSerializer"
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -21,6 +23,8 @@ akka {
|
|||
# Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage
|
||||
# does, need to use the more specific one here in order to avoid ambiguity
|
||||
"com.google.protobuf.GeneratedMessage" = proto
|
||||
"akka.remote.DaemonMsgCreate" = daemon-create
|
||||
"akka.remote.DaemonMsgWatch" = daemon-watch
|
||||
}
|
||||
|
||||
deployment {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
||||
import java.io.Serializable
|
||||
|
||||
import com.google.protobuf.ByteString
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.NoScopeGiven
|
||||
import akka.actor.Props
|
||||
import akka.actor.Scope
|
||||
import akka.remote.DaemonMsgCreate
|
||||
import akka.remote.RemoteProtocol.ActorRefProtocol
|
||||
import akka.remote.RemoteProtocol.DaemonMsgCreateProtocol
|
||||
import akka.remote.RemoteProtocol.DeployProtocol
|
||||
import akka.remote.RemoteProtocol.PropsProtocol
|
||||
import akka.routing.NoRouter
|
||||
import akka.routing.RouterConfig
|
||||
|
||||
/**
|
||||
* Serializes akka's internal DaemonMsgCreate using protobuf
|
||||
* for the core structure of DaemonMsgCreate, Props and Deploy.
|
||||
* Serialization of contained RouterConfig, Config, Scope, and creator (scala.Function0)
|
||||
* is done with configured serializer for those classes, by default java.io.Serializable.
|
||||
*/
|
||||
class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||
import ProtobufSerializer.serializeActorRef
|
||||
import ProtobufSerializer.deserializeActorRef
|
||||
|
||||
def includeManifest: Boolean = true
|
||||
def identifier = 3
|
||||
lazy val serialization = SerializationExtension(system)
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
|
||||
def deployProto(d: Deploy): DeployProtocol = {
|
||||
val builder = DeployProtocol.newBuilder.setPath(d.path)
|
||||
if (d.config != ConfigFactory.empty)
|
||||
builder.setConfig(serialize(d.config))
|
||||
if (d.routerConfig != NoRouter)
|
||||
builder.setRouterConfig(serialize(d.routerConfig))
|
||||
if (d.scope != NoScopeGiven)
|
||||
builder.setScope(serialize(d.scope))
|
||||
builder.build
|
||||
}
|
||||
|
||||
def propsProto = {
|
||||
val builder = PropsProtocol.newBuilder.
|
||||
setCreator(serialize(props.creator)).
|
||||
setDispatcher(props.dispatcher).
|
||||
setDeploy(deployProto(props.deploy))
|
||||
if (props.routerConfig != NoRouter)
|
||||
builder.setRouterConfig(serialize(props.routerConfig))
|
||||
builder.build
|
||||
}
|
||||
|
||||
DaemonMsgCreateProtocol.newBuilder.
|
||||
setProps(propsProto).
|
||||
setDeploy(deployProto(deploy)).
|
||||
setPath(path).
|
||||
setSupervisor(serializeActorRef(supervisor)).
|
||||
build.toByteArray
|
||||
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(
|
||||
"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [%s]".format(obj))
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
|
||||
val proto = DaemonMsgCreateProtocol.parseFrom(bytes)
|
||||
|
||||
def deploy(protoDeploy: DeployProtocol) = {
|
||||
val config =
|
||||
if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config])
|
||||
else ConfigFactory.empty
|
||||
val routerConfig =
|
||||
if (protoDeploy.hasRouterConfig) deserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
|
||||
else NoRouter
|
||||
val scope =
|
||||
if (protoDeploy.hasScope) deserialize(protoDeploy.getScope, classOf[Scope])
|
||||
else NoScopeGiven
|
||||
Deploy(protoDeploy.getPath, config, routerConfig, scope)
|
||||
}
|
||||
|
||||
def props = {
|
||||
val routerConfig =
|
||||
if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig])
|
||||
else NoRouter
|
||||
Props(
|
||||
creator = deserialize(proto.getProps.getCreator, classOf[() ⇒ Actor]),
|
||||
dispatcher = proto.getProps.getDispatcher,
|
||||
routerConfig = routerConfig,
|
||||
deploy = deploy(proto.getProps.getDeploy))
|
||||
}
|
||||
|
||||
DaemonMsgCreate(
|
||||
props = props,
|
||||
deploy = deploy(proto.getDeploy),
|
||||
path = proto.getPath,
|
||||
supervisor = deserializeActorRef(system, proto.getSupervisor))
|
||||
}
|
||||
|
||||
protected def serialize(any: AnyRef): ByteString =
|
||||
serialization.serialize(any) match {
|
||||
case Right(bytes) ⇒ ByteString.copyFrom(bytes)
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
|
||||
protected def deserialize[T: ClassManifest](data: ByteString, clazz: Class[T]): T = {
|
||||
val bytes = data.toByteArray
|
||||
serialization.deserialize(bytes, clazz) match {
|
||||
case Right(x) if classManifest[T].erasure.isInstance(x) ⇒ x.asInstanceOf[T]
|
||||
case Right(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".
|
||||
format(clazz.getName, other))
|
||||
case Left(e) ⇒
|
||||
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
|
||||
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
|
||||
// scala.Function0 (the creator)
|
||||
// com.typesafe.config.Config
|
||||
// akka.routing.RouterConfig
|
||||
// akka.actor.Scope
|
||||
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
|
||||
case Right(x) if classManifest[T].erasure.isInstance(x) ⇒ x.asInstanceOf[T]
|
||||
case _ ⇒ throw e // the first exception
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.remote.DaemonMsgWatch
|
||||
import akka.remote.RemoteProtocol.ActorRefProtocol
|
||||
import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol
|
||||
import akka.actor.ExtendedActorSystem
|
||||
|
||||
/**
|
||||
* Serializes akka's internal DaemonMsgWatch using protobuf.
|
||||
*/
|
||||
class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||
import ProtobufSerializer.serializeActorRef
|
||||
import ProtobufSerializer.deserializeActorRef
|
||||
|
||||
def includeManifest: Boolean = true
|
||||
def identifier = 4
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case DaemonMsgWatch(watcher, watched) ⇒
|
||||
DaemonMsgWatchProtocol.newBuilder.
|
||||
setWatcher(serializeActorRef(watcher)).
|
||||
setWatched(serializeActorRef(watched)).
|
||||
build.toByteArray
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(
|
||||
"Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj))
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
|
||||
val proto = DaemonMsgWatchProtocol.parseFrom(bytes)
|
||||
DaemonMsgWatch(
|
||||
watcher = deserializeActorRef(system, proto.getWatcher),
|
||||
watched = deserializeActorRef(system, proto.getWatched))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -6,6 +6,22 @@ package akka.serialization
|
|||
|
||||
import com.google.protobuf.Message
|
||||
import akka.actor.DynamicAccess
|
||||
import akka.remote.RemoteProtocol.ActorRefProtocol
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ProtobufSerializer {
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
|
||||
val identifier: String = Serialization.currentTransportAddress.value match {
|
||||
case null ⇒ ref.path.toString
|
||||
case address ⇒ ref.path.toStringWithAddress(address)
|
||||
}
|
||||
ActorRefProtocol.newBuilder.setPath(identifier).build
|
||||
}
|
||||
|
||||
def deserializeActorRef(system: ActorSystem, refProtocol: ActorRefProtocol): ActorRef =
|
||||
system.actorFor(refProtocol.getPath)
|
||||
}
|
||||
|
||||
/**
|
||||
* This Serializer serializes `com.google.protobuf.Message`s
|
||||
|
|
|
|||
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Address
|
||||
import akka.actor.Props
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy
|
||||
import akka.remote.DaemonMsgCreate
|
||||
import akka.remote.RemoteScope
|
||||
import akka.routing.RoundRobinRouter
|
||||
import akka.routing.FromConfig
|
||||
import akka.util.duration._
|
||||
|
||||
object DaemonMsgCreateSerializerSpec {
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class DaemonMsgCreateSerializerSpec extends AkkaSpec {
|
||||
|
||||
import DaemonMsgCreateSerializerSpec._
|
||||
val ser = SerializationExtension(system)
|
||||
val supervisor = system.actorOf(Props[MyActor], "supervisor")
|
||||
|
||||
"Serialization" must {
|
||||
|
||||
"resolve DaemonMsgCreateSerializer" in {
|
||||
ser.serializerFor(classOf[DaemonMsgCreate]).getClass must be(classOf[DaemonMsgCreateSerializer])
|
||||
}
|
||||
|
||||
"serialize and de-serialize simple DaemonMsgCreate" in {
|
||||
|
||||
val msg = DaemonMsgCreate(
|
||||
props = Props[MyActor],
|
||||
deploy = Deploy(),
|
||||
path = "foo",
|
||||
supervisor = supervisor)
|
||||
|
||||
val bytes = ser.serialize(msg) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(m: DaemonMsgCreate) ⇒ assertDaemonMsgCreate(msg, m)
|
||||
}
|
||||
}
|
||||
|
||||
"serialize and de-serialize DaemonMsgCreate with Deploy and RouterConfig" in {
|
||||
// Duration.Inf doesn't equal Duration.Inf, so we use another for test
|
||||
val supervisorStrategy = OneForOneStrategy(3, 10 seconds) {
|
||||
case _ ⇒ SupervisorStrategy.Escalate
|
||||
}
|
||||
val deploy1 = Deploy(
|
||||
path = "path1",
|
||||
config = ConfigFactory.parseString("a=1"),
|
||||
routerConfig = RoundRobinRouter(nrOfInstances = 5, supervisorStrategy = supervisorStrategy),
|
||||
scope = RemoteScope(Address("akka", "Test", "host1", 1921)))
|
||||
val deploy2 = Deploy(
|
||||
path = "path2",
|
||||
config = ConfigFactory.parseString("a=2"),
|
||||
routerConfig = FromConfig,
|
||||
scope = RemoteScope(Address("akka", "Test", "host2", 1922)))
|
||||
val msg = DaemonMsgCreate(
|
||||
props = Props[MyActor].withDispatcher("my-disp").withDeploy(deploy1),
|
||||
deploy = deploy2,
|
||||
path = "foo",
|
||||
supervisor = supervisor)
|
||||
|
||||
val bytes = ser.serialize(msg) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(m: DaemonMsgCreate) ⇒ assertDaemonMsgCreate(msg, m)
|
||||
}
|
||||
}
|
||||
|
||||
def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = {
|
||||
// can't compare props.creator
|
||||
assert(got.props.dispatcher === expected.props.dispatcher)
|
||||
assert(got.props.dispatcher === expected.props.dispatcher)
|
||||
assert(got.props.routerConfig === expected.props.routerConfig)
|
||||
assert(got.props.deploy === expected.props.deploy)
|
||||
assert(got.deploy === expected.deploy)
|
||||
assert(got.path === expected.path)
|
||||
assert(got.supervisor === expected.supervisor)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.remote.DaemonMsgWatch
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Props
|
||||
|
||||
object DaemonMsgWatchSerializerSpec {
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class DaemonMsgWatchSerializerSpec extends AkkaSpec {
|
||||
|
||||
import DaemonMsgWatchSerializerSpec._
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
|
||||
"Serialization" must {
|
||||
|
||||
"resolve DaemonMsgWatchSerializer" in {
|
||||
ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer])
|
||||
}
|
||||
|
||||
"serialize and de-serialize DaemonMsgWatch" in {
|
||||
val watcher = system.actorOf(Props[MyActor], "watcher")
|
||||
val watched = system.actorOf(Props[MyActor], "watched")
|
||||
val msg = DaemonMsgWatch(watcher, watched)
|
||||
val bytes = ser.serialize(msg) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(m) ⇒ assert(m === msg)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue