OrElse stage for fallback when no elements emitted #21024

This commit is contained in:
Johan Andrén 2016-08-30 13:35:59 +02:00 committed by GitHub
parent fbfc8b4c58
commit 012919fdb3
12 changed files with 465 additions and 7 deletions

View file

@ -1252,6 +1252,24 @@ If materialized values needs to be collected ``prependMat`` is available.
**completes** when all upstreams complete
orElse
^^^^^^
If the primary source completes without emitting any elements, the elements from the secondary source
are emitted. If the primary source emits any elements the secondary source is cancelled.
Note that both sources are materialized directly and the secondary source is backpressured until it becomes
the source of elements or is cancelled.
Signal errors downstream, regardless which of the two sources emitted the error.
**emits** when an element is available from first stream or first stream closed without emitting any elements and an element
is available from the second stream
**backpressures** when downstream backpressures
**completes** the primary stream completes after emitting at least one element, when the primary stream completes
without emitting and the secondary stream already has completed or when the secondary stream completes
interleave
^^^^^^^^^^
Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one

View file

@ -1254,6 +1254,24 @@ If materialized values needs to be collected ``prependMat`` is available.
**completes** when all upstreams complete
orElse
^^^^^^
If the primary source completes without emitting any elements, the elements from the secondary source
are emitted. If the primary source emits any elements the secondary source is cancelled.
Note that both sources are materialized directly and the secondary source is backpressured until it becomes
the source of elements or is cancelled.
Signal errors downstream, regardless which of the two sources emitted the error.
**emits** when an element is available from first stream or first stream closed without emitting any elements and an element
is available from the second stream
**backpressures** when downstream backpressures
**completes** the primary stream completes after emitting at least one element, when the primary stream completes
without emitting and the secondary stream already has completed or when the secondary stream completes
interleave
^^^^^^^^^^
Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one

View file

@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph")
val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass graphHelpers,
jSourceClass graphHelpers,

View file

@ -0,0 +1,151 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.testkit.AkkaSpec
import scala.collection.immutable.Seq
class FlowOrElseSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
"An OrElse flow" should {
"pass elements from the first input" in {
val source1 = Source(Seq(1, 2, 3))
val source2 = Source(Seq(4, 5, 6))
val sink = Sink.seq[Int]
source1.orElse(source2).runWith(sink).futureValue shouldEqual Seq(1, 2, 3)
}
"pass elements from the second input if the first completes with no elements emitted" in {
val source1 = Source.empty[Int]
val source2 = Source(Seq(4, 5, 6))
val sink = Sink.seq[Int]
source1.orElse(source2).runWith(sink).futureValue shouldEqual Seq(4, 5, 6)
}
"pass elements from input one through and cancel input 2" in new OrElseProbedFlow {
outProbe.request(1)
inProbe1.expectRequest()
inProbe1.sendNext('a')
outProbe.expectNext('a')
inProbe1.sendComplete()
inProbe2.expectCancellation()
outProbe.expectComplete()
}
"pass elements from input two when input 1 has completed without elements" in new OrElseProbedFlow {
outProbe.request(1)
inProbe1.sendComplete()
inProbe2.expectRequest()
inProbe2.sendNext('a')
outProbe.expectNext('a')
inProbe2.sendComplete()
outProbe.expectComplete()
}
"pass elements from input two when input 1 has completed without elements (lazyEmpty)" in {
val inProbe1 = TestPublisher.lazyEmpty[Char]
val source1 = Source.fromPublisher(inProbe1)
val inProbe2 = TestPublisher.probe[Char]()
val source2 = Source.fromPublisher(inProbe2)
val outProbe = TestSubscriber.probe[Char]()
val sink = Sink.fromSubscriber(outProbe)
source1.orElse(source2).runWith(sink)
outProbe.request(1)
inProbe2.expectRequest()
inProbe2.sendNext('a')
outProbe.expectNext('a')
inProbe2.sendComplete()
outProbe.expectComplete()
}
"pass all available requested elements from input two when input 1 has completed without elements" in new OrElseProbedFlow {
outProbe.request(5)
inProbe1.sendComplete()
inProbe2.expectRequest()
inProbe2.sendNext('a')
outProbe.expectNext('a')
inProbe2.sendNext('b')
outProbe.expectNext('b')
inProbe2.sendNext('c')
outProbe.expectNext('c')
inProbe2.sendComplete()
outProbe.expectComplete()
}
"complete when both inputs completes without emitting elements" in new OrElseProbedFlow {
outProbe.ensureSubscription()
inProbe1.sendComplete()
inProbe2.sendComplete()
outProbe.expectComplete()
}
"complete when both inputs completes without emitting elements, regardless of order" in new OrElseProbedFlow {
outProbe.ensureSubscription()
inProbe2.sendComplete()
outProbe.expectNoMsg() // make sure it did not complete here
inProbe1.sendComplete()
outProbe.expectComplete()
}
"continue passing primary through when secondary completes" in new OrElseProbedFlow {
outProbe.ensureSubscription()
outProbe.request(1)
inProbe2.sendComplete()
inProbe1.expectRequest()
inProbe1.sendNext('a')
outProbe.expectNext('a')
inProbe1.sendComplete()
outProbe.expectComplete()
}
"fail when input 1 fails" in new OrElseProbedFlow {
outProbe.ensureSubscription()
inProbe1.sendError(TE("in1 failed"))
inProbe2.expectCancellation()
outProbe.expectError()
}
"fail when input 2 fails" in new OrElseProbedFlow {
outProbe.ensureSubscription()
inProbe2.sendError(TE("in2 failed"))
inProbe1.expectCancellation()
outProbe.expectError()
}
trait OrElseProbedFlow {
val inProbe1 = TestPublisher.probe[Char]()
val source1 = Source.fromPublisher(inProbe1)
val inProbe2 = TestPublisher.probe[Char]()
val source2 = Source.fromPublisher(inProbe2)
val outProbe = TestSubscriber.probe[Char]()
val sink = Sink.fromSubscriber(outProbe)
source1.orElse(source2).runWith(sink)
}
}
}

View file

@ -83,6 +83,7 @@ object Stages {
val zipWithN = name("zipWithN")
val unzip = name("unzip")
val concat = name("concat")
val orElse = name("orElse")
val repeat = name("repeat")
val unfold = name("unfold")
val unfoldAsync = name("unfoldAsync")

View file

@ -1359,6 +1359,46 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.orElse(secondary))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#orElse]]
*/
def orElseMat[T >: Out, M2, M3](
secondary: Graph[SourceShape[T], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, T, M3] =
new Flow(delegate.orElseMat(secondary)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].

View file

@ -639,6 +639,44 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] =
new Source(delegate.orElse(secondary))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#orElse]]
*/
def orElseMat[T >: Out, M, M2](secondary: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.orElseMat(secondary)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].

View file

@ -982,6 +982,31 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] =
new SubFlow(delegate.prepend(that))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubFlow[In, T, Mat] =
new SubFlow(delegate.orElse(secondary))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].

View file

@ -980,6 +980,31 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] =
new SubSource(delegate.prepend(that))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubSource[T, Mat] =
new SubSource(delegate.orElse(secondary))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].

View file

@ -1793,6 +1793,40 @@ trait FlowOps[+Out, +Mat] {
FlowShape(merge.in(1), merge.out)
}
/**
* Provides a secondary source that will be consumed if this stream completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[U >: Out, Mat2](secondary: Graph[SourceShape[U], Mat2]): Repr[U] =
via(orElseGraph(secondary))
protected def orElseGraph[U >: Out, Mat2](secondary: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(secondary) { implicit b secondary
val orElse = b.add(OrElse[U]())
secondary ~> orElse.in(1)
FlowShape(orElse.in(0), orElse.out)
}
/**
* Concatenates this [[Flow]] with the given [[Source]] so the first element
* emitted by that source is emitted after the last element of this
@ -2031,6 +2065,31 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that))(matF)
/**
* Provides a secondary source that will be consumed if this stream completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElseMat[U >: Out, Mat2, Mat3](secondary: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(orElseGraph(secondary))(matF)
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].

View file

@ -11,7 +11,8 @@ import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
import scala.collection.immutable
@ -911,6 +912,84 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
override def toString: String = s"Concat($inputPorts)"
}
object OrElse {
private val singleton = new OrElse[Nothing]
def apply[T]() = singleton.asInstanceOf[OrElse[T]]
}
/**
* Takes two streams and passes the first through, the secondary stream is only passed
* through if the primary stream completes without passing any elements through. When
* the first element is passed through from the primary the secondary is cancelled.
* Both incoming streams are materialized when the stage is materialized.
*
* On errors the stage is failed regardless of source of the error.
*
* '''Emits when''' element is available from primary stream or the primary stream closed without emitting any elements and an element
* is available from the secondary stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels
*/
private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] {
val primary = Inlet[T]("OrElse.primary")
val secondary = Inlet[T]("OrElse.secondary")
val out = Outlet[T]("OrElse.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, primary, secondary)
override protected def initialAttributes: Attributes = DefaultAttributes.orElse
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler {
private[this] var currentIn = primary
private[this] var primaryPushed = false
override def onPull(): Unit = {
pull(currentIn)
}
// for the primary inHandler
override def onPush(): Unit = {
if (!primaryPushed) {
primaryPushed = true
cancel(secondary)
}
val elem = grab(primary)
push(out, elem)
}
// for the primary inHandler
override def onUpstreamFinish(): Unit = {
if (!primaryPushed && !isClosed(secondary)) {
currentIn = secondary
if (isAvailable(out)) pull(secondary)
} else {
completeStage()
}
}
setHandler(secondary, new InHandler {
override def onPush(): Unit = {
push(out, grab(secondary))
}
override def onUpstreamFinish(): Unit = {
if (isClosed(primary)) completeStage()
}
})
setHandlers(primary, out, this)
}
override def toString: String = s"OrElse"
}
object GraphDSL extends GraphApply {
class Builder[+M] private[stream] () {

View file

@ -910,17 +910,17 @@ object MiMa extends AutoPlugin {
// Interpreter internals change
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.stage.GraphStageLogic.portToConn"),
// #20994 adding new decode method, since we're on JDK7+ now
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.util.ByteString.decodeString"),
// #20508 HTTP: Document how to be able to support custom request methods
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMethod.getRequestEntityAcceptance"),
// #20976 provide different options to deal with the illegal response header value
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getIllegalResponseHeaderValueProcessingMode"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.illegalResponseHeaderValueProcessingMode"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf"),
// #20628 migrate Masker to GraphStage
@ -929,7 +929,7 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.ws.Masking#Masker.initial"),
ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.ws.Masking$Masker$Running"),
ProblemFilters.exclude[MissingTypesProblem]("akka.http.impl.engine.ws.Masking$Unmasking"),
// #
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpEntity.discardBytes"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.discardBytes"),
@ -942,12 +942,16 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.Deployer.lookup"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.find"),
// #20942 ClusterSingleton
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.singleton.ClusterSingletonManager.addRemoved"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.selfAddressOption")
),
"2.4.9" -> Seq(
// #21025 new orElse flow op
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat")
)
)
}