Add wiretap/wiretapContext to FlowWithContext/SourceWithContext
This commit is contained in:
parent
eccfb848d9
commit
f548ea55ad
7 changed files with 113 additions and 0 deletions
|
|
@ -43,6 +43,7 @@ The Stream API has been updated to add some extra functions.
|
|||
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
|
||||
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))
|
||||
* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443))
|
||||
* add wireTap/wireTapContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1446](https://github.com/apache/pekko/pull/1446))
|
||||
|
||||
The Stream Testkit Java DSL has some extra functions.
|
||||
|
||||
|
|
|
|||
|
|
@ -126,6 +126,50 @@ class FlowWithContextSpec extends StreamSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"pass through all data when using wireTap" in {
|
||||
val listBuffer = new ListBuffer[String]()
|
||||
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
|
||||
.asSourceWithContext(_.offset)
|
||||
.via(
|
||||
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
|
||||
(data.data.toLowerCase, offset)
|
||||
}).wireTap(Sink.foreach(string => listBuffer.+=(string)))
|
||||
)
|
||||
.toMat(TestSink.probe[(String, Long)])(Keep.right)
|
||||
.run()
|
||||
.request(4)
|
||||
.expectNext(("a", 1L))
|
||||
.expectNext(("b", 2L))
|
||||
.expectNext(("d", 3L))
|
||||
.expectNext(("c", 4L))
|
||||
.expectComplete()
|
||||
.within(10.seconds) {
|
||||
listBuffer should contain atLeastOneElementOf List("a", "b", "d", "c")
|
||||
}
|
||||
}
|
||||
|
||||
"pass through all data when using wireTapContext" in {
|
||||
val listBuffer = new ListBuffer[Long]()
|
||||
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
|
||||
.asSourceWithContext(_.offset)
|
||||
.via(
|
||||
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
|
||||
(data.data.toLowerCase, offset)
|
||||
}).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
|
||||
)
|
||||
.toMat(TestSink.probe[(String, Long)])(Keep.right)
|
||||
.run()
|
||||
.request(4)
|
||||
.expectNext(("a", 1L))
|
||||
.expectNext(("b", 2L))
|
||||
.expectNext(("d", 3L))
|
||||
.expectNext(("c", 4L))
|
||||
.expectComplete()
|
||||
.within(10.seconds) {
|
||||
listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
|
||||
}
|
||||
}
|
||||
|
||||
"keep the same order for data and context when using unsafeDataVia" in {
|
||||
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))
|
||||
|
||||
|
|
|
|||
|
|
@ -115,6 +115,44 @@ class SourceWithContextSpec extends StreamSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"pass through all data when using wireTap" in {
|
||||
val listBuffer = new ListBuffer[Message]()
|
||||
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
|
||||
Source(messages)
|
||||
.asSourceWithContext(_.offset)
|
||||
.wireTap(Sink.foreach(message => listBuffer.+=(message)))
|
||||
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
|
||||
.run()
|
||||
.request(4)
|
||||
.expectNext((Message("A", 1L), 1L))
|
||||
.expectNext((Message("B", 2L), 2L))
|
||||
.expectNext((Message("D", 3L), 3L))
|
||||
.expectNext((Message("C", 4L), 4L))
|
||||
.expectComplete()
|
||||
.within(10.seconds) {
|
||||
listBuffer.toVector should contain atLeastOneElementOf messages
|
||||
}
|
||||
}
|
||||
|
||||
"pass through all data when using wireTapContext" in {
|
||||
val listBuffer = new ListBuffer[Long]()
|
||||
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
|
||||
Source(messages)
|
||||
.asSourceWithContext(_.offset)
|
||||
.wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
|
||||
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
|
||||
.run()
|
||||
.request(4)
|
||||
.expectNext((Message("A", 1L), 1L))
|
||||
.expectNext((Message("B", 2L), 2L))
|
||||
.expectNext((Message("D", 3L), 3L))
|
||||
.expectNext((Message("C", 4L), 4L))
|
||||
.expectComplete()
|
||||
.within(10.seconds) {
|
||||
listBuffer.toVector should contain atLeastOneElementOf (messages.map(_.offset))
|
||||
}
|
||||
}
|
||||
|
||||
"pass through contexts via a FlowWithContext" in {
|
||||
|
||||
def flowWithContext[T] = FlowWithContext[T, Long]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
|
||||
|
|
@ -144,6 +144,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
|
|||
override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
|
||||
FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2)))
|
||||
|
||||
override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
|
||||
FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1)))
|
||||
|
||||
override def wireTapContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
|
||||
FlowWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2)))
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]].
|
||||
*
|
||||
|
|
|
|||
|
|
@ -104,6 +104,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
|
|||
*/
|
||||
def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
|
||||
|
||||
/**
|
||||
* Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
|
||||
*
|
||||
* @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]
|
||||
|
||||
/**
|
||||
* Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]]
|
||||
*
|
||||
* @see [[pekko.stream.scaladsl.FlowOps.wireTap]]
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
|
||||
*
|
||||
|
|
|
|||
|
|
@ -161,6 +161,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
|
|||
override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
|
||||
SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))
|
||||
|
||||
override def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
|
||||
SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1)))
|
||||
|
||||
override def wireTapContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
|
||||
SourceWithContext.fromTuples(delegate.wireTap(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))
|
||||
|
||||
/**
|
||||
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it.
|
||||
* The returned value is the materialized value of the `Sink`.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue