Merge pull request #19153 from akka/wip-19149-fix-flowOps-sliding-√

=str - 19149 - Corrects the behavior of FlowOps.sliding
This commit is contained in:
Roland Kuhn 2015-12-13 16:45:59 +01:00
commit 1c18eafcbe
2 changed files with 49 additions and 146 deletions

View file

@ -3,128 +3,54 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit._ import akka.stream.testkit._
import com.typesafe.config.ConfigFactory import org.scalacheck.Gen
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import akka.pattern.pipe
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._
class FlowSlidingSpec extends AkkaSpec { class FlowSlidingSpec extends AkkaSpec with GeneratorDrivenPropertyChecks {
import system.dispatcher import system.dispatcher
val settings = ActorMaterializerSettings(system) implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val mat = ActorMaterializer(settings)
"Sliding" must { "Sliding" must {
"work with n = 3, step = 1" in assertAllStagesStopped { import org.scalacheck.Shrink.shrinkAny
Source(1 to 6).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { def check(gen: Gen[(Int, Int, Int)]): Unit =
case _ testActor ! "done" forAll(gen, MinSize(1000), MaxSize(1000)) {
case (len, win, step)
val af = Source(() Iterator.from(0).take(len)).sliding(win, step).runFold(Seq.empty[Seq[Int]])(_ :+ _)
val cf = Source(() Iterator.from(0).take(len).sliding(win, step)).runFold(Seq.empty[Seq[Int]])(_ :+ _)
Await.result(af, remaining) should be(Await.result(cf, remaining))
} }
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(2, 3, 4)) "behave just like collections sliding with step < window" in assertAllStagesStopped {
expectMsg(Vector(3, 4, 5)) check(for {
expectMsg(Vector(4, 5, 6)) len Gen.choose(0, 31)
expectMsg("done") win Gen.choose(1, 61)
step Gen.choose(1, win - 1)
} yield (len, win, step))
} }
"work with n = 3, step = 1, 7 elements" in assertAllStagesStopped { "behave just like collections sliding with step == window" in assertAllStagesStopped {
Source(1 to 7).sliding(n = 3, step = 1).runForeach(testActor ! _) onSuccess { check(for {
case _ testActor ! "done" len Gen.choose(0, 31)
} win Gen.choose(1, 61)
expectMsg(Vector(1, 2, 3)) step Gen.const(win)
expectMsg(Vector(2, 3, 4)) } yield (len, win, step))
expectMsg(Vector(3, 4, 5))
expectMsg(Vector(4, 5, 6))
expectMsg(Vector(5, 6, 7))
expectMsg("done")
} }
"work with n = 3, step = 2" in assertAllStagesStopped { "behave just like collections sliding with step > window" in assertAllStagesStopped {
Source(1 to 6).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess { check(for {
case _ testActor ! "done" len Gen.choose(0, 31)
} win Gen.choose(1, 61)
expectMsg(Vector(1, 2, 3)) step Gen.choose(win + 1, 127)
expectMsg(Vector(3, 4, 5)) } yield (len, win, step))
expectMsg(Vector(5, 6))
expectMsg("done")
}
"work with n = 3, step = 2, complete group" in assertAllStagesStopped {
Source(1 to 7).sliding(n = 3, step = 2).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(3, 4, 5))
expectMsg(Vector(5, 6, 7))
expectMsg("done")
}
"work with n = 3, step = 3" in assertAllStagesStopped {
Source(1 to 6).sliding(n = 3, step = 3).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(4, 5, 6))
expectMsg("done")
}
"work with n = 2, step = 3" in assertAllStagesStopped {
Source(1 to 6).sliding(n = 2, step = 3).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2))
expectMsg(Vector(4, 5))
expectMsg("done")
}
"work with n = 2, step = 1" in assertAllStagesStopped {
Source(1 to 6).sliding(n = 2, step = 1).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2))
expectMsg(Vector(2, 3))
expectMsg(Vector(3, 4))
expectMsg(Vector(4, 5))
expectMsg(Vector(5, 6))
expectMsg("done")
}
"work with n = 3, step = 4" in assertAllStagesStopped {
Source(1 to 12).sliding(n = 3, step = 4).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(5, 6, 7))
expectMsg(Vector(9, 10, 11))
expectMsg("done")
}
"work with n = 3, step = 6" in assertAllStagesStopped {
Source(1 to 12).sliding(n = 3, step = 6).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(7, 8, 9))
expectMsg("done")
}
"work with n = 3, step = 10, incomplete group" in assertAllStagesStopped {
Source(1 to 12).sliding(n = 3, step = 10).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(Vector(1, 2, 3))
expectMsg(Vector(11, 12))
expectMsg("done")
} }
"work with empty sources" in assertAllStagesStopped { "work with empty sources" in assertAllStagesStopped {
Source.empty.sliding(1).runForeach(testActor ! _) onSuccess { Source.empty.sliding(1).runForeach(testActor ! _).map(_ "done") pipeTo testActor
case _ testActor ! "done"
}
expectMsg("done") expectMsg("done")
} }
} }

View file

@ -319,55 +319,32 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] { private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
private val buf = { private var buf = Vector.empty[T]
val b = Vector.newBuilder[T]
b.sizeHint(n)
b
}
var bufferedElements = 0
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = { override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
buf += elem buf :+= elem
bufferedElements += 1 if (buf.size < n) {
if (bufferedElements < n) { ctx.pull()
} else if (buf.size == n) {
ctx.push(buf)
} else if (step > n) {
if (buf.size == step)
buf = Vector.empty
ctx.pull() ctx.pull()
} else if (bufferedElements == n) {
ctx.push(buf.result())
} else { } else {
if (step > n) { buf = buf.drop(step)
if (bufferedElements == step) { if (buf.size == n) ctx.push(buf)
buf.clear() else ctx.pull()
buf.sizeHint(n)
bufferedElements = 0
ctx.pull()
} else {
ctx.pull()
}
} else {
val emit = buf.result()
buf.clear()
buf.sizeHint(n)
emit.drop(step).foreach(buf += _)
val updatedEmit = buf.result()
bufferedElements = updatedEmit.size
if (bufferedElements == n) ctx.push(updatedEmit)
else ctx.pull()
}
} }
} }
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective = override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
if (ctx.isFinishing) { if (!ctx.isFinishing) ctx.pull()
val emit = buf.result() else if (buf.size >= n) ctx.finish()
if (emit.size == n) { else ctx.pushAndFinish(buf)
ctx.finish()
} else {
ctx.pushAndFinish(emit)
}
} else ctx.pull()
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective = override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
if (buf.result().isEmpty) ctx.finish() if (buf.isEmpty) ctx.finish()
else ctx.absorbTermination() else ctx.absorbTermination()
} }