Merge pull request #25951 from RayRoestenburg/wip-context-operators
SourceWithContext and FlowWithContext
This commit is contained in:
commit
a920349090
12 changed files with 1061 additions and 1 deletions
|
|
@ -0,0 +1,18 @@
|
||||||
|
# Source.startContextPropagation
|
||||||
|
|
||||||
|
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
|
||||||
|
|
||||||
|
@ref[Source operators](../index.md#source-operators)
|
||||||
|
|
||||||
|
@@@div { .group-scala }
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation }
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
|
||||||
|
The function passed into startContextPropagation must turn elements into contexts, one context for every element.
|
||||||
|
|
@ -27,6 +27,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
||||||
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|
||||||
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly|
|
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly|
|
||||||
|Source|<a name="single"></a>@ref[single](Source/single.md)|Stream a single object|
|
|Source|<a name="single"></a>@ref[single](Source/single.md)|Stream a single object|
|
||||||
|
|Source|<a name="startcontextpropagation"></a>@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|
||||||
|Source|<a name="tick"></a>@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.|
|
|Source|<a name="tick"></a>@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.|
|
||||||
|Source|<a name="unfold"></a>@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].|
|
|Source|<a name="unfold"></a>@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].|
|
||||||
|Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
|
|Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
|
||||||
|
|
@ -285,6 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
@@@ index
|
@@@ index
|
||||||
|
|
||||||
* [combine](Source/combine.md)
|
* [combine](Source/combine.md)
|
||||||
|
* [startContextPropagation](Source/startContextPropagation.md)
|
||||||
* [fromPublisher](Source/fromPublisher.md)
|
* [fromPublisher](Source/fromPublisher.md)
|
||||||
* [fromIterator](Source/fromIterator.md)
|
* [fromIterator](Source/fromIterator.md)
|
||||||
* [cycle](Source/cycle.md)
|
* [cycle](Source/cycle.md)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,103 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||||
|
import akka.stream.testkit.StreamSpec
|
||||||
|
|
||||||
|
case class Message(data: String, offset: Long)
|
||||||
|
|
||||||
|
class SourceWithContextSpec extends StreamSpec {
|
||||||
|
|
||||||
|
val settings = ActorMaterializerSettings(system)
|
||||||
|
implicit val materializer = ActorMaterializer(settings)
|
||||||
|
|
||||||
|
"A SourceWithContext" must {
|
||||||
|
|
||||||
|
"get created from Source.startContextPropagation" in {
|
||||||
|
val msg = Message("a", 1L)
|
||||||
|
Source(Vector(msg))
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.endContextPropagation
|
||||||
|
.runWith(TestSink.probe[(Message, Long)])
|
||||||
|
.request(1)
|
||||||
|
.expectNext((msg, 1L))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to get turned back into a normal Source" in {
|
||||||
|
val msg = Message("a", 1L)
|
||||||
|
Source(Vector(msg))
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.map(_.data)
|
||||||
|
.endContextPropagation.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[String])
|
||||||
|
.request(1)
|
||||||
|
.expectNext("a")
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"pass through contexts using map and filter" in {
|
||||||
|
Source(
|
||||||
|
Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
|
||||||
|
)
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.map(_.data.toLowerCase)
|
||||||
|
.filter(_ != "b")
|
||||||
|
.filterNot(_ == "d")
|
||||||
|
.endContextPropagation
|
||||||
|
.runWith(TestSink.probe[(String, Long)])
|
||||||
|
.request(2)
|
||||||
|
.expectNext(("a", 1L))
|
||||||
|
.expectNext(("c", 4L))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"pass through contexts via a FlowWithContext" in {
|
||||||
|
|
||||||
|
def flowWithContext[T] = FlowWithContext[Long, T]
|
||||||
|
|
||||||
|
Source(Vector(Message("a", 1L)))
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.map(_.data)
|
||||||
|
.via(flowWithContext.map(s ⇒ s + "b"))
|
||||||
|
.endContextPropagation
|
||||||
|
.runWith(TestSink.probe[(String, Long)])
|
||||||
|
.request(1)
|
||||||
|
.expectNext(("ab", 1L))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"pass through contexts via mapConcat" in {
|
||||||
|
Source(Vector(Message("a", 1L)))
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.map(_.data)
|
||||||
|
.mapConcat { str ⇒
|
||||||
|
List(1, 2, 3).map(i ⇒ s"$str-$i")
|
||||||
|
}
|
||||||
|
.endContextPropagation
|
||||||
|
.runWith(TestSink.probe[(String, Long)])
|
||||||
|
.request(3)
|
||||||
|
.expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"pass through a sequence of contexts per element via grouped" in {
|
||||||
|
Source(Vector(Message("a", 1L)))
|
||||||
|
.startContextPropagation(_.offset)
|
||||||
|
.map(_.data)
|
||||||
|
.mapConcat { str ⇒
|
||||||
|
List(1, 2, 3, 4).map(i ⇒ s"$str-$i")
|
||||||
|
}
|
||||||
|
.grouped(2)
|
||||||
|
.endContextPropagation
|
||||||
|
.runWith(TestSink.probe[(Seq[String], Seq[Long])])
|
||||||
|
.request(2)
|
||||||
|
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,219 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
|
import akka.stream.testkit.TestSubscriber.Probe
|
||||||
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||||
|
import akka.stream.testkit.StreamSpec
|
||||||
|
|
||||||
|
class WithContextUsageSpec extends StreamSpec {
|
||||||
|
|
||||||
|
val settings = ActorMaterializerSettings(system)
|
||||||
|
implicit val materializer = ActorMaterializer(settings)
|
||||||
|
|
||||||
|
"Context propagation used for committing offsets" must {
|
||||||
|
|
||||||
|
"be able to commit on offset change" in {
|
||||||
|
val testRange = 0 to 10
|
||||||
|
val input = genInput(testRange)
|
||||||
|
val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init
|
||||||
|
|
||||||
|
val f: (Record ⇒ Record) = record ⇒ record.copy(value = record.value + 1)
|
||||||
|
val expectedRecords = toRecords(input).map(f)
|
||||||
|
|
||||||
|
val src = createSourceWithContext(input)
|
||||||
|
.map(f)
|
||||||
|
.endContextPropagation
|
||||||
|
|
||||||
|
src.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[Record])
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedRecords)
|
||||||
|
.expectComplete()
|
||||||
|
|
||||||
|
src.map { case (_, ctx) ⇒ ctx }
|
||||||
|
.toMat(commitOffsets)(Keep.right)
|
||||||
|
.run()
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedOffsets)
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"only commit filtered offsets on offset change" in {
|
||||||
|
val testRange = 0 to 10
|
||||||
|
val input = genInput(testRange)
|
||||||
|
|
||||||
|
val f: (Record ⇒ Boolean) = record ⇒ record.key.endsWith("2")
|
||||||
|
val expectedOffsets = input.filter(cm ⇒ f(cm.record)).map(cm ⇒ Offset(cm)).init
|
||||||
|
val expectedRecords = toRecords(input).filter(f)
|
||||||
|
|
||||||
|
val src = createSourceWithContext(input)
|
||||||
|
.filter(f)
|
||||||
|
.endContextPropagation
|
||||||
|
|
||||||
|
src.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[Record])
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedRecords)
|
||||||
|
.expectComplete()
|
||||||
|
|
||||||
|
src.map { case (_, ctx) ⇒ ctx }
|
||||||
|
.toMat(commitOffsets)(Keep.right)
|
||||||
|
.run()
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedOffsets)
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"only commit after mapConcat on offset change" in {
|
||||||
|
val testRange = 0 to 10
|
||||||
|
val input = genInput(testRange)
|
||||||
|
|
||||||
|
val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record)
|
||||||
|
val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init
|
||||||
|
val expectedRecords = toRecords(input).flatMap(f)
|
||||||
|
|
||||||
|
val src = createSourceWithContext(input)
|
||||||
|
.mapConcat(f)
|
||||||
|
.endContextPropagation
|
||||||
|
|
||||||
|
src.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[Record])
|
||||||
|
.request(expectedRecords.size)
|
||||||
|
.expectNextN(expectedRecords)
|
||||||
|
.expectComplete()
|
||||||
|
|
||||||
|
src.map { case (_, ctx) ⇒ ctx }
|
||||||
|
.toMat(commitOffsets)(Keep.right)
|
||||||
|
.run()
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedOffsets)
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"commit offsets after grouped on offset change" in {
|
||||||
|
val groupSize = 2
|
||||||
|
val testRange = 0 to 10
|
||||||
|
val input = genInput(testRange)
|
||||||
|
|
||||||
|
val expectedOffsets = testRange.grouped(2).map(ixs ⇒ Offset(ixs.last)).toVector.init
|
||||||
|
val expectedMultiRecords = toRecords(input).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector
|
||||||
|
|
||||||
|
val src = createSourceWithContext(input)
|
||||||
|
.grouped(groupSize)
|
||||||
|
.map(l ⇒ MultiRecord(l))
|
||||||
|
.mapContext(_.last)
|
||||||
|
.endContextPropagation
|
||||||
|
|
||||||
|
src.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[MultiRecord])
|
||||||
|
.request(expectedMultiRecords.size)
|
||||||
|
.expectNextN(expectedMultiRecords)
|
||||||
|
.expectComplete()
|
||||||
|
|
||||||
|
src.map { case (_, ctx) ⇒ ctx }
|
||||||
|
.toMat(commitOffsets)(Keep.right)
|
||||||
|
.run()
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedOffsets)
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"commit offsets after mapConcat + grouped on offset change" in {
|
||||||
|
val groupSize = 2
|
||||||
|
val testRange = 0 to 10
|
||||||
|
val input = genInput(testRange)
|
||||||
|
|
||||||
|
val f: (Record ⇒ List[Record]) = record ⇒ List(record, record, record)
|
||||||
|
|
||||||
|
// the mapConcat creates bigger lists than the groups, which is why all offsets are seen.
|
||||||
|
// (The mapContext selects the last offset in a group)
|
||||||
|
val expectedOffsets = testRange.map(ix ⇒ Offset(ix)).init
|
||||||
|
val expectedMultiRecords = toRecords(input).flatMap(f).grouped(groupSize).map(l ⇒ MultiRecord(l)).toVector
|
||||||
|
|
||||||
|
val src = createSourceWithContext(input)
|
||||||
|
.mapConcat(f)
|
||||||
|
.grouped(groupSize)
|
||||||
|
.map(l ⇒ MultiRecord(l))
|
||||||
|
.mapContext(_.last)
|
||||||
|
.endContextPropagation
|
||||||
|
|
||||||
|
src.map { case (e, _) ⇒ e }
|
||||||
|
.runWith(TestSink.probe[MultiRecord])
|
||||||
|
.request(expectedMultiRecords.size)
|
||||||
|
.expectNextN(expectedMultiRecords)
|
||||||
|
.expectComplete()
|
||||||
|
|
||||||
|
src.map { case (_, ctx) ⇒ ctx }
|
||||||
|
.toMat(commitOffsets)(Keep.right)
|
||||||
|
.run()
|
||||||
|
.request(input.size)
|
||||||
|
.expectNextN(expectedOffsets)
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
def genInput(range: Range) = range.map(ix ⇒ Consumer.CommittableMessage(Record(genKey(ix), genValue(ix)), Consumer.CommittableOffsetImpl(ix))).toVector
|
||||||
|
def toRecords(committableMessages: Vector[Consumer.CommittableMessage[Record]]) = committableMessages.map(_.record)
|
||||||
|
def genKey(ix: Int) = s"k$ix"
|
||||||
|
def genValue(ix: Int) = s"v$ix"
|
||||||
|
}
|
||||||
|
|
||||||
|
def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Offset, Record, NotUsed] =
|
||||||
|
Consumer
|
||||||
|
.committableSource(committableMessages)
|
||||||
|
.startContextPropagation(m ⇒ Offset(m.committableOffset.offset))
|
||||||
|
.map(_.record)
|
||||||
|
|
||||||
|
def commitOffsets = commit[Offset](Offset.Uninitialized)
|
||||||
|
def commit[Ctx](uninitialized: Ctx): Sink[Ctx, Probe[Ctx]] = {
|
||||||
|
val testSink = TestSink.probe[Ctx]
|
||||||
|
Flow[Ctx].statefulMapConcat { () ⇒
|
||||||
|
{
|
||||||
|
var prevCtx: Ctx = uninitialized
|
||||||
|
ctx ⇒ {
|
||||||
|
val res =
|
||||||
|
if (prevCtx != uninitialized && ctx != prevCtx) Vector(prevCtx)
|
||||||
|
else Vector.empty[Ctx]
|
||||||
|
|
||||||
|
prevCtx = ctx
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.toMat(testSink)(Keep.right)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object Offset {
|
||||||
|
val Uninitialized = Offset(-1)
|
||||||
|
def apply(cm: Consumer.CommittableMessage[Record]): Offset = Offset(cm.committableOffset.offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
case class Offset(value: Int)
|
||||||
|
|
||||||
|
case class Record(key: String, value: String)
|
||||||
|
case class Committed[R](record: R, offset: Int)
|
||||||
|
case class MultiRecord(records: immutable.Seq[Record])
|
||||||
|
|
||||||
|
object Consumer {
|
||||||
|
def committableSource(committableMessages: Vector[CommittableMessage[Record]]): Source[CommittableMessage[Record], NotUsed] = {
|
||||||
|
Source(committableMessages)
|
||||||
|
}
|
||||||
|
case class CommittableMessage[V](record: V, committableOffset: CommittableOffset)
|
||||||
|
|
||||||
|
trait Committable {
|
||||||
|
def commit(): Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
trait CommittableOffset extends Committable {
|
||||||
|
def offset: Int
|
||||||
|
}
|
||||||
|
|
||||||
|
case class CommittableOffsetImpl(offset: Int) extends CommittableOffset {
|
||||||
|
def commit(): Unit = {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
|
import akka.annotation.InternalApi
|
||||||
import akka.stream.impl.TraversalBuilder
|
import akka.stream.impl.TraversalBuilder
|
||||||
|
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
|
@ -68,3 +69,16 @@ trait Graph[+S <: Shape, +M] {
|
||||||
*/
|
*/
|
||||||
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
|
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*
|
||||||
|
* Allows creating additional API on top of an existing Graph by extending from this class and
|
||||||
|
* accessing the delegate
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[stream] abstract class GraphDelegate[+S <: Shape, +Mat](delegate: Graph[S, Mat]) extends Graph[S, Mat] {
|
||||||
|
final override def shape: S = delegate.shape
|
||||||
|
final override private[stream] def traversalBuilder: TraversalBuilder = delegate.traversalBuilder
|
||||||
|
final override def withAttributes(attr: Attributes): Graph[S, Mat] = delegate.withAttributes(attr)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,205 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.javadsl
|
||||||
|
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.japi.{ Pair, Util, function }
|
||||||
|
import akka.stream._
|
||||||
|
|
||||||
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
|
import scala.compat.java8.FutureConverters._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
object FlowWithContext {
|
||||||
|
def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = {
|
||||||
|
new FlowWithContext(scaladsl.FlowWithContext[Ctx, In])
|
||||||
|
}
|
||||||
|
def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = {
|
||||||
|
new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A flow that provides operations which automatically propagate the context of an element.
|
||||||
|
* Only a subset of common operations from [[Flow]] is supported. As an escape hatch you can
|
||||||
|
* use [[FlowWithContext.via]] to manually provide the context propagation for otherwise unsupported
|
||||||
|
* operations.
|
||||||
|
*
|
||||||
|
* An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`.
|
||||||
|
*
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends GraphDelegate(delegate) {
|
||||||
|
/**
|
||||||
|
* Transform this flow by the regular flow. The given flow must support manual context propagation by
|
||||||
|
* taking and producing tuples of (data, context).
|
||||||
|
*
|
||||||
|
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
|
||||||
|
* context propagation here.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.via]]
|
||||||
|
*/
|
||||||
|
def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = {
|
||||||
|
val under = asFlow().via(viaFlow)
|
||||||
|
FlowWithContext.fromPairs(under)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a regular flow of pairs (data, context).
|
||||||
|
*/
|
||||||
|
def asFlow(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance =
|
||||||
|
scaladsl.Flow[Pair[In, CtxIn]]
|
||||||
|
.map(_.toScala)
|
||||||
|
.viaMat(delegate.asFlow)(scaladsl.Keep.right)
|
||||||
|
.map { case (o, c) ⇒ Pair(o, c) }
|
||||||
|
.asJava
|
||||||
|
|
||||||
|
// remaining operations in alphabetic order
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.collect]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.collect]]
|
||||||
|
*/
|
||||||
|
def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
|
||||||
|
viaScala(_.collect(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.filter]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.filter]]
|
||||||
|
*/
|
||||||
|
def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
|
||||||
|
viaScala(_.filter(p.test))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.filterNot]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.filterNot]]
|
||||||
|
*/
|
||||||
|
def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
|
||||||
|
viaScala(_.filterNot(p.test))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.grouped]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.grouped]]
|
||||||
|
*/
|
||||||
|
def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
|
||||||
|
viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.map]].
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.map]]
|
||||||
|
*/
|
||||||
|
def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
|
||||||
|
viaScala(_.map(f.apply))
|
||||||
|
|
||||||
|
def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
|
||||||
|
viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.mapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.mapConcat(dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.mapConcat]]
|
||||||
|
*/
|
||||||
|
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
|
||||||
|
viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem))))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the given function to each context element (leaving the data elements unchanged).
|
||||||
|
*/
|
||||||
|
def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = {
|
||||||
|
viaScala(_.mapContext(extractContext.apply))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.sliding]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.sliding]]
|
||||||
|
*/
|
||||||
|
def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
|
||||||
|
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.statefulMapConcat(() => dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.statefulMapConcat]]
|
||||||
|
*/
|
||||||
|
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
|
||||||
|
viaScala(_.statefulMapConcat { () ⇒
|
||||||
|
val fun = f.create()
|
||||||
|
elem ⇒ Util.immutableSeq(fun(elem))
|
||||||
|
})
|
||||||
|
|
||||||
|
def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate
|
||||||
|
|
||||||
|
private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] ⇒ scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] =
|
||||||
|
new FlowWithContext(f(delegate))
|
||||||
|
}
|
||||||
|
|
@ -8,6 +8,7 @@ import java.util
|
||||||
import java.util.Optional
|
import java.util.Optional
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.japi.{ Pair, Util, function }
|
import akka.japi.{ Pair, Util, function }
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
|
|
@ -3466,4 +3467,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
*/
|
*/
|
||||||
def log(name: String): javadsl.Source[Out, Mat] =
|
def log(name: String): javadsl.Source[Out, Mat] =
|
||||||
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
|
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] =
|
||||||
|
new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,186 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.javadsl
|
||||||
|
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.japi.{ Pair, Util, function }
|
||||||
|
import akka.stream._
|
||||||
|
|
||||||
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
|
import scala.compat.java8.FutureConverters._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A source that provides operations which automatically propagate the context of an element.
|
||||||
|
* Only a subset of common operations from [[Source]] is supported. As an escape hatch you can
|
||||||
|
* use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported
|
||||||
|
* operations.
|
||||||
|
*
|
||||||
|
* Can be created by calling [[Source.startContextPropagation()]]
|
||||||
|
*
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends GraphDelegate(delegate) {
|
||||||
|
/**
|
||||||
|
* Transform this flow by the regular flow. The given flow must support manual context propagation by
|
||||||
|
* taking and producing tuples of (data, context).
|
||||||
|
*
|
||||||
|
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
|
||||||
|
* context propagation here.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Flow.via]]
|
||||||
|
*/
|
||||||
|
def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] =
|
||||||
|
viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) ⇒ Pair(o, c) }.via(viaFlow).map(_.toScala)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops automatic context propagation from here and converts this to a regular
|
||||||
|
* stream of a pair of (data, context).
|
||||||
|
*/
|
||||||
|
def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
|
||||||
|
delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava
|
||||||
|
|
||||||
|
// remaining operations in alphabetic order
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.collect]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.collect]]
|
||||||
|
*/
|
||||||
|
def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] =
|
||||||
|
viaScala(_.collect(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.filter]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.filter]]
|
||||||
|
*/
|
||||||
|
def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
|
||||||
|
viaScala(_.filter(p.test))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.filterNot]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.filterNot]]
|
||||||
|
*/
|
||||||
|
def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
|
||||||
|
viaScala(_.filterNot(p.test))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.grouped]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.grouped]]
|
||||||
|
*/
|
||||||
|
def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
|
||||||
|
viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.map]].
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.map]]
|
||||||
|
*/
|
||||||
|
def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] =
|
||||||
|
viaScala(_.map(f.apply))
|
||||||
|
|
||||||
|
def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] =
|
||||||
|
viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.mapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.mapConcat(dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.mapConcat]]
|
||||||
|
*/
|
||||||
|
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] =
|
||||||
|
viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem))))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the given function to each context element (leaving the data elements unchanged).
|
||||||
|
*/
|
||||||
|
def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] =
|
||||||
|
viaScala(_.mapContext(extractContext.apply))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.sliding]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.sliding]]
|
||||||
|
*/
|
||||||
|
def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
|
||||||
|
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.statefulMapConcat(() => dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.javadsl.Source.statefulMapConcat]]
|
||||||
|
*/
|
||||||
|
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] =
|
||||||
|
viaScala(_.statefulMapConcat { () ⇒
|
||||||
|
val fun = f.create()
|
||||||
|
elem ⇒ Util.immutableSeq(fun(elem))
|
||||||
|
})
|
||||||
|
|
||||||
|
def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate
|
||||||
|
|
||||||
|
private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] ⇒ scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] =
|
||||||
|
new SourceWithContext(f(delegate))
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.stream._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
object FlowWithContext {
|
||||||
|
/**
|
||||||
|
* Creates an "empty" FlowWithContext that passes elements through with their context unchanged.
|
||||||
|
*/
|
||||||
|
def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = {
|
||||||
|
val under = Flow[(In, Ctx)]
|
||||||
|
new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under)
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements.
|
||||||
|
*/
|
||||||
|
def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]): FlowWithContext[CI, I, CO, O, M] = new FlowWithContext(flow)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A flow that provides operations which automatically propagate the context of an element.
|
||||||
|
* Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can
|
||||||
|
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
|
||||||
|
* operations.
|
||||||
|
*
|
||||||
|
* An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`.
|
||||||
|
*
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](
|
||||||
|
delegate: Flow[(In, CtxIn), (Out, CtxOut), Mat]
|
||||||
|
) extends GraphDelegate(delegate) with FlowWithContextOps[CtxOut, Out, Mat] {
|
||||||
|
override type ReprMat[+C, +O, +M] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, M @uncheckedVariance]
|
||||||
|
|
||||||
|
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] =
|
||||||
|
FlowWithContext.from(delegate.via(viaFlow))
|
||||||
|
|
||||||
|
override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[CtxIn, In, Ctx2, Out2, Mat3] =
|
||||||
|
FlowWithContext.from(delegate.viaMat(flow)(combine))
|
||||||
|
|
||||||
|
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
|
||||||
|
|
||||||
|
def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] =
|
||||||
|
new javadsl.FlowWithContext(this)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,201 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.language.higherKinds
|
||||||
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.dispatch.ExecutionContexts
|
||||||
|
import akka.stream._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context
|
||||||
|
* element with each data element.
|
||||||
|
*
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
trait FlowWithContextOps[+Ctx, +Out, +Mat] {
|
||||||
|
type ReprMat[+C, +O, +M] <: FlowWithContextOps[C, O, M] {
|
||||||
|
type ReprMat[+CC, +OO, +MatMat] = FlowWithContextOps.this.ReprMat[CC, OO, MatMat]
|
||||||
|
}
|
||||||
|
type Repr[+C, +O] = ReprMat[C, O, Mat @uncheckedVariance]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform this flow by the regular flow. The given flow must support manual context propagation by
|
||||||
|
* taking and producing tuples of (data, context).
|
||||||
|
*
|
||||||
|
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
|
||||||
|
* context propagation here.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.via]]
|
||||||
|
*/
|
||||||
|
def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform this flow by the regular flow. The given flow must support manual context propagation by
|
||||||
|
* taking and producing tuples of (data, context).
|
||||||
|
*
|
||||||
|
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
|
||||||
|
* context propagation here.
|
||||||
|
*
|
||||||
|
* The `combine` function is used to compose the materialized values of this flow and that
|
||||||
|
* flow into the materialized value of the resulting Flow.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.viaMat]]
|
||||||
|
*/
|
||||||
|
def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[Ctx2, Out2, Mat3]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.map]].
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.map]]
|
||||||
|
*/
|
||||||
|
def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] =
|
||||||
|
via(flow.map { case (e, ctx) ⇒ (f(e), ctx) })
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]].
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.mapAsync]]
|
||||||
|
*/
|
||||||
|
def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] =
|
||||||
|
via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) })
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.collect]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.collect]]
|
||||||
|
*/
|
||||||
|
def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] =
|
||||||
|
via(flow.collect {
|
||||||
|
case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filter]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.filter]]
|
||||||
|
*/
|
||||||
|
def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] =
|
||||||
|
collect { case e if pred(e) ⇒ e }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filterNot]].
|
||||||
|
*
|
||||||
|
* Note, that the context of elements that are filtered out is skipped as well.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.filterNot]]
|
||||||
|
*/
|
||||||
|
def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] =
|
||||||
|
collect { case e if !pred(e) ⇒ e }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.grouped]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.grouped]]
|
||||||
|
*/
|
||||||
|
def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
|
||||||
|
via(flow.grouped(n).map { elsWithContext ⇒
|
||||||
|
val (els, ctxs) = elsWithContext.unzip
|
||||||
|
(els, ctxs)
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.sliding]].
|
||||||
|
*
|
||||||
|
* Each output group will be associated with a `Seq` of corresponding context elements.
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.sliding]]
|
||||||
|
*/
|
||||||
|
def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
|
||||||
|
via(flow.sliding(n, step).map { elsWithContext ⇒
|
||||||
|
val (els, ctxs) = elsWithContext.unzip
|
||||||
|
(els, ctxs)
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.mapConcat(dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.mapConcat]]
|
||||||
|
*/
|
||||||
|
def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]].
|
||||||
|
*
|
||||||
|
* The context of the input element will be associated with each of the output elements calculated from
|
||||||
|
* this input element.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* def dup(element: String) = Seq(element, element)
|
||||||
|
*
|
||||||
|
* Input:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
*
|
||||||
|
* inputElements.statefulMapConcat(() => dup)
|
||||||
|
*
|
||||||
|
* Output:
|
||||||
|
*
|
||||||
|
* ("a", 1)
|
||||||
|
* ("a", 1)
|
||||||
|
* ("b", 2)
|
||||||
|
* ("b", 2)
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]
|
||||||
|
*/
|
||||||
|
def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = {
|
||||||
|
val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = { () ⇒ elWithContext ⇒
|
||||||
|
val (el, ctx) = elWithContext
|
||||||
|
f()(el).map(o ⇒ (o, ctx))
|
||||||
|
}
|
||||||
|
via(flow.statefulMapConcat(fCtx))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the given function to each context element (leaving the data elements unchanged).
|
||||||
|
*/
|
||||||
|
def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] =
|
||||||
|
via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) })
|
||||||
|
|
||||||
|
private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)]
|
||||||
|
}
|
||||||
|
|
@ -7,7 +7,7 @@ package akka.stream.scaladsl
|
||||||
import java.util.concurrent.CompletionStage
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.{ ApiMayChange, InternalApi }
|
||||||
import akka.stream.actor.ActorPublisher
|
import akka.stream.actor.ActorPublisher
|
||||||
import akka.stream.impl.Stages.DefaultAttributes
|
import akka.stream.impl.Stages.DefaultAttributes
|
||||||
import akka.stream.impl.fusing.GraphStages
|
import akka.stream.impl.fusing.GraphStages
|
||||||
|
|
@ -215,6 +215,12 @@ final class Source[+Out, +Mat](
|
||||||
|
|
||||||
combineRest(2, rest.iterator)
|
combineRest(2, rest.iterator)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e))))
|
||||||
}
|
}
|
||||||
|
|
||||||
object Source {
|
object Source {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.stream._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A source that provides operations which automatically propagate the context of an element.
|
||||||
|
* Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can
|
||||||
|
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
|
||||||
|
* operations.
|
||||||
|
*
|
||||||
|
* Can be created by calling [[Source.startContextPropagation()]]
|
||||||
|
*
|
||||||
|
* API MAY CHANGE
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] (
|
||||||
|
delegate: Source[(Out, Ctx), Mat]
|
||||||
|
) extends GraphDelegate(delegate) with FlowWithContextOps[Ctx, Out, Mat] {
|
||||||
|
override type ReprMat[+C, +O, +M] = SourceWithContext[C, O, M @uncheckedVariance]
|
||||||
|
|
||||||
|
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] =
|
||||||
|
new SourceWithContext(delegate.via(viaFlow))
|
||||||
|
|
||||||
|
override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Ctx2, Out2, Mat3] =
|
||||||
|
new SourceWithContext(delegate.viaMat(flow)(combine))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops automatic context propagation from here and converts this to a regular
|
||||||
|
* stream of a pair of (data, context).
|
||||||
|
*/
|
||||||
|
def endContextPropagation: Source[(Out, Ctx), Mat] = delegate
|
||||||
|
|
||||||
|
def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] =
|
||||||
|
new javadsl.SourceWithContext(this)
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue