diff --git a/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala new file mode 100644 index 0000000000..0d9a42d104 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.serialization + +import akka.actor.ExtendedActorSystem +import akka.serialization.AsyncSerializeSpec.{ Message1, TestAsyncSerializer } +import akka.testkit.{ AkkaSpec, EventFilter } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Future + +object AsyncSerializeSpec { + + case class Message1(str: String) + case class Message2(str: String) + + val config = ConfigFactory.parseString( + """ + akka { + actor { + serializers { + async = "akka.serialization.AsyncSerializeSpec$TestAsyncSerializer" + } + + serialization-bindings = { + "akka.serialization.AsyncSerializeSpec$Message1" = async + "akka.serialization.AsyncSerializeSpec$Message2" = async + } + } + } + """) + + class TestAsyncSerializer(system: ExtendedActorSystem) extends AsyncSerializerWithStringManifest(system) { + + override def toBinaryAsync(o: AnyRef): Future[Array[Byte]] = { + o match { + case Message1(msg) ⇒ Future.successful(msg.getBytes) + case Message2(msg) ⇒ Future.successful(msg.getBytes) + } + } + + override def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] = { + manifest match { + case "1" ⇒ Future.successful(Message1(new String(bytes))) + case "2" ⇒ Future.successful(Message2(new String(bytes))) + } + } + + override def identifier: Int = 9000 + + override def manifest(o: AnyRef): String = o match { + case _: Message1 ⇒ "1" + case _: Message2 ⇒ "2" + } + } + +} + +class AsyncSerializeSpec extends AkkaSpec(AsyncSerializeSpec.config) { + val ser = SerializationExtension(system) + + "SerializationExtension" must { + "find async serializers" in { + val o1 = Message1("to async") + val serializer: Serializer = ser.findSerializerFor(o1) + serializer.getClass shouldEqual classOf[TestAsyncSerializer] + val asyncSerializer = serializer.asInstanceOf[SerializerWithStringManifest with AsyncSerializer] + val binary = asyncSerializer.toBinaryAsync(o1).futureValue + val back = asyncSerializer.fromBinaryAsync(binary, asyncSerializer.manifest(o1)).futureValue + o1 shouldEqual back + } + + "logs warning if sync methods called" in { + EventFilter.warning(start = "Async serializer called synchronously", occurrences = 1) intercept { + ser.serialize(Message1("to async")) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala new file mode 100644 index 0000000000..9450c30a40 --- /dev/null +++ b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.serialization + +import akka.actor.ExtendedActorSystem + +import scala.concurrent.duration.Duration +import scala.concurrent.{ Await, Future } + +/** + * Serializer that supports async serialization. + * + * Only used for Akka persistence journals that explicitly support async serializers. + * + * Implementations should typically extend [[AsyncSerializerWithStringManifest]] that + * delegates synchronous calls to their async equivalents. + */ +trait AsyncSerializer { + def toBinaryAsync(o: AnyRef): Future[Array[Byte]] + + def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] +} + +/** + * Async serializer with string manifest that delegates synchronous calls to the asynchronous calls + * and blocks. + */ +abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with AsyncSerializer { + final override def toBinary(o: AnyRef): Array[Byte] = { + system.log.warning("Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Class: {}", o.getClass) + Await.result(toBinaryAsync(o), Duration.Inf) + } + + final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + system.log.warning("Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Manifest: [{}]", manifest) + Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf) + } +} + diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index dbaba879e9..da6d47ae34 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -4,10 +4,6 @@ package akka.serialization -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, NotSerializableException, ObjectOutputStream } import java.nio.ByteBuffer import java.util.concurrent.Callable