From 5e8ff792a041da21d73245c8aec5285c2c4bb416 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 14 Apr 2015 13:44:24 +0200 Subject: [PATCH] +str #16885 add splitAfter Implementation is shared with splitWhen - see Split / SplitWhere Allows for easy addition of splitWhen(x => Decision) if we'd like to Resolves #16885 --- akka-docs-dev/rst/stages-overview.rst | 17 +- .../engine/parsing/RequestParserSpec.scala | 40 +-- .../java/akka/stream/javadsl/FlowTest.java | 38 +++ .../stream/scaladsl/FlowSplitAfterSpec.scala | 230 ++++++++++++++++++ .../stream/scaladsl/FlowSplitWhenSpec.scala | 187 +++++++------- .../impl/ActorFlowMaterializerImpl.scala | 2 +- .../stream/impl/SplitWhenProcessorImpl.scala | 97 -------- .../stream/impl/SplitWhereProcessorImpl.scala | 153 ++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 5 +- .../main/scala/akka/stream/javadsl/Flow.scala | 43 ++++ .../scala/akka/stream/javadsl/Source.scala | 61 +++++ .../scala/akka/stream/scaladsl/Flow.scala | 52 +++- 12 files changed, 709 insertions(+), 216 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index ce79a57f03..1dcabbffae 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -95,14 +95,15 @@ nested streams and turn them into a stream of elements instead (flattening). **It is currently not possible to build custom nesting or flattening stages** -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed -groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_ -splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_ -flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== +===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== +Stage Emits when Backpressures when Completes when +===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== +prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed +groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_ +splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_ +splitAfter an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_ +flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete +===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== Fan-in stages ^^^^^^^^^^^^^ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index bc613d14f0..70bdd30d5f 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -4,28 +4,28 @@ package akka.http.impl.engine.parsing -import akka.http.ParserSettings -import akka.http.scaladsl.util.FastFuture -import com.typesafe.config.{ ConfigFactory, Config } -import scala.concurrent.Future -import scala.concurrent.duration._ -import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } -import org.scalatest.matchers.Matcher import akka.actor.ActorSystem -import akka.util.ByteString -import akka.stream.scaladsl._ -import akka.stream.scaladsl.FlattenStrategy -import akka.stream.ActorFlowMaterializer -import akka.http.scaladsl.util.FastFuture._ -import akka.http.scaladsl.model._ +import akka.http.ParserSettings +import akka.http.impl.engine.parsing.ParserOutput._ import akka.http.impl.util._ -import headers._ -import MediaTypes._ -import HttpMethods._ -import HttpProtocols._ -import StatusCodes._ -import HttpEntity._ -import ParserOutput._ +import akka.http.scaladsl.model.HttpEntity._ +import akka.http.scaladsl.model.HttpMethods._ +import akka.http.scaladsl.model.HttpProtocols._ +import akka.http.scaladsl.model.MediaTypes._ +import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.util.FastFuture +import akka.http.scaladsl.util.FastFuture._ +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl.{ FlattenStrategy, _ } +import akka.util.ByteString +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.matchers.Matcher +import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 66ae9a0c41..e8da3ae3cc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -216,6 +216,44 @@ public class FlowTest extends StreamTest { } + @Test + public void mustBeAbleToUseSplitAfter() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F"); + final Flow, ?> flow = Flow.of(String.class).splitAfter(new Predicate() { + public boolean test(String elem) { + return elem.equals("."); + } + }); + + Source.from(input).via(flow).runForeach(new Procedure>() { + @Override + public void apply(Source subStream) throws Exception { + subStream.grouped(10).runForeach(new Procedure>() { + @Override + public void apply(List chunk) throws Exception { + probe.getRef().tell(chunk, ActorRef.noSender()); + } + }, materializer); + } + }, materializer); + + for (Object o : probe.receiveN(3)) { + @SuppressWarnings("unchecked") + List chunk = (List) o; + if (chunk.get(0).equals("A")) { + assertEquals(Arrays.asList("A", "B", "C", "."), chunk); + } else if (chunk.get(0).equals("D")) { + assertEquals(Arrays.asList("D", "."), chunk); + } else if (chunk.get(0).equals("E")) { + assertEquals(Arrays.asList("E", "F"), chunk); + } else { + assertEquals("[A, B, C, .] or [D, .] or [E, F]", chunk); + } + } + + } + public Creator> op() { return new akka.japi.function.Creator>() { @Override diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala new file mode 100644 index 0000000000..6655936b85 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -0,0 +1,230 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.ActorOperationAttributes +import akka.stream.Supervision.resumingDecider +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.Utils._ +import org.reactivestreams.Publisher + +import scala.concurrent.duration._ + +class FlowSplitAfterSpec extends AkkaSpec { + + val settings = ActorFlowMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = ActorFlowMaterializer(settings) + + case class StreamPuppet(p: Publisher[Int]) { + val probe = TestSubscriber.manualProbe[Int]() + p.subscribe(probe) + val subscription = probe.expectSubscription() + + def request(demand: Int): Unit = subscription.request(demand) + def expectNext(elem: Int): Unit = probe.expectNext(elem) + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + def expectComplete(): Unit = probe.expectComplete() + def expectError(e: Throwable) = probe.expectError(e) + def cancel(): Unit = subscription.cancel() + } + + class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) { + val source = Source(1 to elementCount) + val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher) + val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() + + groupStream.subscribe(masterSubscriber) + val masterSubscription = masterSubscriber.expectSubscription() + + def expectSubFlow(): Source[Int, _] = { + masterSubscription.request(1) + expectSubPublisher() + } + + def expectSubPublisher(): Source[Int, _] = { + val substream = masterSubscriber.expectNext() + substream + } + + } + + "splitAfter" must { + + "work in the happy case" in assertAllStagesStopped { + new SubstreamsSupport(3, elementCount = 5) { + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + masterSubscriber.expectNoMsg(100.millis) + + s1.request(2) + s1.expectNext(1) + s1.expectNext(2) + s1.request(1) + s1.expectNext(3) + s1.request(1) + s1.expectComplete() + + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + + s2.request(2) + s2.expectNext(4) + s2.expectNext(5) + s2.expectComplete() + + masterSubscriber.expectComplete() + } + } + + "work when first element is split-by" in assertAllStagesStopped { + new SubstreamsSupport(splitAfter = 1, elementCount = 3) { + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + masterSubscriber.expectNoMsg(100.millis) + + s1.request(3) + s1.expectNext(1) + s1.expectComplete() + + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + + s2.request(3) + s2.expectNext(2) + s2.expectNext(3) + s2.expectComplete() + + masterSubscriber.expectComplete() + } + } + + "support cancelling substreams" in assertAllStagesStopped { + new SubstreamsSupport(splitAfter = 5, elementCount = 8) { + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + s1.cancel() + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + + s2.request(4) + s2.expectNext(6) + s2.expectNext(7) + s2.expectNext(8) + s2.expectComplete() + + masterSubscription.request(1) + masterSubscriber.expectComplete() + } + } + + "support cancelling the master stream" in assertAllStagesStopped { + new SubstreamsSupport(splitAfter = 5, elementCount = 8) { + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + masterSubscription.cancel() + s1.request(5) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectNext(4) + s1.expectNext(5) + s1.request(1) + s1.expectComplete() + } + } + + "fail stream when splitAfter function throws" in assertAllStagesStopped { + val publisherProbeProbe = TestPublisher.manualProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .runWith(Sink.publisher) + val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val substream = subscriber.expectNext() + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + + substreamPuppet.request(10) + substreamPuppet.expectNext(1) + + upstreamSubscription.sendNext(2) + substreamPuppet.expectNext(2) + + upstreamSubscription.sendNext(3) + + subscriber.expectError(exc) + substreamPuppet.expectError(exc) + upstreamSubscription.expectCancellation() + } + + "resume stream when splitAfter function throws" in assertAllStagesStopped { + val publisherProbeProbe = TestPublisher.manualProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider)) + .runWith(Sink.publisher) + val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val substream1 = subscriber.expectNext() + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + + substreamPuppet1.request(10) + substreamPuppet1.expectNext(1) + + upstreamSubscription.sendNext(2) + substreamPuppet1.expectNext(2) + + upstreamSubscription.sendNext(3) + upstreamSubscription.sendNext(4) + substreamPuppet1.expectNext(4) // note that 3 was dropped + + upstreamSubscription.sendNext(5) + substreamPuppet1.expectNext(5) + + upstreamSubscription.sendNext(6) + substreamPuppet1.expectNext(6) + substreamPuppet1.expectComplete() + val substream2 = subscriber.expectNext() + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + substreamPuppet2.request(10) + upstreamSubscription.sendNext(7) + substreamPuppet2.expectNext(7) + + upstreamSubscription.sendComplete() + subscriber.expectComplete() + substreamPuppet2.expectComplete() + } + + "pass along early cancellation" in assertAllStagesStopped { + val up = TestPublisher.manualProbe[Int]() + val down = TestSubscriber.manualProbe[Source[Int, Unit]]() + + val flowSubscriber = Source.subscriber[Int].splitAfter(_ % 3 == 0).to(Sink(down)).run() + + val downstream = down.expectSubscription() + downstream.cancel() + up.subscribe(flowSubscriber) + val upsub = up.expectSubscription() + upsub.expectCancellation() + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index ac15cc846f..0a00fdf4b8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -1,16 +1,17 @@ /** - * Copyright (C) 2009-2014 Typesafe Inc. + * Copyright (C-2015 Typesafe Inc. */ package akka.stream.scaladsl -import scala.concurrent.duration._ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings -import akka.stream.Supervision.resumingDecider -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import org.reactivestreams.Publisher import akka.stream.ActorOperationAttributes +import akka.stream.Supervision.resumingDecider +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import org.reactivestreams.Publisher + +import scala.concurrent.duration._ class FlowSplitWhenSpec extends AkkaSpec { @@ -80,6 +81,21 @@ class FlowSplitWhenSpec extends AkkaSpec { } } + "work when first element is split-by" in assertAllStagesStopped { + new SubstreamsSupport(1, elementCount = 3) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + masterSubscriber.expectNoMsg(100.millis) + + s1.request(5) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectComplete() + + masterSubscriber.expectComplete() + } + } + "support cancelling substreams" in assertAllStagesStopped { new SubstreamsSupport(splitWhen = 5, elementCount = 8) { val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) @@ -98,111 +114,110 @@ class FlowSplitWhenSpec extends AkkaSpec { masterSubscriber.expectComplete() } } + } - "support cancelling the master stream" in assertAllStagesStopped { - new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) - masterSubscription.cancel() - s1.request(4) - s1.expectNext(1) - s1.expectNext(2) - s1.expectNext(3) - s1.expectNext(4) - s1.request(1) - s1.expectComplete() - } + "support cancelling the master stream" in assertAllStagesStopped { + new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + masterSubscription.cancel() + s1.request(4) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectNext(4) + s1.request(1) + s1.expectComplete() } + } - "fail stream when splitWhen function throws" in assertAllStagesStopped { - val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val exc = TE("test") - val publisher = Source(publisherProbeProbe) - .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher) - val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() - publisher.subscribe(subscriber) + "fail stream when splitWhen function throws" in assertAllStagesStopped { + val publisherProbeProbe = TestPublisher.manualProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .runWith(Sink.publisher) + val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() + publisher.subscribe(subscriber) - val upstreamSubscription = publisherProbeProbe.expectSubscription() + val upstreamSubscription = publisherProbeProbe.expectSubscription() - val downstreamSubscription = subscriber.expectSubscription() - downstreamSubscription.request(100) + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) - upstreamSubscription.sendNext(1) + upstreamSubscription.sendNext(1) - val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + val substream = subscriber.expectNext() + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) - substreamPuppet.request(10) - substreamPuppet.expectNext(1) + substreamPuppet.request(10) + substreamPuppet.expectNext(1) - upstreamSubscription.sendNext(2) - substreamPuppet.expectNext(2) + upstreamSubscription.sendNext(2) + substreamPuppet.expectNext(2) - upstreamSubscription.sendNext(3) + upstreamSubscription.sendNext(3) - subscriber.expectError(exc) - substreamPuppet.expectError(exc) - upstreamSubscription.expectCancellation() - } + subscriber.expectError(exc) + substreamPuppet.expectError(exc) + upstreamSubscription.expectCancellation() + } - "resume stream when splitWhen function throws" in { - val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val exc = TE("test") - val publisher = Source(publisherProbeProbe) - .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher) - val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() - publisher.subscribe(subscriber) + "resume stream when splitWhen function throws" in assertAllStagesStopped { + val publisherProbeProbe = TestPublisher.manualProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider)) + .runWith(Sink.publisher) + val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() + publisher.subscribe(subscriber) - val upstreamSubscription = publisherProbeProbe.expectSubscription() + val upstreamSubscription = publisherProbeProbe.expectSubscription() - val downstreamSubscription = subscriber.expectSubscription() - downstreamSubscription.request(100) + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) - upstreamSubscription.sendNext(1) + upstreamSubscription.sendNext(1) - val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + val substream1 = subscriber.expectNext() + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) - substreamPuppet1.request(10) - substreamPuppet1.expectNext(1) + substreamPuppet1.request(10) + substreamPuppet1.expectNext(1) - upstreamSubscription.sendNext(2) - substreamPuppet1.expectNext(2) + upstreamSubscription.sendNext(2) + substreamPuppet1.expectNext(2) - upstreamSubscription.sendNext(3) - upstreamSubscription.sendNext(4) - substreamPuppet1.expectNext(4) // note that 3 was dropped + upstreamSubscription.sendNext(3) + upstreamSubscription.sendNext(4) + substreamPuppet1.expectNext(4) // note that 3 was dropped - upstreamSubscription.sendNext(5) - substreamPuppet1.expectNext(5) + upstreamSubscription.sendNext(5) + substreamPuppet1.expectNext(5) - upstreamSubscription.sendNext(6) - substreamPuppet1.expectComplete() - val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) - substreamPuppet2.request(10) - substreamPuppet2.expectNext(6) + upstreamSubscription.sendNext(6) + substreamPuppet1.expectComplete() + val substream2 = subscriber.expectNext() + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + substreamPuppet2.request(10) + substreamPuppet2.expectNext(6) - upstreamSubscription.sendComplete() - subscriber.expectComplete() - substreamPuppet2.expectComplete() - } + upstreamSubscription.sendComplete() + subscriber.expectComplete() + substreamPuppet2.expectComplete() + } - "pass along early cancellation" in assertAllStagesStopped { - val up = TestPublisher.manualProbe[Int]() - val down = TestSubscriber.manualProbe[Source[Int, Unit]]() + "pass along early cancellation" in assertAllStagesStopped { + val up = TestPublisher.manualProbe[Int]() + val down = TestSubscriber.manualProbe[Source[Int, Unit]]() - val flowSubscriber = Source.subscriber[Int].splitWhen(_ % 3 == 0).to(Sink(down)).run() - - val downstream = down.expectSubscription() - downstream.cancel() - up.subscribe(flowSubscriber) - val upsub = up.expectSubscription() - upsub.expectCancellation() - } + val flowSubscriber = Source.subscriber[Int].splitWhen(_ % 3 == 0).to(Sink(down)).run() + val downstream = down.expectSubscription() + downstream.cancel() + up.subscribe(flowSubscriber) + val upsub = up.expectSubscription() + upsub.expectCancellation() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index ee1671dedc..9dce508c39 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -309,7 +309,7 @@ private[akka] object ActorProcessorFactory { case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) - case SplitWhen(p, _) ⇒ (SplitWhenProcessorImpl.props(settings, p), ()) + case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ()) case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala deleted file mode 100644 index c159c28a9a..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.util.control.NonFatal -import akka.actor.Props -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.Supervision -import akka.stream.scaladsl.Source - -/** - * INTERNAL API - */ -private[akka] object SplitWhenProcessorImpl { - def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any ⇒ Boolean): Props = - Props(new SplitWhenProcessorImpl(settings, splitPredicate)) - - private trait SplitDecision - private case object Split extends SplitDecision - private case object Continue extends SplitDecision - private case object Drop extends SplitDecision -} - -/** - * INTERNAL API - */ -private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSettings, val splitPredicate: Any ⇒ Boolean) - extends MultiStreamOutputProcessor(_settings) { - - import MultiStreamOutputProcessor._ - import SplitWhenProcessorImpl._ - - val decider = settings.supervisionDecider - var currentSubstream: SubstreamOutput = _ - - val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - nextPhase(openSubstream(primaryInputs.dequeueInputElement())) - } - - def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - val substreamOutput = createSubstreamOutput() - val substreamFlow = Source(substreamOutput) // substreamOutput is a Publisher - primaryOutputs.enqueueOutputElement(substreamFlow) - currentSubstream = substreamOutput - nextPhase(serveSubstreamFirst(currentSubstream, elem)) - } - - // Serving the substream is split into two phases to minimize elements "held in hand" - def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ - substream.enqueueOutputElement(elem) - nextPhase(serveSubstreamRest(substream)) - } - - // Note that this phase is allocated only once per _slice_ and not per element - def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - decideSplit(elem) match { - case Continue ⇒ substream.enqueueOutputElement(elem) - case Split ⇒ - completeSubstreamOutput(currentSubstream.key) - currentSubstream = null - nextPhase(openSubstream(elem)) - case Drop ⇒ // drop elem and continue - } - } - - // Ignore elements for a cancelled substream until a new substream needs to be opened - val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - decideSplit(elem) match { - case Continue | Drop ⇒ // ignore elem - case Split ⇒ nextPhase(openSubstream(elem)) - } - } - - private def decideSplit(elem: Any): SplitDecision = - try if (splitPredicate(elem)) Split else Continue catch { - case NonFatal(e) if decider(e) != Supervision.Stop ⇒ - if (settings.debugLogging) - log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage) - Drop - } - - initialPhase(1, waitFirst) - - override def completeSubstreamOutput(substream: SubstreamKey): Unit = { - if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.completeSubstreamOutput(substream) - } - - override def cancelSubstreamOutput(substream: SubstreamKey): Unit = { - if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.cancelSubstreamOutput(substream) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala new file mode 100644 index 0000000000..899d7e8b7c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.Props +import akka.stream.impl.SplitDecision.SplitDecision +import akka.stream.scaladsl.Source +import akka.stream.{ ActorFlowMaterializerSettings, Supervision } + +import scala.util.control.NonFatal + +/** INTERNAL API */ +private[akka] object SplitDecision { + sealed abstract class SplitDecision + + /** Splits before the current element. The current element will be the first element in the new substream. */ + case object SplitBefore extends SplitDecision + + /** Splits after the current element. The current element will be the last element in the current substream. */ + case object SplitAfter extends SplitDecision + + /** Emit this element into the current substream. */ + case object Continue extends SplitDecision + + /** + * Drop this element without signalling it to any substream. + * TODO: Dropping is currently not exposed in an usable way - we would have to expose splitWhen(x => SplitDecision), to be decided if we want this + */ + private[impl] case object Drop extends SplitDecision +} + +/** + * INTERNAL API + */ +private[akka] object SplitWhereProcessorImpl { + def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any ⇒ SplitDecision): Props = + Props(new SplitWhereProcessorImpl(settings, in ⇒ splitPredicate(in))) +} + +/** + * INTERNAL API + */ +private[akka] class SplitWhereProcessorImpl(_settings: ActorFlowMaterializerSettings, val splitPredicate: Any ⇒ SplitDecision) + extends MultiStreamOutputProcessor(_settings) { + + import MultiStreamOutputProcessor._ + import SplitDecision._ + + /** + * `firstElement` is needed in case a SplitBefore is signalled, and the first element matches + * We do not want to emit an "empty stream" then followed by the "split", but we do want to start the stream + * from the first element (as if no split was applied): [0,1,2,0].splitWhen(_ == 0) => [0,1,2], [0] + */ + var firstElement = true + + val decider = settings.supervisionDecider + var currentSubstream: SubstreamOutput = _ + + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + decideSplit(elem) match { + case Continue ⇒ nextPhase(openSubstream(serveSubstreamFirst(_, elem))) + case SplitAfter ⇒ nextPhase(openSubstream(completeSubstream(_, elem))) + case SplitBefore ⇒ nextPhase(openSubstream(serveSubstreamFirst(_, elem))) + case Drop ⇒ // stay in waitFirst + } + } + + private def openSubstream(andThen: SubstreamOutput ⇒ TransferPhase): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + val substreamOutput = createSubstreamOutput() + val substreamFlow = Source(substreamOutput) // substreamOutput is a Publisher + primaryOutputs.enqueueOutputElement(substreamFlow) + currentSubstream = substreamOutput + + nextPhase(andThen(currentSubstream)) + } + + // Serving the substream is split into two phases to minimize elements "held in hand" + private def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ + firstElement = false + substream.enqueueOutputElement(elem) + nextPhase(serveSubstreamRest(substream)) + } + + // Signal given element to substream and complete it + private def completeSubstream(substream: SubstreamOutput, elem: Any): TransferPhase = TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + completeSubstreamOutput(currentSubstream.key) + nextPhase(waitFirst) + } + + // Note that this phase is allocated only once per _slice_ and not per element + private def serveSubstreamRest(substream: SubstreamOutput): TransferPhase = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + decideSplit(elem) match { + case Continue ⇒ + substream.enqueueOutputElement(elem) + + case SplitAfter ⇒ + substream.enqueueOutputElement(elem) + completeSubstreamOutput(currentSubstream.key) + currentSubstream = null + nextPhase(openSubstream(serveSubstreamRest)) + + case SplitBefore if firstElement ⇒ + currentSubstream.enqueueOutputElement(elem) + completeSubstreamOutput(currentSubstream.key) + currentSubstream = null + nextPhase(openSubstream(serveSubstreamRest)) + + case SplitBefore ⇒ + completeSubstreamOutput(currentSubstream.key) + currentSubstream = null + nextPhase(openSubstream(serveSubstreamFirst(_, elem))) + + case Drop ⇒ + // drop elem and continue + } + firstElement = false + } + + // Ignore elements for a cancelled substream until a new substream needs to be opened + val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + decideSplit(elem) match { + case Continue | Drop ⇒ // ignore elem + case SplitBefore ⇒ nextPhase(openSubstream(serveSubstreamFirst(_, elem))) + case SplitAfter ⇒ nextPhase(openSubstream(serveSubstreamRest)) + } + } + + private def decideSplit(elem: Any): SplitDecision = + try splitPredicate(elem) catch { + case NonFatal(e) if decider(e) != Supervision.Stop ⇒ + if (settings.debugLogging) + log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage) + Drop + } + + initialPhase(1, waitFirst) + + override def completeSubstreamOutput(substream: SubstreamKey): Unit = { + if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) + super.completeSubstreamOutput(substream) + } + + override def cancelSubstreamOutput(substream: SubstreamKey): Unit = { + if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) + super.cancelSubstreamOutput(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 9cb7a39742..31d247ab87 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -4,6 +4,7 @@ package akka.stream.impl import akka.event.{ LoggingAdapter, Logging } +import akka.stream.impl.SplitDecision.SplitDecision import akka.stream.{ OverflowStrategy, TimerTransformer } import akka.stream.OperationAttributes import akka.stream.OperationAttributes._ @@ -38,7 +39,7 @@ private[stream] object Stages { val mapConcat = name("mapConcat") val groupBy = name("groupBy") val prefixAndTail = name("prefixAndTail") - val splitWhen = name("splitWhen") + val split = name("split") val concatAll = name("concatAll") val processor = name("processor") val processorWithKey = name("processorWithKey") @@ -208,7 +209,7 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } - final case class SplitWhen(p: Any ⇒ Boolean, attributes: OperationAttributes = splitWhen) extends StageModule { + final case class Split(p: Any ⇒ SplitDecision, attributes: OperationAttributes = split) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8af5eb1db7..aa4990fa86 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -562,6 +562,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * true, false, false // elements go into third substream * }}} * + * In case the *first* element of the stream matches the predicate, the first + * substream emitted by splitWhen will start from that element. For example: + * + * {{{ + * true, false, false // first substream starts from the split-by element + * true, false // subsequent substreams operate the same way + * }}} + * * If the split predicate `p` throws an exception and the supervision decision * is [[akka.stream.Supervision#stop]] the stream and substreams will be completed * with failure. @@ -583,6 +591,41 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def splitWhen(p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] = new Flow(delegate.splitWhen(p.test).map(_.asJava)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. This means that for the following series of predicate values, + * three substreams will be produced with lengths 2, 2, and 3: + * + * {{{ + * false, true, // elements go into first substream + * false, true, // elements go into second substream + * false, false, true // elements go into third substream + * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. + * + * '''Emits when''' an element passes through. When the provided predicate is true it emitts the element + * and opens a new substream for subsequent element + * + * '''Backpressures when''' there is an element pending for the next substream, but the previous + * is not fully consumed yet, or the substream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels and substreams cancel + * + * See also [[Flow.splitWhen]]. + */ + def splitAfter[U >: Out](p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] = + new Flow(delegate.splitAfter(p.test).map(_.asJava)) + /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index aee303aff8..c545db1963 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -509,10 +509,71 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * true, false, // elements go into second substream * true, false, false // elements go into third substream * }}} + * + * In case the *first* element of the stream matches the predicate, the first + * substream emitted by splitWhen will start from that element. For example: + * + * {{{ + * true, false, false // first substream starts from the split-by element + * true, false // subsequent substreams operate the same way + * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. + * + * '''Emits when''' an element for which the provided predicate is true, opening and emitting a new substream for subsequent element + * + * '''Backpressures when''' there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels and substreams cancel + * + * See also [[Source.splitAfter]]. */ def splitWhen(p: function.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] = new Source(delegate.splitWhen(p.test).map(_.asJava)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. This means that for the following series of predicate values, + * three substreams will be produced with lengths 2, 2, and 3: + * + * {{{ + * false, true, // elements go into first substream + * false, true, // elements go into second substream + * false, false, true // elements go into third substream + * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. + * + * '''Emits when''' an element passes through. When the provided predicate is true it emitts the element + * and opens a new substream for subsequent element + * + * '''Backpressures when''' there is an element pending for the next substream, but the previous + * is not fully consumed yet, or the substream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels and substreams cancel + * + * See also [[Source.splitWhen]]. + */ + def splitAfter[U >: Out](p: function.Predicate[Out]): javadsl.Source[Source[Out, Unit], Mat] = + new Source(delegate.splitAfter(p.test).map(_.asJava)) + /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1944be3772..c4b7296843 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.actor.ActorSystem +import akka.stream.impl.SplitDecision._ import akka.event.LoggingAdapter import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } @@ -784,6 +785,14 @@ trait FlowOps[+Out, +Mat] { * true, false, false // elements go into third substream * }}} * + * In case the *first* element of the stream matches the predicate, the first + * substream emitted by splitWhen will start from that element. For example: + * + * {{{ + * true, false, false // first substream starts from the split-by element + * true, false // subsequent substreams operate the same way + * }}} + * * If the split predicate `p` throws an exception and the supervision decision * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed * with failure. @@ -803,8 +812,47 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels and substreams cancel * */ - def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U, Unit], Mat] = - andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = { + val f = p.asInstanceOf[Any ⇒ Boolean] + withAttributes(name("splitWhen")).andThen(Split(el ⇒ if (f(el)) SplitBefore else Continue)) + } + + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. This means that for the following series of predicate values, + * three substreams will be produced with lengths 2, 2, and 3: + * + * {{{ + * false, true, // elements go into first substream + * false, true, // elements go into second substream + * false, false, true // elements go into third substream + * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. + * + * '''Emits when''' an element passes through. When the provided predicate is true it emitts the element + * and opens a new substream for subsequent element + * + * '''Backpressures when''' there is an element pending for the next substream, but the previous + * is not fully consumed yet, or the substream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels and substreams cancel + * + * See also [[FlowOps.splitAfter]]. + */ + def splitAfter[U >: Out](p: Out ⇒ Boolean): Repr[Out, Mat]#Repr[Source[U, Unit], Mat] = { + val f = p.asInstanceOf[Any ⇒ Boolean] + withAttributes(name("splitAfter")).andThen(Split(el ⇒ if (f(el)) SplitAfter else Continue)) + } /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.