diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 3c7c742da6..cf6e56bba6 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -168,6 +168,7 @@ abstract class SerializerWithStringManifest extends Serializer { * * }}} */ +//#ByteBufferSerializer trait ByteBufferSerializer { /** @@ -182,6 +183,7 @@ trait ByteBufferSerializer { def fromBinary(buf: ByteBuffer, manifest: String): AnyRef } +//#ByteBufferSerializer /** * Base serializer trait with serialization identifiers configuration contract, diff --git a/akka-docs/rst/java/code/docs/actor/ByteBufferSerializerDocTest.java b/akka-docs/rst/java/code/docs/actor/ByteBufferSerializerDocTest.java new file mode 100644 index 0000000000..0601af279a --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/ByteBufferSerializerDocTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package docs.actor; + +//#bytebufserializer-with-manifest +import akka.serialization.ByteBufferSerializer; +import akka.serialization.SerializerWithStringManifest; + +//#bytebufserializer-with-manifest +import java.nio.ByteBuffer; + +public class ByteBufferSerializerDocTest { + + + static //#bytebufserializer-with-manifest + class ExampleByteBufSerializer extends SerializerWithStringManifest + implements ByteBufferSerializer { + + @Override + public int identifier() { + return 1337; + } + + @Override + public String manifest(Object o) { + return "serialized-" + o.getClass().getSimpleName(); + } + + @Override + public byte[] toBinary(Object o) { + final ByteBuffer buf = ByteBuffer.allocate(256); // in production code, aquire this from a BufferPool + + toBinary(o, buf); + buf.flip(); + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return bytes; + } + + @Override + public Object fromBinary(byte[] bytes, String manifest) { + return fromBinary(ByteBuffer.wrap(bytes), manifest); + } + + @Override + public void toBinary(Object o, ByteBuffer buf) { + // Implement actual serialization here + } + + @Override + public Object fromBinary(ByteBuffer buf, String manifest) { + // Implement actual deserialization here + return null; + } + } + //#bytebufserializer-with-manifest + +} diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 41b73442bb..2b7e37517b 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -1231,7 +1231,7 @@ returned value downstream. **completes** when any upstream completes zipWithIndex -^^^^^^^ +^^^^^^^^^^^^ Zips elements of current flow with its indices. **emits** upstream emits an element and is paired with their index diff --git a/akka-docs/rst/scala/code/docs/actor/ByteBufferSerializerDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ByteBufferSerializerDocSpec.scala new file mode 100644 index 0000000000..056f4c9076 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/actor/ByteBufferSerializerDocSpec.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package docs.actor + +//#bytebufserializer-with-manifest +import java.nio.ByteBuffer +import akka.serialization.ByteBufferSerializer +import akka.serialization.SerializerWithStringManifest + +//#bytebufserializer-with-manifest + +class ByteBufferSerializerDocSpec { + + //#bytebufserializer-with-manifest + class ExampleByteBufSerializer extends SerializerWithStringManifest with ByteBufferSerializer { + override def identifier: Int = 1337 + override def manifest(o: AnyRef): String = "naive-toStringImpl" + + // Implement this method for compatibility with `SerializerWithStringManifest`. + override def toBinary(o: AnyRef): Array[Byte] = { + val buf = ByteBuffer.allocate(256) // in production code, aquire this from a BufferPool + + toBinary(o, buf) + buf.flip() + val bytes = Array.ofDim[Byte](buf.remaining) + buf.get(bytes) + bytes + } + + // Implement this method for compatibility with `SerializerWithStringManifest`. + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + fromBinary(ByteBuffer.wrap(bytes), manifest) + + // Actual implementation in the ByteBuffer versions of to/fromBinary: + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = ??? // implement actual logic here + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = ??? // implement actual logic here + } + //#bytebufserializer-with-manifest + +} diff --git a/akka-docs/rst/scala/remoting-artery.rst b/akka-docs/rst/scala/remoting-artery.rst index 17b259c270..247f4f4b6f 100644 --- a/akka-docs/rst/scala/remoting-artery.rst +++ b/akka-docs/rst/scala/remoting-artery.rst @@ -414,7 +414,31 @@ For more information please see :ref:`serialization-scala`. ByteBuffer based serialization ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -TODO +Artery introduces a new serialization mechanism which allows the ``ByteBufferSerializer`` to directly write into a +shared :class:`java.nio.ByteBuffer` instead of being forced to allocate and return an ``Array[Byte]`` for each serialized +message. For high-throughput messaging this API change can yield significant performance benefits, so we recommend +changing your serializers to use this new mechanism. + +This new API also plays well with new versions of Google Protocol Buffers and other serialization libraries, which gained +the ability to serialize directly into and from ByteBuffers. + +As the new feature only changes how bytes are read and written, and the rest of the serializatio infrastructure +remained the same, we recommend reading the :ref:`serialization-scala` documentation first. + +Implementing an :class:`akka.serialization.ByteBufferSerializer` works the same way as any other serializer, + +.. includecode:: ../../../akka-actor/src/main/scala/akka/serialization/Serializer.scala#ByteBufferSerializer + +Implementing a serializer for Artery is therefore as simple as implementing this interface, and binding the serializer +as usual (which is explained in :ref:`serialization-scala`). + +Implementations should typically extend ``SerializerWithStringManifest`` and in addition to the ``ByteBuffer`` based +``toBinary`` and ``fromBinary`` methods also implement the array based ``toBinary`` and ``fromBinary`` methods. +The array based methods will be used when ``ByteBuffer`` is not used, e.g. in Akka Persistence. + +Note that the array based methods can be implemented by delegation like this: + +.. includecode:: code/docs/actor/ByteBufferSerializerDocSpec.scala#bytebufserializer-with-manifest Disabling the Java Serializer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -503,7 +527,7 @@ helpful to separate actors that have different QoS requirements: large messages Akka remoting provides a dedicated channel for large messages if configured. Since actor message ordering must not be violated the channel is actually dedicated for *actors* instead of messages, to ensure all of the messages arrive in send order. It is possible to assign actors on given paths to use this dedicated channel by using -path patterns: +path patterns:: akka.remote.artery.large-message-destinations = [ "/user/largeMessageActor", diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 7236b8635e..37e66b2125 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -1223,7 +1223,7 @@ returned value downstream. **completes** when any upstream completes zipWithIndex -^^^^^^^ +^^^^^^^^^^^^ Zips elements of current flow with its indices. **emits** upstream emits an element and is paired with their index