diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 4317e29696..2252d46faf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -173,10 +173,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[T, immutable.Seq[T]] = fusing.Sliding(n, step) } - final case class TakeWhile[T](p: T ⇒ Boolean, attributes: Attributes = takeWhile) extends SymbolicStage[T, T] { - override def create(attr: Attributes): Stage[T, T] = fusing.TakeWhile(p, supervision(attr)) - } - final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, attributes: Attributes = scan) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.Scan(zero, f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 409a2e7a09..71a8a5cd00 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -43,17 +43,39 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision. } /** - * INTERNAL API - */ -private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] { + * INTERNAL API + */ +private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { + override def initialAttributes: Attributes = DefaultAttributes.takeWhile - override def onPush(elem: T, ctx: Context[T]): SyncDirective = - if (p(elem)) - ctx.push(elem) - else - ctx.finish() + override def toString: String = "TakeWhile" - override def decide(t: Throwable): Supervision.Directive = decider(t) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler with InHandler { + override def toString = "TakeWhileLogic" + + def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + override def onPush(): Unit = { + try { + val elem = grab(in) + if (p(elem)) { + push(out, elem) + } else { + completeStage() + } + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ pull(in) + } + } + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 95696dd994..39027cb6b3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -616,7 +616,7 @@ trait FlowOps[+Out, +Mat] { * * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] */ - def takeWhile(p: Out ⇒ Boolean): Repr[Out] = andThen(TakeWhile(p)) + def takeWhile(p: Out ⇒ Boolean): Repr[Out] = via(TakeWhile(p)) /** * Discard elements at the beginning of the stream while predicate is true. diff --git a/project/MiMa.scala b/project/MiMa.scala index fcfbb05253..dcc2959cd7 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -3,7 +3,6 @@ */ package akka -import akka.MiMa.FilterAnyProblemStartingWith import sbt._ import sbt.Keys._ import com.typesafe.tools.mima.plugin.MimaPlugin @@ -75,6 +74,7 @@ object MiMa extends AutoPlugin { val bcIssuesBetween23and24 = Seq( FilterAnyProblem("akka.remote.testconductor.Terminate"), FilterAnyProblem("akka.remote.testconductor.TerminateMsg"), + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testconductor.Conductor.shutdown"), ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testkit.MultiNodeSpec.akka$remote$testkit$MultiNodeSpec$$deployer"), FilterAnyProblem("akka.remote.EndpointManager$Pass"), @@ -713,33 +713,6 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), - // #20028 Simplify TickSource cancellation - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"), - - // #19834 replacing PushStages usages with GraphStages - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$LimitWeighted"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Collect$"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$LimitWeighted$"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Collect"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile$"), - FilterAnyProblemStartingWith("akka.stream.impl.fusing.Collect"), - FilterAnyProblemStartingWith("akka.stream.impl.fusing.DropWhile"), - FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"), - - // #19892 Removed internal Breaker classes from akka.stream.impl.fusing - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.breaker"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.bidiBreaker"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail$"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel$"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail"), - // #19390 Add flow monitor ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") )