diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 9fbfef0668..84a4ef2f91 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -32,6 +32,10 @@ private[akka] object Ast { override def name = "prefixAndTail" } + case class SplitWhen(p: Any ⇒ Boolean) extends AstNode { + override def name = "splitWhen" + } + } /** @@ -189,6 +193,7 @@ private[akka] object ActorProcessorFactory { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) }).withDispatcher(settings.dispatcher) diff --git a/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala new file mode 100644 index 0000000000..757338840b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import akka.stream.MaterializerSettings +import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey +import akka.stream.impl.TransferPhase +import akka.stream.impl.MultiStreamOutputProcessor +import akka.stream.scaladsl2.FlowFrom + +/** + * INTERNAL API + */ +private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any ⇒ Boolean) + extends MultiStreamOutputProcessor(_settings) { + + var currentSubstream: SubstreamOutputs = _ + + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + nextPhase(openSubstream(primaryInputs.dequeueInputElement())) + } + + def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + val substreamOutput = newSubstream() + val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher + primaryOutputs.enqueueOutputElement(substreamFlow) + currentSubstream = substreamOutput + nextPhase(serveSubstreamFirst(currentSubstream, elem)) + } + + // Serving the substream is split into two phases to minimize elements "held in hand" + def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + nextPhase(serveSubstreamRest(substream)) + } + + // Note that this phase is allocated only once per _slice_ and not per element + def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + if (splitPredicate(elem)) { + currentSubstream.complete() + currentSubstream = null + nextPhase(openSubstream(elem)) + } else substream.enqueueOutputElement(elem) + } + + // Ignore elements for a cancelled substream until a new substream needs to be opened + val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + if (splitPredicate(elem)) nextPhase(openSubstream(elem)) + } + + nextPhase(waitFirst) + + override def invalidateSubstream(substream: SubstreamKey): Unit = { + if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) + super.invalidateSubstream(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 4eed36d9f3..e309e818ca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -67,6 +67,22 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] { */ def groupBy[K, U >: Out](f: Out ⇒ K): Repr[In, (K, FlowWithSource[U, U])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. This means + * that for the following series of predicate values, three substreams will + * be produced with lengths 1, 2, and 3: + * + * {{{ + * false, // element goes into first substream + * true, false, // elements go into second substream + * true, false, false // elements go into third substream + * }}} + */ + def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[In, FlowWithSource[U, U]] = + andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) } /** diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala new file mode 100644 index 0000000000..009815601d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.AkkaSpec +import org.reactivestreams.Publisher +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowSplitWhenSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) + + case class StreamPuppet(p: Publisher[Int]) { + val probe = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(probe) + val subscription = probe.expectSubscription() + + def request(demand: Int): Unit = subscription.request(demand) + def expectNext(elem: Int): Unit = probe.expectNext(elem) + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + def expectComplete(): Unit = probe.expectComplete() + def cancel(): Unit = subscription.cancel() + } + + class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { + val source = FlowFrom((1 to elementCount).iterator) + val groupStream = source.splitWhen(_ == splitWhen).toPublisher() + val masterSubscriber = StreamTestKit.SubscriberProbe[FlowWithSource[Int, Int]]() + + groupStream.subscribe(masterSubscriber) + val masterSubscription = masterSubscriber.expectSubscription() + + def getSubFlow(): FlowWithSource[Int, Int] = { + masterSubscription.request(1) + expectSubPublisher() + } + + def expectSubPublisher(): FlowWithSource[Int, Int] = { + val substream = masterSubscriber.expectNext() + substream + } + + } + + "splitWhen" must { + + "work in the happy case" in new SubstreamsSupport(elementCount = 4) { + val s1 = StreamPuppet(getSubFlow().toPublisher()) + masterSubscriber.expectNoMsg(100.millis) + + s1.request(2) + s1.expectNext(1) + s1.expectNext(2) + s1.request(1) + s1.expectComplete() + + val s2 = StreamPuppet(getSubFlow().toPublisher()) + + s2.request(1) + s2.expectNext(3) + s2.expectNoMsg(100.millis) + + s2.request(1) + s2.expectNext(4) + s2.request(1) + s2.expectComplete() + + masterSubscriber.expectComplete() + } + + "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubFlow().toPublisher()) + s1.cancel() + val s2 = StreamPuppet(getSubFlow().toPublisher()) + + s2.request(4) + s2.expectNext(5) + s2.expectNext(6) + s2.expectNext(7) + s2.expectNext(8) + s2.request(1) + s2.expectComplete() + + masterSubscription.request(1) + masterSubscriber.expectComplete() + } + + "support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubFlow().toPublisher()) + masterSubscription.cancel() + s1.request(4) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectNext(4) + s1.request(1) + s1.expectComplete() + } + + } + +}