+str #16610 minor fixes

This commit is contained in:
Gilad Hoch 2016-01-20 19:29:50 +02:00
parent fde0c11fff
commit 7e488001cc
3 changed files with 9 additions and 5 deletions

View file

@ -21,7 +21,7 @@ private[akka] object ConstantFun {
def scalaIdentityFunction[T]: T T = conforms def scalaIdentityFunction[T]: T T = conforms
def returnZero[T](t: T): Long = 0L val zeroLong = (_: Any) 0L
def returnOne[T](t: T): Long = 1L val oneLong = (_: Any) 1L
} }

View file

@ -478,6 +478,9 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
override def restart(): Conflate[In, Out] = copy() override def restart(): Conflate[In, Out] = copy()
} }
/**
* INTERNAL API
*/
private[akka] final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out) private[akka] final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] { extends GraphStage[FlowShape[In, Out]] {

View file

@ -906,8 +906,9 @@ trait FlowOps[+Out, +Mat] {
* *
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
*/ */
def conflate[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] = //andThen(Conflate(seed, aggregate)) def conflate[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] = andThen(Conflate(seed, aggregate))
via(Batch(1L, ConstantFun.returnZero[Out], seed, aggregate).withAttributes(DefaultAttributes.conflate)) //FIXME: conflate can be expressed as a batch
//via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))
/** /**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
@ -932,7 +933,7 @@ trait FlowOps[+Out, +Mat] {
* @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate
*/ */
def batch[S](max: Long, seed: Out S)(aggregate: (S, Out) S): Repr[S] = def batch[S](max: Long, seed: Out S)(aggregate: (S, Out) S): Repr[S] =
via(Batch(max, ConstantFun.returnOne[Out], seed, aggregate).withAttributes(DefaultAttributes.batch)) via(Batch(max, ConstantFun.oneLong, seed, aggregate).withAttributes(DefaultAttributes.batch))
/** /**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches