add serialization api based on ByteBuffer, #20324

* new trait ByteBufferSerializer with fromBinary and toBinary
  methods that takes ByteBuffer, this can be mixed in to
  existing serializer without breaking compatibility
* implement the ByteBufferSerializer in the ByteArraySerializer
* minor adjustment of the class manifest cache
This commit is contained in:
Patrik Nordwall 2016-05-26 11:58:13 +02:00
parent 2ff76299c9
commit d759f738ea
5 changed files with 159 additions and 37 deletions

View file

@ -5,7 +5,6 @@
package akka.serialization
import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._
import akka.dispatch.sysmsg._
@ -17,6 +16,8 @@ import scala.beans.BeanInfo
import com.typesafe.config._
import akka.pattern.ask
import org.apache.commons.codec.binary.Hex.encodeHex
import java.nio.ByteOrder
import java.nio.ByteBuffer
object SerializationTests {
@ -247,7 +248,25 @@ class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
intercept[IllegalArgumentException] {
byteSerializer.toBinary("pigdog")
}.getMessage should ===("ByteArraySerializer only serializes byte arrays, not [pigdog]")
}.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]")
}
"support ByteBuffer serialization for byte arrays" in {
val byteSerializer = ser.serializerFor(classOf[Array[Byte]]).asInstanceOf[ByteBufferSerializer]
val byteBuffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN)
val str = "abcdef"
val payload = str.getBytes("UTF-8")
byteSerializer.toBinary(payload, byteBuffer)
byteBuffer.position() should ===(payload.length)
byteBuffer.flip()
val deserialized = byteSerializer.fromBinary(byteBuffer, "").asInstanceOf[Array[Byte]]
byteBuffer.remaining() should ===(0)
new String(deserialized, "UTF-8") should ===(str)
intercept[IllegalArgumentException] {
byteSerializer.toBinary("pigdog", byteBuffer)
}.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]")
}
}
}

View file

@ -14,6 +14,9 @@ import scala.util.{ Try, DynamicVariable, Failure }
import scala.collection.immutable
import scala.util.control.NonFatal
import scala.util.Success
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
object Serialization {
@ -83,7 +86,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
val settings = new Settings(system.settings.config)
val log = Logging(system, getClass.getName)
private val manifestCache = new ConcurrentHashMap[String, Option[Class[_]]]
private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]])
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
@ -118,29 +121,61 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
deserializeByteArray(bytes, serializer, manifest)
}
private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = {
@tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = {
manifestCache.compareAndSet(cache, cache.updated(key, value)) ||
updateCache(manifestCache.get, key, value) // recursive, try again
}
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
val cachedClassManifest = manifestCache.get(manifest)
if (cachedClassManifest ne null)
s1.fromBinary(bytes, cachedClassManifest)
else {
val cache = manifestCache.get
cache.get(manifest) match {
case Some(cachedClassManifest) => s1.fromBinary(bytes, cachedClassManifest)
case None =>
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
manifestCache.put(manifest, Some(classManifest))
s1.fromBinary(bytes, Some(classManifest))
val classManifestOption: Option[Class[_]] = Some(classManifest)
updateCache(cache, manifest, classManifestOption)
s1.fromBinary(bytes, classManifestOption)
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].")
s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
}
}
}
}
}
/**
* Deserializes the given ByteBuffer of bytes using the specified serializer id,
* using the optional type hint to the Serializer.
* Returns either the resulting object or throws an exception if deserialization fails.
*/
def deserializeByteBuffer(buf: ByteBuffer, serializerId: Int, manifest: String): AnyRef = {
val serializer = try serializerByIdentity(serializerId) catch {
case _: NoSuchElementException throw new NotSerializableException(
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer match {
case ser: ByteBufferSerializer =>
ser.fromBinary(buf, manifest)
case _
val bytes = Array.ofDim[Byte](buf.remaining())
buf.get(bytes)
deserializeByteArray(bytes, serializer, manifest)
}
}
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* Returns either the resulting object or an Exception if one was thrown.

View file

@ -5,6 +5,7 @@ package akka.serialization
*/
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream }
import java.nio.ByteBuffer
import java.util.concurrent.Callable
import akka.util.ClassLoaderObjectInputStream
import akka.actor.ExtendedActorSystem
@ -132,6 +133,56 @@ abstract class SerializerWithStringManifest extends Serializer {
}
/**
* Serializer between an object and a `ByteBuffer` representing that object.
*
* Implementations should typically extend [[SerializerWithStringManifest]] and
* in addition to the `ByteBuffer` based `toBinary` and `fromBinary` methods also
* implement the array based `toBinary` and `fromBinary` methods. The array based
* methods will be used when `ByteBuffer` is not used, e.g. in Akka Persistence.
*
* Note that the array based methods can for example be implemented by delegation
* like this:
* {{{
* // you need to know the maximum size in bytes of the serialized messages
* val pool = new akka.io.DirectByteBufferPool(defaultBufferSize = 1024 * 1024, maxPoolEntries = 10)
*
*
* // Implement this method for compatibility with `SerializerWithStringManifest`.
* override def toBinary(o: AnyRef): Array[Byte] = {
* val buf = pool.acquire()
* try {
* toBinary(o, buf)
* buf.flip()
* val bytes = Array.ofDim[Byte](buf.remaining)
* buf.get(bytes)
* bytes
* } finally {
* pool.release(buf)
* }
* }
*
* // Implement this method for compatibility with `SerializerWithStringManifest`.
* override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
* fromBinary(ByteBuffer.wrap(bytes), manifest)
*
* }}}
*/
trait ByteBufferSerializer {
/**
* Serializes the given object into the `ByteBuffer`.
*/
def toBinary(o: AnyRef, buf: ByteBuffer): Unit
/**
* Produces an object from a `ByteBuffer`, with an optional type-hint;
* the class should be loaded using ActorSystem.dynamicAccess.
*/
def fromBinary(buf: ByteBuffer, manifest: String): AnyRef
}
/**
* Base serializer trait with serialization identifiers configuration contract,
* when globally unique serialization identifier is configured in the `reference.conf`.
@ -260,7 +311,7 @@ class NullSerializer extends Serializer {
* This is a special Serializer that Serializes and deserializes byte arrays only,
* (just returns the byte array unchanged/uncopied)
*/
class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer {
class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer {
@deprecated("Use constructor with ExtendedActorSystem", "2.4")
def this() = this(null)
@ -274,7 +325,22 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize
def toBinary(o: AnyRef) = o match {
case null null
case o: Array[Byte] o
case other throw new IllegalArgumentException("ByteArraySerializer only serializes byte arrays, not [" + other + "]")
case other throw new IllegalArgumentException(
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes
override def toBinary(o: AnyRef, buf: ByteBuffer): Unit =
o match {
case null
case bytes: Array[Byte] buf.put(bytes)
case other throw new IllegalArgumentException(
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
}
override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = {
val bytes = Array.ofDim[Byte](buf.remaining())
buf.get(bytes)
bytes
}
}

View file

@ -9,6 +9,7 @@ import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder }
import akka.serialization.{ Serialization, SerializationExtension, SerializerWithStringManifest }
import akka.serialization.ByteBufferSerializer
/**
* INTERNAL API
@ -53,27 +54,28 @@ private[akka] object MessageSerializer {
// FIXME: This should be a FQCN instead
headerBuilder.serializer = serializer.identifier.toString
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(message)
headerBuilder.classManifest = manifest
case _
headerBuilder.classManifest = message.getClass.getName
def manifest: String = serializer match {
case ser: SerializerWithStringManifest ser.manifest(message)
case _ if (serializer.includeManifest) message.getClass.getName else ""
}
serializer match {
case ser: ByteBufferSerializer
headerBuilder.classManifest = manifest
envelope.writeHeader(headerBuilder)
ser.toBinary(message, envelope.byteBuffer)
case _
headerBuilder.classManifest = manifest
envelope.writeHeader(headerBuilder)
// FIXME: This should directly write to the buffer instead
envelope.byteBuffer.put(serializer.toBinary(message))
}
}
def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = {
// FIXME: Use the buffer directly
val size = envelope.byteBuffer.limit - envelope.byteBuffer.position
val bytes = Array.ofDim[Byte](size)
envelope.byteBuffer.get(bytes)
serialization.deserialize(
bytes,
serialization.deserializeByteBuffer(
envelope.byteBuffer,
Integer.parseInt(headerBuilder.serializer), // FIXME: Use FQCN
headerBuilder.classManifest).get
headerBuilder.classManifest)
}
}