diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 5b1c8f421a..3a79a113b6 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -34,6 +34,7 @@ mapConcat the mapping function returns an element or there are stil filter the given predicate returns true for the element the given predicate returns true for the element and downstream backpressures upstream completes collect the provided partial function is defined for the element the partial function is defined for the element and downstream backpressures upstream completes grouped the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes +sliding the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes scan the function scanning the element returns a new element downstream backpressures upstream completes fold upstream completes downstream backpressures upstream completes drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala new file mode 100644 index 0000000000..77324635e7 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.ActorSystem +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit._ +import com.typesafe.config.ConfigFactory +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowSlidingSpec extends AkkaSpec { + import system.dispatcher + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Sliding" must { + "work with n = 3, step = 1" in assertAllStagesStopped { + Source(1 to 6).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(2, 3, 4)) + expectMsg(Vector(3, 4, 5)) + expectMsg(Vector(4, 5, 6)) + expectMsg("done") + } + + "work with n = 3, step = 1, 7 elements" in assertAllStagesStopped { + Source(1 to 7).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(2, 3, 4)) + expectMsg(Vector(3, 4, 5)) + expectMsg(Vector(4, 5, 6)) + expectMsg(Vector(5, 6, 7)) + expectMsg("done") + } + + "work with n = 3, step = 2" in assertAllStagesStopped { + Source(1 to 6).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(3, 4, 5)) + expectMsg(Vector(5, 6)) + expectMsg("done") + } + + "work with n = 3, step = 2, complete group" in assertAllStagesStopped { + Source(1 to 7).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(3, 4, 5)) + expectMsg(Vector(5, 6, 7)) + expectMsg("done") + } + + "work with n = 3, step = 3" in assertAllStagesStopped { + Source(1 to 6).sliding(n = 3, step = 3).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(4, 5, 6)) + expectMsg("done") + } + + "work with n = 2, step = 3" in assertAllStagesStopped { + Source(1 to 6).sliding(n = 2, step = 3).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2)) + expectMsg(Vector(4, 5)) + expectMsg("done") + } + + "work with n = 2, step = 1" in assertAllStagesStopped { + Source(1 to 6).sliding(n = 2, step = 1).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2)) + expectMsg(Vector(2, 3)) + expectMsg(Vector(3, 4)) + expectMsg(Vector(4, 5)) + expectMsg(Vector(5, 6)) + expectMsg("done") + } + + "work with n = 3, step = 4" in assertAllStagesStopped { + Source(1 to 12).sliding(n = 3, step = 4).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(5, 6, 7)) + expectMsg(Vector(9, 10, 11)) + expectMsg("done") + } + + "work with n = 3, step = 6" in assertAllStagesStopped { + Source(1 to 12).sliding(n = 3, step = 6).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(7, 8, 9)) + expectMsg("done") + } + + "work with n = 3, step = 10, incomplete group" in assertAllStagesStopped { + Source(1 to 12).sliding(n = 3, step = 10).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(Vector(1, 2, 3)) + expectMsg(Vector(11, 12)) + expectMsg("done") + } + + "work with empty sources" in assertAllStagesStopped { + Source.empty.sliding(1).runForeach(testActor ! _) onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg("done") + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index f8ef0472a5..afe694fa2f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -318,6 +318,7 @@ private[akka] object ActorProcessorFactory { case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) + case Sliding(n, step, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Sliding(n, step)), materializer, att), ()) case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 849d23e429..0fa58bfc28 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -30,6 +30,7 @@ private[stream] object Stages { val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") val grouped = name("grouped") + val sliding = name("sliding") val take = name("take") val drop = name("drop") val takeWhile = name("takeWhile") @@ -163,6 +164,14 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } + final case class Sliding(n: Int, step: Int, attributes: Attributes = sliding) extends StageModule { + require(n > 0, "n must be greater than 0") + require(step > 0, "step must be greater than 0") + + def withAttributes(attributes: Attributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + final case class Take(n: Long, attributes: Attributes = take) extends StageModule { def withAttributes(attributes: Attributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index b5c485c4de..dc288fd9eb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -265,6 +265,62 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut else ctx.absorbTermination() } +/** + * INTERNAL API + */ +private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] { + private val buf = { + val b = Vector.newBuilder[T] + b.sizeHint(n) + b + } + var bufferedElements = 0 + + override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = { + buf += elem + bufferedElements += 1 + if (bufferedElements < n) { + ctx.pull() + } else if (bufferedElements == n) { + ctx.push(buf.result()) + } else { + if (step > n) { + if (bufferedElements == step) { + buf.clear() + buf.sizeHint(n) + bufferedElements = 0 + ctx.pull() + } else { + ctx.pull() + } + } else { + val emit = buf.result() + buf.clear() + buf.sizeHint(n) + emit.drop(step).foreach(buf += _) + val updatedEmit = buf.result() + bufferedElements = updatedEmit.size + if (bufferedElements == n) ctx.push(updatedEmit) + else ctx.pull() + } + } + } + + override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective = + if (ctx.isFinishing) { + val emit = buf.result() + if (emit.size == n) { + ctx.finish() + } else { + ctx.pushAndFinish(emit) + } + } else ctx.pull() + + override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective = + if (buf.result().isEmpty) ctx.finish() + else ctx.absorbTermination() +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 0c1a629caf..821e7b6330 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -345,6 +345,24 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step + /** + * Apply a sliding window over the stream and return the windows as groups of elements, with the last group + * possibly smaller than requested due to end-of-stream. + * + * `n` must be positive, otherwise IllegalArgumentException is thrown. + * `step` must be positive, otherwise IllegalArgumentException is thrown. + * + * '''Emits when''' enough elements have been collected within the window or upstream completed + * + * '''Backpressures when''' a window has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def sliding(n: Int, step: Int = 1): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + new Flow(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step + /** * Similar to `fold` but is not a terminal operation, * emits its current value which starts at `zero` and then diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 40dfc9d177..035c954409 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -392,6 +392,16 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.grouped(n).map(_.asJava)) + /** + * Apply a sliding window over the stream and return the windows as groups of elements, with the last group + * possibly smaller than requested due to end-of-stream. + * + * @param n must be positive, otherwise [[IllegalArgumentException]] is thrown. + * @param step must be positive, otherwise [[IllegalArgumentException]] is thrown. + */ + def sliding(n: Int, step: Int): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = + new Source(delegate.sliding(n, step).map(_.asJava)) + /** * Similar to `fold` but is not a terminal operation, * emits its current value which starts at `zero` and then diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f9db52ae46..29474686ce 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -568,6 +568,23 @@ trait FlowOps[+Out, +Mat] { */ def grouped(n: Int): Repr[immutable.Seq[Out], Mat] = andThen(Grouped(n)) + /** + * Apply a sliding window over the stream and return the windows as groups of elements, with the last group + * possibly smaller than requested due to end-of-stream. + * + * `n` must be positive, otherwise IllegalArgumentException is thrown. + * `step` must be positive, otherwise IllegalArgumentException is thrown. + * + * '''Emits when''' enough elements have been collected within the window or upstream completed + * + * '''Backpressures when''' a window has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Out], Mat] = andThen(Sliding(n, step)) + /** * Similar to `fold` but is not a terminal operation, * emits its current value which starts at `zero` and then