+str #16885 add splitAfter

Implementation is shared with splitWhen - see Split / SplitWhere
Allows for easy addition of splitWhen(x => Decision) if we'd like to
Resolves #16885
This commit is contained in:
Konrad Malawski 2015-04-14 13:44:24 +02:00
parent 454a393af1
commit 5e8ff792a0
12 changed files with 709 additions and 216 deletions

View file

@ -95,14 +95,15 @@ nested streams and turn them into a stream of elements instead (flattening).
**It is currently not possible to build custom nesting or flattening stages**
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed
groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_
splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
===================== ========================================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================================= ============================================================================================================================== =====================================================================================
prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed
groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_
splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
splitAfter an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete
===================== ========================================================================================================================================= ============================================================================================================================== =====================================================================================
Fan-in stages
^^^^^^^^^^^^^

View file

@ -4,28 +4,28 @@
package akka.http.impl.engine.parsing
import akka.http.ParserSettings
import akka.http.scaladsl.util.FastFuture
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.Future
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
import org.scalatest.matchers.Matcher
import akka.actor.ActorSystem
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.scaladsl.FlattenStrategy
import akka.stream.ActorFlowMaterializer
import akka.http.scaladsl.util.FastFuture._
import akka.http.scaladsl.model._
import akka.http.ParserSettings
import akka.http.impl.engine.parsing.ParserOutput._
import akka.http.impl.util._
import headers._
import MediaTypes._
import HttpMethods._
import HttpProtocols._
import StatusCodes._
import HttpEntity._
import ParserOutput._
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.HttpProtocols._
import akka.http.scaladsl.model.MediaTypes._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.util.FastFuture
import akka.http.scaladsl.util.FastFuture._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ FlattenStrategy, _ }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.matchers.Matcher
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""

View file

@ -216,6 +216,44 @@ public class FlowTest extends StreamTest {
}
@Test
public void mustBeAbleToUseSplitAfter() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F");
final Flow<String, Source<String, BoxedUnit>, ?> flow = Flow.of(String.class).splitAfter(new Predicate<String>() {
public boolean test(String elem) {
return elem.equals(".");
}
});
Source.from(input).via(flow).runForeach(new Procedure<Source<String, BoxedUnit>>() {
@Override
public void apply(Source<String, BoxedUnit> subStream) throws Exception {
subStream.grouped(10).runForeach(new Procedure<List<String>>() {
@Override
public void apply(List<String> chunk) throws Exception {
probe.getRef().tell(chunk, ActorRef.noSender());
}
}, materializer);
}
}, materializer);
for (Object o : probe.receiveN(3)) {
@SuppressWarnings("unchecked")
List<String> chunk = (List<String>) o;
if (chunk.get(0).equals("A")) {
assertEquals(Arrays.asList("A", "B", "C", "."), chunk);
} else if (chunk.get(0).equals("D")) {
assertEquals(Arrays.asList("D", "."), chunk);
} else if (chunk.get(0).equals("E")) {
assertEquals(Arrays.asList("E", "F"), chunk);
} else {
assertEquals("[A, B, C, .] or [D, .] or [E, F]", chunk);
}
}
}
public <T> Creator<Stage<T, T>> op() {
return new akka.japi.function.Creator<Stage<T, T>>() {
@Override

View file

@ -0,0 +1,230 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.ActorOperationAttributes
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.Utils._
import org.reactivestreams.Publisher
import scala.concurrent.duration._
class FlowSplitAfterSpec extends AkkaSpec {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = ActorFlowMaterializer(settings)
case class StreamPuppet(p: Publisher[Int]) {
val probe = TestSubscriber.manualProbe[Int]()
p.subscribe(probe)
val subscription = probe.expectSubscription()
def request(demand: Int): Unit = subscription.request(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def expectError(e: Throwable) = probe.expectError(e)
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) {
val source = Source(1 to elementCount)
val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher)
val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]()
groupStream.subscribe(masterSubscriber)
val masterSubscription = masterSubscriber.expectSubscription()
def expectSubFlow(): Source[Int, _] = {
masterSubscription.request(1)
expectSubPublisher()
}
def expectSubPublisher(): Source[Int, _] = {
val substream = masterSubscriber.expectNext()
substream
}
}
"splitAfter" must {
"work in the happy case" in assertAllStagesStopped {
new SubstreamsSupport(3, elementCount = 5) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
masterSubscriber.expectNoMsg(100.millis)
s1.request(2)
s1.expectNext(1)
s1.expectNext(2)
s1.request(1)
s1.expectNext(3)
s1.request(1)
s1.expectComplete()
val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
s2.request(2)
s2.expectNext(4)
s2.expectNext(5)
s2.expectComplete()
masterSubscriber.expectComplete()
}
}
"work when first element is split-by" in assertAllStagesStopped {
new SubstreamsSupport(splitAfter = 1, elementCount = 3) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
masterSubscriber.expectNoMsg(100.millis)
s1.request(3)
s1.expectNext(1)
s1.expectComplete()
val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
s2.request(3)
s2.expectNext(2)
s2.expectNext(3)
s2.expectComplete()
masterSubscriber.expectComplete()
}
}
"support cancelling substreams" in assertAllStagesStopped {
new SubstreamsSupport(splitAfter = 5, elementCount = 8) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
s1.cancel()
val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
s2.request(4)
s2.expectNext(6)
s2.expectNext(7)
s2.expectNext(8)
s2.expectComplete()
masterSubscription.request(1)
masterSubscriber.expectComplete()
}
}
"support cancelling the master stream" in assertAllStagesStopped {
new SubstreamsSupport(splitAfter = 5, elementCount = 8) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher))
masterSubscription.cancel()
s1.request(5)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectNext(4)
s1.expectNext(5)
s1.request(1)
s1.expectComplete()
}
}
"fail stream when splitAfter function throws" in assertAllStagesStopped {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitAfter(elem if (elem == 3) throw exc else elem % 3 == 0)
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val substream = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(10)
substreamPuppet.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet.expectNext(2)
upstreamSubscription.sendNext(3)
subscriber.expectError(exc)
substreamPuppet.expectError(exc)
upstreamSubscription.expectCancellation()
}
"resume stream when splitAfter function throws" in assertAllStagesStopped {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitAfter(elem if (elem == 3) throw exc else elem % 3 == 0)
.withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider))
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val substream1 = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet1.expectNext(2)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
upstreamSubscription.sendNext(5)
substreamPuppet1.expectNext(5)
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
upstreamSubscription.sendNext(7)
substreamPuppet2.expectNext(7)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
}
"pass along early cancellation" in assertAllStagesStopped {
val up = TestPublisher.manualProbe[Int]()
val down = TestSubscriber.manualProbe[Source[Int, Unit]]()
val flowSubscriber = Source.subscriber[Int].splitAfter(_ % 3 == 0).to(Sink(down)).run()
val downstream = down.expectSubscription()
downstream.cancel()
up.subscribe(flowSubscriber)
val upsub = up.expectSubscription()
upsub.expectCancellation()
}
}
}

View file

@ -1,16 +1,17 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import org.reactivestreams.Publisher
import akka.stream.ActorOperationAttributes
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.reactivestreams.Publisher
import scala.concurrent.duration._
class FlowSplitWhenSpec extends AkkaSpec {
@ -80,6 +81,21 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
}
"work when first element is split-by" in assertAllStagesStopped {
new SubstreamsSupport(1, elementCount = 3) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
masterSubscriber.expectNoMsg(100.millis)
s1.request(5)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectComplete()
masterSubscriber.expectComplete()
}
}
"support cancelling substreams" in assertAllStagesStopped {
new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
@ -98,111 +114,110 @@ class FlowSplitWhenSpec extends AkkaSpec {
masterSubscriber.expectComplete()
}
}
}
"support cancelling the master stream" in assertAllStagesStopped {
new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
masterSubscription.cancel()
s1.request(4)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectNext(4)
s1.request(1)
s1.expectComplete()
}
"support cancelling the master stream" in assertAllStagesStopped {
new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher))
masterSubscription.cancel()
s1.request(4)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectNext(4)
s1.request(1)
s1.expectComplete()
}
}
"fail stream when splitWhen function throws" in assertAllStagesStopped {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
"fail stream when splitWhen function throws" in assertAllStagesStopped {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
upstreamSubscription.sendNext(1)
val substream = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
val substream = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(10)
substreamPuppet.expectNext(1)
substreamPuppet.request(10)
substreamPuppet.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet.expectNext(2)
upstreamSubscription.sendNext(2)
substreamPuppet.expectNext(2)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(3)
subscriber.expectError(exc)
substreamPuppet.expectError(exc)
upstreamSubscription.expectCancellation()
}
subscriber.expectError(exc)
substreamPuppet.expectError(exc)
upstreamSubscription.expectCancellation()
}
"resume stream when splitWhen function throws" in {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider))
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
"resume stream when splitWhen function throws" in assertAllStagesStopped {
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider))
.runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
upstreamSubscription.sendNext(1)
val substream1 = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
val substream1 = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet1.expectNext(2)
upstreamSubscription.sendNext(2)
substreamPuppet1.expectNext(2)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
upstreamSubscription.sendNext(5)
substreamPuppet1.expectNext(5)
upstreamSubscription.sendNext(5)
substreamPuppet1.expectNext(5)
upstreamSubscription.sendNext(6)
substreamPuppet1.expectComplete()
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(6)
upstreamSubscription.sendNext(6)
substreamPuppet1.expectComplete()
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(6)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
}
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
}
"pass along early cancellation" in assertAllStagesStopped {
val up = TestPublisher.manualProbe[Int]()
val down = TestSubscriber.manualProbe[Source[Int, Unit]]()
"pass along early cancellation" in assertAllStagesStopped {
val up = TestPublisher.manualProbe[Int]()
val down = TestSubscriber.manualProbe[Source[Int, Unit]]()
val flowSubscriber = Source.subscriber[Int].splitWhen(_ % 3 == 0).to(Sink(down)).run()
val downstream = down.expectSubscription()
downstream.cancel()
up.subscribe(flowSubscriber)
val upsub = up.expectSubscription()
upsub.expectCancellation()
}
val flowSubscriber = Source.subscriber[Int].splitWhen(_ % 3 == 0).to(Sink(down)).run()
val downstream = down.expectSubscription()
downstream.cancel()
up.subscribe(flowSubscriber)
val upsub = up.expectSubscription()
upsub.expectCancellation()
}
}

View file

@ -309,7 +309,7 @@ private[akka] object ActorProcessorFactory {
case Log(n, e, l, _) (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ())
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case SplitWhen(p, _) (SplitWhenProcessorImpl.props(settings, p), ())
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ())
case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ())
case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())

View file

@ -1,97 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.util.control.NonFatal
import akka.actor.Props
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision
import akka.stream.scaladsl.Source
/**
* INTERNAL API
*/
private[akka] object SplitWhenProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any Boolean): Props =
Props(new SplitWhenProcessorImpl(settings, splitPredicate))
private trait SplitDecision
private case object Split extends SplitDecision
private case object Continue extends SplitDecision
private case object Drop extends SplitDecision
}
/**
* INTERNAL API
*/
private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
import MultiStreamOutputProcessor._
import SplitWhenProcessorImpl._
val decider = settings.supervisionDecider
var currentSubstream: SubstreamOutput = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
nextPhase(openSubstream(primaryInputs.dequeueInputElement()))
}
def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
val substreamOutput = createSubstreamOutput()
val substreamFlow = Source(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement(substreamFlow)
currentSubstream = substreamOutput
nextPhase(serveSubstreamFirst(currentSubstream, elem))
}
// Serving the substream is split into two phases to minimize elements "held in hand"
def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
nextPhase(serveSubstreamRest(substream))
}
// Note that this phase is allocated only once per _slice_ and not per element
def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue substream.enqueueOutputElement(elem)
case Split
completeSubstreamOutput(currentSubstream.key)
currentSubstream = null
nextPhase(openSubstream(elem))
case Drop // drop elem and continue
}
}
// Ignore elements for a cancelled substream until a new substream needs to be opened
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue | Drop // ignore elem
case Split nextPhase(openSubstream(elem))
}
}
private def decideSplit(elem: Any): SplitDecision =
try if (splitPredicate(elem)) Split else Continue catch {
case NonFatal(e) if decider(e) != Supervision.Stop
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage)
Drop
}
initialPhase(1, waitFirst)
override def completeSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.completeSubstreamOutput(substream)
}
override def cancelSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.cancelSubstreamOutput(substream)
}
}

View file

@ -0,0 +1,153 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.Props
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.scaladsl.Source
import akka.stream.{ ActorFlowMaterializerSettings, Supervision }
import scala.util.control.NonFatal
/** INTERNAL API */
private[akka] object SplitDecision {
sealed abstract class SplitDecision
/** Splits before the current element. The current element will be the first element in the new substream. */
case object SplitBefore extends SplitDecision
/** Splits after the current element. The current element will be the last element in the current substream. */
case object SplitAfter extends SplitDecision
/** Emit this element into the current substream. */
case object Continue extends SplitDecision
/**
* Drop this element without signalling it to any substream.
* TODO: Dropping is currently not exposed in an usable way - we would have to expose splitWhen(x => SplitDecision), to be decided if we want this
*/
private[impl] case object Drop extends SplitDecision
}
/**
* INTERNAL API
*/
private[akka] object SplitWhereProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any SplitDecision): Props =
Props(new SplitWhereProcessorImpl(settings, in splitPredicate(in)))
}
/**
* INTERNAL API
*/
private[akka] class SplitWhereProcessorImpl(_settings: ActorFlowMaterializerSettings, val splitPredicate: Any SplitDecision)
extends MultiStreamOutputProcessor(_settings) {
import MultiStreamOutputProcessor._
import SplitDecision._
/**
* `firstElement` is needed in case a SplitBefore is signalled, and the first element matches
* We do not want to emit an "empty stream" then followed by the "split", but we do want to start the stream
* from the first element (as if no split was applied): [0,1,2,0].splitWhen(_ == 0) => [0,1,2], [0]
*/
var firstElement = true
val decider = settings.supervisionDecider
var currentSubstream: SubstreamOutput = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue nextPhase(openSubstream(serveSubstreamFirst(_, elem)))
case SplitAfter nextPhase(openSubstream(completeSubstream(_, elem)))
case SplitBefore nextPhase(openSubstream(serveSubstreamFirst(_, elem)))
case Drop // stay in waitFirst
}
}
private def openSubstream(andThen: SubstreamOutput TransferPhase): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
val substreamOutput = createSubstreamOutput()
val substreamFlow = Source(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement(substreamFlow)
currentSubstream = substreamOutput
nextPhase(andThen(currentSubstream))
}
// Serving the substream is split into two phases to minimize elements "held in hand"
private def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
firstElement = false
substream.enqueueOutputElement(elem)
nextPhase(serveSubstreamRest(substream))
}
// Signal given element to substream and complete it
private def completeSubstream(substream: SubstreamOutput, elem: Any): TransferPhase = TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
completeSubstreamOutput(currentSubstream.key)
nextPhase(waitFirst)
}
// Note that this phase is allocated only once per _slice_ and not per element
private def serveSubstreamRest(substream: SubstreamOutput): TransferPhase = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue
substream.enqueueOutputElement(elem)
case SplitAfter
substream.enqueueOutputElement(elem)
completeSubstreamOutput(currentSubstream.key)
currentSubstream = null
nextPhase(openSubstream(serveSubstreamRest))
case SplitBefore if firstElement
currentSubstream.enqueueOutputElement(elem)
completeSubstreamOutput(currentSubstream.key)
currentSubstream = null
nextPhase(openSubstream(serveSubstreamRest))
case SplitBefore
completeSubstreamOutput(currentSubstream.key)
currentSubstream = null
nextPhase(openSubstream(serveSubstreamFirst(_, elem)))
case Drop
// drop elem and continue
}
firstElement = false
}
// Ignore elements for a cancelled substream until a new substream needs to be opened
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue | Drop // ignore elem
case SplitBefore nextPhase(openSubstream(serveSubstreamFirst(_, elem)))
case SplitAfter nextPhase(openSubstream(serveSubstreamRest))
}
}
private def decideSplit(elem: Any): SplitDecision =
try splitPredicate(elem) catch {
case NonFatal(e) if decider(e) != Supervision.Stop
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage)
Drop
}
initialPhase(1, waitFirst)
override def completeSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.completeSubstreamOutput(substream)
}
override def cancelSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.cancelSubstreamOutput(substream)
}
}

