+str: Sliding window operation
This commit is contained in:
parent
c117dd6fab
commit
ac007efc0d
8 changed files with 243 additions and 0 deletions
|
|
@ -265,6 +265,62 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
|
|||
else ctx.absorbTermination()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
|
||||
private val buf = {
|
||||
val b = Vector.newBuilder[T]
|
||||
b.sizeHint(n)
|
||||
b
|
||||
}
|
||||
var bufferedElements = 0
|
||||
|
||||
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
|
||||
buf += elem
|
||||
bufferedElements += 1
|
||||
if (bufferedElements < n) {
|
||||
ctx.pull()
|
||||
} else if (bufferedElements == n) {
|
||||
ctx.push(buf.result())
|
||||
} else {
|
||||
if (step > n) {
|
||||
if (bufferedElements == step) {
|
||||
buf.clear()
|
||||
buf.sizeHint(n)
|
||||
bufferedElements = 0
|
||||
ctx.pull()
|
||||
} else {
|
||||
ctx.pull()
|
||||
}
|
||||
} else {
|
||||
val emit = buf.result()
|
||||
buf.clear()
|
||||
buf.sizeHint(n)
|
||||
emit.drop(step).foreach(buf += _)
|
||||
val updatedEmit = buf.result()
|
||||
bufferedElements = updatedEmit.size
|
||||
if (bufferedElements == n) ctx.push(updatedEmit)
|
||||
else ctx.pull()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
|
||||
if (ctx.isFinishing) {
|
||||
val emit = buf.result()
|
||||
if (emit.size == n) {
|
||||
ctx.finish()
|
||||
} else {
|
||||
ctx.pushAndFinish(emit)
|
||||
}
|
||||
} else ctx.pull()
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
|
||||
if (buf.result().isEmpty) ctx.finish()
|
||||
else ctx.absorbTermination()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue