From 6f9438a2b035aa44819ae1a0e378bba01627a5be Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 12 Jun 2015 23:22:36 -0400 Subject: [PATCH] +str #17226 add dropWhile and takeWhile --- akka-docs-dev/rst/stages-overview.rst | 2 + .../java/akka/stream/javadsl/FlowTest.java | 49 ++++++++++++++++++ .../java/akka/stream/javadsl/SourceTest.java | 48 ++++++++++++++++++ .../stream/scaladsl/FlowDropWhileSpec.scala | 48 ++++++++++++++++++ .../stream/scaladsl/FlowTakeWhileSpec.scala | 50 +++++++++++++++++++ .../impl/ActorFlowMaterializerImpl.scala | 2 + .../main/scala/akka/stream/impl/Stages.scala | 12 +++++ .../scala/akka/stream/impl/fusing/Ops.scala | 31 ++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 33 ++++++++++++ .../scala/akka/stream/javadsl/Source.scala | 18 +++++++ .../scala/akka/stream/scaladsl/Flow.scala | 33 ++++++++++++ 11 files changed, 326 insertions(+) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 1dcabbffae..7a319fc2f9 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -37,6 +37,8 @@ grouped the specified number of elements has been accumulated or scan the function scanning the element returns a new element downstream backpressures upstream completes drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes +takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes +dropWhile the predicate returned false and for all following stream elements predicate returned false and downstream backpressures upstream completes ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== Asynchronous processing stages diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index e8da3ae3cc..154c958c7f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -82,6 +82,55 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToUseDropWhile() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)); + final Flow flow = Flow.of(Integer.class).dropWhile + (new Predicate() { + public boolean test(Integer elem) { + return elem < 2; + } + }); + + final Future future = source.via(flow).runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void mustBeAbleToUseTakeWhile() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)); + final Flow flow = Flow.of(Integer.class).takeWhile + (new Predicate() { + public boolean test(Integer elem) { + return elem < 2; + } + }); + + final Future future = source.via(flow).runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + + FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); + + probe.expectNoMsg(duration); + Await.ready(future, duration); + } + + @Test public void mustBeAbleToUseTransform() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index c98ccb51cf..639bc07d44 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -20,6 +20,7 @@ import org.junit.ClassRule; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import scala.util.Try; @@ -473,4 +474,51 @@ public class SourceTest extends StreamTest { ref.tell(2, ActorRef.noSender()); probe.expectMsgEquals(2); } + + @Test + public void mustBeAbleToUseDropWhile() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)).dropWhile + (new Predicate() { + public boolean test(Integer elem) { + return elem < 2; + } + }); + + final Future future = source.runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void mustBeAbleToUseTakeWhile() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)).takeWhile + (new Predicate() { + public boolean test(Integer elem) { + return elem < 2; + } + }); + + final Future future = source.runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + + FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); + + probe.expectNoMsg(duration); + Await.ready(future, duration); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala new file mode 100644 index 0000000000..2f6552f661 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWhileSpec.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorOperationAttributes._ +import akka.stream.Supervision._ +import akka.stream.testkit.Utils._ + +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +import scala.util.control.NoStackTrace + +class FlowDropWhileSpec extends AkkaSpec { + + val settings = ActorFlowMaterializerSettings(system) + + implicit val materializer = ActorFlowMaterializer(settings) + + "A DropWhile" must { + + "drop while predicate is true" in assertAllStagesStopped { + Source(1 to 4).dropWhile(_ < 3).runWith(TestSink.probe[Int]) + .request(2) + .expectNext(3, 4) + .expectComplete() + } + + "complete the future for an empty stream" in assertAllStagesStopped { + Source.empty[Int].dropWhile(_ < 2).runWith(TestSink.probe[Int]) + .request(1) + .expectComplete() + } + + "continue if error" in assertAllStagesStopped { + val testException = new Exception("test") with NoStackTrace + Source(1 to 4).dropWhile(a ⇒ if (a < 3) true else throw testException).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(1) + .expectComplete() + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala new file mode 100644 index 0000000000..9df07fc6b8 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorOperationAttributes._ +import akka.stream.Supervision._ +import akka.stream.testkit.Utils._ + +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +import scala.util.control.NoStackTrace + +class FlowTakeWhileSpec extends AkkaSpec { + + val settings = ActorFlowMaterializerSettings(system) + + implicit val materializer = ActorFlowMaterializer(settings) + + "A TakeWhile" must { + + "take while predicate is true" in assertAllStagesStopped { + Source(1 to 4).takeWhile(_ < 3).runWith(TestSink.probe[Int]) + .request(3) + .expectNext(1, 2) + .expectComplete() + } + + "complete the future for an empty stream" in assertAllStagesStopped { + Source.empty[Int].takeWhile(_ < 2).runWith(TestSink.probe[Int]) + .request(1) + .expectComplete() + } + + "continue if error" in assertAllStagesStopped { + val testException = new Exception("test") with NoStackTrace + + val p = Source(1 to 4).takeWhile(a ⇒ if (a == 3) throw testException else true).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(4) + .expectNext(1, 2, 4) + .expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index daf87ae023..1043ecac8a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -295,6 +295,8 @@ private[akka] object ActorProcessorFactory { case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer, att), ()) case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer, att), ()) case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer, att), ()) + case TakeWhile(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.TakeWhile(p, settings.supervisionDecider)), materializer, att), ()) + case DropWhile(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.DropWhile(p, settings.supervisionDecider)), materializer, att), ()) case Drop(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer, att), ()) case Take(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer, att), ()) case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer, att), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 31d247ab87..a503c0bf17 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -32,6 +32,8 @@ private[stream] object Stages { val grouped = name("grouped") val take = name("take") val drop = name("drop") + val takeWhile = name("takeWhile") + val dropWhile = name("dropWhile") val scan = name("scan") val buffer = name("buffer") val conflate = name("conflate") @@ -175,6 +177,16 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } + final case class TakeWhile(p: Any ⇒ Boolean, attributes: OperationAttributes = takeWhile) extends StageModule { + def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + + final case class DropWhile(p: Any ⇒ Boolean, attributes: OperationAttributes = dropWhile) extends StageModule { + def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + final case class Scan(zero: Any, f: (Any, Any) ⇒ Any, attributes: OperationAttributes = scan) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index be5afe813c..1386ae4d74 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -36,6 +36,37 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision. override def decide(t: Throwable): Supervision.Directive = decider(t) } +/** + * INTERNAL API + */ +private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] { + + override def onPush(elem: T, ctx: Context[T]): SyncDirective = + if (p(elem)) + ctx.push(elem) + else + ctx.finish() + + override def decide(t: Throwable): Supervision.Directive = decider(t) +} + +/** + * INTERNAL API + */ +private[akka] final case class DropWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] { + var taking = false + + override def onPush(elem: T, ctx: Context[T]): SyncDirective = + if (taking || !p(elem)) { + taking = true + ctx.push(elem) + } else { + ctx.pull + } + + override def decide(t: Throwable): Supervision.Directive = decider(t) +} + private[akka] final object Collect { // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d331ebfcb3..8ef7531f37 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -381,6 +381,39 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.dropWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + */ + def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test)) + + /** + * Discard elements at the beginning of the stream while predicate is true. + * All elements will be taken after predicate returns false first time. + * + * '''Emits when''' predicate returned false and for all following stream elements + * + * '''Backpressures when''' predicate returned false and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.dropWhile(p.test)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1d4336e85e..9c600c47fa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -406,6 +406,24 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def dropWithin(d: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.dropWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after predicate returned false for the first time. + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * @param p predicate is evaluated for each new element until first time returns false + */ + def takeWhile(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.takeWhile(p.test)) + + /** + * Discard elements at the beginning of the stream while predicate is true. + * No elements will be dropped after predicate first time returned false. + * + * @param p predicate is evaluated for each new element until first time returns false + */ + def dropWhile(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.dropWhile(p.test)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 6a240e63a4..55194db57f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -457,6 +457,39 @@ trait FlowOps[+Out, +Mat] { */ def filter(p: Out ⇒ Boolean): Repr[Out, Mat] = andThen(Filter(p.asInstanceOf[Any ⇒ Boolean])) + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + */ + def takeWhile(p: Out ⇒ Boolean): Repr[Out, Mat] = andThen(TakeWhile(p.asInstanceOf[Any ⇒ Boolean])) + + /** + * Discard elements at the beginning of the stream while predicate is true. + * All elements will be taken after predicate returns false first time. + * + * '''Emits when''' predicate returned false and for all following stream elements + * + * '''Backpressures when''' predicate returned false and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWhile(p: Out ⇒ Boolean): Repr[Out, Mat] = andThen(DropWhile(p.asInstanceOf[Any ⇒ Boolean])) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step.