!str #15742 Hook-up splitWhen to new DSL

* Produces flows instead of publishers
This commit is contained in:
Patrik Nordwall 2014-09-08 16:04:35 +02:00
parent c636057f49
commit d3c34269e7
4 changed files with 192 additions and 0 deletions

View file

@ -32,6 +32,10 @@ private[akka] object Ast {
override def name = "prefixAndTail"
}
case class SplitWhen(p: Any Boolean) extends AstNode {
override def name = "splitWhen"
}
}
/**
@ -189,6 +193,7 @@ private[akka] object ActorProcessorFactory {
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
}).withDispatcher(settings.dispatcher)

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl2
import akka.stream.MaterializerSettings
import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamOutputProcessor
import akka.stream.scaladsl2.FlowFrom
/**
* INTERNAL API
*/
private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
var currentSubstream: SubstreamOutputs = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
nextPhase(openSubstream(primaryInputs.dequeueInputElement()))
}
def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
val substreamOutput = newSubstream()
val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement(substreamFlow)
currentSubstream = substreamOutput
nextPhase(serveSubstreamFirst(currentSubstream, elem))
}
// Serving the substream is split into two phases to minimize elements "held in hand"
def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
nextPhase(serveSubstreamRest(substream))
}
// Note that this phase is allocated only once per _slice_ and not per element
def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) {
currentSubstream.complete()
currentSubstream = null
nextPhase(openSubstream(elem))
} else substream.enqueueOutputElement(elem)
}
// Ignore elements for a cancelled substream until a new substream needs to be opened
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) nextPhase(openSubstream(elem))
}
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.invalidateSubstream(substream)
}
}

View file

@ -67,6 +67,22 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] {
*/
def groupBy[K, U >: Out](f: Out K): Repr[In, (K, FlowWithSource[U, U])] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it. This means
* that for the following series of predicate values, three substreams will
* be produced with lengths 1, 2, and 3:
*
* {{{
* false, // element goes into first substream
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[In, FlowWithSource[U, U]] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
}
/**

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.AkkaSpec
import org.reactivestreams.Publisher
import akka.stream.MaterializerSettings
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowSplitWhenSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
.withFanOutBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = FlowMaterializer(settings)
case class StreamPuppet(p: Publisher[Int]) {
val probe = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(probe)
val subscription = probe.expectSubscription()
def request(demand: Int): Unit = subscription.request(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val source = FlowFrom((1 to elementCount).iterator)
val groupStream = source.splitWhen(_ == splitWhen).toPublisher()
val masterSubscriber = StreamTestKit.SubscriberProbe[FlowWithSource[Int, Int]]()
groupStream.subscribe(masterSubscriber)
val masterSubscription = masterSubscriber.expectSubscription()
def getSubFlow(): FlowWithSource[Int, Int] = {
masterSubscription.request(1)
expectSubPublisher()
}
def expectSubPublisher(): FlowWithSource[Int, Int] = {
val substream = masterSubscriber.expectNext()
substream
}
}
"splitWhen" must {
"work in the happy case" in new SubstreamsSupport(elementCount = 4) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
masterSubscriber.expectNoMsg(100.millis)
s1.request(2)
s1.expectNext(1)
s1.expectNext(2)
s1.request(1)
s1.expectComplete()
val s2 = StreamPuppet(getSubFlow().toPublisher())
s2.request(1)
s2.expectNext(3)
s2.expectNoMsg(100.millis)
s2.request(1)
s2.expectNext(4)
s2.request(1)
s2.expectComplete()
masterSubscriber.expectComplete()
}
"support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
s1.cancel()
val s2 = StreamPuppet(getSubFlow().toPublisher())
s2.request(4)
s2.expectNext(5)
s2.expectNext(6)
s2.expectNext(7)
s2.expectNext(8)
s2.request(1)
s2.expectComplete()
masterSubscription.request(1)
masterSubscriber.expectComplete()
}
"support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
masterSubscription.cancel()
s1.request(4)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectNext(4)
s1.request(1)
s1.expectComplete()
}
}
}