View file

@ -4,6 +4,7 @@
package akka.stream.impl
import akka.event.{ LoggingAdapter, Logging }
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.{ OverflowStrategy, TimerTransformer }
import akka.stream.OperationAttributes
import akka.stream.OperationAttributes._
@ -38,7 +39,7 @@ private[stream] object Stages {
val mapConcat = name("mapConcat")
val groupBy = name("groupBy")
val prefixAndTail = name("prefixAndTail")
val splitWhen = name("splitWhen")
val split = name("split")
val concatAll = name("concatAll")
val processor = name("processor")
val processorWithKey = name("processorWithKey")
@ -208,7 +209,7 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
final case class SplitWhen(p: Any Boolean, attributes: OperationAttributes = splitWhen) extends StageModule {
final case class Split(p: Any SplitDecision, attributes: OperationAttributes = split) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}

View file

@ -562,6 +562,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* true, false, false // elements go into third substream
* }}}
*
* In case the *first* element of the stream matches the predicate, the first
* substream emitted by splitWhen will start from that element. For example:
*
* {{{
* true, false, false // first substream starts from the split-by element
* true, false // subsequent substreams operate the same way
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
@ -583,6 +591,41 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
def splitWhen(p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] =
new Flow(delegate.splitWhen(p.test).map(_.asJava))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true. This means that for the following series of predicate values,
* three substreams will be produced with lengths 2, 2, and 3:
*
* {{{
* false, true, // elements go into first substream
* false, true, // elements go into second substream
* false, false, true // elements go into third substream
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
* and opens a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[Flow.splitWhen]].
*/
def splitAfter[U >: Out](p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] =
new Flow(delegate.splitAfter(p.test).map(_.asJava))
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]].

View file

@ -509,10 +509,71 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*
* In case the *first* element of the stream matches the predicate, the first
* substream emitted by splitWhen will start from that element. For example:
*
* {{{
* true, false, false // first substream starts from the split-by element
* true, false // subsequent substreams operate the same way
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the provided predicate is true, opening and emitting a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[Source.splitAfter]].
*/
def splitWhen(p: function.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] =
new Source(delegate.splitWhen(p.test).map(_.asJava))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true. This means that for the following series of predicate values,
* three substreams will be produced with lengths 2, 2, and 3:
*
* {{{
* false, true, // elements go into first substream
* false, true, // elements go into second substream
* false, false, true // elements go into third substream
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
* and opens a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[Source.splitWhen]].
*/
def splitAfter[U >: Out](p: function.Predicate[Out]): javadsl.Source[Source[Out, Unit], Mat] =
new Source(delegate.splitAfter(p.test).map(_.asJava))
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]].

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.impl.SplitDecision._
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
@ -784,6 +785,14 @@ trait FlowOps[+Out, +Mat] {
* true, false, false // elements go into third substream
* }}}
*
* In case the *first* element of the stream matches the predicate, the first
* substream emitted by splitWhen will start from that element. For example:
*
* {{{
* true, false, false // first substream starts from the split-by element
* true, false // subsequent substreams operate the same way
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
@ -803,8 +812,47 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels and substreams cancel
*
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U, Unit], Mat] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
def splitWhen[U >: Out](p: Out Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = {
val f = p.asInstanceOf[Any Boolean]
withAttributes(name("splitWhen")).andThen(Split(el if (f(el)) SplitBefore else Continue))
}
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true. This means that for the following series of predicate values,
* three substreams will be produced with lengths 2, 2, and 3:
*
* {{{
* false, true, // elements go into first substream
* false, true, // elements go into second substream
* false, false, true // elements go into third substream
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
* and opens a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[FlowOps.splitAfter]].
*/
def splitAfter[U >: Out](p: Out Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = {
val f = p.asInstanceOf[Any Boolean]
withAttributes(name("splitAfter")).andThen(Split(el if (f(el)) SplitAfter else Continue))
}
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.