Fix compilation of stream tests on Scala 2.12 (#31062)
This commit is contained in:
parent
1f2e15d944
commit
9d0684d875
1 changed files with 6 additions and 6 deletions
|
|
@ -24,7 +24,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
|
|||
val groupSize = 3
|
||||
val result = Source(stream)
|
||||
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, buffer.size >= groupSize)
|
||||
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
|
||||
.runWith(Sink.collection)
|
||||
|
|
@ -39,7 +39,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
|
|||
val result = Source(stream)
|
||||
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
|
||||
aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, buffer.size >= groupSize)
|
||||
},
|
||||
harvest = buffer => buffer.toSeq :+ -1, // append -1 to output to demonstrate the effect of harvest
|
||||
|
|
@ -56,7 +56,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
|
|||
|
||||
val result = Source(stream)
|
||||
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, buffer.sum >= weight)
|
||||
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
|
||||
.runWith(Sink.collection)
|
||||
|
|
@ -145,7 +145,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
|
|||
.fromPublisher(p)
|
||||
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
|
||||
aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, false)
|
||||
},
|
||||
harvest = seq => seq.toSeq,
|
||||
|
|
@ -189,7 +189,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
|
|||
.fromPublisher(p)
|
||||
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
|
||||
aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, false)
|
||||
},
|
||||
harvest = seq => seq.toSeq,
|
||||
|
|
@ -230,7 +230,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
|
|||
.fromPublisher(upstream)
|
||||
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
|
||||
aggregate = (buffer, i) => {
|
||||
buffer.addOne(i)
|
||||
buffer += i
|
||||
(buffer, false)
|
||||
},
|
||||
harvest = buffer => buffer.toSeq,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue