Serialization support for wrapped stream refs (#27353)

This commit is contained in:
Johan Andrén 2019-07-16 19:11:33 +02:00 committed by Helena Edelson
parent cdd7976c26
commit 03b8c543fc
14 changed files with 304 additions and 22 deletions

View file

@ -55,6 +55,11 @@ class DynamicAccessSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
}
"know if a class exists on the classpath or not" in {
dynamicAccess.classIsOnClasspath("i.just.made.it.up.to.hurt.Myself") should ===(false)
dynamicAccess.classIsOnClasspath("akka.actor.Actor") should ===(true)
}
def instantiateWithDefaultOrStringCtor(fqcn: String): Try[TestSuperclass] =
// recoverWith doesn't work with scala 2.13.0-M5
// https://github.com/scala/bug/issues/11242

View file

@ -80,3 +80,5 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker.onOpen")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker.onHalfOpen")
# streamref serialization #27304
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.DynamicAccess.classIsOnClasspath")

View file

@ -4,6 +4,8 @@
package akka.actor
import akka.annotation.DoNotInherit
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.util.Try
@ -15,8 +17,10 @@ import scala.util.Try
*
* This is an internal facility and users are not expected to encounter it
* unless they are extending Akka in ways which go beyond simple Extensions.
*
* Not for user extension
*/
abstract class DynamicAccess {
@DoNotInherit abstract class DynamicAccess {
/**
* Convenience method which given a `Class[_]` object and a constructor description
@ -34,6 +38,8 @@ abstract class DynamicAccess {
*/
def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]]
def classIsOnClasspath(fqcn: String): Boolean
/**
* Obtain an object conforming to the type T, which is expected to be
* instantiated from a class designated by the fully-qualified class name

View file

@ -6,7 +6,11 @@ package akka.actor
import scala.collection.immutable
import java.lang.reflect.InvocationTargetException
import akka.annotation.DoNotInherit
import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Try
/**
@ -14,7 +18,10 @@ import scala.util.Try
* unless overridden. It uses reflection to turn fully-qualified class names into `Class[_]` objects
* and creates instances from there using `getDeclaredConstructor()` and invoking that. The class loader
* to be used for all this is determined by the actor systems class loader by default.
*
* Not for user extension or construction
*/
@DoNotInherit
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
@ -41,6 +48,14 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
createInstanceFor(c, args)
}
override def classIsOnClasspath(fqcn: String): Boolean =
getClassFor(fqcn) match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
false
case _ =>
true
}
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
val classTry =
if (fqcn.endsWith("$")) getClassFor(fqcn)

View file

