Serialization for system messages, RemoteWatcher and ByteString

This commit is contained in:
Endre Sándor Varga 2016-09-28 16:00:50 +02:00
parent 14e0188a1c
commit bf5b607739
13 changed files with 6225 additions and 54 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -51,6 +51,10 @@ message Payload {
optional bytes messageManifest = 4;
}
message WatcherHeartbeatResponse {
required uint64 uid = 1;
}
message Throwable {
required string className = 1;
optional string message = 2;
@ -58,6 +62,12 @@ message Throwable {
repeated StackTraceElement stackTrace = 4;
}
message ActorInitializationException {
optional ActorRef actor = 1;
required string message = 2;
required Payload cause = 3;
}
message StackTraceElement {
required string className = 1;
required string methodName = 2;

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2009-2015 Lightbend Inc. <http://www.lightbend.com>
*/
option java_package = "akka.remote";
option optimize_for = SPEED;
import "ContainerFormats.proto";
/******************************************
* System message formats
******************************************/
message SystemMessage {
enum Type {
CREATE = 0;
RECREATE = 1;
SUSPEND = 2;
RESUME = 3;
TERMINATE = 4;
SUPERVISE = 5;
WATCH = 6;
UNWATCH = 7;
FAILED = 8;
DEATHWATCH_NOTIFICATION = 9;
}
required Type type = 1;
optional WatchData watchData = 2;
optional Payload causeData = 3;
optional SuperviseData superviseData = 5;
optional FailedData failedData = 6;
optional DeathWatchNotificationData dwNotificationData = 7;
}
message WatchData {
required ActorRef watchee = 1;
required ActorRef watcher = 2;
}
message SuperviseData {
required ActorRef child = 1;
required bool async = 2;
}
message FailedData {
required ActorRef child = 1;
required uint64 uid = 2;
}
message DeathWatchNotificationData {
required ActorRef actor = 1;
required bool existenceConfirmed = 2;
required bool addressTerminated = 3;
}

View file

@ -18,6 +18,11 @@ akka {
artery = "akka.remote.serialization.ArteryMessageSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
primitive-long = "akka.remote.serialization.LongSerializer"
primitive-int = "akka.remote.serialization.IntSerializer"
primitive-string = "akka.remote.serialization.StringSerializer"
primitive-bytestring = "akka.remote.serialization.ByteStringSerializer"
akka-system-msg = "akka.remote.serialization.SystemMessageSerializer"
}
serialization-bindings {
@ -50,6 +55,23 @@ akka {
"scala.None$" = akka-misc
"akka.actor.Status$Success" = akka-misc
"akka.actor.Status$Failure" = akka-misc
"akka.actor.ActorRef" = akka-misc
"akka.actor.PoisonPill$" = akka-misc
"akka.actor.Kill$" = akka-misc
"akka.remote.RemoteWatcher$Heartbeat$" = akka-misc
"akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc
"akka.actor.ActorInitializationException" = akka-misc
"akka.dispatch.sysmsg.SystemMessage" = akka-system-msg
"java.lang.String" = primitive-string
"akka.util.ByteString$ByteString1C" = primitive-bytestring
"akka.util.ByteString$ByteString1" = primitive-bytestring
"akka.util.ByteString$ByteStrings" = primitive-bytestring
"java.lang.Long" = primitive-long
"scala.Long" = primitive-long
"java.lang.Integer" = primitive-int
"scala.Int" = primitive-int
# Java Serializer is by default used for exceptions.
# It's recommended that you implement custom serializer for exceptions that are
@ -64,9 +86,6 @@ akka {
"akka.actor.ActorKilledException" = akka-misc
"akka.actor.InvalidActorNameException" = akka-misc
"akka.actor.InvalidMessageException" = akka-misc
"akka.actor.ActorRef" = akka-misc
}
serialization-identifiers {
@ -75,6 +94,11 @@ akka {
"akka.remote.serialization.MessageContainerSerializer" = 6
"akka.remote.serialization.MiscMessageSerializer" = 16
"akka.remote.serialization.ArteryMessageSerializer" = 17
"akka.remote.serialization.LongSerializer" = 18
"akka.remote.serialization.IntSerializer" = 19
"akka.remote.serialization.StringSerializer" = 20
"akka.remote.serialization.ByteStringSerializer" = 21
"akka.remote.serialization.SystemMessageSerializer" = 22
}
deployment {

View file

@ -5,29 +5,32 @@ package akka.remote.serialization
import akka.actor._
import akka.protobuf.ByteString
import akka.remote.ContainerFormats
import akka.serialization.{ Serialization, BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.remote.{ ContainerFormats, RemoteWatcher }
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
// WARNING! This must lazy otherwise it will deadlock the ActorSystem creation
private lazy val serialization = SerializationExtension(system)
private val payloadSupport = new WrappedPayloadSupport(system)
private val throwableSupport = new ThrowableSupport(system)
private val ParameterlessSerialized = Array.empty[Byte]
private val ParameterlessSerializedMessage = Array.empty[Byte]
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case identify: Identify serializeIdentify(identify)
case identity: ActorIdentity serializeActorIdentity(identity)
case Some(value) serializeSome(value)
case None NoneSerialized
case None ParameterlessSerializedMessage
case r: ActorRef serializeActorRef(r)
case s: Status.Success serializeStatusSuccess(s)
case f: Status.Failure serializeStatusFailure(f)
case ex: ActorInitializationException serializeActorInitializationException(ex)
case t: Throwable throwableSupport.serializeThrowable(t)
case None ParameterlessSerialized
case PoisonPill ParameterlessSerialized
case Kill ParameterlessSerialized
case PoisonPill ParameterlessSerializedMessage
case Kill ParameterlessSerializedMessage
case RemoteWatcher.Heartbeat ParameterlessSerializedMessage
case hbrsp: RemoteWatcher.HeartbeatRsp serializeHeartbeatRsp(hbrsp)
case _ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
}
@ -60,6 +63,10 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def serializeActorRef(ref: ActorRef): Array[Byte] =
actorRefBuilder(ref).build().toByteArray
private def serializeHeartbeatRsp(hbrsp: RemoteWatcher.HeartbeatRsp): Array[Byte] = {
ContainerFormats.WatcherHeartbeatResponse.newBuilder().setUid(hbrsp.addressUid).build().toByteArray
}
private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder =
ContainerFormats.ActorRef.newBuilder()
.setPath(Serialization.serializedActorPath(actorRef))
@ -70,6 +77,17 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def serializeStatusFailure(failure: Status.Failure): Array[Byte] =
payloadSupport.payloadBuilder(failure.cause).build().toByteArray
private def serializeActorInitializationException(ex: ActorInitializationException): Array[Byte] = {
val builder = ContainerFormats.ActorInitializationException.newBuilder()
if (ex.getActor ne null)
builder.setActor(actorRefBuilder(ex.getActor))
builder
.setMessage(ex.getMessage)
.setCause(payloadSupport.payloadBuilder(ex.getCause))
.build().toByteArray
}
private val IdentifyManifest = "A"
private val ActorIdentityManifest = "B"
private val OptionManifest = "C"
@ -79,34 +97,40 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val ActorRefManifest = "G"
private val PoisonPillManifest = "P"
private val KillManifest = "K"
private val RemoteWatcherHBManifest = "RWHB"
private val RemoteWatcherHBRespManifest = "RWHR"
private val ActorInitializationExceptionManifest = "AIEX"
private val fromBinaryMap = Map[String, Array[Byte] AnyRef](
IdentifyManifest deserializeIdentify,
ActorIdentifyManifest deserializeActorIdentity,
ActorIdentityManifest deserializeActorIdentity,
OptionManifest deserializeOption,
StatusSuccessManifest deserializeStatusSuccess,
StatusFailureManifest deserializeStatusFailure,
ThrowableManifest throwableSupport.deserializeThrowable,
ActorRefManifest deserializeActorRefBytes)
ActorRefManifest deserializeActorRefBytes,
OptionManifest deserializeOption,
PoisonPillManifest ((_) PoisonPill),
KillManifest ((_) Kill)
KillManifest ((_) Kill),
RemoteWatcherHBManifest ((_) RemoteWatcher.Heartbeat),
RemoteWatcherHBRespManifest deserializeHeartbeatRsp,
ActorInitializationExceptionManifest deserializeActorInitializationException
)
override def manifest(o: AnyRef): String =
o match {
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentifyManifest
case _: ActorIdentity ActorIdentityManifest
case _: Option[Any] OptionManifest
case _: ActorRef ActorRefManifest
case _: Status.Success StatusSuccessManifest
case _: Status.Failure StatusFailureManifest
case _: ActorInitializationException ActorInitializationExceptionManifest
case _: Throwable ThrowableManifest
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentityManifest
case _: Option[Any] OptionManifest
case _: PoisonPill.type PoisonPillManifest
case _: Kill.type KillManifest
case PoisonPill PoisonPillManifest
case Kill KillManifest
case RemoteWatcher.Heartbeat RemoteWatcherHBManifest
case _: RemoteWatcher.HeartbeatRsp RemoteWatcherHBRespManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
@ -156,4 +180,25 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def deserializeStatusFailure(bytes: Array[Byte]): Status.Failure =
Status.Failure(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable])
private def deserializeHeartbeatRsp(bytes: Array[Byte]): RemoteWatcher.HeartbeatRsp = {
RemoteWatcher.HeartbeatRsp(ContainerFormats.WatcherHeartbeatResponse.parseFrom(bytes).getUid.toInt)
}
private def deserializeActorInitializationException(bytes: Array[Byte]): ActorInitializationException = {
val serializedEx = ContainerFormats.ActorInitializationException.parseFrom(bytes)
val ref = deserializeActorRef(serializedEx.getActor)
val refString = ref.path.toString
val message = serializedEx.getMessage
val reconstructedMessage =
if (message.startsWith(refString)) message.drop(refString.length + 2)
else message
ActorInitializationException(
if (serializedEx.hasActor) ref else null,
reconstructedMessage,
payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable]
)
}
}

View file

@ -1,9 +1,11 @@
package akka.remote.serialization
import java.nio.ByteBuffer
import java.nio.{ BufferOverflowException, ByteBuffer }
import akka.actor.{ ExtendedActorSystem, Kill, PoisonPill }
import akka.remote.OversizedPayloadException
import akka.serialization.{ BaseSerializer, ByteBufferSerializer }
import akka.util.ByteString
class LongSerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer {
override def includeManifest: Boolean = false
@ -87,3 +89,30 @@ class StringSerializer(val system: ExtendedActorSystem) extends BaseSerializer w
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = new String(bytes, "UTF-8")
}
class ByteStringSerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer {
override def includeManifest: Boolean = false
override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = {
val bs = o.asInstanceOf[ByteString]
// ByteString.copyToBuffer does not throw BufferOverflowException
if (bs.copyToBuffer(buf) < bs.length)
throw new BufferOverflowException()
}
override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef =
ByteString.fromByteBuffer(buf)
override def toBinary(o: AnyRef): Array[Byte] = {
val bs = o.asInstanceOf[ByteString]
val result = Array.ofDim[Byte](bs.length)
bs.copyToArray(result, 0, bs.length)
result
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
ByteString(bytes)
}
}

View file

@ -0,0 +1,161 @@
package akka.remote.serialization
import akka.actor.{ ActorInitializationException, ActorRef, ExtendedActorSystem, InternalActorRef }
import akka.dispatch.sysmsg._
import akka.remote.{ ContainerFormats, SystemMessageFormats }
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension }
class SystemMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import SystemMessageFormats.SystemMessage.Type._
// WARNING! This must lazy otherwise it will deadlock the ActorSystem creation
private lazy val serialization = SerializationExtension(system)
private val payloadSupport = new WrappedPayloadSupport(system)
override def includeManifest: Boolean = false
override def toBinary(o: AnyRef): Array[Byte] = {
val builder = SystemMessageFormats.SystemMessage.newBuilder()
o.asInstanceOf[SystemMessage] match {
case Create(failure)
builder.setType(CREATE)
failure match {
case Some(throwable) builder.setCauseData(serializeThrowable(throwable))
case None // Nothing to set
}
case Recreate(cause)
builder.setType(RECREATE)
builder.setCauseData(serializeThrowable(cause))
case Suspend()
builder.setType(SUSPEND)
case Resume(cause)
builder.setType(RESUME)
builder.setCauseData(serializeThrowable(cause))
case Terminate()
builder.setType(TERMINATE)
case Supervise(child, async)
builder.setType(SUPERVISE)
val superviseData = SystemMessageFormats.SuperviseData.newBuilder()
.setChild(serializeActorRef(child))
.setAsync(async)
builder.setSuperviseData(superviseData)
case Watch(watchee, watcher)
builder.setType(WATCH)
val watchData = SystemMessageFormats.WatchData.newBuilder()
.setWatchee(serializeActorRef(watchee))
.setWatcher(serializeActorRef(watcher))
builder.setWatchData(watchData)
case Unwatch(watchee, watcher)
builder.setType(UNWATCH)
val watchData = SystemMessageFormats.WatchData.newBuilder()
.setWatchee(serializeActorRef(watchee))
.setWatcher(serializeActorRef(watcher))
builder.setWatchData(watchData)
case Failed(child, cause, uid)
builder.setType(FAILED)
val failedData = SystemMessageFormats.FailedData.newBuilder()
.setChild(serializeActorRef(child))
.setUid(uid)
builder.setCauseData(serializeThrowable(cause))
builder.setFailedData(failedData)
case DeathWatchNotification(actor, existenceConfirmed, addressTerminated)
builder.setType(DEATHWATCH_NOTIFICATION)
val deathWatchNotificationData = SystemMessageFormats.DeathWatchNotificationData.newBuilder()
.setActor(serializeActorRef(actor))
.setExistenceConfirmed(existenceConfirmed)
.setAddressTerminated(addressTerminated)
builder.setDwNotificationData(deathWatchNotificationData)
case NoMessage
throw new IllegalArgumentException("NoMessage should never be serialized or deserialized")
}
builder.build().toByteArray
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
deserializeSystemMessage(SystemMessageFormats.SystemMessage.parseFrom(bytes))
}
private def deserializeSystemMessage(sysmsg: SystemMessageFormats.SystemMessage): SystemMessage =
sysmsg.getType match {
case CREATE
val cause =
if (sysmsg.hasCauseData)
Some(getCauseThrowable(sysmsg).asInstanceOf[ActorInitializationException])
else
None
Create(cause)
case RECREATE
Recreate(getCauseThrowable(sysmsg))
case SUSPEND
// WARNING!! Must always create a new instance!
Suspend()
case RESUME
Resume(getCauseThrowable(sysmsg))
case TERMINATE
// WARNING!! Must always create a new instance!
Terminate()
case SUPERVISE
Supervise(deserializeActorRef(sysmsg.getSuperviseData.getChild), sysmsg.getSuperviseData.getAsync)
case WATCH
Watch(
deserializeActorRef(sysmsg.getWatchData.getWatchee).asInstanceOf[InternalActorRef],
deserializeActorRef(sysmsg.getWatchData.getWatcher).asInstanceOf[InternalActorRef]
)
case UNWATCH
Unwatch(
deserializeActorRef(sysmsg.getWatchData.getWatchee).asInstanceOf[InternalActorRef],
deserializeActorRef(sysmsg.getWatchData.getWatcher).asInstanceOf[InternalActorRef]
)
case FAILED
Failed(
deserializeActorRef(sysmsg.getFailedData.getChild),
getCauseThrowable(sysmsg),
sysmsg.getFailedData.getUid.toInt)
case DEATHWATCH_NOTIFICATION
DeathWatchNotification(
deserializeActorRef(sysmsg.getDwNotificationData.getActor),
sysmsg.getDwNotificationData.getExistenceConfirmed,
sysmsg.getDwNotificationData.getAddressTerminated
)
}
private def serializeThrowable(throwable: Throwable): ContainerFormats.Payload.Builder = {
payloadSupport.payloadBuilder(throwable)
}
private def getCauseThrowable(msg: SystemMessageFormats.SystemMessage): Throwable = {
payloadSupport.deserializePayload(msg.getCauseData).asInstanceOf[Throwable]
}
private def serializeActorRef(actorRef: ActorRef): ContainerFormats.ActorRef.Builder = {
ContainerFormats.ActorRef.newBuilder()
.setPath(Serialization.serializedActorPath(actorRef))
}
private def deserializeActorRef(serializedRef: ContainerFormats.ActorRef): ActorRef = {
serialization.system.provider.resolveActorRef(serializedRef.getPath)
}
}

View file

@ -16,6 +16,10 @@ private[akka] class ThrowableSupport(system: ExtendedActorSystem) {
private val payloadSupport = new WrappedPayloadSupport(system)
def serializeThrowable(t: Throwable): Array[Byte] = {
toProtobufThrowable(t).build().toByteArray
}
def toProtobufThrowable(t: Throwable): ContainerFormats.Throwable.Builder = {
val b = ContainerFormats.Throwable.newBuilder()
.setClassName(t.getClass.getName)
if (t.getMessage != null)
@ -31,7 +35,7 @@ private[akka] class ThrowableSupport(system: ExtendedActorSystem) {
}
}
b.build().toByteArray
b
}
def stackTraceElementBuilder(elem: StackTraceElement): ContainerFormats.StackTraceElement.Builder = {
@ -43,7 +47,10 @@ private[akka] class ThrowableSupport(system: ExtendedActorSystem) {
}
def deserializeThrowable(bytes: Array[Byte]): Throwable = {
val protoT = ContainerFormats.Throwable.parseFrom(bytes)
fromProtobufThrowable(ContainerFormats.Throwable.parseFrom(bytes))
}
def fromProtobufThrowable(protoT: ContainerFormats.Throwable): Throwable = {
val t: Throwable =
if (protoT.hasCause) {
val cause = payloadSupport.deserializePayload(protoT.getCause).asInstanceOf[Throwable]
@ -63,9 +70,9 @@ private[akka] class ThrowableSupport(system: ExtendedActorSystem) {
import scala.collection.JavaConverters._
val stackTrace =
(protoT.getStackTraceList.asScala.map { elem
protoT.getStackTraceList.asScala.map { elem
new StackTraceElement(elem.getClassName, elem.getMethodName, elem.getFileName, elem.getLineNumber)
}).toArray
}.toArray
t.setStackTrace(stackTrace)
t
}

View file

@ -5,10 +5,11 @@
package akka.remote.serialization
import akka.actor._
import akka.remote.MessageSerializer
import akka.remote.{ MessageSerializer, RemoteWatcher }
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import scala.util.control.NoStackTrace
object MiscMessageSerializerSpec {
@ -77,7 +78,10 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC
"Some" Some("value"),
"None" None,
"Kill" Kill,
"PoisonPill" PoisonPill).foreach {
"PoisonPill" PoisonPill,
"RemoteWatcher.Heartbeat" RemoteWatcher.Heartbeat,
"RemoteWatcher.HertbeatRsp" RemoteWatcher.HeartbeatRsp(65537)
).foreach {
case (scenario, item)
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(system)
@ -107,6 +111,45 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.fromBinary(serializer.toBinary(msg), serializer.manifest(msg)) should ===(msg)
}
// Separate tests due to missing equality on ActorInitializationException
"resolve serializer for ActorInitializationException" in {
val serializer = SerializationExtension(system)
serializer.serializerFor(classOf[ActorInitializationException]).getClass should ===(classOf[MiscMessageSerializer])
}
"serialize and deserialze ActorInitializationException" in {
val aiex = ActorInitializationException(ref, "test", new TestException("err"))
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val deserialized = serializer.fromBinary(serializer.toBinary(aiex), serializer.manifest(aiex))
.asInstanceOf[ActorInitializationException]
deserialized.getCause should ===(aiex.getCause)
deserialized.getMessage should ===(aiex.getMessage)
deserialized.getActor should ===(aiex.getActor)
}
"serialize and deserialze ActorInitializationException if ref is null" in {
val aiex = ActorInitializationException(null, "test", new TestException("err"))
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val deserialized = serializer.fromBinary(serializer.toBinary(aiex), serializer.manifest(aiex))
.asInstanceOf[ActorInitializationException]
deserialized.getCause should ===(aiex.getCause)
deserialized.getMessage should ===(aiex.getMessage)
deserialized.getActor should ===(aiex.getActor)
}
"serialize and deserialze ActorInitializationException if cause is null" in {
val aiex = ActorInitializationException(ref, "test", null)
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val deserialized = serializer.fromBinary(serializer.toBinary(aiex), serializer.manifest(aiex))
.asInstanceOf[ActorInitializationException]
deserialized.getCause should ===(aiex.getCause)
deserialized.getMessage should ===(aiex.getMessage)
deserialized.getActor should ===(aiex.getActor)
}
}
}

View file

@ -8,6 +8,7 @@ import java.nio.ByteBuffer
import akka.actor.{ ActorIdentity, ExtendedActorSystem, Identify }
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.util.Random
@ -126,4 +127,40 @@ class PrimitivesSerializationSpec extends AkkaSpec(PrimitivesSerializationSpec.t
}
}
"ByteStringSerializer" must {
Seq(
"empty string" ByteString.empty,
"simple content" ByteString("hello"),
"concatenated content" (ByteString("hello") ++ ByteString("world")),
"sliced content" ByteString("helloabc").take(5)
).foreach {
case (scenario, item)
s"resolve serializer for [$scenario]" in {
val serializer = SerializationExtension(system)
serializer.serializerFor(item.getClass).getClass should ===(classOf[ByteStringSerializer])
}
s"serialize and de-serialize [$scenario]" in {
verifySerialization(item)
}
s"serialize and de-serialize value [$scenario] using ByteBuffers" in {
verifySerializationByteBuffer(item)
}
}
def verifySerialization(msg: AnyRef): Unit = {
val serializer = new ByteStringSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.fromBinary(serializer.toBinary(msg), None) should ===(msg)
}
def verifySerializationByteBuffer(msg: AnyRef): Unit = {
val serializer = new ByteStringSerializer(system.asInstanceOf[ExtendedActorSystem])
buffer.clear()
serializer.toBinary(msg, buffer)
buffer.flip()
serializer.fromBinary(buffer, "") should ===(msg)
}
}
}

