Merge pull request #452 from akka/wip-1755-protobuf-DaemonMsg-patriknw

Replace Java serialization of DaemonMsg by protobuf. See #1755
This commit is contained in:
patriknw 2012-05-21 01:40:43 -07:00
commit b9f7f4e4cc
11 changed files with 2852 additions and 293 deletions

View file

@ -127,7 +127,7 @@ case class Props(
* Java API.
*/
def this(actorClass: Class[_ <: Actor]) = this(
creator = () actorClass.newInstance,
creator = FromClassCreator(actorClass),
dispatcher = Dispatchers.DefaultDispatcherId,
routerConfig = Props.defaultRoutedProps)
@ -150,7 +150,7 @@ case class Props(
*
* Java API.
*/
def withCreator(c: Class[_ <: Actor]): Props = copy(creator = () c.newInstance)
def withCreator(c: Class[_ <: Actor]): Props = copy(creator = FromClassCreator(c))
/**
* Returns a new Props with the specified dispatcher set.
@ -166,4 +166,13 @@ case class Props(
* Returns a new Props with the specified deployment configuration.
*/
def withDeploy(d: Deploy): Props = copy(deploy = d)
}
/**
* Used when creating an Actor from a class. Special Function0 to be
* able to optimize serialization.
*/
private[akka] case class FromClassCreator(clazz: Class[_ <: Actor]) extends Function0[Actor] {
def apply(): Actor = clazz.newInstance
}

View file

@ -1,30 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
option java_package = "akka.actor.mailbox";
option optimize_for = SPEED;
/******************************************
Compile with:
cd ./akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol
protoc MailboxProtocol.proto --java_out ../java
*******************************************/
/**
* Defines the durable mailbox message.
*/
message DurableMailboxMessageProtocol {
required string ownerAddress = 1;
optional string senderAddress = 2;
optional UuidProtocol futureUuid = 3;
required bytes message = 4;
}
/**
* Defines a UUID.
*/
message UuidProtocol {
required uint64 high = 1;
required uint64 low = 2;
}

View file

@ -28,6 +28,9 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
def serialize(durableMessage: Envelope): Array[Byte] = {
// It's alright to use ref.path.toString here
// When the sender is a LocalActorRef it should be local when deserialized also.
// When the sender is a RemoteActorRef the path.toString already contains remote address information.
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])

File diff suppressed because it is too large Load diff

View file

@ -79,10 +79,40 @@ message AddressProtocol {
}
/**
* Defines the durable mailbox message.
* Defines akka.remote.DaemonMsgCreate
*/
message DurableMailboxMessageProtocol {
required ActorRefProtocol recipient= 1;
optional ActorRefProtocol sender = 2;
required bytes message = 3;
message DaemonMsgCreateProtocol {
required PropsProtocol props = 1;
required DeployProtocol deploy = 2;
required string path = 3;
required ActorRefProtocol supervisor = 4;
}
/**
* Serialization of akka.actor.Props
*/
message PropsProtocol {
required string dispatcher = 1;
required DeployProtocol deploy = 2;
optional string fromClassCreator = 3;
optional bytes creator = 4;
optional bytes routerConfig = 5;
}
/**
* Serialization of akka.actor.Deploy
*/
message DeployProtocol {
required string path = 1;
optional bytes config = 2;
optional bytes routerConfig = 3;
optional bytes scope = 4;
}
/**
* Serialization of akka.remote.DaemonMsgWatch
*/
message DaemonMsgWatchProtocol {
required ActorRefProtocol watcher = 1;
required ActorRefProtocol watched = 2;
}

View file

@ -14,6 +14,8 @@ akka {
serializers {
proto = "akka.serialization.ProtobufSerializer"
daemon-create = "akka.serialization.DaemonMsgCreateSerializer"
daemon-watch = "akka.serialization.DaemonMsgWatchSerializer"
}
@ -21,6 +23,8 @@ akka {
# Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage
# does, need to use the more specific one here in order to avoid ambiguity
"com.google.protobuf.GeneratedMessage" = proto
"akka.remote.DaemonMsgCreate" = daemon-create
"akka.remote.DaemonMsgWatch" = daemon-watch
}
deployment {

View file

@ -0,0 +1,152 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import java.io.Serializable
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.NoScopeGiven
import akka.actor.Props
import akka.actor.Scope
import akka.remote.DaemonMsgCreate
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgCreateProtocol
import akka.remote.RemoteProtocol.DeployProtocol
import akka.remote.RemoteProtocol.PropsProtocol
import akka.routing.NoRouter
import akka.routing.RouterConfig
import akka.actor.FromClassCreator
/**
* Serializes akka's internal DaemonMsgCreate using protobuf
* for the core structure of DaemonMsgCreate, Props and Deploy.
* Serialization of contained RouterConfig, Config, and Scope
* is done with configured serializer for those classes, by
* default java.io.Serializable.
*/
class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 3
lazy val serialization = SerializationExtension(system)
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgCreate(props, deploy, path, supervisor)
def deployProto(d: Deploy): DeployProtocol = {
val builder = DeployProtocol.newBuilder.setPath(d.path)
if (d.config != ConfigFactory.empty)
builder.setConfig(serialize(d.config))
if (d.routerConfig != NoRouter)
builder.setRouterConfig(serialize(d.routerConfig))
if (d.scope != NoScopeGiven)
builder.setScope(serialize(d.scope))
builder.build
}
def propsProto = {
val builder = PropsProtocol.newBuilder.
setDispatcher(props.dispatcher).
setDeploy(deployProto(props.deploy))
props.creator match {
case FromClassCreator(clazz) builder.setFromClassCreator(clazz.getName)
case creator builder.setCreator(serialize(creator))
}
if (props.routerConfig != NoRouter)
builder.setRouterConfig(serialize(props.routerConfig))
builder.build
}
DaemonMsgCreateProtocol.newBuilder.
setProps(propsProto).
setDeploy(deployProto(deploy)).
setPath(path).
setSupervisor(serializeActorRef(supervisor)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgCreateProtocol.parseFrom(bytes)
def deploy(protoDeploy: DeployProtocol) = {
val config =
if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config])
else ConfigFactory.empty
val routerConfig =
if (protoDeploy.hasRouterConfig) deserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
else NoRouter
val scope =
if (protoDeploy.hasScope) deserialize(protoDeploy.getScope, classOf[Scope])
else NoScopeGiven
Deploy(protoDeploy.getPath, config, routerConfig, scope)
}
def props = {
val creator =
if (proto.getProps.hasFromClassCreator) {
system.dynamicAccess.getClassFor(proto.getProps.getFromClassCreator) match {
case Right(clazz) FromClassCreator(clazz)
case Left(e) throw e
}
} else {
deserialize(proto.getProps.getCreator, classOf[() Actor])
}
val routerConfig =
if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig])
else NoRouter
Props(
creator = creator,
dispatcher = proto.getProps.getDispatcher,
routerConfig = routerConfig,
deploy = deploy(proto.getProps.getDeploy))
}
DaemonMsgCreate(
props = props,
deploy = deploy(proto.getDeploy),
path = proto.getPath,
supervisor = deserializeActorRef(system, proto.getSupervisor))
}
protected def serialize(any: AnyRef): ByteString =
serialization.serialize(any) match {
case Right(bytes) ByteString.copyFrom(bytes)
case Left(e) throw e
}
protected def deserialize[T: ClassManifest](data: ByteString, clazz: Class[T]): T = {
val bytes = data.toByteArray
serialization.deserialize(bytes, clazz) match {
case Right(x) if classManifest[T].erasure.isInstance(x) x.asInstanceOf[T]
case Right(other) throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".
format(clazz.getName, other))
case Left(e)
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
// com.typesafe.config.Config
// akka.routing.RouterConfig
// akka.actor.Scope
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
case Right(x) if classManifest[T].erasure.isInstance(x) x.asInstanceOf[T]
case _ throw e // the first exception
}
}
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor.ActorRef
import akka.remote.DaemonMsgWatch
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol
import akka.actor.ExtendedActorSystem
/**
* Serializes akka's internal DaemonMsgWatch using protobuf.
*/
class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 4
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgWatch(watcher, watched)
DaemonMsgWatchProtocol.newBuilder.
setWatcher(serializeActorRef(watcher)).
setWatched(serializeActorRef(watched)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgWatchProtocol.parseFrom(bytes)
DaemonMsgWatch(
watcher = deserializeActorRef(system, proto.getWatcher),
watched = deserializeActorRef(system, proto.getWatched))
}
}

View file

@ -6,6 +6,32 @@ package akka.serialization
import com.google.protobuf.Message
import akka.actor.DynamicAccess
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.actor.ActorSystem
import akka.actor.ActorRef
object ProtobufSerializer {
/**
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's
* protobuf representation.
*/
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
val identifier: String = Serialization.currentTransportAddress.value match {
case null ref.path.toString
case address ref.path.toStringWithAddress(address)
}
ActorRefProtocol.newBuilder.setPath(identifier).build
}
/**
* Helper to materialize (lookup) an [[akka.actor.ActorRef]]
* from Akka's protobuf representation in the supplied
* [[akka.actor.ActorSystem].
*/
def deserializeActorRef(system: ActorSystem, refProtocol: ActorRefProtocol): ActorRef =
system.actorFor(refProtocol.getPath)
}
/**
* This Serializer serializes `com.google.protobuf.Message`s

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import com.typesafe.config.ConfigFactory
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.actor.Address
import akka.actor.Props
import akka.actor.Deploy
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import akka.remote.DaemonMsgCreate
import akka.remote.RemoteScope
import akka.routing.RoundRobinRouter
import akka.routing.FromConfig
import akka.util.duration._
import akka.actor.FromClassCreator
object DaemonMsgCreateSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgCreateSerializerSpec extends AkkaSpec {
import DaemonMsgCreateSerializerSpec._
val ser = SerializationExtension(system)
val supervisor = system.actorOf(Props[MyActor], "supervisor")
"Serialization" must {
"resolve DaemonMsgCreateSerializer" in {
ser.serializerFor(classOf[DaemonMsgCreate]).getClass must be(classOf[DaemonMsgCreateSerializer])
}
"serialize and de-serialize DaemonMsgCreate with FromClassCreator" in {
verifySerialization {
DaemonMsgCreate(
props = Props[MyActor],
deploy = Deploy(),
path = "foo",
supervisor = supervisor)
}
}
"serialize and de-serialize DaemonMsgCreate with function creator" in {
verifySerialization {
DaemonMsgCreate(
props = Props().withCreator(new MyActor),
deploy = Deploy(),
path = "foo",
supervisor = supervisor)
}
}
"serialize and de-serialize DaemonMsgCreate with Deploy and RouterConfig" in {
verifySerialization {
// Duration.Inf doesn't equal Duration.Inf, so we use another for test
val supervisorStrategy = OneForOneStrategy(3, 10 seconds) {
case _ SupervisorStrategy.Escalate
}
val deploy1 = Deploy(
path = "path1",
config = ConfigFactory.parseString("a=1"),
routerConfig = RoundRobinRouter(nrOfInstances = 5, supervisorStrategy = supervisorStrategy),
scope = RemoteScope(Address("akka", "Test", "host1", 1921)))
val deploy2 = Deploy(
path = "path2",
config = ConfigFactory.parseString("a=2"),
routerConfig = FromConfig,
scope = RemoteScope(Address("akka", "Test", "host2", 1922)))
DaemonMsgCreate(
props = Props[MyActor].withDispatcher("my-disp").withDeploy(deploy1),
deploy = deploy2,
path = "foo",
supervisor = supervisor)
}
}
def verifySerialization(msg: DaemonMsgCreate): Unit = {
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
case Left(exception) fail(exception)
case Right(m: DaemonMsgCreate) assertDaemonMsgCreate(msg, m)
}
}
def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = {
// can't compare props.creator when function
if (expected.props.creator.isInstanceOf[FromClassCreator])
assert(got.props.creator === expected.props.creator)
assert(got.props.dispatcher === expected.props.dispatcher)
assert(got.props.dispatcher === expected.props.dispatcher)
assert(got.props.routerConfig === expected.props.routerConfig)
assert(got.props.deploy === expected.props.deploy)
assert(got.deploy === expected.deploy)
assert(got.path === expected.path)
assert(got.supervisor === expected.supervisor)
}
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.testkit.AkkaSpec
import akka.remote.DaemonMsgWatch
import akka.actor.Actor
import akka.actor.Props
object DaemonMsgWatchSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgWatchSerializerSpec extends AkkaSpec {
import DaemonMsgWatchSerializerSpec._
val ser = SerializationExtension(system)
"Serialization" must {
"resolve DaemonMsgWatchSerializer" in {
ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer])
}
"serialize and de-serialize DaemonMsgWatch" in {
val watcher = system.actorOf(Props[MyActor], "watcher")
val watched = system.actorOf(Props[MyActor], "watched")
val msg = DaemonMsgWatch(watcher, watched)
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match {
case Left(exception) fail(exception)
case Right(m) assert(m === msg)
}
}
}
}