diff --git a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md b/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md new file mode 100644 index 0000000000..1a326093c0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md @@ -0,0 +1,18 @@ +# Source.startContextPropagation + +Turns a Source into a SourceWithContext which can propagate a context per element along a stream. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation } + +@@@ + +## Description + +Turns a Source into a SourceWithContext which can propagate a context per element along a stream. +The function passed into startContextPropagation must turn elements into contexts, one context for every element. diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 2a0c672e47..51e4c1a133 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -27,6 +27,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly| |Source|@ref[single](Source/single.md)|Stream a single object| +|Source|@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.| |Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| @@ -285,6 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) @@@ index * [combine](Source/combine.md) +* [startContextPropagation](Source/startContextPropagation.md) * [fromPublisher](Source/fromPublisher.md) * [fromIterator](Source/fromIterator.md) * [cycle](Source/cycle.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala new file mode 100644 index 0000000000..1de487e653 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.StreamSpec + +case class Message(data: String, offset: Long) + +class SourceWithContextSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + "A SourceWithContext" must { + + "get created from Source.startContextPropagation" in { + val msg = Message("a", 1L) + Source(Vector(msg)) + .startContextPropagation(_.offset) + .endContextPropagation + .runWith(TestSink.probe[(Message, Long)]) + .request(1) + .expectNext((msg, 1L)) + .expectComplete() + } + + "be able to get turned back into a normal Source" in { + val msg = Message("a", 1L) + Source(Vector(msg)) + .startContextPropagation(_.offset) + .map(_.data) + .endContextPropagation.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[String]) + .request(1) + .expectNext("a") + .expectComplete() + } + + "pass through contexts using map and filter" in { + Source( + Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + ) + .startContextPropagation(_.offset) + .map(_.data.toLowerCase) + .filter(_ != "b") + .filterNot(_ == "d") + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(2) + .expectNext(("a", 1L)) + .expectNext(("c", 4L)) + .expectComplete() + } + + "pass through contexts via a FlowWithContext" in { + + def flowWithContext[T] = FlowWithContext[Long, T] + + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .via(flowWithContext.map(s ⇒ s + "b")) + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(1) + .expectNext(("ab", 1L)) + .expectComplete() + } + + "pass through contexts via mapConcat" in { + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .mapConcat { str ⇒ + List(1, 2, 3).map(i ⇒ s"$str-$i") + } + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(3) + .expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L)) + .expectComplete() + } + + "pass through a sequence of contexts per element via grouped" in { + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .mapConcat { str ⇒ + List(1, 2, 3, 4).map(i ⇒ s"$str-$i") + } + .grouped(2) + .endContextPropagation + .runWith(TestSink.probe[(Seq[String], Seq[Long])]) + .request(2) + .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) + .expectComplete() + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala new file mode 100644 index 0000000000..155f110558 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.collection.immutable +import akka.NotUsed +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.StreamSpec + +class WithContextUsageSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + "Context propagation used for committing offsets" must { + + "be able to commit on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + + val f: (Record ⇒ Record) = record ⇒ record.copy(value = record.value + 1) + val expectedRecords = toRecords(input).map(f) + + val src = createSourceWithContext(input) + .map(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(input.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "only commit filtered offsets on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ Boolean) = record ⇒ record.key.endsWith("2") + val expectedOffsets = input.filter(cm ⇒ f(cm.record)).map(cm ⇒ Offset(cm)).init + val expectedRecords = toRecords(input).filter(f) + + val src = createSourceWithContext(input) + .filter(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(input.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "only commit after mapConcat on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + val expectedRecords = toRecords(input).flatMap(f) + + val src = createSourceWithContext(input) + .mapConcat(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(expectedRecords.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "commit offsets after grouped on offset change" in { + val groupSize = 2 + val testRange = 0 to 10 + val input = genInput(testRange) + + val expectedOffsets = testRange.grouped(2).map(ixs ⇒ Offset(ixs.last)).toVector.init + val expectedMultiRecords = toRecords(input).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector + + val src = createSourceWithContext(input) + .grouped(groupSize) + .map(l ⇒ MultiRecord(l)) + .mapContext(_.last) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[MultiRecord]) + .request(expectedMultiRecords.size) + .expectNextN(expectedMultiRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "commit offsets after mapConcat + grouped on offset change" in { + val groupSize = 2 + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record) + + // the mapConcat creates bigger lists than the groups, which is why all offsets are seen. + // (The mapContext selects the last offset in a group) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + val expectedMultiRecords = toRecords(input).flatMap(f).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector + + val src = createSourceWithContext(input) + .mapConcat(f) + .grouped(groupSize) + .map(l ⇒ MultiRecord(l)) + .mapContext(_.last) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[MultiRecord]) + .request(expectedMultiRecords.size) + .expectNextN(expectedMultiRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + def genInput(range: Range) = range.map(ix ⇒ Consumer.CommittableMessage(Record(genKey(ix), genValue(ix)), Consumer.CommittableOffsetImpl(ix))).toVector + def toRecords(committableMessages: Vector[Consumer.CommittableMessage[Record]]) = committableMessages.map(_.record) + def genKey(ix: Int) = s"k$ix" + def genValue(ix: Int) = s"v$ix" + } + + def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Offset, Record, NotUsed] = + Consumer + .committableSource(committableMessages) + .startContextPropagation(m ⇒ Offset(m.committableOffset.offset)) + .map(_.record) + + def commitOffsets = commit[Offset](Offset.Uninitialized) + def commit[Ctx](uninitialized: Ctx): Sink[Ctx, Probe[Ctx]] = { + val testSink = TestSink.probe[Ctx] + Flow[Ctx].statefulMapConcat { () ⇒ + { + var prevCtx: Ctx = uninitialized + ctx ⇒ { + val res = + if (prevCtx != uninitialized && ctx != prevCtx) Vector(prevCtx) + else Vector.empty[Ctx] + + prevCtx = ctx + res + } + } + }.toMat(testSink)(Keep.right) + } +} + +object Offset { + val Uninitialized = Offset(-1) + def apply(cm: Consumer.CommittableMessage[Record]): Offset = Offset(cm.committableOffset.offset) +} + +case class Offset(value: Int) + +case class Record(key: String, value: String) +case class Committed[R](record: R, offset: Int) +case class MultiRecord(records: immutable.Seq[Record]) + +object Consumer { + def committableSource(committableMessages: Vector[CommittableMessage[Record]]): Source[CommittableMessage[Record], NotUsed] = { + Source(committableMessages) + } + case class CommittableMessage[V](record: V, committableOffset: CommittableOffset) + + trait Committable { + def commit(): Unit + } + + trait CommittableOffset extends Committable { + def offset: Int + } + + case class CommittableOffsetImpl(offset: Int) extends CommittableOffset { + def commit(): Unit = {} + } +} diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 4899c080f2..61874af190 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -4,6 +4,7 @@ package akka.stream +import akka.annotation.InternalApi import akka.stream.impl.TraversalBuilder import scala.annotation.unchecked.uncheckedVariance @@ -68,3 +69,16 @@ trait Graph[+S <: Shape, +M] { */ def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr) } + +/** + * INTERNAL API + * + * Allows creating additional API on top of an existing Graph by extending from this class and + * accessing the delegate + */ +@InternalApi +private[stream] abstract class GraphDelegate[+S <: Shape, +Mat](delegate: Graph[S, Mat]) extends Graph[S, Mat] { + final override def shape: S = delegate.shape + final override private[stream] def traversalBuilder: TraversalBuilder = delegate.traversalBuilder + final override def withAttributes(attr: Attributes): Graph[S, Mat] = delegate.withAttributes(attr) +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala new file mode 100644 index 0000000000..d3e13ea619 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -0,0 +1,205 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.annotation.ApiMayChange +import akka.japi.{ Pair, Util, function } +import akka.stream._ + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.JavaConverters._ +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +/** + * API MAY CHANGE + */ +@ApiMayChange +object FlowWithContext { + def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { + new FlowWithContext(scaladsl.FlowWithContext[Ctx, In]) + } + def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = { + new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) + } +} + +/** + * A flow that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[Flow]] is supported. As an escape hatch you can + * use [[FlowWithContext.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`. + * + * API MAY CHANGE + */ +@ApiMayChange +final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends GraphDelegate(delegate) { + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.javadsl.Flow.via]] + */ + def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = { + val under = asFlow().via(viaFlow) + FlowWithContext.fromPairs(under) + } + + /** + * Creates a regular flow of pairs (data, context). + */ + def asFlow(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance = + scaladsl.Flow[Pair[In, CtxIn]] + .map(_.toScala) + .viaMat(delegate.asFlow)(scaladsl.Keep.right) + .map { case (o, c) ⇒ Pair(o, c) } + .asJava + + // remaining operations in alphabetic order + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.collect]] + */ + def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.collect(pf)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.filter]] + */ + def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + viaScala(_.filter(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.filterNot]] + */ + def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + viaScala(_.filterNot(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Flow.grouped]] + */ + def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.map]]. + * + * @see [[akka.stream.javadsl.Flow.map]] + */ + def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.map(f.apply)) + + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Flow.mapConcat]] + */ + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = { + viaScala(_.mapContext(extractContext.apply)) + } + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Flow.sliding]] + */ + def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Flow.statefulMapConcat]] + */ + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.statefulMapConcat { () ⇒ + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }) + + def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate + + private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] ⇒ scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] = + new FlowWithContext(f(delegate)) +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b1b3b53285..ac23dc77d9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,6 +8,7 @@ import java.util import java.util.Optional import akka.actor.{ ActorRef, Cancellable, Props } +import akka.annotation.ApiMayChange import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ @@ -3466,4 +3467,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def log(name: String): javadsl.Source[Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + + /** + * API MAY CHANGE + */ + @ApiMayChange + def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] = + new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala new file mode 100644 index 0000000000..b9387c945c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.annotation.ApiMayChange +import akka.japi.{ Pair, Util, function } +import akka.stream._ + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.JavaConverters._ +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +/** + * A source that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[Source]] is supported. As an escape hatch you can + * use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * Can be created by calling [[Source.startContextPropagation()]] + * + * API MAY CHANGE + */ +@ApiMayChange +final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends GraphDelegate(delegate) { + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.javadsl.Flow.via]] + */ + def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = + viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) ⇒ Pair(o, c) }.via(viaFlow).map(_.toScala))) + + /** + * Stops automatic context propagation from here and converts this to a regular + * stream of a pair of (data, context). + */ + def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] = + delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava + + // remaining operations in alphabetic order + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.collect]] + */ + def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.collect(pf)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.filter]] + */ + def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + viaScala(_.filter(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.filterNot]] + */ + def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + viaScala(_.filterNot(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Source.grouped]] + */ + def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.map]]. + * + * @see [[akka.stream.javadsl.Source.map]] + */ + def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.map(f.apply)) + + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Source.mapConcat]] + */ + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = + viaScala(_.mapContext(extractContext.apply)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Source.sliding]] + */ + def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Source.statefulMapConcat]] + */ + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.statefulMapConcat { () ⇒ + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }) + + def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate + + private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] ⇒ scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] = + new SourceWithContext(f(delegate)) +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala new file mode 100644 index 0000000000..52a0108015 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.annotation.unchecked.uncheckedVariance +import akka.annotation.ApiMayChange +import akka.stream._ + +/** + * API MAY CHANGE + */ +@ApiMayChange +object FlowWithContext { + /** + * Creates an "empty" FlowWithContext that passes elements through with their context unchanged. + */ + def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { + val under = Flow[(In, Ctx)] + new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under) + } + /** + * Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements. + */ + def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]): FlowWithContext[CI, I, CO, O, M] = new FlowWithContext(flow) +} + +/** + * A flow that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can + * use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`. + * + * API MAY CHANGE + */ +@ApiMayChange +final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat]( + delegate: Flow[(In, CtxIn), (Out, CtxOut), Mat] +) extends GraphDelegate(delegate) with FlowWithContextOps[CtxOut, Out, Mat] { + override type ReprMat[+C, +O, +M] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, M @uncheckedVariance] + + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + FlowWithContext.from(delegate.via(viaFlow)) + + override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[CtxIn, In, Ctx2, Out2, Mat3] = + FlowWithContext.from(delegate.viaMat(flow)(combine)) + + def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate + + def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] = + new javadsl.FlowWithContext(this) +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala new file mode 100644 index 0000000000..c36e07cb2e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.collection.immutable +import scala.concurrent.Future +import scala.language.higherKinds +import scala.annotation.unchecked.uncheckedVariance +import akka.NotUsed +import akka.annotation.ApiMayChange +import akka.dispatch.ExecutionContexts +import akka.stream._ + +/** + * Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context + * element with each data element. + * + * API MAY CHANGE + */ +@ApiMayChange +trait FlowWithContextOps[+Ctx, +Out, +Mat] { + type ReprMat[+C, +O, +M] <: FlowWithContextOps[C, O, M] { + type ReprMat[+CC, +OO, +MatMat] = FlowWithContextOps.this.ReprMat[CC, OO, MatMat] + } + type Repr[+C, +O] = ReprMat[C, O, Mat @uncheckedVariance] + + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.scaladsl.FlowOps.via]] + */ + def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] + + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + * + * @see [[akka.stream.scaladsl.FlowOps.viaMat]] + */ + def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[Ctx2, Out2, Mat3] + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.map]]. + * + * @see [[akka.stream.scaladsl.FlowOps.map]] + */ + def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] = + via(flow.map { case (e, ctx) ⇒ (f(e), ctx) }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]]. + * + * @see [[akka.stream.scaladsl.FlowOps.mapAsync]] + */ + def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] = + via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.collect]] + */ + def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] = + via(flow.collect { + case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.filter]] + */ + def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if pred(e) ⇒ e } + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.filterNot]] + */ + def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if !pred(e) ⇒ e } + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.scaladsl.FlowOps.grouped]] + */ + def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.grouped(n).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.scaladsl.FlowOps.sliding]] + */ + def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.sliding(n, step).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.scaladsl.FlowOps.mapConcat]] + */ + def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]] + */ + def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = { + val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = { () ⇒ elWithContext ⇒ + val (el, ctx) = elWithContext + f()(el).map(o ⇒ (o, ctx)) + } + via(flow.statefulMapConcat(fCtx)) + } + + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = + via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) + + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 76e749dcc5..0861849ef1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -7,7 +7,7 @@ package akka.stream.scaladsl import java.util.concurrent.CompletionStage import akka.actor.{ ActorRef, Cancellable, Props } -import akka.annotation.InternalApi +import akka.annotation.{ ApiMayChange, InternalApi } import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages @@ -215,6 +215,12 @@ final class Source[+Out, +Mat]( combineRest(2, rest.iterator) }) + + /** + * API MAY CHANGE + */ + @ApiMayChange + def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) } object Source { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala new file mode 100644 index 0000000000..a73b168734 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.annotation.unchecked.uncheckedVariance + +import akka.annotation.ApiMayChange +import akka.stream._ + +/** + * A source that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can + * use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * Can be created by calling [[Source.startContextPropagation()]] + * + * API MAY CHANGE + */ +@ApiMayChange +final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] ( + delegate: Source[(Out, Ctx), Mat] +) extends GraphDelegate(delegate) with FlowWithContextOps[Ctx, Out, Mat] { + override type ReprMat[+C, +O, +M] = SourceWithContext[C, O, M @uncheckedVariance] + + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + new SourceWithContext(delegate.via(viaFlow)) + + override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Ctx2, Out2, Mat3] = + new SourceWithContext(delegate.viaMat(flow)(combine)) + + /** + * Stops automatic context propagation from here and converts this to a regular + * stream of a pair of (data, context). + */ + def endContextPropagation: Source[(Out, Ctx), Mat] = delegate + + def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] = + new javadsl.SourceWithContext(this) +} +