parent
f2e39c7534
commit
ccf5d46a58
12 changed files with 321 additions and 7 deletions
|
|
@ -1089,6 +1089,23 @@ Each upstream element will either be diverted to the given sink, or the downstre
|
|||
|
||||
---------------------------------------------------------------
|
||||
|
||||
### wireTap
|
||||
|
||||
Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass
|
||||
through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow.
|
||||
If the wire-tap `Sink` backpressures, elements that would've been sent to it will be dropped instead.
|
||||
|
||||
**emits** element is available and demand exists from the downstream; the element will
|
||||
also be sent to the wire-tap `Sink` if there is demand.
|
||||
|
||||
**backpressures** downstream backpressures
|
||||
|
||||
**completes** upstream completes
|
||||
|
||||
**cancels** downstream cancels
|
||||
|
||||
---------------------------------------------------------------
|
||||
|
||||
<br/>
|
||||
|
||||
## Flow stages composed of Sinks and Sources
|
||||
|
|
|
|||
2
akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala
Normal file → Executable file
2
akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala
Normal file → Executable file
|
|
@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
|
|||
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
||||
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
|
||||
|
||||
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph", "divertToGraph")
|
||||
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
|
||||
val allowMissing: Map[Class[_], Set[String]] = Map(
|
||||
jFlowClass → graphHelpers,
|
||||
jSourceClass → graphHelpers,
|
||||
|
|
|
|||
2
akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala
Normal file → Executable file
2
akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala
Normal file → Executable file
|
|
@ -115,7 +115,7 @@ class GraphUnzipSpec extends StreamSpec {
|
|||
c1.expectComplete()
|
||||
}
|
||||
|
||||
"not loose elements when pull is followed by cancel before other sink has requested" in {
|
||||
"not lose elements when pull is followed by cancel before other sink has requested" in {
|
||||
val c1 = TestSubscriber.manualProbe[Int]()
|
||||
val c2 = TestSubscriber.manualProbe[String]()
|
||||
|
||||
|
|
|
|||
59
akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala
Executable file
59
akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala
Executable file
|
|
@ -0,0 +1,59 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
|
||||
class GraphWireTapSpec extends StreamSpec {
|
||||
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
"A wire tap" must {
|
||||
|
||||
"wireTap must broadcast to the tap" in assertAllStagesStopped {
|
||||
val tp, mp = TestSink.probe[Int](system)
|
||||
val (tps, mps) = Source(1 to 2).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
|
||||
tps.request(2)
|
||||
mps.requestNext(1)
|
||||
mps.requestNext(2)
|
||||
tps.expectNext(1, 2)
|
||||
mps.expectComplete()
|
||||
tps.expectComplete()
|
||||
}
|
||||
|
||||
"wireTap must drop elements while the tap has no demand, buffering up to one element" in assertAllStagesStopped {
|
||||
val tp, mp = TestSink.probe[Int](system)
|
||||
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
|
||||
mps.request(3)
|
||||
mps.expectNext(1, 2, 3)
|
||||
tps.request(4)
|
||||
mps.requestNext(4)
|
||||
mps.requestNext(5)
|
||||
mps.requestNext(6)
|
||||
tps.expectNext(3, 4, 5, 6)
|
||||
mps.expectComplete()
|
||||
tps.expectComplete()
|
||||
}
|
||||
|
||||
"wireTap must cancel if main sink cancels" in assertAllStagesStopped {
|
||||
val tp, mp = TestSink.probe[Int](system)
|
||||
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
|
||||
tps.request(6)
|
||||
mps.cancel()
|
||||
tps.expectComplete()
|
||||
}
|
||||
|
||||
"wireTap must continue if tap sink cancels" in assertAllStagesStopped {
|
||||
val tp, mp = TestSink.probe[Int](system)
|
||||
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
|
||||
tps.cancel()
|
||||
mps.request(6)
|
||||
mps.expectNext(1, 2, 3, 4, 5, 6)
|
||||
mps.expectComplete()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -40,3 +40,16 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConve
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.divertToMat")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertTo")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertToGraph")
|
||||
|
||||
# wireTap
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTapGraph")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.wireTapMat")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTap")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTapMat")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTapMat")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTapMat")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTapMat")
|
||||
|
|
|
|||
1
akka-stream/src/main/scala/akka/stream/impl/Stages.scala
Normal file → Executable file
1
akka-stream/src/main/scala/akka/stream/impl/Stages.scala
Normal file → Executable file
|
|
@ -74,6 +74,7 @@ import akka.stream._
|
|||
val flattenMerge = name("flattenMerge")
|
||||
val recoverWith = name("recoverWith")
|
||||
val broadcast = name("broadcast")
|
||||
val wireTap = name("wireTap")
|
||||
val balance = name("balance")
|
||||
val zip = name("zip")
|
||||
val zipN = name("zipN")
|
||||
|
|
|
|||
33
akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
Normal file → Executable file
33
akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
Normal file → Executable file
|
|
@ -1722,6 +1722,39 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
|
||||
new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the downstream; the element will
|
||||
* also be sent to the wire-tap Sink if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
|
||||
new Flow(delegate.wireTap(that))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||
*
|
||||
* @see [[#wireTap]]
|
||||
*/
|
||||
def wireTapMat[M2, M3](
|
||||
that: Graph[SinkShape[Out], M2],
|
||||
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
|
||||
new Flow(delegate.wireTapMat(that)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
|
|
|
|||
32
akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
Normal file → Executable file
32
akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
Normal file → Executable file
|
|
@ -778,6 +778,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
|
||||
new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the downstream; the element will
|
||||
* also be sent to the wire-tap Sink if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
|
||||
new Source(delegate.wireTap(that))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||
*
|
||||
* @see [[#wireTap]]
|
||||
*/
|
||||
def wireTapMat[M2, M3](
|
||||
that: Graph[SinkShape[Out], M2],
|
||||
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
|
||||
new Source(delegate.wireTapMat(that)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
|
|
|
|||
17
akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
Normal file → Executable file
17
akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
Normal file → Executable file
|
|
@ -1165,6 +1165,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubFlow[In, Out, Mat] =
|
||||
new SubFlow(delegate.divertTo(that, when.test))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the downstream; the element will
|
||||
* also be sent to the wire-tap Sink if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] =
|
||||
new SubFlow(delegate.wireTap(that))
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
* picking randomly when several elements ready.
|
||||
|
|
|
|||
17
akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
Normal file → Executable file
17
akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
Normal file → Executable file
|
|
@ -1157,6 +1157,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.divertTo(that, when.test))
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the downstream; the element will
|
||||
* also be sent to the wire-tap Sink if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.wireTap(that))
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
* picking randomly when several elements ready.
|
||||
|
|
|
|||
43
akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
Normal file → Executable file
43
akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
Normal file → Executable file
|
|
@ -2277,7 +2277,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
|
||||
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass
|
||||
* through will also be sent to the [[Sink]].
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
|
||||
|
|
@ -2286,7 +2286,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* '''Cancels when''' downstream and Sink cancel
|
||||
*/
|
||||
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that))
|
||||
|
||||
|
|
@ -2320,6 +2320,30 @@ trait FlowOps[+Out, +Mat] {
|
|||
FlowShape(partition.in, partition.out(0))
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the downstream; the element will
|
||||
* also be sent to the wire-tap Sink if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out] = via(wireTapGraph(that))
|
||||
|
||||
protected def wireTapGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
|
||||
GraphDSL.create(that) { implicit b ⇒ r ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val bcast = b.add(WireTap[Out]())
|
||||
bcast.out1 ~> r
|
||||
FlowShape(bcast.in, bcast.out0)
|
||||
}
|
||||
|
||||
def withAttributes(attr: Attributes): Repr[Out]
|
||||
|
||||
def addAttributes(attr: Attributes): Repr[Out]
|
||||
|
|
@ -2549,7 +2573,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
|||
viaMat(orElseGraph(secondary))(matF)
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
|
||||
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass
|
||||
* through will also be sent to the [[Sink]].
|
||||
*
|
||||
* @see [[#alsoTo]]
|
||||
|
|
@ -2572,6 +2596,19 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
|||
def divertToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], when: Out ⇒ Boolean)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
|
||||
viaMat(divertToGraph(that, when))(matF)
|
||||
|
||||
/**
|
||||
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
|
||||
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
|
||||
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
|
||||
*
|
||||
* @see [[#wireTap]]
|
||||
*
|
||||
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
|
||||
* where appropriate instead of manually writing functions that pass through one of the values.
|
||||
*/
|
||||
def wireTapMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
|
||||
viaMat(wireTapGraph(that))(matF)
|
||||
|
||||
/**
|
||||
* Materializes to `Future[Done]` that completes on getting termination message.
|
||||
* The Future completes with success when received complete message from upstream or cancel
|
||||
|
|
|
|||
92
akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
Normal file → Executable file
92
akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
Normal file → Executable file
|
|
@ -6,16 +6,17 @@ package akka.stream.scaladsl
|
|||
import java.util.SplittableRandom
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
import akka.util.ConstantFun
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.annotation.tailrec
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.{ immutable, mutable }
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
|
|
@ -627,6 +628,88 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
|
|||
|
||||
}
|
||||
|
||||
object WireTap {
|
||||
private val singleton = new WireTap[Nothing]
|
||||
|
||||
/**
|
||||
* @see [[WireTap]]
|
||||
*/
|
||||
def apply[T](): WireTap[T] = singleton.asInstanceOf[WireTap[T]]
|
||||
}
|
||||
|
||||
/**
|
||||
* Fan-out the stream to two output streams - a 'main' and a 'tap' one. Each incoming element is emitted
|
||||
* to the 'main' output; elements are also emitted to the 'tap' output if there is demand;
|
||||
* otherwise they are dropped.
|
||||
*
|
||||
* '''Emits when''' element is available and demand exists from the 'main' output; the element will
|
||||
* also be sent to the 'tap' output if there is demand.
|
||||
*
|
||||
* '''Backpressures when''' the 'main' output backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' the 'main' output cancels
|
||||
*
|
||||
*/
|
||||
@InternalApi
|
||||
private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] {
|
||||
val in: Inlet[T] = Inlet[T]("WireTap.in")
|
||||
val outMain: Outlet[T] = Outlet[T]("WireTap.outMain")
|
||||
val outTap: Outlet[T] = Outlet[T]("WireTap.outTap")
|
||||
override def initialAttributes = DefaultAttributes.wireTap
|
||||
override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, outTap)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
private var pendingTap: Option[T] = None
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush() = {
|
||||
val elem = grab(in)
|
||||
push(outMain, elem)
|
||||
if (isAvailable(outTap)) {
|
||||
push(outTap, elem)
|
||||
} else {
|
||||
pendingTap = Some(elem)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(outMain, new OutHandler {
|
||||
override def onPull() = {
|
||||
pull(in)
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = {
|
||||
completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
// The 'tap' output can neither backpressure, nor cancel, the stage.
|
||||
setHandler(outTap, new OutHandler {
|
||||
override def onPull() = {
|
||||
pendingTap match {
|
||||
case Some(elem) ⇒
|
||||
push(outTap, elem)
|
||||
pendingTap = None
|
||||
case None ⇒ // no pending element to emit
|
||||
}
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush() = {
|
||||
push(outMain, grab(in))
|
||||
}
|
||||
})
|
||||
// Allow any outstanding element to be garbage-collected
|
||||
pendingTap = None
|
||||
}
|
||||
})
|
||||
}
|
||||
override def toString = "WireTap"
|
||||
}
|
||||
|
||||
object Partition {
|
||||
// FIXME make `PartitionOutOfBoundsException` a `final` class when possible
|
||||
case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace
|
||||
|
|
@ -1090,6 +1173,10 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
|
|||
|
||||
object OrElse {
|
||||
private val singleton = new OrElse[Nothing]
|
||||
|
||||
/**
|
||||
* @see [[OrElse]]
|
||||
*/
|
||||
def apply[T]() = singleton.asInstanceOf[OrElse[T]]
|
||||
}
|
||||
|
||||
|
|
@ -1111,6 +1198,7 @@ object OrElse {
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@InternalApi
|
||||
private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] {
|
||||
val primary = Inlet[T]("OrElse.primary")
|
||||
val secondary = Inlet[T]("OrElse.secondary")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue