Java API for AsyncSerializer, #25078

This commit is contained in:
Patrik Nordwall 2018-05-14 18:20:59 +02:00 committed by Johan Andrén
parent db057df5d9
commit da5cc33b92
3 changed files with 89 additions and 18 deletions

View file

@ -4,29 +4,36 @@
package akka.serialization
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit
import akka.actor.ExtendedActorSystem
import akka.serialization.AsyncSerializeSpec.{ Message1, TestAsyncSerializer }
import akka.testkit.{ AkkaSpec, EventFilter }
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
object AsyncSerializeSpec {
case class Message1(str: String)
case class Message2(str: String)
case class Message3(str: String)
case class Message4(str: String)
val config = ConfigFactory.parseString(
"""
s"""
akka {
actor {
serializers {
async = "akka.serialization.AsyncSerializeSpec$TestAsyncSerializer"
async = "akka.serialization.AsyncSerializeSpec$$TestAsyncSerializer"
asyncCS = "akka.serialization.AsyncSerializeSpec$$TestAsyncSerializerCS"
}
serialization-bindings = {
"akka.serialization.AsyncSerializeSpec$Message1" = async
"akka.serialization.AsyncSerializeSpec$Message2" = async
"akka.serialization.AsyncSerializeSpec$$Message1" = async
"akka.serialization.AsyncSerializeSpec$$Message2" = async
"akka.serialization.AsyncSerializeSpec$$Message3" = asyncCS
"akka.serialization.AsyncSerializeSpec$$Message4" = asyncCS
}
}
}
@ -56,9 +63,35 @@ object AsyncSerializeSpec {
}
}
class TestAsyncSerializerCS(system: ExtendedActorSystem) extends AsyncSerializerWithStringManifestCS(system) {
override def toBinaryAsyncCS(o: AnyRef): CompletionStage[Array[Byte]] = {
o match {
case Message3(msg) CompletableFuture.completedFuture(msg.getBytes)
case Message4(msg) CompletableFuture.completedFuture(msg.getBytes)
}
}
override def fromBinaryAsyncCS(bytes: Array[Byte], manifest: String): CompletionStage[AnyRef] = {
manifest match {
case "1" CompletableFuture.completedFuture(Message3(new String(bytes)))
case "2" CompletableFuture.completedFuture(Message4(new String(bytes)))
}
}
override def identifier: Int = 9001
override def manifest(o: AnyRef): String = o match {
case _: Message3 "1"
case _: Message4 "2"
}
}
}
class AsyncSerializeSpec extends AkkaSpec(AsyncSerializeSpec.config) {
import AsyncSerializeSpec._
val ser = SerializationExtension(system)
"SerializationExtension" must {
@ -77,5 +110,17 @@ class AsyncSerializeSpec extends AkkaSpec(AsyncSerializeSpec.config) {
ser.serialize(Message1("to async"))
}
}
"have Java API for async serializers that delegate to the CS methods" in {
val msg3 = Message3("to async")
val serializer = ser.findSerializerFor(msg3).asInstanceOf[TestAsyncSerializerCS]
EventFilter.warning(start = "Async serializer called synchronously", occurrences = 2) intercept {
val binary = ser.serialize(msg3).get
val back = ser.deserialize(binary, serializer.identifier, serializer.manifest(msg3)).get
back shouldEqual msg3
}
}
}
}

View file

@ -4,8 +4,9 @@
package akka.serialization
import akka.actor.ExtendedActorSystem
import java.util.concurrent.CompletionStage
import akka.actor.ExtendedActorSystem
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
@ -14,17 +15,23 @@ import scala.concurrent.{ Await, Future }
*
* Only used for Akka persistence journals that explicitly support async serializers.
*
* Implementations should typically extend [[AsyncSerializerWithStringManifest]] that
* delegates synchronous calls to their async equivalents.
* Implementations should typically extend [[AsyncSerializerWithStringManifest]] or
* [[AsyncSerializerWithStringManifestCS]] that delegates synchronous calls to their async equivalents.
*/
trait AsyncSerializer {
/**
* Serializes the given object into an Array of Byte
*/
def toBinaryAsync(o: AnyRef): Future[Array[Byte]]
/**
* Produces an object from an array of bytes, with an optional type-hint.
*/
def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef]
}
/**
* Async serializer with string manifest that delegates synchronous calls to the asynchronous calls
* Scala API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls
* and blocks.
*/
abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with AsyncSerializer {
@ -39,3 +46,27 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) ex
}
}
/**
* Java API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls
* and blocks.
*/
abstract class AsyncSerializerWithStringManifestCS(system: ExtendedActorSystem) extends AsyncSerializerWithStringManifest(system) {
import scala.compat.java8.FutureConverters._
def toBinaryAsyncCS(o: AnyRef): CompletionStage[Array[Byte]]
def fromBinaryAsyncCS(bytes: Array[Byte], manifest: String): CompletionStage[AnyRef]
/**
* Delegates to [[AsyncSerializerWithStringManifestCS#toBinaryAsyncCS]]
*/
final def toBinaryAsync(o: AnyRef): Future[Array[Byte]] =
toBinaryAsyncCS(o).toScala
/**
* Delegates to [[AsyncSerializerWithStringManifestCS#fromBinaryAsyncCS]]
*/
def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] =
fromBinaryAsyncCS(bytes, manifest).toScala
}

View file

@ -101,12 +101,8 @@ object Serializers {
* start-up, where two constructors are tried in order:
*
* <ul>
* <li>taking exactly one argument of type [[akka.actor.ExtendedActorSystem]];
* this should be the preferred one because all reflective loading of classes
* during deserialization should use ExtendedActorSystem.dynamicAccess (see
* [[akka.actor.DynamicAccess]]), and</li>
* <li>without arguments, which is only an option if the serializer does not
* load classes using reflection.</li>
* <li>taking exactly one argument of type [[akka.actor.ExtendedActorSystem]], and</li>
* <li>without arguments</li>
* </ul>
*
* <b>Be sure to always use the </b>[[akka.actor.DynamicAccess]]<b> for loading classes!</b> This is necessary to
@ -135,8 +131,7 @@ abstract class SerializerWithStringManifest extends Serializer {
def toBinary(o: AnyRef): Array[Byte]
/**
* Produces an object from an array of bytes, with an optional type-hint;
* the class should be loaded using ActorSystem.dynamicAccess.
* Produces an object from an array of bytes, with an optional type-hint.
*
* It's recommended to throw `java.io.NotSerializableException` in `fromBinary`
* if the manifest is unknown. This makes it possible to introduce new message