From 94d7237d1744db81efad321d538acb1f05b2c2ba Mon Sep 17 00:00:00 2001 From: Nafer Sanabria Date: Wed, 21 Sep 2016 01:41:56 -0500 Subject: [PATCH] +str add zipWithIndex to FlowOps #21290 --- akka-docs/rst/java/stream/stages-overview.rst | 10 +++++ .../rst/scala/stream/stages-overview.rst | 10 +++++ .../akka/stream/DslConsistencySpec.scala | 2 +- .../scaladsl/FlowZipWithIndexSpec.scala | 38 +++++++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 6 +-- .../main/scala/akka/stream/javadsl/Flow.scala | 21 +++++++++- .../scala/akka/stream/javadsl/Source.scala | 15 ++++++++ .../scala/akka/stream/javadsl/SubFlow.scala | 15 ++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 15 ++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 23 +++++++++++ project/MiMa.scala | 4 ++ 11 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 228ac0b580..41b73442bb 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -1230,6 +1230,16 @@ returned value downstream. **completes** when any upstream completes +zipWithIndex +^^^^^^^ +Zips elements of current flow with its indices. + +**emits** upstream emits an element and is paired with their index + +**backpressures** when downstream backpressures + +**completes** when upstream completes + concat ^^^^^^ After completion of the original upstream the elements of the given source will be emitted. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 7181cad197..7236b8635e 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -1222,6 +1222,16 @@ returned value downstream. **completes** when any upstream completes +zipWithIndex +^^^^^^^ +Zips elements of current flow with its indices. + +**emits** upstream emits an element and is paired with their index + +**backpressures** when downstream backpressures + +**completes** when upstream completes + concat ^^^^^^ After completion of the original upstream the elements of the given source will be emitted. diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 235e295937..fd6873a981 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -66,7 +66,7 @@ class DslConsistencySpec extends WordSpec with Matchers { ("Flow" → List[Class[_]](sFlowClass, jFlowClass)) :: ("SubFlow" → List[Class[_]](sSubFlowClass, jSubFlowClass)) :: ("Sink" → List[Class[_]](sSinkClass, jSinkClass)) :: - ("RunanbleFlow" → List[Class[_]](sRunnableGraphClass, jRunnableGraphClass)) :: + ("RunnableFlow" → List[Class[_]](sRunnableGraphClass, jRunnableGraphClass)) :: Nil foreach { case (element, classes) ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala new file mode 100644 index 0000000000..e1e12df5d6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.{ StreamSpec, TestSubscriber } + +class FlowZipWithIndexSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorMaterializer(settings) + + "A ZipWithIndex for Flow " must { + + "work in the happy case" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[(Int, Long)]() + Source(7 to 10).zipWithIndex.runWith(Sink.fromSubscriber(probe)) + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext((7, 0L)) + probe.expectNext((8, 1L)) + + subscription.request(1) + probe.expectNext((9, 2L)) + subscription.request(1) + probe.expectNext((10, 3L)) + + probe.expectComplete() + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c64a01b012..5e3ee846aa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -3,17 +3,12 @@ */ package akka.stream.impl -import akka.event.LoggingAdapter import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Attributes._ import akka.stream.Supervision.Decider import akka.stream._ -import akka.stream.impl.StreamLayout._ import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage.Stage -import org.reactivestreams.Processor - -import scala.collection.immutable /** * INTERNAL API @@ -81,6 +76,7 @@ object Stages { val zip = name("zip") val zipN = name("zipN") val zipWithN = name("zipWithN") + val zipWithIndex = name("zipWithIndex") val unzip = name("unzip") val concat = name("concat") val orElse = name("orElse") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 9232aa090e..6f70412664 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -3,18 +3,20 @@ */ package akka.stream.javadsl -import akka.{ NotUsed, Done } +import akka.{ Done, NotUsed } import akka.event.LoggingAdapter -import akka.japi.{ function, Pair } +import akka.japi.{ Pair, function } import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream._ import akka.stream.stage.Stage import org.reactivestreams.Processor + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import akka.japi.Util import java.util.Comparator import java.util.concurrent.CompletionStage + import scala.compat.java8.FutureConverters._ object Flow { @@ -1634,6 +1636,21 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** + * Combine the elements of current flow into a stream of tuples consisting + * of all elements paired with their index. Indices start at 0. + * + * '''Emits when''' upstream emits an element and is paired with their index + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWithIndex: Flow[In, Pair[Out @uncheckedVariance, Long], Mat] = + new Flow(delegate.zipWithIndex.map { case (elem, index) ⇒ Pair(elem, index) }) + /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index abb3f0ead8..7020455889 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -873,6 +873,21 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] = new Source(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** + * Combine the elements of current [[Source]] into a stream of tuples consisting + * of all elements paired with their index. Indices start at 0. + * + * '''Emits when''' upstream emits an element and is paired with their index + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWithIndex: javadsl.Source[Pair[Out @uncheckedVariance, Long], Mat] = + new Source(delegate.zipWithIndex.map { case (elem, index) ⇒ Pair(elem, index) }) + /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * for each received element. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 2feb262ee8..1bea108a97 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1112,6 +1112,21 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo combine: function.Function2[Out, Out2, Out3]): SubFlow[In, Out3, Mat] = new SubFlow(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) + /** + * Combine the elements of current [[Flow]] into a stream of tuples consisting + * of all elements paired with their index. Indices start at 0. + * + * '''Emits when''' upstream emits an element and is paired with their index + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWithIndex: SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, Long], Mat] = + new SubFlow(delegate.zipWithIndex.map { case (elem, index) ⇒ akka.japi.Pair(elem, index) }) + /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4cd8bc01e3..8113b0154a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1111,6 +1111,21 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source combine: function.Function2[Out, Out2, Out3]): SubSource[Out3, Mat] = new SubSource(delegate.zipWith[Out2, Out3](that)(combinerToScala(combine))) + /** + * Combine the elements of current [[Source]] into a stream of tuples consisting + * of all elements paired with their index. Indices start at 0. + * + * '''Emits when''' upstream emits an element and is paired with their index + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWithIndex: javadsl.SubSource[akka.japi.Pair[Out @uncheckedVariance, Long], Mat] = + new SubSource(delegate.zipWithIndex.map { case (elem, index) ⇒ akka.japi.Pair(elem, index) }) + /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index cc6c1d3a10..f5f2a9677f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1655,6 +1655,29 @@ trait FlowOps[+Out, +Mat] { FlowShape(zip.in0, zip.out) } + /** + * Combine the elements of current flow into a stream of tuples consisting + * of all elements paired with their index. Indices start at 0. + * + * '''Emits when''' upstream emits an element and is paired with their index + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWithIndex: Repr[(Out, Long)] = { + statefulMapConcat[(Out, Long)] { () ⇒ + var index: Long = 0L + elem ⇒ { + val zipped = (elem, index) + index += 1 + immutable.Iterable[(Out, Long)](zipped) + } + } + } + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` diff --git a/project/MiMa.scala b/project/MiMa.scala index 4b66bcd0df..fb1fed0814 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -966,6 +966,10 @@ object MiMa extends AutoPlugin { // #21131 new implementation for Akka Typed ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.isWatching") + ), + "2.4.10" -> Seq( + // #21290 new zipWithIndex flow op + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipWithIndex") ) ) }