Merge pull request #19910 from vans239/master
Replaced PushStage based TakeWhile with GraphStage #19834
This commit is contained in:
commit
8449647e40
4 changed files with 33 additions and 42 deletions
|
|
@ -173,10 +173,6 @@ private[stream] object Stages {
|
|||
override def create(attr: Attributes): Stage[T, immutable.Seq[T]] = fusing.Sliding(n, step)
|
||||
}
|
||||
|
||||
final case class TakeWhile[T](p: T ⇒ Boolean, attributes: Attributes = takeWhile) extends SymbolicStage[T, T] {
|
||||
override def create(attr: Attributes): Stage[T, T] = fusing.TakeWhile(p, supervision(attr))
|
||||
}
|
||||
|
||||
final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, attributes: Attributes = scan) extends SymbolicStage[In, Out] {
|
||||
override def create(attr: Attributes): Stage[In, Out] = fusing.Scan(zero, f, supervision(attr))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,15 +45,37 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
|
||||
private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
||||
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
|
||||
|
||||
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
||||
if (p(elem))
|
||||
ctx.push(elem)
|
||||
else
|
||||
ctx.finish()
|
||||
override def toString: String = "TakeWhile"
|
||||
|
||||
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with OutHandler with InHandler {
|
||||
override def toString = "TakeWhileLogic"
|
||||
|
||||
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
val elem = grab(in)
|
||||
if (p(elem)) {
|
||||
push(out, elem)
|
||||
} else {
|
||||
completeStage()
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ decider(ex) match {
|
||||
case Supervision.Stop ⇒ failStage(ex)
|
||||
case _ ⇒ pull(in)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -616,7 +616,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = andThen(TakeWhile(p))
|
||||
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = via(TakeWhile(p))
|
||||
|
||||
/**
|
||||
* Discard elements at the beginning of the stream while predicate is true.
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
*/
|
||||
package akka
|
||||
|
||||
import akka.MiMa.FilterAnyProblemStartingWith
|
||||
import sbt._
|
||||
import sbt.Keys._
|
||||
import com.typesafe.tools.mima.plugin.MimaPlugin
|
||||
|
|
@ -75,6 +74,7 @@ object MiMa extends AutoPlugin {
|
|||
val bcIssuesBetween23and24 = Seq(
|
||||
FilterAnyProblem("akka.remote.testconductor.Terminate"),
|
||||
FilterAnyProblem("akka.remote.testconductor.TerminateMsg"),
|
||||
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testconductor.Conductor.shutdown"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testkit.MultiNodeSpec.akka$remote$testkit$MultiNodeSpec$$deployer"),
|
||||
FilterAnyProblem("akka.remote.EndpointManager$Pass"),
|
||||
|
|
@ -713,33 +713,6 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),
|
||||
ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),
|
||||
|
||||
// #20028 Simplify TickSource cancellation
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"),
|
||||
|
||||
// #19834 replacing PushStages usages with GraphStages
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$LimitWeighted"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Collect$"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$LimitWeighted$"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Collect"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile$"),
|
||||
FilterAnyProblemStartingWith("akka.stream.impl.fusing.Collect"),
|
||||
FilterAnyProblemStartingWith("akka.stream.impl.fusing.DropWhile"),
|
||||
FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"),
|
||||
|
||||
// #19892 Removed internal Breaker classes from akka.stream.impl.fusing
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.breaker"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.bidiBreaker"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail$"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel$"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail"),
|
||||
|
||||
// #19390 Add flow monitor
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor")
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue