diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index d5b5b50b92..166d2323cf 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -1252,6 +1252,24 @@ If materialized values needs to be collected ``prependMat`` is available. **completes** when all upstreams complete +orElse +^^^^^^ +If the primary source completes without emitting any elements, the elements from the secondary source +are emitted. If the primary source emits any elements the secondary source is cancelled. + +Note that both sources are materialized directly and the secondary source is backpressured until it becomes +the source of elements or is cancelled. + +Signal errors downstream, regardless which of the two sources emitted the error. + +**emits** when an element is available from first stream or first stream closed without emitting any elements and an element +is available from the second stream + +**backpressures** when downstream backpressures + +**completes** the primary stream completes after emitting at least one element, when the primary stream completes +without emitting and the secondary stream already has completed or when the secondary stream completes + interleave ^^^^^^^^^^ Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index d5a263d64f..7181cad197 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -1254,6 +1254,24 @@ If materialized values needs to be collected ``prependMat`` is available. **completes** when all upstreams complete +orElse +^^^^^^ +If the primary source completes without emitting any elements, the elements from the secondary source +are emitted. If the primary source emits any elements the secondary source is cancelled. + +Note that both sources are materialized directly and the secondary source is backpressured until it becomes +the source of elements or is cancelled. + +Signal errors downstream, regardless which of the two sources emitted the error. + +**emits** when an element is available from first stream or first stream closed without emitting any elements and an element +is available from the second stream + +**backpressures** when downstream backpressures + +**completes** the primary stream completes after emitting at least one element, when the primary stream completes +without emitting and the secondary stream already has completed or when the secondary stream completes + interleave ^^^^^^^^^^ Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index a4ee8188a0..235e295937 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass → graphHelpers, jSourceClass → graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala new file mode 100644 index 0000000000..2b728f1204 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.testkit.AkkaSpec + +import scala.collection.immutable.Seq + +class FlowOrElseSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + + implicit val materializer = ActorMaterializer(settings) + + "An OrElse flow" should { + + "pass elements from the first input" in { + val source1 = Source(Seq(1, 2, 3)) + val source2 = Source(Seq(4, 5, 6)) + + val sink = Sink.seq[Int] + + source1.orElse(source2).runWith(sink).futureValue shouldEqual Seq(1, 2, 3) + } + + "pass elements from the second input if the first completes with no elements emitted" in { + val source1 = Source.empty[Int] + val source2 = Source(Seq(4, 5, 6)) + val sink = Sink.seq[Int] + + source1.orElse(source2).runWith(sink).futureValue shouldEqual Seq(4, 5, 6) + } + + "pass elements from input one through and cancel input 2" in new OrElseProbedFlow { + outProbe.request(1) + inProbe1.expectRequest() + inProbe1.sendNext('a') + outProbe.expectNext('a') + inProbe1.sendComplete() + inProbe2.expectCancellation() + outProbe.expectComplete() + } + + "pass elements from input two when input 1 has completed without elements" in new OrElseProbedFlow { + outProbe.request(1) + inProbe1.sendComplete() + inProbe2.expectRequest() + inProbe2.sendNext('a') + outProbe.expectNext('a') + inProbe2.sendComplete() + outProbe.expectComplete() + } + + "pass elements from input two when input 1 has completed without elements (lazyEmpty)" in { + val inProbe1 = TestPublisher.lazyEmpty[Char] + val source1 = Source.fromPublisher(inProbe1) + val inProbe2 = TestPublisher.probe[Char]() + val source2 = Source.fromPublisher(inProbe2) + val outProbe = TestSubscriber.probe[Char]() + val sink = Sink.fromSubscriber(outProbe) + + source1.orElse(source2).runWith(sink) + outProbe.request(1) + inProbe2.expectRequest() + inProbe2.sendNext('a') + outProbe.expectNext('a') + inProbe2.sendComplete() + + outProbe.expectComplete() + } + + "pass all available requested elements from input two when input 1 has completed without elements" in new OrElseProbedFlow { + outProbe.request(5) + + inProbe1.sendComplete() + + inProbe2.expectRequest() + inProbe2.sendNext('a') + outProbe.expectNext('a') + + inProbe2.sendNext('b') + outProbe.expectNext('b') + + inProbe2.sendNext('c') + outProbe.expectNext('c') + + inProbe2.sendComplete() + outProbe.expectComplete() + } + + "complete when both inputs completes without emitting elements" in new OrElseProbedFlow { + outProbe.ensureSubscription() + inProbe1.sendComplete() + inProbe2.sendComplete() + outProbe.expectComplete() + } + + "complete when both inputs completes without emitting elements, regardless of order" in new OrElseProbedFlow { + outProbe.ensureSubscription() + inProbe2.sendComplete() + outProbe.expectNoMsg() // make sure it did not complete here + inProbe1.sendComplete() + outProbe.expectComplete() + } + + "continue passing primary through when secondary completes" in new OrElseProbedFlow { + outProbe.ensureSubscription() + outProbe.request(1) + inProbe2.sendComplete() + + inProbe1.expectRequest() + inProbe1.sendNext('a') + outProbe.expectNext('a') + + inProbe1.sendComplete() + outProbe.expectComplete() + } + + "fail when input 1 fails" in new OrElseProbedFlow { + outProbe.ensureSubscription() + inProbe1.sendError(TE("in1 failed")) + inProbe2.expectCancellation() + outProbe.expectError() + } + + "fail when input 2 fails" in new OrElseProbedFlow { + outProbe.ensureSubscription() + inProbe2.sendError(TE("in2 failed")) + inProbe1.expectCancellation() + outProbe.expectError() + } + + trait OrElseProbedFlow { + val inProbe1 = TestPublisher.probe[Char]() + val source1 = Source.fromPublisher(inProbe1) + val inProbe2 = TestPublisher.probe[Char]() + val source2 = Source.fromPublisher(inProbe2) + + val outProbe = TestSubscriber.probe[Char]() + val sink = Sink.fromSubscriber(outProbe) + + source1.orElse(source2).runWith(sink) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e34ebc5871..c64a01b012 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -83,6 +83,7 @@ object Stages { val zipWithN = name("zipWithN") val unzip = name("unzip") val concat = name("concat") + val orElse = name("orElse") val repeat = name("repeat") val unfold = name("unfold") val unfoldAsync = name("unfoldAsync") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 392d7badb4..9232aa090e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1359,6 +1359,46 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.prependMat(that)(combinerToScala(matF))) + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.orElse(secondary)) + + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#orElse]] + */ + def orElseMat[T >: Out, M2, M3]( + secondary: Graph[SourceShape[T], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, T, M3] = + new Flow(delegate.orElseMat(secondary)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e5428cbfdb..abb3f0ead8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -639,6 +639,44 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.prependMat(that)(combinerToScala(matF))) + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + new Source(delegate.orElse(secondary)) + + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#orElse]] + */ + def orElseMat[T >: Out, M, M2](secondary: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.orElseMat(secondary)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index cb315e5932..2feb262ee8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -982,6 +982,31 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = new SubFlow(delegate.prepend(that)) + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubFlow[In, T, Mat] = + new SubFlow(delegate.orElse(secondary)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index abe7defdad..4cd8bc01e3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -980,6 +980,31 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = new SubSource(delegate.prepend(that)) + /** + * Provides a secondary source that will be consumed if this source completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubSource[T, Mat] = + new SubSource(delegate.orElse(secondary)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1a8b18ac8c..cc6c1d3a10 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1793,6 +1793,40 @@ trait FlowOps[+Out, +Mat] { FlowShape(merge.in(1), merge.out) } + /** + * Provides a secondary source that will be consumed if this stream completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElse[U >: Out, Mat2](secondary: Graph[SourceShape[U], Mat2]): Repr[U] = + via(orElseGraph(secondary)) + + protected def orElseGraph[U >: Out, Mat2](secondary: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = + GraphDSL.create(secondary) { implicit b ⇒ secondary ⇒ + val orElse = b.add(OrElse[U]()) + + secondary ~> orElse.in(1) + + FlowShape(orElse.in(0), orElse.out) + } + /** * Concatenates this [[Flow]] with the given [[Source]] so the first element * emitted by that source is emitted after the last element of this @@ -2031,6 +2065,31 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(prependGraph(that))(matF) + /** + * Provides a secondary source that will be consumed if this stream completes without any + * elements passing by. As soon as the first element comes through this stream, the alternative + * will be cancelled. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes or it gets + * cancelled. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element + * is available from the second stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes + * by from this stream. + */ + def orElseMat[U >: Out, Mat2, Mat3](secondary: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(orElseGraph(secondary))(matF) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 8c924f90fe..fb9869d6f4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -11,7 +11,8 @@ import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout._ import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException -import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec import scala.collection.immutable @@ -911,6 +912,84 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[ override def toString: String = s"Concat($inputPorts)" } +object OrElse { + private val singleton = new OrElse[Nothing] + def apply[T]() = singleton.asInstanceOf[OrElse[T]] +} + +/** + * Takes two streams and passes the first through, the secondary stream is only passed + * through if the primary stream completes without passing any elements through. When + * the first element is passed through from the primary the secondary is cancelled. + * Both incoming streams are materialized when the stage is materialized. + * + * On errors the stage is failed regardless of source of the error. + * + * '''Emits when''' element is available from primary stream or the primary stream closed without emitting any elements and an element + * is available from the secondary stream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes + * without emitting and the secondary stream already has completed or when the secondary stream completes + * + * '''Cancels when''' downstream cancels + */ +private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] { + val primary = Inlet[T]("OrElse.primary") + val secondary = Inlet[T]("OrElse.secondary") + val out = Outlet[T]("OrElse.out") + + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, primary, secondary) + + override protected def initialAttributes: Attributes = DefaultAttributes.orElse + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler { + + private[this] var currentIn = primary + private[this] var primaryPushed = false + + override def onPull(): Unit = { + pull(currentIn) + } + + // for the primary inHandler + override def onPush(): Unit = { + if (!primaryPushed) { + primaryPushed = true + cancel(secondary) + } + val elem = grab(primary) + push(out, elem) + } + + // for the primary inHandler + override def onUpstreamFinish(): Unit = { + if (!primaryPushed && !isClosed(secondary)) { + currentIn = secondary + if (isAvailable(out)) pull(secondary) + } else { + completeStage() + } + } + + setHandler(secondary, new InHandler { + override def onPush(): Unit = { + push(out, grab(secondary)) + } + + override def onUpstreamFinish(): Unit = { + if (isClosed(primary)) completeStage() + } + }) + + setHandlers(primary, out, this) + } + + override def toString: String = s"OrElse" + +} + object GraphDSL extends GraphApply { class Builder[+M] private[stream] () { diff --git a/project/MiMa.scala b/project/MiMa.scala index e4e5a4c502..beee48c78a 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -910,17 +910,17 @@ object MiMa extends AutoPlugin { // Interpreter internals change ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.stage.GraphStageLogic.portToConn"), - + // #20994 adding new decode method, since we're on JDK7+ now ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.util.ByteString.decodeString"), // #20508 HTTP: Document how to be able to support custom request methods ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMethod.getRequestEntityAcceptance"), - + // #20976 provide different options to deal with the illegal response header value ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getIllegalResponseHeaderValueProcessingMode"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.illegalResponseHeaderValueProcessingMode"), - + ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf"), // #20628 migrate Masker to GraphStage @@ -929,7 +929,7 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.ws.Masking#Masker.initial"), ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.ws.Masking$Masker$Running"), ProblemFilters.exclude[MissingTypesProblem]("akka.http.impl.engine.ws.Masking$Unmasking"), - + // # ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpEntity.discardBytes"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.discardBytes"), @@ -942,12 +942,16 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.Deployer.lookup"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.find"), - + // #20942 ClusterSingleton ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.singleton.ClusterSingletonManager.addRemoved"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.selfAddressOption") ), "2.4.9" -> Seq( + // #21025 new orElse flow op + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat") ) ) }