View file

@ -0,0 +1,79 @@
package akka.remote.serialization
import akka.actor.{ ActorInitializationException, ActorRef, ExtendedActorSystem, InternalActorRef }
import akka.dispatch.sysmsg._
import akka.serialization.SerializationExtension
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
object SystemMessageSerializationSpec {
val serializationTestOverrides =
"""
akka.actor.enable-additional-serialization-bindings=on
# or they can be enabled with
# akka.remote.artery.enabled=on
"""
val testConfig = ConfigFactory.parseString(serializationTestOverrides).withFallback(AkkaSpec.testConf)
class TestException(msg: String) extends RuntimeException {
override def equals(other: Any): Boolean = other match {
case e: TestException e.getMessage == getMessage
case _ false
}
}
}
class SystemMessageSerializationSpec extends AkkaSpec(PrimitivesSerializationSpec.testConfig) {
import SystemMessageSerializationSpec._
val testRef = TestProbe().ref.asInstanceOf[InternalActorRef]
val testRef2 = TestProbe().ref.asInstanceOf[InternalActorRef]
"ByteStringSerializer" must {
Seq(
"Create(None)" Create(None),
"Recreate(ex)" Recreate(new TestException("test2")),
"Suspend()" Suspend(),
"Resume(ex)" Resume(new TestException("test3")),
"Terminate()" Terminate(),
"Supervise(ref, async)" Supervise(testRef, async = true),
"Watch(ref, ref)" Watch(testRef, testRef2),
"Unwatch(ref, ref)" Unwatch(testRef, testRef2),
"Failed(ref, ex, uid)" Failed(testRef, new TestException("test4"), 42),
"DeathWatchNotification(ref, confimed, addressTerminated)"
DeathWatchNotification(testRef, existenceConfirmed = true, addressTerminated = true)
).foreach {
case (scenario, item)
s"resolve serializer for [$scenario]" in {
val serializer = SerializationExtension(system)
serializer.serializerFor(item.getClass).getClass should ===(classOf[SystemMessageSerializer])
}
s"serialize and de-serialize [$scenario]" in {
verifySerialization(item)
}
}
def verifySerialization(msg: AnyRef): Unit = {
val serializer = new SystemMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.fromBinary(serializer.toBinary(msg), None) should ===(msg)
}
// ActorInitializationException has no proper equality
"serialize and de-serialize Create(Some(ex))" in {
val aiex = ActorInitializationException(testRef, "test", new TestException("test5"))
val createMsg = Create(Some(aiex))
val serializer = new SystemMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val deserialized = serializer.fromBinary(serializer.toBinary(createMsg), None).asInstanceOf[Create]
deserialized.failure.get.getCause should ===(aiex.getCause)
deserialized.failure.get.getMessage should ===(aiex.getMessage)
deserialized.failure.get.getActor should ===(aiex.getActor)
}
}
}