diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala index 2c51bbc629..dd0c8b2797 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala @@ -24,7 +24,7 @@ class AggregateWithBoundarySpec extends StreamSpec { val groupSize = 3 val result = Source(stream) .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, buffer.size >= groupSize) }, harvest = buffer => buffer.toSeq, emitOnTimer = None) .runWith(Sink.collection) @@ -39,7 +39,7 @@ class AggregateWithBoundarySpec extends StreamSpec { val result = Source(stream) .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, buffer.size >= groupSize) }, harvest = buffer => buffer.toSeq :+ -1, // append -1 to output to demonstrate the effect of harvest @@ -56,7 +56,7 @@ class AggregateWithBoundarySpec extends StreamSpec { val result = Source(stream) .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, buffer.sum >= weight) }, harvest = buffer => buffer.toSeq, emitOnTimer = None) .runWith(Sink.collection) @@ -145,7 +145,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with .fromPublisher(p) .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, false) }, harvest = seq => seq.toSeq, @@ -189,7 +189,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with .fromPublisher(p) .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, false) }, harvest = seq => seq.toSeq, @@ -230,7 +230,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with .fromPublisher(upstream) .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( aggregate = (buffer, i) => { - buffer.addOne(i) + buffer += i (buffer, false) }, harvest = buffer => buffer.toSeq,