From 03b8c543fcde52e8fe81b8df3824fdd0242ded7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 16 Jul 2019 19:11:33 +0200 Subject: [PATCH] Serialization support for wrapped stream refs (#27353) --- .../scala/akka/actor/DynamicAccessSpec.scala | 5 + .../mima-filters/2.5.x.backwards.excludes | 2 + .../main/scala/akka/actor/DynamicAccess.scala | 8 +- .../akka/actor/ReflectiveDynamicAccess.scala | 15 +++ .../scala/akka/cluster/StreamRefSpec.scala | 3 +- .../src/main/paradox/stream/stream-refs.md | 15 ++- .../src/main/resources/reference.conf | 3 + .../jackson/AkkaJacksonModule.scala | 6 + .../jackson/JacksonObjectMapperProvider.scala | 22 ++-- .../jackson/StreamRefModule.scala | 115 ++++++++++++++++++ .../akka/stream/scaladsl/StreamRefsSpec.scala | 41 ++++++- .../main/scala/akka/stream/StreamRefs.scala | 56 +++++++++ .../streamref/StreamRefResolverImpl.scala | 33 +++++ build.sbt | 2 +- 14 files changed, 304 insertions(+), 22 deletions(-) create mode 100644 akka-serialization-jackson/src/main/scala/akka/serialization/jackson/StreamRefModule.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefResolverImpl.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/DynamicAccessSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DynamicAccessSpec.scala index ffc36a5577..b1b3e9139b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DynamicAccessSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DynamicAccessSpec.scala @@ -55,6 +55,11 @@ class DynamicAccessSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } + "know if a class exists on the classpath or not" in { + dynamicAccess.classIsOnClasspath("i.just.made.it.up.to.hurt.Myself") should ===(false) + dynamicAccess.classIsOnClasspath("akka.actor.Actor") should ===(true) + } + def instantiateWithDefaultOrStringCtor(fqcn: String): Try[TestSuperclass] = // recoverWith doesn't work with scala 2.13.0-M5 // https://github.com/scala/bug/issues/11242 diff --git a/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes index afe7a7c909..39e4889e20 100644 --- a/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes @@ -80,3 +80,5 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker.onOpen") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.CircuitBreaker.onHalfOpen") +# streamref serialization #27304 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.DynamicAccess.classIsOnClasspath") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala index 07eada5449..e742a3880e 100644 --- a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala @@ -4,6 +4,8 @@ package akka.actor +import akka.annotation.DoNotInherit + import scala.collection.immutable import scala.reflect.ClassTag import scala.util.Try @@ -15,8 +17,10 @@ import scala.util.Try * * This is an internal facility and users are not expected to encounter it * unless they are extending Akka in ways which go beyond simple Extensions. + * + * Not for user extension */ -abstract class DynamicAccess { +@DoNotInherit abstract class DynamicAccess { /** * Convenience method which given a `Class[_]` object and a constructor description @@ -34,6 +38,8 @@ abstract class DynamicAccess { */ def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] + def classIsOnClasspath(fqcn: String): Boolean + /** * Obtain an object conforming to the type T, which is expected to be * instantiated from a class designated by the fully-qualified class name diff --git a/akka-actor/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala index 91f6c03bf4..9d9530406e 100644 --- a/akka-actor/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala @@ -6,7 +6,11 @@ package akka.actor import scala.collection.immutable import java.lang.reflect.InvocationTargetException + +import akka.annotation.DoNotInherit + import scala.reflect.ClassTag +import scala.util.Failure import scala.util.Try /** @@ -14,7 +18,10 @@ import scala.util.Try * unless overridden. It uses reflection to turn fully-qualified class names into `Class[_]` objects * and creates instances from there using `getDeclaredConstructor()` and invoking that. The class loader * to be used for all this is determined by the actor system’s class loader by default. + * + * Not for user extension or construction */ +@DoNotInherit class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] = @@ -41,6 +48,14 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces createInstanceFor(c, args) } + override def classIsOnClasspath(fqcn: String): Boolean = + getClassFor(fqcn) match { + case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) => + false + case _ => + true + } + override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = { val classTry = if (fqcn.endsWith("$")) getClassFor(fqcn) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala index 7e6c3e1958..a872e95d7a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -45,8 +45,7 @@ object StreamRefSpec extends MultiNodeConfig { testTransport(on = true) case class RequestLogs(streamId: Int) extends CborSerializable - // Using Java serialization until issue #27304 is fixed - case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) extends JavaSerializable + case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) extends CborSerializable object DataSource { def props(streamLifecycleProbe: ActorRef): Props = diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md index db5f6236fb..8d3c380305 100644 --- a/akka-docs/src/main/paradox/stream/stream-refs.md +++ b/akka-docs/src/main/paradox/stream/stream-refs.md @@ -170,6 +170,19 @@ Stream refs utilise normal actor messaging for their trainsport, and therefore p Bulk stream refs can be used to create simple side-channels to transfer humongous amounts of data such as huge log files, messages or even media, with as much ease as if it was a trivial local stream. +## Serialization of SourceRef and SinkRef + +StreamRefs require serialization, since the whole point is to send them between nodes of a cluster. A built in serializer +is provided when `SourceRef` and `SinkRef` are sent directly as messages however the recommended use is to wrap them +into your own actor message classes. + +When @ref[Akka Jackson](../serialization-jackson.md) is used, serialization of wrapped `SourceRef` and `SinkRef` +will work out of the box. + +If you are using some other form of serialization you will need to use the @apidoc[akka.stream.StreamRefResolver] extension +from your serializer to get the `SourceRef` and `SinkRef`. The extension provides the methods `toSerializationFormat(sink or source)` +to transform from refs to string and `resolve{Sink,Source}Ref(String)` to resolve refs from strings. + ## Configuration ### Stream reference subscription timeouts @@ -190,7 +203,7 @@ Scala Java : @@snip [FlowStreamRefsDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java) { #attr-sub-timeout } -## General configuration +### General configuration Other settings can be set globally in your `application.conf`, by overriding any of the following values in the `akka.stream.materializer.stream-ref.*` keyspace: diff --git a/akka-serialization-jackson/src/main/resources/reference.conf b/akka-serialization-jackson/src/main/resources/reference.conf index 6148c17e8d..594c4a041b 100644 --- a/akka-serialization-jackson/src/main/resources/reference.conf +++ b/akka-serialization-jackson/src/main/resources/reference.conf @@ -12,6 +12,9 @@ akka.serialization.jackson { jackson-modules += "akka.serialization.jackson.AkkaJacksonModule" # AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule" + // FIXME how does that optinal loading work?? + # AkkaStreamsModule optionally included if akka-streams is in classpath + jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule" jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule" jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module" jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule" diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaJacksonModule.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaJacksonModule.scala index 792b6a6b7d..b53aed9d71 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaJacksonModule.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaJacksonModule.scala @@ -18,3 +18,9 @@ class AkkaTypedJacksonModule extends JacksonModule with TypedActorRefModule { } object AkkaTypedJacksonModule extends AkkaJacksonModule + +class AkkaStreamJacksonModule extends JacksonModule with StreamRefModule { + override def getModuleName = "AkkaStreamJacksonModule" +} + +object AkkaStreamJacksonModule extends AkkaJacksonModule diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonObjectMapperProvider.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonObjectMapperProvider.scala index fb956b97f8..158d95f3f6 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonObjectMapperProvider.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonObjectMapperProvider.scala @@ -132,18 +132,16 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid mapper } - private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean = { - // akka-actor-typed dependency is "provided" and may not be included - if (fqcn == "akka.serialization.jackson.AkkaTypedJacksonModule") { - dynamicAccess.getClassFor("akka.actor.typed.ActorRef") match { - case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) => - false // akka-actor-typed not in classpath - case _ => - true - } - } else - true - } + private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean = + fqcn match { + case "akka.serialization.jackson.AkkaTypedJacksonModule" => + // akka-actor-typed dependency is "provided" and may not be included + dynamicAccess.classIsOnClasspath("akka.actor.typed.ActorRef") + case "akka.serialization.jackson.AkkaStreamJacksonModule" => + // akka-stream dependency is "provided" and may not be included + dynamicAccess.classIsOnClasspath("akka.stream.Graph") + case _ => true + } private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = { import akka.util.ccompat.JavaConverters._ diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/StreamRefModule.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/StreamRefModule.scala new file mode 100644 index 0000000000..2d56b67699 --- /dev/null +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/StreamRefModule.scala @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.serialization.jackson + +// FIXME maybe move many things to `akka.serialization.jackson.internal` package? + +import akka.annotation.InternalApi +import akka.stream.SinkRef +import akka.stream.SourceRef +import akka.stream.StreamRefResolver +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonTokenId +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.SerializerProvider +import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer +import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer + +/** + * INTERNAL API: Adds support for serializing and deserializing [[akka.stream.SourceRef]] and [[akka.stream.SinkRef]]. + */ +@InternalApi private[akka] trait StreamRefModule extends JacksonModule { + addSerializer(classOf[SourceRef[_]], () => SourceRefSerializer.instance, () => SourceRefDeserializer.instance) + addSerializer(classOf[SinkRef[_]], () => SinkRefSerializer.instance, () => SinkRefDeserializer.instance) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object SourceRefSerializer { + val instance: SourceRefSerializer = new SourceRefSerializer +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class SourceRefSerializer + extends StdScalarSerializer[SourceRef[_]](classOf[SourceRef[_]]) + with ActorSystemAccess { + + override def serialize(value: SourceRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = { + val resolver = StreamRefResolver(currentSystem()) + val serializedSourceRef = resolver.toSerializationFormat(value) + jgen.writeString(serializedSourceRef) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object SourceRefDeserializer { + val instance: SourceRefDeserializer = new SourceRefDeserializer +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class SourceRefDeserializer + extends StdScalarDeserializer[SourceRef[_]](classOf[SourceRef[_]]) + with ActorSystemAccess { + + def deserialize(jp: JsonParser, ctxt: DeserializationContext): SourceRef[_] = { + if (jp.currentTokenId() == JsonTokenId.ID_STRING) { + val serializedSourceRef = jp.getText() + StreamRefResolver(currentSystem()).resolveSourceRef(serializedSourceRef) + } else + ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[SourceRef[_]] + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object SinkRefSerializer { + val instance: SinkRefSerializer = new SinkRefSerializer +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class SinkRefSerializer + extends StdScalarSerializer[SinkRef[_]](classOf[SinkRef[_]]) + with ActorSystemAccess { + + override def serialize(value: SinkRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = { + val resolver = StreamRefResolver(currentSystem()) + val serializedSinkRef = resolver.toSerializationFormat(value) + jgen.writeString(serializedSinkRef) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object SinkRefDeserializer { + val instance: SinkRefDeserializer = new SinkRefDeserializer +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class SinkRefDeserializer + extends StdScalarDeserializer[SinkRef[_]](classOf[SinkRef[_]]) + with ActorSystemAccess { + + def deserialize(jp: JsonParser, ctxt: DeserializationContext): SinkRef[_] = { + if (jp.currentTokenId() == JsonTokenId.ID_STRING) { + val serializedSinkref = jp.getText() + StreamRefResolver(currentSystem()).resolveSinkRef(serializedSinkref) + } else + ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[SinkRef[_]] + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala index a5cc9f213d..118885a37a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala @@ -11,6 +11,8 @@ import akka.pattern._ import akka.stream.testkit.TestPublisher import akka.stream.testkit.scaladsl._ import akka.stream._ +import akka.stream.impl.streamref.SinkRefImpl +import akka.stream.impl.streamref.SourceRefImpl import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit, TestProbe } import akka.util.ByteString import com.typesafe.config._ @@ -160,15 +162,20 @@ object StreamRefsSpec { } """).withFallback(ConfigFactory.load()) } + + object SnitchActor { + def props(probe: ActorRef) = Props(new SnitchActor(probe)) + } + class SnitchActor(probe: ActorRef) extends Actor { + def receive = { + case msg => probe ! msg + } + } } -class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSender { +class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) with ImplicitSender { import StreamRefsSpec._ - def this() { - this(StreamRefsSpec.config()) - } - val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config()) implicit val mat = ActorMaterializer() @@ -408,4 +415,28 @@ class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSende } + "The StreamRefResolver" must { + + "serialize and deserialize SourceRefs" in { + val probe = TestProbe() + val ref = system.actorOf(StreamRefsSpec.SnitchActor.props(probe.ref)) + val sourceRef = SourceRefImpl[String](ref) + val resolver = StreamRefResolver(system) + val result = resolver.resolveSourceRef(resolver.toSerializationFormat(sourceRef)) + result.asInstanceOf[SourceRefImpl[String]].initialPartnerRef ! "ping" + probe.expectMsg("ping") + } + + "serialize and deserialize SinkRefs" in { + val probe = TestProbe() + val ref = system.actorOf(StreamRefsSpec.SnitchActor.props(probe.ref)) + val sinkRef = SinkRefImpl[String](ref) + val resolver = StreamRefResolver(system) + val result = resolver.resolveSinkRef(resolver.toSerializationFormat(sinkRef)) + result.asInstanceOf[SinkRefImpl[String]].initialPartnerRef ! "ping" + probe.expectMsg("ping") + } + + } + } diff --git a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala index 391cdf6118..2a59aefc76 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamRefs.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamRefs.scala @@ -6,6 +6,12 @@ package akka.stream import akka.NotUsed import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.annotation.DoNotInherit +import akka.stream.impl.streamref.StreamRefResolverImpl import akka.stream.scaladsl.{ Sink, Source } import scala.language.implicitConversions @@ -31,7 +37,10 @@ object SinkRef { * See also [[akka.stream.SourceRef]] which is the dual of a `SinkRef`. * * For additional configuration see `reference.conf` as well as [[akka.stream.StreamRefAttributes]]. + * + * Not for user extension. */ +@DoNotInherit trait SinkRef[In] { /** Scala API: Get [[Sink]] underlying to this source ref. */ @@ -62,7 +71,10 @@ object SourceRef { * See also [[akka.stream.SinkRef]] which is the dual of a `SourceRef`. * * For additional configuration see `reference.conf` as well as [[akka.stream.StreamRefAttributes]]. + * + * Not for user extension. */ +@DoNotInherit trait SourceRef[T] { /** Scala API: Get [[Source]] underlying to this source ref. */ @@ -103,3 +115,47 @@ final case class InvalidPartnerActorException(expectedRef: ActorRef, gotRef: Act s"This may happen due to 'double-materialization' on the other side of this stream ref. " + s"Do note that stream refs are one-shot references and have to be paired up in 1:1 pairs. " + s"Multi-cast such as broadcast etc can be implemented by sharing multiple new stream references. ") + +/** + * The stream ref resolver extension provides a way to serialize and deserialize streamrefs in user serializers. + */ +object StreamRefResolver extends ExtensionId[StreamRefResolver] { + + /** + * Java API + */ + override def get(system: ActorSystem): StreamRefResolver = super.get(system) + + override def createExtension(system: ExtendedActorSystem): StreamRefResolver = + new StreamRefResolverImpl(system) +} + +/** + * The stream ref resolver provides a way to serialize and deserialize streamrefs in user serializers. + * + * Not for user extension + */ +@DoNotInherit trait StreamRefResolver extends Extension { + + /** + * Generate full String representation of the `SourceRef`. + * This representation should be used as serialized representation. + */ + def toSerializationFormat[T](ref: SourceRef[T]): String + + /** + * Generate full String representation of the `SinkRef`. + * This representation should be used as serialized representation. + */ + def toSerializationFormat[T](ref: SinkRef[T]): String + + /** + * Deserialize an `SourceRef` in the [[#toSerializationFormat]]. + */ + def resolveSourceRef[T](serializedSourceRef: String): SourceRef[T] + + /** + * Deserialize an `SinkRef` in the [[#toSerializationFormat]]. + */ + def resolveSinkRef[T](serializedSinkRef: String): SinkRef[T] +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefResolverImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefResolverImpl.scala new file mode 100644 index 0000000000..52eab5d858 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefResolverImpl.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.impl.streamref + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.stream.SinkRef +import akka.stream.SourceRef +import akka.stream.StreamRefResolver + +/** + * INTERNAL API + */ +@InternalApi final class StreamRefResolverImpl(system: ExtendedActorSystem) extends StreamRefResolver { + + def toSerializationFormat[T](ref: SourceRef[T]): String = ref match { + case SourceRefImpl(actorRef) => + actorRef.path.toSerializationFormatWithAddress(system.provider.getDefaultAddress) + } + + def toSerializationFormat[T](ref: SinkRef[T]): String = ref match { + case SinkRefImpl(actorRef) => + actorRef.path.toSerializationFormatWithAddress(system.provider.getDefaultAddress) + } + + def resolveSourceRef[T](serializedSourceRef: String): SourceRef[T] = + SourceRefImpl(system.provider.resolveActorRef(serializedSourceRef)) + + def resolveSinkRef[T](serializedSinkRef: String): SinkRef[T] = + SinkRefImpl(system.provider.resolveActorRef(serializedSinkRef)) +} diff --git a/build.sbt b/build.sbt index f8cdb8c63c..f2fe8990cd 100644 --- a/build.sbt +++ b/build.sbt @@ -246,7 +246,7 @@ lazy val docs = akkaModule("akka-docs") .disablePlugins(ScalafixPlugin) lazy val jackson = akkaModule("akka-serialization-jackson") - .dependsOn(actor, actorTyped % "optional->compile", actorTests % "test->test", testkit % "test->test") + .dependsOn(actor, actorTyped % "optional->compile", stream % "optional->compile" ,actorTests % "test->test", testkit % "test->test") .settings(Dependencies.jackson) .settings(AutomaticModuleName.settings("akka.serialization.jackson")) .settings(OSGi.jackson)