diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 2640e4d954..234568e8b6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -5,16 +5,16 @@ package akka.stream.impl import akka.event.LoggingAdapter import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream.Supervision.Decider -import akka.stream.impl.SplitDecision.SplitDecision -import akka.stream.impl.StreamLayout._ -import akka.stream._ import akka.stream.Attributes._ +import akka.stream.Supervision.Decider +import akka.stream._ +import akka.stream.impl.SplitDecision.{ Continue, SplitAfter, SplitBefore, SplitDecision } +import akka.stream.impl.StreamLayout._ import akka.stream.stage.AbstractStage.PushPullGraphStage -import akka.stream.stage.{ GraphStageLogic, GraphStage, Stage } +import akka.stream.stage.Stage import org.reactivestreams.Processor + import scala.collection.immutable -import scala.concurrent.Future /** * INTERNAL API @@ -226,6 +226,12 @@ private[stream] object Stages { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) } + object Split { + def when(f: Any ⇒ Boolean) = Split(el ⇒ if (f(el)) SplitBefore else Continue, name("splitWhen")) + + def after(f: Any ⇒ Boolean) = Split(el ⇒ if (f(el)) SplitAfter else Continue, name("splitAfter")) + } + final case class ConcatAll(attributes: Attributes = concatAll) extends StageModule { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d9eaff2d14..3626101073 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -6,10 +6,8 @@ package akka.stream.scaladsl import akka.event.LoggingAdapter import akka.stream.Attributes._ import akka.stream._ -import akka.stream.impl.SplitDecision._ -import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage } +import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.impl.Timers import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } @@ -958,10 +956,8 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels and substreams cancel * */ - def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = { - val f = p.asInstanceOf[Any ⇒ Boolean] - withAttributes(name("splitWhen")).deprecatedAndThen(Split(el ⇒ if (f(el)) SplitBefore else Continue)) - } + def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U, Unit], Mat] = + deprecatedAndThen(Split.when(p.asInstanceOf[Any ⇒ Boolean])) /** * This operation applies the given predicate to all incoming elements and @@ -995,10 +991,8 @@ trait FlowOps[+Out, +Mat] { * * See also [[FlowOps.splitAfter]]. */ - def splitAfter[U >: Out](p: Out ⇒ Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = { - val f = p.asInstanceOf[Any ⇒ Boolean] - withAttributes(name("splitAfter")).deprecatedAndThen(Split(el ⇒ if (f(el)) SplitAfter else Continue)) - } + def splitAfter[U >: Out](p: Out ⇒ Boolean): Repr[Source[U, Unit], Mat] = + deprecatedAndThen(Split.after(p.asInstanceOf[Any ⇒ Boolean])) /** * Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other.