=str - 19149 - Corrects the behavior of FlowOps.sliding

Generatively attempts to verify that it behaves exactly like the
collections library

Removes the use of Vector.builder for an easier-to-follow Vector
implementation while waiting for this to be reimplemented as a
GraphStage.

Also, the Builder implementation relies on undefined behavior:

"Builder.result() — Produces a collection from the added elements.
The builder's contents are undefined after this operation." -
http://www.scala-lang.org/api/current/#scala.collection.mutable.Builder
This commit is contained in:
Viktor Klang 2015-12-12 14:02:13 +01:00
parent 15cc65ce9d
commit 108f6749de
2 changed files with 49 additions and 146 deletions

View file

@ -318,55 +318,32 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
* INTERNAL API
*/
private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
private val buf = {
val b = Vector.newBuilder[T]
b.sizeHint(n)
b
}
var bufferedElements = 0
private var buf = Vector.empty[T]
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
buf += elem
bufferedElements += 1
if (bufferedElements < n) {
buf :+= elem
if (buf.size < n) {
ctx.pull()
} else if (buf.size == n) {
ctx.push(buf)
} else if (step > n) {
if (buf.size == step)
buf = Vector.empty
ctx.pull()
} else if (bufferedElements == n) {
ctx.push(buf.result())
} else {
if (step > n) {
if (bufferedElements == step) {
buf.clear()
buf.sizeHint(n)
bufferedElements = 0
ctx.pull()
} else {
ctx.pull()
}
} else {
val emit = buf.result()
buf.clear()
buf.sizeHint(n)
emit.drop(step).foreach(buf += _)
val updatedEmit = buf.result()
bufferedElements = updatedEmit.size
if (bufferedElements == n) ctx.push(updatedEmit)
else ctx.pull()
}
buf = buf.drop(step)
if (buf.size == n) ctx.push(buf)
else ctx.pull()
}
}
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
if (ctx.isFinishing) {
val emit = buf.result()
if (emit.size == n) {
ctx.finish()
} else {
ctx.pushAndFinish(emit)
}
} else ctx.pull()
if (!ctx.isFinishing) ctx.pull()
else if (buf.size >= n) ctx.finish()
else ctx.pushAndFinish(buf)
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
if (buf.result().isEmpty) ctx.finish()
if (buf.isEmpty) ctx.finish()
else ctx.absorbTermination()
}