=str simplify splitWhen and splitAfter's signature.

This commit is contained in:
何品 2015-10-27 02:28:54 +08:00 committed by hepin
parent aa339e41ec
commit cf19262e68
2 changed files with 17 additions and 17 deletions

View file

@ -5,16 +5,16 @@ package akka.stream.impl
import akka.event.LoggingAdapter
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.impl.StreamLayout._
import akka.stream._
import akka.stream.Attributes._
import akka.stream.Supervision.Decider
import akka.stream._
import akka.stream.impl.SplitDecision.{ Continue, SplitAfter, SplitBefore, SplitDecision }
import akka.stream.impl.StreamLayout._
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage.{ GraphStageLogic, GraphStage, Stage }
import akka.stream.stage.Stage
import org.reactivestreams.Processor
import scala.collection.immutable
import scala.concurrent.Future
/**
* INTERNAL API
@ -226,6 +226,12 @@ private[stream] object Stages {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
}
object Split {
def when(f: Any Boolean) = Split(el if (f(el)) SplitBefore else Continue, name("splitWhen"))
def after(f: Any Boolean) = Split(el if (f(el)) SplitAfter else Continue, name("splitAfter"))
}
final case class ConcatAll(attributes: Attributes = concatAll) extends StageModule {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
}

View file

@ -6,10 +6,8 @@ package akka.stream.scaladsl
import akka.event.LoggingAdapter
import akka.stream.Attributes._
import akka.stream._
import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage }
import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.Timers
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered }
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage }
@ -958,10 +956,8 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels and substreams cancel
*
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = {
val f = p.asInstanceOf[Any Boolean]
withAttributes(name("splitWhen")).deprecatedAndThen(Split(el if (f(el)) SplitBefore else Continue))
}
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U, Unit], Mat] =
deprecatedAndThen(Split.when(p.asInstanceOf[Any Boolean]))
/**
* This operation applies the given predicate to all incoming elements and
@ -995,10 +991,8 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.splitAfter]].
*/
def splitAfter[U >: Out](p: Out Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = {
val f = p.asInstanceOf[Any Boolean]
withAttributes(name("splitAfter")).deprecatedAndThen(Split(el if (f(el)) SplitAfter else Continue))
}
def splitAfter[U >: Out](p: Out Boolean): Repr[Source[U, Unit], Mat] =
deprecatedAndThen(Split.after(p.asInstanceOf[Any Boolean]))
/**
* Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other.