@ -45,8 +45,7 @@ object StreamRefSpec extends MultiNodeConfig {
testTransport(on = true)
case class RequestLogs(streamId: Int) extends CborSerializable
// Using Java serialization until issue #27304 is fixed
case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) extends JavaSerializable
case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) extends CborSerializable
object DataSource {
def props(streamLifecycleProbe: ActorRef): Props =

View file

@ -170,6 +170,19 @@ Stream refs utilise normal actor messaging for their trainsport, and therefore p
Bulk stream refs can be used to create simple side-channels to transfer humongous amounts
of data such as huge log files, messages or even media, with as much ease as if it was a trivial local stream.
## Serialization of SourceRef and SinkRef
StreamRefs require serialization, since the whole point is to send them between nodes of a cluster. A built in serializer
is provided when `SourceRef` and `SinkRef` are sent directly as messages however the recommended use is to wrap them
into your own actor message classes.
When @ref[Akka Jackson](../serialization-jackson.md) is used, serialization of wrapped `SourceRef` and `SinkRef`
will work out of the box.
If you are using some other form of serialization you will need to use the @apidoc[akka.stream.StreamRefResolver] extension
from your serializer to get the `SourceRef` and `SinkRef`. The extension provides the methods `toSerializationFormat(sink or source)`
to transform from refs to string and `resolve{Sink,Source}Ref(String)` to resolve refs from strings.
## Configuration
### Stream reference subscription timeouts
@ -190,7 +203,7 @@ Scala
Java
: @@snip [FlowStreamRefsDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java) { #attr-sub-timeout }
## General configuration
### General configuration
Other settings can be set globally in your `application.conf`, by overriding any of the following values
in the `akka.stream.materializer.stream-ref.*` keyspace:

View file

@ -12,6 +12,9 @@ akka.serialization.jackson {
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
// FIXME how does that optinal loading work??
# AkkaStreamsModule optionally included if akka-streams is in classpath
jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"

View file

@ -18,3 +18,9 @@ class AkkaTypedJacksonModule extends JacksonModule with TypedActorRefModule {
}
object AkkaTypedJacksonModule extends AkkaJacksonModule
class AkkaStreamJacksonModule extends JacksonModule with StreamRefModule {
override def getModuleName = "AkkaStreamJacksonModule"
}
object AkkaStreamJacksonModule extends AkkaJacksonModule

View file

@ -132,18 +132,16 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid
mapper
}
private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean = {
// akka-actor-typed dependency is "provided" and may not be included
if (fqcn == "akka.serialization.jackson.AkkaTypedJacksonModule") {
dynamicAccess.getClassFor("akka.actor.typed.ActorRef") match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
false // akka-actor-typed not in classpath
case _ =>
true
}
} else
true
}
private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean =
fqcn match {
case "akka.serialization.jackson.AkkaTypedJacksonModule" =>
// akka-actor-typed dependency is "provided" and may not be included
dynamicAccess.classIsOnClasspath("akka.actor.typed.ActorRef")
case "akka.serialization.jackson.AkkaStreamJacksonModule" =>
// akka-stream dependency is "provided" and may not be included
dynamicAccess.classIsOnClasspath("akka.stream.Graph")
case _ => true
}
private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = {
import akka.util.ccompat.JavaConverters._

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.serialization.jackson
// FIXME maybe move many things to `akka.serialization.jackson.internal` package?
import akka.annotation.InternalApi
import akka.stream.SinkRef
import akka.stream.SourceRef
import akka.stream.StreamRefResolver
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonTokenId
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
/**
* INTERNAL API: Adds support for serializing and deserializing [[akka.stream.SourceRef]] and [[akka.stream.SinkRef]].
*/
@InternalApi private[akka] trait StreamRefModule extends JacksonModule {
addSerializer(classOf[SourceRef[_]], () => SourceRefSerializer.instance, () => SourceRefDeserializer.instance)
addSerializer(classOf[SinkRef[_]], () => SinkRefSerializer.instance, () => SinkRefDeserializer.instance)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object SourceRefSerializer {
val instance: SourceRefSerializer = new SourceRefSerializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class SourceRefSerializer
extends StdScalarSerializer[SourceRef[_]](classOf[SourceRef[_]])
with ActorSystemAccess {
override def serialize(value: SourceRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = {
val resolver = StreamRefResolver(currentSystem())
val serializedSourceRef = resolver.toSerializationFormat(value)
jgen.writeString(serializedSourceRef)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object SourceRefDeserializer {
val instance: SourceRefDeserializer = new SourceRefDeserializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class SourceRefDeserializer
extends StdScalarDeserializer[SourceRef[_]](classOf[SourceRef[_]])
with ActorSystemAccess {
def deserialize(jp: JsonParser, ctxt: DeserializationContext): SourceRef[_] = {
if (jp.currentTokenId() == JsonTokenId.ID_STRING) {
val serializedSourceRef = jp.getText()
StreamRefResolver(currentSystem()).resolveSourceRef(serializedSourceRef)
} else
ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[SourceRef[_]]
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object SinkRefSerializer {
val instance: SinkRefSerializer = new SinkRefSerializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class SinkRefSerializer
extends StdScalarSerializer[SinkRef[_]](classOf[SinkRef[_]])
with ActorSystemAccess {
override def serialize(value: SinkRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = {
val resolver = StreamRefResolver(currentSystem())
val serializedSinkRef = resolver.toSerializationFormat(value)
jgen.writeString(serializedSinkRef)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object SinkRefDeserializer {
val instance: SinkRefDeserializer = new SinkRefDeserializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class SinkRefDeserializer
extends StdScalarDeserializer[SinkRef[_]](classOf[SinkRef[_]])
with ActorSystemAccess {
def deserialize(jp: JsonParser, ctxt: DeserializationContext): SinkRef[_] = {
if (jp.currentTokenId() == JsonTokenId.ID_STRING) {
val serializedSinkref = jp.getText()
StreamRefResolver(currentSystem()).resolveSinkRef(serializedSinkref)
} else
ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[SinkRef[_]]
}
}

View file

@ -11,6 +11,8 @@ import akka.pattern._
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.scaladsl._
import akka.stream._
import akka.stream.impl.streamref.SinkRefImpl
import akka.stream.impl.streamref.SourceRefImpl
import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe }
import akka.util.ByteString
import com.typesafe.config._
@ -160,15 +162,20 @@ object StreamRefsSpec {
}
""").withFallback(ConfigFactory.load())
}
object SnitchActor {
def props(probe: ActorRef) = Props(new SnitchActor(probe))
}
class SnitchActor(probe: ActorRef) extends Actor {
def receive = {
case msg => probe ! msg
}
}
}
class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSender {
import StreamRefsSpec._
def this() {
this(StreamRefsSpec.config())
}
val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config())
implicit val mat = ActorMaterializer()
@ -408,4 +415,28 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende
}
"The StreamRefResolver" must {
"serialize and deserialize SourceRefs" in {
val probe = TestProbe()
val ref = system.actorOf(StreamRefsSpec.SnitchActor.props(probe.ref))
val sourceRef = SourceRefImpl[String](ref)
val resolver = StreamRefResolver(system)
val result = resolver.resolveSourceRef(resolver.toSerializationFormat(sourceRef))
result.asInstanceOf[SourceRefImpl[String]].initialPartnerRef ! "ping"
probe.expectMsg("ping")
}
"serialize and deserialize SinkRefs" in {
val probe = TestProbe()
val ref = system.actorOf(StreamRefsSpec.SnitchActor.props(probe.ref))
val sinkRef = SinkRefImpl[String](ref)
val resolver = StreamRefResolver(system)
val result = resolver.resolveSinkRef(resolver.toSerializationFormat(sinkRef))
result.asInstanceOf[SinkRefImpl[String]].initialPartnerRef ! "ping"
probe.expectMsg("ping")
}
}
}

View file

@ -6,6 +6,12 @@ package akka.stream
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.impl.streamref.StreamRefResolverImpl
import akka.stream.scaladsl.{ Sink, Source }
import scala.language.implicitConversions
@ -31,7 +37,10 @@ object SinkRef {
* See also [[akka.stream.SourceRef]] which is the dual of a `SinkRef`.
*
* For additional configuration see `reference.conf` as well as [[akka.stream.StreamRefAttributes]].
*
* Not for user extension.
*/
@DoNotInherit
trait SinkRef[In] {
/** Scala API: Get [[Sink]] underlying to this source ref. */
@ -62,7 +71,10 @@ object SourceRef {
* See also [[akka.stream.SinkRef]] which is the dual of a `SourceRef`.
*
* For additional configuration see `reference.conf` as well as [[akka.stream.StreamRefAttributes]].
*
* Not for user extension.
*/
@DoNotInherit
trait SourceRef[T] {
/** Scala API: Get [[Source]] underlying to this source ref. */
@ -103,3 +115,47 @@ final case class InvalidPartnerActorException(expectedRef: ActorRef, gotRef: Act
s"This may happen due to 'double-materialization' on the other side of this stream ref. " +
s"Do note that stream refs are one-shot references and have to be paired up in 1:1 pairs. " +
s"Multi-cast such as broadcast etc can be implemented by sharing multiple new stream references. ")
/**
* The stream ref resolver extension provides a way to serialize and deserialize streamrefs in user serializers.
*/
object StreamRefResolver extends ExtensionId[StreamRefResolver] {
/**
* Java API
*/
override def get(system: ActorSystem): StreamRefResolver = super.get(system)
override def createExtension(system: ExtendedActorSystem): StreamRefResolver =
new StreamRefResolverImpl(system)
}
/**
* The stream ref resolver provides a way to serialize and deserialize streamrefs in user serializers.
*
* Not for user extension
*/
@DoNotInherit trait StreamRefResolver extends Extension {
/**
* Generate full String representation of the `SourceRef`.
* This representation should be used as serialized representation.
*/
def toSerializationFormat[T](ref: SourceRef[T]): String
/**
* Generate full String representation of the `SinkRef`.
* This representation should be used as serialized representation.
*/
def toSerializationFormat[T](ref: SinkRef[T]): String
/**
* Deserialize an `SourceRef` in the [[#toSerializationFormat]].
*/
def resolveSourceRef[T](serializedSourceRef: String): SourceRef[T]
/**
* Deserialize an `SinkRef` in the [[#toSerializationFormat]].
*/
def resolveSinkRef[T](serializedSinkRef: String): SinkRef[T]
}

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.streamref
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.stream.SinkRef
import akka.stream.SourceRef
import akka.stream.StreamRefResolver
/**
* INTERNAL API
*/
@InternalApi final class StreamRefResolverImpl(system: ExtendedActorSystem) extends StreamRefResolver {
def toSerializationFormat[T](ref: SourceRef[T]): String = ref match {
case SourceRefImpl(actorRef) =>
actorRef.path.toSerializationFormatWithAddress(system.provider.getDefaultAddress)
}
def toSerializationFormat[T](ref: SinkRef[T]): String = ref match {
case SinkRefImpl(actorRef) =>
actorRef.path.toSerializationFormatWithAddress(system.provider.getDefaultAddress)
}
def resolveSourceRef[T](serializedSourceRef: String): SourceRef[T] =
SourceRefImpl(system.provider.resolveActorRef(serializedSourceRef))
def resolveSinkRef[T](serializedSinkRef: String): SinkRef[T] =
SinkRefImpl(system.provider.resolveActorRef(serializedSinkRef))
}

View file

@ -246,7 +246,7 @@ lazy val docs = akkaModule("akka-docs")
.disablePlugins(ScalafixPlugin)
lazy val jackson = akkaModule("akka-serialization-jackson")
.dependsOn(actor, actorTyped % "optional->compile", actorTests % "test->test", testkit % "test->test")
.dependsOn(actor, actorTyped % "optional->compile", stream % "optional->compile" ,actorTests % "test->test", testkit % "test->test")
.settings(Dependencies.jackson)
.settings(AutomaticModuleName.settings("akka.serialization.jackson"))
.settings(OSGi.jackson)