From bbbcaccd2c2b23d2b71275fbb41a5e2462e980be Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 14 Oct 2024 09:28:14 +0100 Subject: [PATCH] revert zipWithIndex changes (#1526) * revert zipWithIndex changes * scalafmt * Update FlowZipWithIndexSpec.scala --- .../scaladsl/FlowZipWithIndexSpec.scala | 36 ++++++++++++++++++- .../apache/pekko/stream/scaladsl/Flow.scala | 14 +++++--- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala index 86a2354da0..7b696020ea 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithIndexSpec.scala @@ -14,7 +14,7 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko -import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer } +import pekko.stream.{ ActorMaterializer, ActorMaterializerSettings, ClosedShape, Materializer, UniformFanInShape } import pekko.stream.testkit.{ StreamSpec, TestSubscriber } import scala.annotation.nowarn @@ -57,5 +57,39 @@ class FlowZipWithIndexSpec extends StreamSpec { // #zip-with-index } + "support junction output ports" in { + // https://github.com/apache/pekko/issues/1525 + import GraphDSL.Implicits._ + + val pickMaxOfThree = GraphDSL.create() { implicit b => + val zip1 = b.add(ZipWith[Int, Int, Int](math.max _)) + val zip2 = b.add(ZipWith[Int, Int, Int](math.max _)) + zip1.out ~> zip2.in0 + + UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1) + } + + val probe = TestSubscriber.manualProbe[(Int, AnyVal)]() + val resultSink = Sink.fromSubscriber(probe) + + val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink => + // importing the partial graph will return its shape (inlets & outlets) + val pm3 = b.add(pickMaxOfThree) + + Source.single(1) ~> pm3.in(0) + Source.single(2) ~> pm3.in(1) + Source.single(3) ~> pm3.in(2) + pm3.out.zipWithIndex ~> sink.in + ClosedShape + }) + + g.run() + + val subscription = probe.expectSubscription() + subscription.request(1) + probe.expectNext((3, 0)) + probe.expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 3bf8e436ba..9c66727d51 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3304,10 +3304,16 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Repr[(Out, Long)] = - statefulMap(() => 0L)((index, out) => - (index + 1L, (out, index)), _ => None) - .withAttributes(DefaultAttributes.zipWithIndex) + def zipWithIndex: Repr[(Out, Long)] = { + statefulMapConcat[(Out, Long)] { () => + var index: Long = 0L + elem => { + val zipped = (elem, index) + index += 1 + immutable.Iterable[(Out, Long)](zipped) + } + } + } /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].