From d759f738ea01fe8cf8ca99b582d64a54922b6301 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 26 May 2016 11:58:13 +0200 Subject: [PATCH] add serialization api based on ByteBuffer, #20324 * new trait ByteBufferSerializer with fromBinary and toBinary methods that takes ByteBuffer, this can be mixed in to existing serializer without breaking compatibility * implement the ByteBufferSerializer in the ByteArraySerializer * minor adjustment of the class manifest cache --- .../akka/serialization/SerializeSpec.scala | 23 +++++- akka-actor/src/main/resources/reference.conf | 2 +- .../akka/serialization/Serialization.scala | 67 +++++++++++++----- .../scala/akka/serialization/Serializer.scala | 70 ++++++++++++++++++- .../scala/akka/remote/MessageSerializer.scala | 34 ++++----- 5 files changed, 159 insertions(+), 37 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index fa76783b5c..95f643d602 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -5,7 +5,6 @@ package akka.serialization import language.postfixOps - import akka.testkit.{ AkkaSpec, EventFilter } import akka.actor._ import akka.dispatch.sysmsg._ @@ -17,6 +16,8 @@ import scala.beans.BeanInfo import com.typesafe.config._ import akka.pattern.ask import org.apache.commons.codec.binary.Hex.encodeHex +import java.nio.ByteOrder +import java.nio.ByteBuffer object SerializationTests { @@ -247,7 +248,25 @@ class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) { intercept[IllegalArgumentException] { byteSerializer.toBinary("pigdog") - }.getMessage should ===("ByteArraySerializer only serializes byte arrays, not [pigdog]") + }.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]") + } + + "support ByteBuffer serialization for byte arrays" in { + val byteSerializer = ser.serializerFor(classOf[Array[Byte]]).asInstanceOf[ByteBufferSerializer] + + val byteBuffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN) + val str = "abcdef" + val payload = str.getBytes("UTF-8") + byteSerializer.toBinary(payload, byteBuffer) + byteBuffer.position() should ===(payload.length) + byteBuffer.flip() + val deserialized = byteSerializer.fromBinary(byteBuffer, "").asInstanceOf[Array[Byte]] + byteBuffer.remaining() should ===(0) + new String(deserialized, "UTF-8") should ===(str) + + intercept[IllegalArgumentException] { + byteSerializer.toBinary("pigdog", byteBuffer) + }.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]") } } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index b3aaf7eafd..a58c8e0c21 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -598,7 +598,7 @@ akka { # Identifier values from 0 to 16 are reserved for Akka internal usage. serialization-identifiers { "akka.serialization.JavaSerializer" = 1 - "akka.serialization.ByteArraySerializer" = 4 + "akka.serialization.ByteArraySerializer" = 4 } # Configuration items which are used by the akka.actor.ActorDSL._ methods diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index dbc1f21d1c..a2633d05a2 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -14,6 +14,9 @@ import scala.util.{ Try, DynamicVariable, Failure } import scala.collection.immutable import scala.util.control.NonFatal import scala.util.Success +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec object Serialization { @@ -83,7 +86,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { val settings = new Settings(system.settings.config) val log = Logging(system, getClass.getName) - private val manifestCache = new ConcurrentHashMap[String, Option[Class[_]]] + private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]]) /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration @@ -118,28 +121,60 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } - serializer match { - case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) - case s1 ⇒ - if (manifest == "") - s1.fromBinary(bytes, None) - else { - val cachedClassManifest = manifestCache.get(manifest) - if (cachedClassManifest ne null) - s1.fromBinary(bytes, cachedClassManifest) - else { + deserializeByteArray(bytes, serializer, manifest) + } + + private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = { + + @tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = { + manifestCache.compareAndSet(cache, cache.updated(key, value)) || + updateCache(manifestCache.get, key, value) // recursive, try again + } + + serializer match { + case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) + case s1 ⇒ + if (manifest == "") + s1.fromBinary(bytes, None) + else { + val cache = manifestCache.get + cache.get(manifest) match { + case Some(cachedClassManifest) => s1.fromBinary(bytes, cachedClassManifest) + case None => system.dynamicAccess.getClassFor[AnyRef](manifest) match { case Success(classManifest) ⇒ - manifestCache.put(manifest, Some(classManifest)) - s1.fromBinary(bytes, Some(classManifest)) + val classManifestOption: Option[Class[_]] = Some(classManifest) + updateCache(cache, manifest, classManifestOption) + s1.fromBinary(bytes, classManifestOption) case Failure(e) ⇒ throw new NotSerializableException( - s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].") + s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") } - } } - } + } } + } + + /** + * Deserializes the given ByteBuffer of bytes using the specified serializer id, + * using the optional type hint to the Serializer. + * Returns either the resulting object or throws an exception if deserialization fails. + */ + def deserializeByteBuffer(buf: ByteBuffer, serializerId: Int, manifest: String): AnyRef = { + val serializer = try serializerByIdentity(serializerId) catch { + case _: NoSuchElementException ⇒ throw new NotSerializableException( + s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + + "akka.actor.serializers is not in synch between the two systems.") + } + serializer match { + case ser: ByteBufferSerializer => + ser.fromBinary(buf, manifest) + case _ ⇒ + val bytes = Array.ofDim[Byte](buf.remaining()) + buf.get(bytes) + deserializeByteArray(bytes, serializer, manifest) + } + } /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 51bcc545ad..d550a769ca 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -5,6 +5,7 @@ package akka.serialization */ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream } +import java.nio.ByteBuffer import java.util.concurrent.Callable import akka.util.ClassLoaderObjectInputStream import akka.actor.ExtendedActorSystem @@ -132,6 +133,56 @@ abstract class SerializerWithStringManifest extends Serializer { } +/** + * Serializer between an object and a `ByteBuffer` representing that object. + * + * Implementations should typically extend [[SerializerWithStringManifest]] and + * in addition to the `ByteBuffer` based `toBinary` and `fromBinary` methods also + * implement the array based `toBinary` and `fromBinary` methods. The array based + * methods will be used when `ByteBuffer` is not used, e.g. in Akka Persistence. + * + * Note that the array based methods can for example be implemented by delegation + * like this: + * {{{ + * // you need to know the maximum size in bytes of the serialized messages + * val pool = new akka.io.DirectByteBufferPool(defaultBufferSize = 1024 * 1024, maxPoolEntries = 10) + * + * + * // Implement this method for compatibility with `SerializerWithStringManifest`. + * override def toBinary(o: AnyRef): Array[Byte] = { + * val buf = pool.acquire() + * try { + * toBinary(o, buf) + * buf.flip() + * val bytes = Array.ofDim[Byte](buf.remaining) + * buf.get(bytes) + * bytes + * } finally { + * pool.release(buf) + * } + * } + * + * // Implement this method for compatibility with `SerializerWithStringManifest`. + * override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + * fromBinary(ByteBuffer.wrap(bytes), manifest) + * + * }}} + */ +trait ByteBufferSerializer { + + /** + * Serializes the given object into the `ByteBuffer`. + */ + def toBinary(o: AnyRef, buf: ByteBuffer): Unit + + /** + * Produces an object from a `ByteBuffer`, with an optional type-hint; + * the class should be loaded using ActorSystem.dynamicAccess. + */ + def fromBinary(buf: ByteBuffer, manifest: String): AnyRef + +} + /** * Base serializer trait with serialization identifiers configuration contract, * when globally unique serialization identifier is configured in the `reference.conf`. @@ -260,7 +311,7 @@ class NullSerializer extends Serializer { * This is a special Serializer that Serializes and deserializes byte arrays only, * (just returns the byte array unchanged/uncopied) */ -class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer { +class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer { @deprecated("Use constructor with ExtendedActorSystem", "2.4") def this() = this(null) @@ -274,7 +325,22 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize def toBinary(o: AnyRef) = o match { case null ⇒ null case o: Array[Byte] ⇒ o - case other ⇒ throw new IllegalArgumentException("ByteArraySerializer only serializes byte arrays, not [" + other + "]") + case other ⇒ throw new IllegalArgumentException( + s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]") } def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = + o match { + case null ⇒ + case bytes: Array[Byte] ⇒ buf.put(bytes) + case other ⇒ throw new IllegalArgumentException( + s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]") + } + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { + val bytes = Array.ofDim[Byte](buf.remaining()) + buf.get(bytes) + bytes + } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 7119b96d2c..38699a04c9 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -9,6 +9,7 @@ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder } import akka.serialization.{ Serialization, SerializationExtension, SerializerWithStringManifest } +import akka.serialization.ByteBufferSerializer /** * INTERNAL API @@ -53,27 +54,28 @@ private[akka] object MessageSerializer { // FIXME: This should be a FQCN instead headerBuilder.serializer = serializer.identifier.toString - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(message) - headerBuilder.classManifest = manifest - case _ ⇒ - headerBuilder.classManifest = message.getClass.getName + + def manifest: String = serializer match { + case ser: SerializerWithStringManifest ⇒ ser.manifest(message) + case _ ⇒ if (serializer.includeManifest) message.getClass.getName else "" } - envelope.writeHeader(headerBuilder) - // FIXME: This should directly write to the buffer instead - envelope.byteBuffer.put(serializer.toBinary(message)) + serializer match { + case ser: ByteBufferSerializer ⇒ + headerBuilder.classManifest = manifest + envelope.writeHeader(headerBuilder) + ser.toBinary(message, envelope.byteBuffer) + case _ ⇒ + headerBuilder.classManifest = manifest + envelope.writeHeader(headerBuilder) + envelope.byteBuffer.put(serializer.toBinary(message)) + } } def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = { - // FIXME: Use the buffer directly - val size = envelope.byteBuffer.limit - envelope.byteBuffer.position - val bytes = Array.ofDim[Byte](size) - envelope.byteBuffer.get(bytes) - serialization.deserialize( - bytes, + serialization.deserializeByteBuffer( + envelope.byteBuffer, Integer.parseInt(headerBuilder.serializer), // FIXME: Use FQCN - headerBuilder.classManifest).get + headerBuilder.classManifest) } }