From 14e0188a1ccdc57370ea67786d098a61268cdad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 26 Sep 2016 15:04:53 +0200 Subject: [PATCH] 21202: Added more serializers for built-in stuff --- .../scala/akka/serialization/Serializer.scala | 4 +- .../serialization/MiscMessageSerializer.scala | 18 ++- .../serialization/PrimitiveSerializers.scala | 89 ++++++++++++ .../MiscMessageSerializerSpec.scala | 4 +- .../PrimitivesSerializationSpec.scala | 129 ++++++++++++++++++ 5 files changed, 239 insertions(+), 5 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/serialization/PrimitiveSerializers.scala create mode 100644 akka-remote/src/test/scala/akka/remote/serialization/PrimitivesSerializationSpec.scala diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index d550a769ca..3c7c742da6 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -303,7 +303,7 @@ class NullSerializer extends Serializer { val nullAsBytes = Array[Byte]() def includeManifest: Boolean = false def identifier = 0 - def toBinary(o: AnyRef) = nullAsBytes + def toBinary(o: AnyRef): Array[Byte] = nullAsBytes def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null } @@ -322,7 +322,7 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize else identifierFromConfig def includeManifest: Boolean = false - def toBinary(o: AnyRef) = o match { + def toBinary(o: AnyRef): Array[Byte] = o match { case null ⇒ null case o: Array[Byte] ⇒ o case other ⇒ throw new IllegalArgumentException( diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala index 2883018822..0403515c70 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala @@ -14,7 +14,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private val payloadSupport = new WrappedPayloadSupport(system) private val throwableSupport = new ThrowableSupport(system) - private val NoneSerialized = Array.empty[Byte] + private val ParameterlessSerialized = Array.empty[Byte] def toBinary(obj: AnyRef): Array[Byte] = obj match { case identify: Identify ⇒ serializeIdentify(identify) @@ -25,6 +25,9 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW case s: Status.Success ⇒ serializeStatusSuccess(s) case f: Status.Failure ⇒ serializeStatusFailure(f) case t: Throwable ⇒ throwableSupport.serializeThrowable(t) + case None ⇒ ParameterlessSerialized + case PoisonPill ⇒ ParameterlessSerialized + case Kill ⇒ ParameterlessSerialized case _ ⇒ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]") } @@ -68,12 +71,14 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW payloadSupport.payloadBuilder(failure.cause).build().toByteArray private val IdentifyManifest = "A" - private val ActorIdentifyManifest = "B" + private val ActorIdentityManifest = "B" private val OptionManifest = "C" private val StatusSuccessManifest = "D" private val StatusFailureManifest = "E" private val ThrowableManifest = "F" private val ActorRefManifest = "G" + private val PoisonPillManifest = "P" + private val KillManifest = "K" private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef]( IdentifyManifest → deserializeIdentify, @@ -83,6 +88,10 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW StatusFailureManifest → deserializeStatusFailure, ThrowableManifest → throwableSupport.deserializeThrowable, ActorRefManifest → deserializeActorRefBytes) + OptionManifest → deserializeOption, + PoisonPillManifest → ((_) ⇒ PoisonPill), + KillManifest → ((_) ⇒ Kill) + ) override def manifest(o: AnyRef): String = o match { @@ -93,6 +102,11 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW case _: Status.Success ⇒ StatusSuccessManifest case _: Status.Failure ⇒ StatusFailureManifest case _: Throwable ⇒ ThrowableManifest + case _: Identify ⇒ IdentifyManifest + case _: ActorIdentity ⇒ ActorIdentityManifest + case _: Option[Any] ⇒ OptionManifest + case _: PoisonPill.type ⇒ PoisonPillManifest + case _: Kill.type ⇒ KillManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/PrimitiveSerializers.scala b/akka-remote/src/main/scala/akka/remote/serialization/PrimitiveSerializers.scala new file mode 100644 index 0000000000..9952652ae4 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/serialization/PrimitiveSerializers.scala @@ -0,0 +1,89 @@ +package akka.remote.serialization + +import java.nio.ByteBuffer + +import akka.actor.{ ExtendedActorSystem, Kill, PoisonPill } +import akka.serialization.{ BaseSerializer, ByteBufferSerializer } + +class LongSerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer { + override def includeManifest: Boolean = false + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = { + buf.putLong(Long.unbox(o)) + } + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { + Long.box(buf.getLong) + } + + override def toBinary(o: AnyRef): Array[Byte] = { + val result = Array.ofDim[Byte](8) + var long = Long.unbox(o) + var i = 0 + while (long != 0) { + result(i) = (long & 0xFF).toByte + i += 1 + long >>>= 8 + } + result + } + + override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { + var result = 0L + var i = 7 + while (i >= 0) { + result <<= 8 + result |= (bytes(i).toLong & 0xFF) + i -= 1 + } + Long.box(result) + } +} + +class IntSerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer { + override def includeManifest: Boolean = false + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = buf.putInt(Int.unbox(o)) + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = Int.box(buf.getInt) + + override def toBinary(o: AnyRef): Array[Byte] = { + val result = Array.ofDim[Byte](4) + var int = Int.unbox(o) + var i = 0 + while (int != 0) { + result(i) = (int & 0xFF).toByte + i += 1 + int >>>= 8 + } + result + } + + override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { + var result = 0 + var i = 3 + while (i >= 0) { + result <<= 8 + result |= (bytes(i).toInt & 0xFF) + i -= 1 + } + Int.box(result) + } +} + +class StringSerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer { + override def includeManifest: Boolean = false + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = buf.put(toBinary(o)) + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { + val bytes = Array.ofDim[Byte](buf.remaining()) + buf.get(bytes) + new String(bytes, "UTF-8") + } + + override def toBinary(o: AnyRef): Array[Byte] = o.asInstanceOf[String].getBytes("UTF-8") + + override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = new String(bytes, "UTF-8") + +} diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index 869b87bf10..32054ddbf6 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -75,7 +75,9 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC "Status.Failure JavaSer" → Status.Failure(new OtherException("exc")), // exc with JavaSerializer "ActorRef" → ref, "Some" → Some("value"), - "None" → None).foreach { + "None" → None, + "Kill" → Kill, + "PoisonPill" → PoisonPill).foreach { case (scenario, item) ⇒ s"resolve serializer for $scenario" in { val serializer = SerializationExtension(system) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/PrimitivesSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/PrimitivesSerializationSpec.scala new file mode 100644 index 0000000000..075e839df2 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/serialization/PrimitivesSerializationSpec.scala @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.serialization + +import java.nio.ByteBuffer + +import akka.actor.{ ActorIdentity, ExtendedActorSystem, Identify } +import akka.serialization.SerializationExtension +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory + +import scala.util.Random + +object PrimitivesSerializationSpec { + val serializationTestOverrides = + """ + akka.actor.enable-additional-serialization-bindings=on + # or they can be enabled with + # akka.remote.artery.enabled=on + """ + + val testConfig = ConfigFactory.parseString(serializationTestOverrides).withFallback(AkkaSpec.testConf) +} + +class PrimitivesSerializationSpec extends AkkaSpec(PrimitivesSerializationSpec.testConfig) { + + val buffer = ByteBuffer.allocate(1024) + + "LongSerializer" must { + Seq(0L, 1L, -1L, Long.MinValue, Long.MinValue + 1L, Long.MaxValue, Long.MaxValue - 1L).map(_.asInstanceOf[AnyRef]).foreach { + item ⇒ + s"resolve serializer for value $item" in { + val serializer = SerializationExtension(system) + serializer.serializerFor(item.getClass).getClass should ===(classOf[LongSerializer]) + } + + s"serialize and de-serialize value $item" in { + verifySerialization(item) + } + + s"serialize and de-serialize value $item using ByteBuffers" in { + verifySerializationByteBuffer(item) + } + } + + def verifySerialization(msg: AnyRef): Unit = { + val serializer = new LongSerializer(system.asInstanceOf[ExtendedActorSystem]) + serializer.fromBinary(serializer.toBinary(msg), None) should ===(msg) + } + + def verifySerializationByteBuffer(msg: AnyRef): Unit = { + val serializer = new LongSerializer(system.asInstanceOf[ExtendedActorSystem]) + buffer.clear() + serializer.toBinary(msg, buffer) + buffer.flip() + serializer.fromBinary(buffer, "") should ===(msg) + } + } + + "IntSerializer" must { + Seq(0, 1, -1, Int.MinValue, Int.MinValue + 1, Int.MaxValue, Int.MaxValue - 1).map(_.asInstanceOf[AnyRef]).foreach { + item ⇒ + s"resolve serializer for value $item" in { + val serializer = SerializationExtension(system) + serializer.serializerFor(item.getClass).getClass should ===(classOf[IntSerializer]) + } + + s"serialize and de-serialize value $item" in { + verifySerialization(item) + } + + s"serialize and de-serialize value $item using ByteBuffers" in { + verifySerializationByteBuffer(item) + } + } + + def verifySerialization(msg: AnyRef): Unit = { + val serializer = new IntSerializer(system.asInstanceOf[ExtendedActorSystem]) + serializer.fromBinary(serializer.toBinary(msg), None) should ===(msg) + } + + def verifySerializationByteBuffer(msg: AnyRef): Unit = { + val serializer = new IntSerializer(system.asInstanceOf[ExtendedActorSystem]) + buffer.clear() + serializer.toBinary(msg, buffer) + buffer.flip() + serializer.fromBinary(buffer, "") should ===(msg) + } + } + + "StringSerializer" must { + val random = Random.nextString(256) + Seq( + "empty string" → "", + "hello" → "hello", + "árvíztűrőütvefúrógép" → "árvíztűrőütvefúrógép", + "random" → random + ).foreach { + case (scenario, item) ⇒ + s"resolve serializer for [$scenario]" in { + val serializer = SerializationExtension(system) + serializer.serializerFor(item.getClass).getClass should ===(classOf[StringSerializer]) + } + + s"serialize and de-serialize [$scenario]" in { + verifySerialization(item) + } + + s"serialize and de-serialize value [$scenario] using ByteBuffers" in { + verifySerializationByteBuffer(item) + } + } + + def verifySerialization(msg: AnyRef): Unit = { + val serializer = new StringSerializer(system.asInstanceOf[ExtendedActorSystem]) + serializer.fromBinary(serializer.toBinary(msg), None) should ===(msg) + } + + def verifySerializationByteBuffer(msg: AnyRef): Unit = { + val serializer = new StringSerializer(system.asInstanceOf[ExtendedActorSystem]) + buffer.clear() + serializer.toBinary(msg, buffer) + buffer.flip() + serializer.fromBinary(buffer, "") should ===(msg) + } + } + +}