+str #16610 applied the changes suggested by @drewhk

This commit is contained in:
Gilad Hoch 2016-01-19 23:03:36 +02:00
parent d690067fc9
commit b420d6a472
9 changed files with 245 additions and 137 deletions

View file

@ -478,6 +478,97 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
override def restart(): Conflate[In, Out] = copy()
}
private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In Long, seed: In Out,
aggregate: (Out, In) Out, val in: Inlet[In],
val out: Outlet[Out]) extends GraphStage[FlowShape[In, Out]] {
override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var agg: Any = null
private var left: Long = max
private var pending: Any = null
private def flush(): Unit = {
push(out, agg.asInstanceOf[Out])
left = max
if (pending != null) {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
} else {
agg = null
}
}
override def preStart() = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
if (agg == null) {
left -= cost
agg = seed(elem)
} else if (left < cost) {
pending = elem
} else {
left -= cost
agg = aggregate(agg.asInstanceOf[Out], elem)
}
if (isAvailable(out)) flush()
if (pending == null) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (agg == null) completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
//if upstream finished, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element)
if (isClosed(in)) {
if (agg == null) completeStage()
else {
push(out, agg.asInstanceOf[Out])
if (pending == null) completeStage()
else {
agg = seed(pending.asInstanceOf[In])
pending = null
}
}
} else if (agg == null) {
if (!hasBeenPulled(in))
pull(in)
} else {
flush()
if (!hasBeenPulled(in)) pull(in)
}
}
})
}
}
private[akka] final case class BatchWeighted[I, O](max: Long, costFn: I Long, seed: I O, aggregate: (O, I) O)
extends AbstractBatch(max, costFn, seed, aggregate, Inlet[I]("BatchWeighted.in"), Outlet[O]("BatchWeighted.out")) {
override def initialAttributes = Attributes.name("BatchWeighted")
}
private[akka] final case class Batch[I, O](max: Long, seed: I O, aggregate: (O, I) O)
extends AbstractBatch(max, { _: I 1L }, seed, aggregate, Inlet[I]("Batch.in"), Outlet[O]("Batch.out")) {
override def initialAttributes = Attributes.name("Batch")
}
//private[akka] final case class Conflate[I, O](seed: I O, aggregate: (O, I) O)
// extends AbstractBatch(Long.MaxValue, { _: I 0L }, seed, aggregate, Inlet[I]("Conflate.in"), Outlet[O]("Conflate.out")) {
// override def initialAttributes = Attributes.name("Conflate")
//}
/**
* INTERNAL API
*/