diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index b60c360bad..19379ae10e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -21,7 +21,7 @@ private[akka] object ConstantFun { def scalaIdentityFunction[T]: T ⇒ T = conforms - def returnZero[T](t: T): Long = 0L + val zeroLong = (_: Any) ⇒ 0L - def returnOne[T](t: T): Long = 1L + val oneLong = (_: Any) ⇒ 1L } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ca239ad4ca..c1074b6c45 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -478,6 +478,9 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O override def restart(): Conflate[In, Out] = copy() } +/** + * INTERNAL API + */ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f3bbf6cdc6..339e9b94ba 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -906,8 +906,9 @@ trait FlowOps[+Out, +Mat] { * * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ - def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = //andThen(Conflate(seed, aggregate)) - via(Batch(1L, ConstantFun.returnZero[Out], seed, aggregate).withAttributes(DefaultAttributes.conflate)) + def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) + //FIXME: conflate can be expressed as a batch + //via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches @@ -932,7 +933,7 @@ trait FlowOps[+Out, +Mat] { * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ def batch[S](max: Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - via(Batch(max, ConstantFun.returnOne[Out], seed, aggregate).withAttributes(DefaultAttributes.batch)) + via(Batch(max, ConstantFun.oneLong, seed, aggregate).withAttributes(DefaultAttributes.batch)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches