revert zipWithIndex changes (#1526)

* revert zipWithIndex changes

* scalafmt

* Update FlowZipWithIndexSpec.scala
This commit is contained in:
PJ Fanning 2024-10-14 09:28:14 +01:00 committed by GitHub
parent d20f1028ff
commit bbbcaccd2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 45 additions and 5 deletions

View file

@ -14,7 +14,7 @@
package org.apache.pekko.stream.scaladsl
import org.apache.pekko
import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer }
import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, ClosedShape, Materializer, UniformFanInShape }
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import scala.annotation.nowarn
@ -57,5 +57,39 @@ class FlowZipWithIndexSpec extends StreamSpec {
// #zip-with-index
}
"support junction output ports" in {
// https://github.com/apache/pekko/issues/1525
import GraphDSL.Implicits._
val pickMaxOfThree = GraphDSL.create() { implicit b =>
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
zip1.out ~> zip2.in0
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}
val probe = TestSubscriber.manualProbe[(Int, AnyVal)]()
val resultSink = Sink.fromSubscriber(probe)
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink =>
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)
Source.single(1) ~> pm3.in(0)
Source.single(2) ~> pm3.in(1)
Source.single(3) ~> pm3.in(2)
pm3.out.zipWithIndex ~> sink.in
ClosedShape
})
g.run()
val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext((3, 0))
probe.expectComplete()
}
}
}

View file

@ -3304,10 +3304,16 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def zipWithIndex: Repr[(Out, Long)] =
statefulMap(() => 0L)((index, out) =>
(index + 1L, (out, index)), _ => None)
.withAttributes(DefaultAttributes.zipWithIndex)
def zipWithIndex: Repr[(Out, Long)] = {
statefulMapConcat[(Out, Long)] { () =>
var index: Long = 0L
elem => {
val zipped = (elem, index)
index += 1
immutable.Iterable[(Out, Long)](zipped)
}
}
}
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].