diff --git a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md b/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md new file mode 100644 index 0000000000..1a326093c0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md @@ -0,0 +1,18 @@ +# Source.startContextPropagation + +Turns a Source into a SourceWithContext which can propagate a context per element along a stream. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation } + +@@@ + +## Description + +Turns a Source into a SourceWithContext which can propagate a context per element along a stream. +The function passed into startContextPropagation must turn elements into contexts, one context for every element. diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 2a0c672e47..51e4c1a133 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -27,6 +27,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly| |Source|@ref[single](Source/single.md)|Stream a single object| +|Source|@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.| |Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| @@ -285,6 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) @@@ index * [combine](Source/combine.md) +* [startContextPropagation](Source/startContextPropagation.md) * [fromPublisher](Source/fromPublisher.md) * [fromIterator](Source/fromIterator.md) * [cycle](Source/cycle.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala new file mode 100644 index 0000000000..1de487e653 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.StreamSpec + +case class Message(data: String, offset: Long) + +class SourceWithContextSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + "A SourceWithContext" must { + + "get created from Source.startContextPropagation" in { + val msg = Message("a", 1L) + Source(Vector(msg)) + .startContextPropagation(_.offset) + .endContextPropagation + .runWith(TestSink.probe[(Message, Long)]) + .request(1) + .expectNext((msg, 1L)) + .expectComplete() + } + + "be able to get turned back into a normal Source" in { + val msg = Message("a", 1L) + Source(Vector(msg)) + .startContextPropagation(_.offset) + .map(_.data) + .endContextPropagation.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[String]) + .request(1) + .expectNext("a") + .expectComplete() + } + + "pass through contexts using map and filter" in { + Source( + Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + ) + .startContextPropagation(_.offset) + .map(_.data.toLowerCase) + .filter(_ != "b") + .filterNot(_ == "d") + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(2) + .expectNext(("a", 1L)) + .expectNext(("c", 4L)) + .expectComplete() + } + + "pass through contexts via a FlowWithContext" in { + + def flowWithContext[T] = FlowWithContext[Long, T] + + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .via(flowWithContext.map(s ⇒ s + "b")) + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(1) + .expectNext(("ab", 1L)) + .expectComplete() + } + + "pass through contexts via mapConcat" in { + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .mapConcat { str ⇒ + List(1, 2, 3).map(i ⇒ s"$str-$i") + } + .endContextPropagation + .runWith(TestSink.probe[(String, Long)]) + .request(3) + .expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L)) + .expectComplete() + } + + "pass through a sequence of contexts per element via grouped" in { + Source(Vector(Message("a", 1L))) + .startContextPropagation(_.offset) + .map(_.data) + .mapConcat { str ⇒ + List(1, 2, 3, 4).map(i ⇒ s"$str-$i") + } + .grouped(2) + .endContextPropagation + .runWith(TestSink.probe[(Seq[String], Seq[Long])]) + .request(2) + .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) + .expectComplete() + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala new file mode 100644 index 0000000000..155f110558 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.collection.immutable +import akka.NotUsed +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.StreamSpec + +class WithContextUsageSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + "Context propagation used for committing offsets" must { + + "be able to commit on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + + val f: (Record ⇒ Record) = record ⇒ record.copy(value = record.value + 1) + val expectedRecords = toRecords(input).map(f) + + val src = createSourceWithContext(input) + .map(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(input.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "only commit filtered offsets on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ Boolean) = record ⇒ record.key.endsWith("2") + val expectedOffsets = input.filter(cm ⇒ f(cm.record)).map(cm ⇒ Offset(cm)).init + val expectedRecords = toRecords(input).filter(f) + + val src = createSourceWithContext(input) + .filter(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(input.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "only commit after mapConcat on offset change" in { + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + val expectedRecords = toRecords(input).flatMap(f) + + val src = createSourceWithContext(input) + .mapConcat(f) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[Record]) + .request(expectedRecords.size) + .expectNextN(expectedRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "commit offsets after grouped on offset change" in { + val groupSize = 2 + val testRange = 0 to 10 + val input = genInput(testRange) + + val expectedOffsets = testRange.grouped(2).map(ixs ⇒ Offset(ixs.last)).toVector.init + val expectedMultiRecords = toRecords(input).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector + + val src = createSourceWithContext(input) + .grouped(groupSize) + .map(l ⇒ MultiRecord(l)) + .mapContext(_.last) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[MultiRecord]) + .request(expectedMultiRecords.size) + .expectNextN(expectedMultiRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + "commit offsets after mapConcat + grouped on offset change" in { + val groupSize = 2 + val testRange = 0 to 10 + val input = genInput(testRange) + + val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record) + + // the mapConcat creates bigger lists than the groups, which is why all offsets are seen. + // (The mapContext selects the last offset in a group) + val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init + val expectedMultiRecords = toRecords(input).flatMap(f).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector + + val src = createSourceWithContext(input) + .mapConcat(f) + .grouped(groupSize) + .map(l ⇒ MultiRecord(l)) + .mapContext(_.last) + .endContextPropagation + + src.map { case (e, _) ⇒ e } + .runWith(TestSink.probe[MultiRecord]) + .request(expectedMultiRecords.size) + .expectNextN(expectedMultiRecords) + .expectComplete() + + src.map { case (_, ctx) ⇒ ctx } + .toMat(commitOffsets)(Keep.right) + .run() + .request(input.size) + .expectNextN(expectedOffsets) + .expectComplete() + } + + def genInput(range: Range) = range.map(ix ⇒ Consumer.CommittableMessage(Record(genKey(ix), genValue(ix)), Consumer.CommittableOffsetImpl(ix))).toVector + def toRecords(committableMessages: Vector[Consumer.CommittableMessage[Record]]) = committableMessages.map(_.record) + def genKey(ix: Int) = s"k$ix" + def genValue(ix: Int) = s"v$ix" + } + + def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Offset, Record, NotUsed] = + Consumer + .committableSource(committableMessages) + .startContextPropagation(m ⇒ Offset(m.committableOffset.offset)) + .map(_.record) + + def commitOffsets = commit[Offset](Offset.Uninitialized) + def commit[Ctx](uninitialized: Ctx): Sink[Ctx, Probe[Ctx]] = { + val testSink = TestSink.probe[Ctx] + Flow[Ctx].statefulMapConcat { () ⇒ + { + var prevCtx: Ctx = uninitialized + ctx ⇒ { + val res = + if (prevCtx != uninitialized && ctx != prevCtx) Vector(prevCtx) + else Vector.empty[Ctx] + + prevCtx = ctx + res + } + } + }.toMat(testSink)(Keep.right) + } +} + +object Offset { + val Uninitialized = Offset(-1) + def apply(cm: Consumer.CommittableMessage[Record]): Offset = Offset(cm.committableOffset.offset) +} + +case class Offset(value: Int) + +case class Record(key: String, value: String) +case class Committed[R](record: R, offset: Int) +case class MultiRecord(records: immutable.Seq[Record]) + +object Consumer { + def committableSource(committableMessages: Vector[CommittableMessage[Record]]): Source[CommittableMessage[Record], NotUsed] = { + Source(committableMessages) + } + case class CommittableMessage[V](record: V, committableOffset: CommittableOffset) + + trait Committable { + def commit(): Unit + } + + trait CommittableOffset extends Committable { + def offset: Int + } + + case class CommittableOffsetImpl(offset: Int) extends CommittableOffset { + def commit(): Unit = {} + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala new file mode 100644 index 0000000000..c34eb989fb --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.annotation.ApiMayChange +import akka.japi.{ Pair, Util, function } +import akka.stream._ +import akka.stream.impl.LinearTraversalBuilder + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.JavaConverters._ +import scala.collection.immutable +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +/** + * API MAY CHANGE + */ +@ApiMayChange +object FlowWithContext { + def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { + new FlowWithContext(scaladsl.FlowWithContext[Ctx, In]) + } + def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]) = { + new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) + } +} + +/** + * API MAY CHANGE + */ +@ApiMayChange +final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] { + override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder + override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)] = delegate.shape + override def withAttributes(attr: Attributes): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = new FlowWithContext(delegate.withAttributes(attr)) + + def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = { + new FlowWithContext(delegate.mapContext(extractContext.apply)) + } + + def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = { + val under = endContextPropagation().via(viaFlow) + FlowWithContext.fromPairs(under) + } + + def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2]): Sink[Pair[In, CtxIn], Mat] @uncheckedVariance = + endContextPropagation().toMat(sink, Keep.left) + + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): Sink[Pair[In, CtxIn], Mat3] @uncheckedVariance = + endContextPropagation().toMat(sink, combine) + + def endContextPropagation(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance = + scaladsl.Flow[Pair[In, CtxIn]] + .map(_.toScala) + .viaMat(delegate.endContextPropagation)(scaladsl.Keep.right) + .map { case (o, c) ⇒ Pair(o, c) } + .asJava + + def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + new FlowWithContext(delegate.map(f.apply)) + + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + new FlowWithContext(delegate.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + + def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + new FlowWithContext(delegate.collect(pf)) + + def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + new FlowWithContext(delegate.filter(p.test)) + + def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + new FlowWithContext(delegate.filterNot(p.test)) + + def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { + val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] { + def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava + } + new FlowWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f) + } + + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + new FlowWithContext(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + new FlowWithContext(delegate.statefulMapConcat { () ⇒ + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }) + + def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { + val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] { + def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava + } + new FlowWithContext(delegate.sliding(n, step).map(_.asJava)).mapContext(f) + } + + def asScala = delegate +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 435e7753f2..3ea9763479 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,6 +8,7 @@ import java.util import java.util.Optional import akka.actor.{ ActorRef, Cancellable, Props } +import akka.annotation.ApiMayChange import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ @@ -3466,4 +3467,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def log(name: String): javadsl.Source[Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + + /** + * API MAY CHANGE + */ + @ApiMayChange + def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] = + new javadsl.SourceWithContext(scaladsl.SourceWithContext(this.asScala).mapContext(extractContext.apply)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala new file mode 100644 index 0000000000..3bbdd37868 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.annotation.ApiMayChange +import akka.japi.{ Pair, Util, function } +import akka.stream._ +import akka.stream.impl.LinearTraversalBuilder + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.JavaConverters._ +import scala.collection.immutable +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +/** + * API MAY CHANGE + */ +@ApiMayChange +object SourceWithContext { + def from[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = { + new SourceWithContext(scaladsl.SourceWithContext(underlying.asScala)) + } + + def fromPairs[Out, Ctx, Mat](under: Source[Pair[Out, Ctx], Mat]): SourceWithContext[Ctx, Out, Mat] = { + new SourceWithContext(scaladsl.SourceWithContext.from(under.asScala.map(_.toScala))) + } +} + +/** + * API MAY CHANGE + */ +@ApiMayChange +final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends Graph[SourceShape[(Out, Ctx)], Mat] { + override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder + override val shape: SourceShape[(Out, Ctx)] = delegate.shape + override def withAttributes(attr: Attributes): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(delegate.withAttributes(attr)) + + def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = { + new SourceWithContext(delegate.mapContext(extractContext.apply)) + } + + def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = { + val under = endContextPropagation().via(viaFlow) + SourceWithContext.fromPairs(under) + } + + def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2]): RunnableGraph[Mat] = + endContextPropagation().toMat(sink, Keep.left) + + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): RunnableGraph[Mat3] = + endContextPropagation().toMat(sink, combine) + + def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] = + delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava + + def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + new SourceWithContext(delegate.map(f.apply)) + + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + new SourceWithContext(delegate.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + + def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + new SourceWithContext(delegate.collect(pf)) + + def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + new SourceWithContext(delegate.filter(p.test)) + + def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + new SourceWithContext(delegate.filterNot(p.test)) + + def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { + val f = new function.Function[immutable.Seq[Ctx], java.util.List[Ctx]] { + def apply(ctxs: immutable.Seq[Ctx]) = ctxs.asJava + } + + new SourceWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f) + } + + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + new SourceWithContext(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] = + new SourceWithContext(delegate.statefulMapConcat { () ⇒ + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }) + + def asScala = delegate +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala new file mode 100644 index 0000000000..7be4254690 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.collection.immutable +import scala.concurrent.Future +import scala.language.higherKinds +import scala.annotation.unchecked.uncheckedVariance + +import akka.NotUsed +import akka.annotation.ApiMayChange +import akka.dispatch.ExecutionContexts +import akka.stream._ +import akka.stream.impl.LinearTraversalBuilder + +/** + * API MAY CHANGE + */ +@ApiMayChange +trait FlowWithContextOps[+Ctx, +Out, +Mat] { + type Repr[+C, +O] <: FlowWithContextOps[C, O, Mat] { + type Repr[+CC, +OO] = FlowWithContextOps.this.Repr[CC, OO] + type Prov[+CC, +OO] = FlowWithContextOps.this.Prov[CC, OO] + } + + type Prov[+C, +O] <: FlowOpsMat[(O, C), Mat] + + def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] + + def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] = + via(flow.map { case (e, ctx) ⇒ (f(e), ctx) }) + + def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] = + via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) }) + + def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] = + via(flow.collect { + case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx) + }) + + def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if pred(e) ⇒ e } + + def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if !pred(e) ⇒ e } + + def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.grouped(n).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.sliding(n, step).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f) + + def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = { + val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = { () ⇒ elWithContext ⇒ + val (el, ctx) = elWithContext + f()(el).map(o ⇒ (o, ctx)) + } + via(flow.statefulMapConcat(fCtx)) + } + + def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = + via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) + + def endContextPropagation: Prov[Ctx, Out] + + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] +} + +/** + * API MAY CHANGE + */ +@ApiMayChange +object FlowWithContext { + def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { + val under = Flow[(In, Ctx)] + new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under, under.traversalBuilder, under.shape) + } + def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = new FlowWithContext(flow, flow.traversalBuilder, flow.shape) +} + +/** + * API MAY CHANGE + */ +@ApiMayChange +final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat]( + underlying: Flow[(In, CtxIn), (Out, CtxOut), Mat], + override val traversalBuilder: LinearTraversalBuilder, + override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)] +) extends FlowWithContextOps[CtxOut, Out, Mat] with Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] { + + override def withAttributes(attr: Attributes): Repr[CtxOut, Out] = new FlowWithContext(underlying, traversalBuilder.setAttributes(attr), shape) + + override type Repr[+C, +O] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, Mat @uncheckedVariance] + override type Prov[+C, +O] = Flow[(In @uncheckedVariance, CtxIn @uncheckedVariance), (O, C), Mat @uncheckedVariance] + + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = from(underlying.via(viaFlow)) + + def to[Mat2](sink: Graph[SinkShape[(Out, CtxOut)], Mat2]): Sink[(In, CtxIn), Mat] = underlying.toMat(sink)(Keep.left) + + def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, CtxOut)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[(In, CtxIn), Mat3] = underlying.toMat(sink)(combine) + + override def endContextPropagation: Prov[CtxOut, Out] = underlying + + private[this] def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = FlowWithContext.from(flow) + + def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] = + new javadsl.FlowWithContext(this) +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 98e3b44ad6..cda704a48d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -7,7 +7,7 @@ package akka.stream.scaladsl import java.util.concurrent.CompletionStage import akka.actor.{ ActorRef, Cancellable, Props } -import akka.annotation.InternalApi +import akka.annotation.{ ApiMayChange, InternalApi } import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages @@ -215,6 +215,11 @@ final class Source[+Out, +Mat]( combineRest(2, rest.iterator) }) + /** + * API MAY CHANGE + */ + @ApiMayChange + def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = SourceWithContext(this).mapContext(f) } object Source { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala new file mode 100644 index 0000000000..8489bf7409 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.annotation.unchecked.uncheckedVariance + +import akka.annotation.ApiMayChange +import akka.stream._ +import akka.stream.impl.LinearTraversalBuilder + +/** + * API MAY CHANGE + */ +@ApiMayChange +object SourceWithContext { + def apply[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = { + val under = underlying.map(e ⇒ (e, e)) + new SourceWithContext[Out, Out, Mat](under, under.traversalBuilder, under.shape) + } + def from[Out, Ctx, Mat](under: Source[(Out, Ctx), Mat]): SourceWithContext[Ctx, Out, Mat] = { + new SourceWithContext[Ctx, Out, Mat](under, under.traversalBuilder, under.shape) + } +} + +/** + * API MAY CHANGE + */ +@ApiMayChange +final class SourceWithContext[+Ctx, +Out, +Mat]( + underlying: Source[(Out, Ctx), Mat], + override val traversalBuilder: LinearTraversalBuilder, + override val shape: SourceShape[(Out, Ctx)] +) extends FlowWithContextOps[Ctx, Out, Mat] with Graph[SourceShape[(Out, Ctx)], Mat] { + + override def withAttributes(attr: Attributes): Repr[Ctx, Out] = new SourceWithContext(underlying, traversalBuilder.setAttributes(attr), shape) + + override type Repr[+C, +O] = SourceWithContext[C, O, Mat @uncheckedVariance] + override type Prov[+C, +O] = Source[(O, C), Mat @uncheckedVariance] + + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = { + val under = underlying.via(viaFlow) + new SourceWithContext[Ctx2, Out2, Mat](under, under.traversalBuilder, under.shape) + } + + def to[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2]): RunnableGraph[Mat] = underlying.toMat(sink)(Keep.left) + + def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = + underlying.toMat(sink)(combine) + + override def endContextPropagation: Prov[Ctx, Out] = underlying + + def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] = + new javadsl.SourceWithContext(this) +} +