diff --git a/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala index 0d9a42d104..10ad8ca85a 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/AsyncSerializeSpec.scala @@ -4,29 +4,36 @@ package akka.serialization +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.TimeUnit + import akka.actor.ExtendedActorSystem -import akka.serialization.AsyncSerializeSpec.{ Message1, TestAsyncSerializer } import akka.testkit.{ AkkaSpec, EventFilter } import com.typesafe.config.ConfigFactory - import scala.concurrent.Future object AsyncSerializeSpec { case class Message1(str: String) case class Message2(str: String) + case class Message3(str: String) + case class Message4(str: String) val config = ConfigFactory.parseString( - """ + s""" akka { actor { serializers { - async = "akka.serialization.AsyncSerializeSpec$TestAsyncSerializer" + async = "akka.serialization.AsyncSerializeSpec$$TestAsyncSerializer" + asyncCS = "akka.serialization.AsyncSerializeSpec$$TestAsyncSerializerCS" } serialization-bindings = { - "akka.serialization.AsyncSerializeSpec$Message1" = async - "akka.serialization.AsyncSerializeSpec$Message2" = async + "akka.serialization.AsyncSerializeSpec$$Message1" = async + "akka.serialization.AsyncSerializeSpec$$Message2" = async + "akka.serialization.AsyncSerializeSpec$$Message3" = asyncCS + "akka.serialization.AsyncSerializeSpec$$Message4" = asyncCS } } } @@ -56,9 +63,35 @@ object AsyncSerializeSpec { } } + class TestAsyncSerializerCS(system: ExtendedActorSystem) extends AsyncSerializerWithStringManifestCS(system) { + + override def toBinaryAsyncCS(o: AnyRef): CompletionStage[Array[Byte]] = { + o match { + case Message3(msg) ⇒ CompletableFuture.completedFuture(msg.getBytes) + case Message4(msg) ⇒ CompletableFuture.completedFuture(msg.getBytes) + } + } + + override def fromBinaryAsyncCS(bytes: Array[Byte], manifest: String): CompletionStage[AnyRef] = { + manifest match { + case "1" ⇒ CompletableFuture.completedFuture(Message3(new String(bytes))) + case "2" ⇒ CompletableFuture.completedFuture(Message4(new String(bytes))) + } + } + + override def identifier: Int = 9001 + + override def manifest(o: AnyRef): String = o match { + case _: Message3 ⇒ "1" + case _: Message4 ⇒ "2" + } + } + } class AsyncSerializeSpec extends AkkaSpec(AsyncSerializeSpec.config) { + import AsyncSerializeSpec._ + val ser = SerializationExtension(system) "SerializationExtension" must { @@ -77,5 +110,17 @@ class AsyncSerializeSpec extends AkkaSpec(AsyncSerializeSpec.config) { ser.serialize(Message1("to async")) } } + + "have Java API for async serializers that delegate to the CS methods" in { + val msg3 = Message3("to async") + + val serializer = ser.findSerializerFor(msg3).asInstanceOf[TestAsyncSerializerCS] + + EventFilter.warning(start = "Async serializer called synchronously", occurrences = 2) intercept { + val binary = ser.serialize(msg3).get + val back = ser.deserialize(binary, serializer.identifier, serializer.manifest(msg3)).get + back shouldEqual msg3 + } + } } } diff --git a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala index 9450c30a40..16c1c6f6df 100644 --- a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala @@ -4,8 +4,9 @@ package akka.serialization -import akka.actor.ExtendedActorSystem +import java.util.concurrent.CompletionStage +import akka.actor.ExtendedActorSystem import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } @@ -14,17 +15,23 @@ import scala.concurrent.{ Await, Future } * * Only used for Akka persistence journals that explicitly support async serializers. * - * Implementations should typically extend [[AsyncSerializerWithStringManifest]] that - * delegates synchronous calls to their async equivalents. + * Implementations should typically extend [[AsyncSerializerWithStringManifest]] or + * [[AsyncSerializerWithStringManifestCS]] that delegates synchronous calls to their async equivalents. */ trait AsyncSerializer { + /** + * Serializes the given object into an Array of Byte + */ def toBinaryAsync(o: AnyRef): Future[Array[Byte]] + /** + * Produces an object from an array of bytes, with an optional type-hint. + */ def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] } /** - * Async serializer with string manifest that delegates synchronous calls to the asynchronous calls + * Scala API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls * and blocks. */ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with AsyncSerializer { @@ -39,3 +46,27 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) ex } } +/** + * Java API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls + * and blocks. + */ +abstract class AsyncSerializerWithStringManifestCS(system: ExtendedActorSystem) extends AsyncSerializerWithStringManifest(system) { + import scala.compat.java8.FutureConverters._ + + def toBinaryAsyncCS(o: AnyRef): CompletionStage[Array[Byte]] + + def fromBinaryAsyncCS(bytes: Array[Byte], manifest: String): CompletionStage[AnyRef] + + /** + * Delegates to [[AsyncSerializerWithStringManifestCS#toBinaryAsyncCS]] + */ + final def toBinaryAsync(o: AnyRef): Future[Array[Byte]] = + toBinaryAsyncCS(o).toScala + + /** + * Delegates to [[AsyncSerializerWithStringManifestCS#fromBinaryAsyncCS]] + */ + def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] = + fromBinaryAsyncCS(bytes, manifest).toScala +} + diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index da6d47ae34..33ce6318ef 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -101,12 +101,8 @@ object Serializers { * start-up, where two constructors are tried in order: * *