diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala index 77324635e7..c8414eeaa6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSlidingSpec.scala @@ -3,128 +3,54 @@ */ package akka.stream.scaladsl -import akka.actor.ActorSystem import akka.stream.testkit.Utils._ -import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ -import com.typesafe.config.ConfigFactory +import org.scalacheck.Gen +import org.scalatest.prop.GeneratorDrivenPropertyChecks +import akka.pattern.pipe + import scala.concurrent.Await -import scala.concurrent.duration._ -class FlowSlidingSpec extends AkkaSpec { +class FlowSlidingSpec extends AkkaSpec with GeneratorDrivenPropertyChecks { import system.dispatcher - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 16) - - implicit val mat = ActorMaterializer(settings) - + implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) "Sliding" must { - "work with n = 3, step = 1" in assertAllStagesStopped { - Source(1 to 6).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" + import org.scalacheck.Shrink.shrinkAny + def check(gen: Gen[(Int, Int, Int)]): Unit = + forAll(gen, MinSize(1000), MaxSize(1000)) { + case (len, win, step) ⇒ + val af = Source(() ⇒ Iterator.from(0).take(len)).sliding(win, step).runFold(Seq.empty[Seq[Int]])(_ :+ _) + val cf = Source(() ⇒ Iterator.from(0).take(len).sliding(win, step)).runFold(Seq.empty[Seq[Int]])(_ :+ _) + Await.result(af, remaining) should be(Await.result(cf, remaining)) } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(2, 3, 4)) - expectMsg(Vector(3, 4, 5)) - expectMsg(Vector(4, 5, 6)) - expectMsg("done") + + "behave just like collections sliding with step < window" in assertAllStagesStopped { + check(for { + len ← Gen.choose(0, 31) + win ← Gen.choose(1, 61) + step ← Gen.choose(1, win - 1) + } yield (len, win, step)) } - "work with n = 3, step = 1, 7 elements" in assertAllStagesStopped { - Source(1 to 7).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(2, 3, 4)) - expectMsg(Vector(3, 4, 5)) - expectMsg(Vector(4, 5, 6)) - expectMsg(Vector(5, 6, 7)) - expectMsg("done") + "behave just like collections sliding with step == window" in assertAllStagesStopped { + check(for { + len ← Gen.choose(0, 31) + win ← Gen.choose(1, 61) + step ← Gen.const(win) + } yield (len, win, step)) } - "work with n = 3, step = 2" in assertAllStagesStopped { - Source(1 to 6).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(3, 4, 5)) - expectMsg(Vector(5, 6)) - expectMsg("done") - } - - "work with n = 3, step = 2, complete group" in assertAllStagesStopped { - Source(1 to 7).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(3, 4, 5)) - expectMsg(Vector(5, 6, 7)) - expectMsg("done") - } - - "work with n = 3, step = 3" in assertAllStagesStopped { - Source(1 to 6).sliding(n = 3, step = 3).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(4, 5, 6)) - expectMsg("done") - } - - "work with n = 2, step = 3" in assertAllStagesStopped { - Source(1 to 6).sliding(n = 2, step = 3).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2)) - expectMsg(Vector(4, 5)) - expectMsg("done") - } - - "work with n = 2, step = 1" in assertAllStagesStopped { - Source(1 to 6).sliding(n = 2, step = 1).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2)) - expectMsg(Vector(2, 3)) - expectMsg(Vector(3, 4)) - expectMsg(Vector(4, 5)) - expectMsg(Vector(5, 6)) - expectMsg("done") - } - - "work with n = 3, step = 4" in assertAllStagesStopped { - Source(1 to 12).sliding(n = 3, step = 4).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(5, 6, 7)) - expectMsg(Vector(9, 10, 11)) - expectMsg("done") - } - - "work with n = 3, step = 6" in assertAllStagesStopped { - Source(1 to 12).sliding(n = 3, step = 6).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(7, 8, 9)) - expectMsg("done") - } - - "work with n = 3, step = 10, incomplete group" in assertAllStagesStopped { - Source(1 to 12).sliding(n = 3, step = 10).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } - expectMsg(Vector(1, 2, 3)) - expectMsg(Vector(11, 12)) - expectMsg("done") + "behave just like collections sliding with step > window" in assertAllStagesStopped { + check(for { + len ← Gen.choose(0, 31) + win ← Gen.choose(1, 61) + step ← Gen.choose(win + 1, 127) + } yield (len, win, step)) } "work with empty sources" in assertAllStagesStopped { - Source.empty.sliding(1).runForeach(testActor ! _) onSuccess { - case _ ⇒ testActor ! "done" - } + Source.empty.sliding(1).runForeach(testActor ! _).map(_ ⇒ "done") pipeTo testActor expectMsg("done") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 2dc1437da3..8ef091e4c7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -318,55 +318,32 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut * INTERNAL API */ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] { - private val buf = { - val b = Vector.newBuilder[T] - b.sizeHint(n) - b - } - var bufferedElements = 0 + private var buf = Vector.empty[T] override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = { - buf += elem - bufferedElements += 1 - if (bufferedElements < n) { + buf :+= elem + if (buf.size < n) { + ctx.pull() + } else if (buf.size == n) { + ctx.push(buf) + } else if (step > n) { + if (buf.size == step) + buf = Vector.empty ctx.pull() - } else if (bufferedElements == n) { - ctx.push(buf.result()) } else { - if (step > n) { - if (bufferedElements == step) { - buf.clear() - buf.sizeHint(n) - bufferedElements = 0 - ctx.pull() - } else { - ctx.pull() - } - } else { - val emit = buf.result() - buf.clear() - buf.sizeHint(n) - emit.drop(step).foreach(buf += _) - val updatedEmit = buf.result() - bufferedElements = updatedEmit.size - if (bufferedElements == n) ctx.push(updatedEmit) - else ctx.pull() - } + buf = buf.drop(step) + if (buf.size == n) ctx.push(buf) + else ctx.pull() } } override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective = - if (ctx.isFinishing) { - val emit = buf.result() - if (emit.size == n) { - ctx.finish() - } else { - ctx.pushAndFinish(emit) - } - } else ctx.pull() + if (!ctx.isFinishing) ctx.pull() + else if (buf.size >= n) ctx.finish() + else ctx.pushAndFinish(buf) override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective = - if (buf.result().isEmpty) ctx.finish() + if (buf.isEmpty) ctx.finish() else ctx.absorbTermination() }