+str #16610 added javadsl tests + conflate now delegates to Batch + most of the changes @drewhk & @rkuhn suggested

This commit is contained in:
Gilad Hoch 2016-01-20 18:20:12 +02:00
parent b420d6a472
commit fde0c11fff
9 changed files with 93 additions and 128 deletions

View file

@ -478,27 +478,29 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
override def restart(): Conflate[In, Out] = copy()
}
private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In Long, seed: In Out,
aggregate: (Out, In) Out, val in: Inlet[In],
val out: Outlet[Out]) extends GraphStage[FlowShape[In, Out]] {
private[akka] final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Batch.in")
val out = Outlet[Out]("Batch.out")
override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var agg: Any = null
private var agg: Out = null.asInstanceOf[Out]
private var left: Long = max
private var pending: Any = null
private var pending: In = null.asInstanceOf[In]
private def flush(): Unit = {
push(out, agg.asInstanceOf[Out])
push(out, agg)
left = max
if (pending != null) {
val elem = pending.asInstanceOf[In]
val elem = pending
agg = seed(elem)
left -= costFn(elem)
pending = null
pending = null.asInstanceOf[In]
} else {
agg = null
agg = null.asInstanceOf[Out]
}
}
@ -516,7 +518,7 @@ private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In
pending = elem
} else {
left -= cost
agg = aggregate(agg.asInstanceOf[Out], elem)
agg = aggregate(agg, elem)
}
if (isAvailable(out)) flush()
@ -531,20 +533,16 @@ private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In
setHandler(out, new OutHandler {
override def onPull(): Unit = {
//if upstream finished, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element)
if (isClosed(in)) {
if (agg == null) completeStage()
if (agg == null) {
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
} else if (isClosed(in)) {
push(out, agg)
if (pending == null) completeStage()
else {
push(out, agg.asInstanceOf[Out])
if (pending == null) completeStage()
else {
agg = seed(pending.asInstanceOf[In])
pending = null
}
agg = seed(pending)
pending = null.asInstanceOf[In]
}
} else if (agg == null) {
if (!hasBeenPulled(in))
pull(in)
} else {
flush()
if (!hasBeenPulled(in)) pull(in)
@ -554,101 +552,6 @@ private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In
}
}
private[akka] final case class BatchWeighted[I, O](max: Long, costFn: I Long, seed: I O, aggregate: (O, I) O)
extends AbstractBatch(max, costFn, seed, aggregate, Inlet[I]("BatchWeighted.in"), Outlet[O]("BatchWeighted.out")) {
override def initialAttributes = Attributes.name("BatchWeighted")
}
private[akka] final case class Batch[I, O](max: Long, seed: I O, aggregate: (O, I) O)
extends AbstractBatch(max, { _: I 1L }, seed, aggregate, Inlet[I]("Batch.in"), Outlet[O]("Batch.out")) {
override def initialAttributes = Attributes.name("Batch")
}
//private[akka] final case class Conflate[I, O](seed: I O, aggregate: (O, I) O)
// extends AbstractBatch(Long.MaxValue, { _: I 0L }, seed, aggregate, Inlet[I]("Conflate.in"), Outlet[O]("Conflate.out")) {
// override def initialAttributes = Attributes.name("Conflate")
//}
/**
* INTERNAL API
*/
private[akka] final case class AggregateWeighted[In, Out](max: Long, costFn: In Long, seed: In Out,
aggregate: (Out, In) Out,
decider: Supervision.Decider) extends DetachedStage[In, Out] {
private var agg: Any = null
private var left: Long = max
private var pending: Any = null
private[this] def flush(ctx: DetachedContext[Out]) = {
val result = agg.asInstanceOf[Out]
agg = null
left = max
if (pending != null) {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
}
ctx.pushAndPull(result)
}
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
val cost = costFn(elem)
if (agg == null) {
left -= cost
agg = seed(elem)
} else if (left <= 0 || left - cost < 0) {
pending = elem
} else {
left -= cost
agg = aggregate(agg.asInstanceOf[Out], elem)
}
if (!ctx.isHoldingDownstream && pending == null) ctx.pull()
else if (!ctx.isHoldingDownstream) ctx.holdUpstream()
else flush(ctx)
}
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
//if ctx.isFinishing, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element)
if (ctx.isFinishing) {
//agg != null since we already checked it in onUpstreamFinish
val result = agg.asInstanceOf[Out]
if (pending == null) ctx.pushAndFinish(result)
else {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
pending = null
ctx.push(result)
}
} else if (ctx.isHoldingBoth) flush(ctx)
else if (agg == null) ctx.holdDownstream()
else {
val result = agg.asInstanceOf[Out]
left = max
if (pending != null) {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
} else {
agg = null
}
if (ctx.isHoldingUpstream) ctx.pushAndPull(result)
else ctx.push(result)
}
}
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = {
if (agg == null) ctx.finish()
else ctx.absorbTermination()
}
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): AggregateWeighted[In, Out] = copy()
}
/**
* INTERNAL API
*/