diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala index 32d520be17..76cabeac23 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipLatestWithSpec.scala @@ -26,7 +26,10 @@ class GraphZipLatestWithSpec extends TwoStreamsSetup { override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { val subscriber = TestSubscriber.probe[Outputs]() - Source.fromPublisher(p1).zipLatestWith(Source.fromPublisher(p2))(_ + _).runWith(Sink.fromSubscriber(subscriber)) + Source + .fromPublisher(p1) + .zipLatestWith(Source.fromPublisher(p2))(_ + _) + .runWith(Sink.fromSubscriber(subscriber)) subscriber } @@ -171,7 +174,7 @@ class GraphZipLatestWithSpec extends TwoStreamsSetup { RunnableGraph .fromGraph(GraphDSL.create() { implicit b ⇒ - val sum19 = (v1: Int, + val sum22 = (v1: Int, v2: String, v3: Int, v4: String, @@ -189,32 +192,38 @@ class GraphZipLatestWithSpec extends TwoStreamsSetup { v16: String, v17: Int, v18: String, - v19: Int) ⇒ + v19: Int, + v20: String, + v21: Int, + v22: String) ⇒ v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 + - v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v20 + v21 + v22 // odd input ports will be Int, even input ports will be String - val zip = b.add(ZipLatestWith(sum19)) + val zip = b.add(ZipLatestWith(sum22)) - Source.repeat(1) ~> zip.in0 - Source.repeat(2).map(_.toString) ~> zip.in1 - Source.repeat(3) ~> zip.in2 - Source.repeat(4).map(_.toString) ~> zip.in3 - Source.repeat(5) ~> zip.in4 - Source.repeat(6).map(_.toString) ~> zip.in5 - Source.repeat(7) ~> zip.in6 - Source.repeat(8).map(_.toString) ~> zip.in7 - Source.repeat(9) ~> zip.in8 - Source.repeat(10).map(_.toString) ~> zip.in9 - Source.repeat(11) ~> zip.in10 - Source.repeat(12).map(_.toString) ~> zip.in11 - Source.repeat(13) ~> zip.in12 - Source.repeat(14).map(_.toString) ~> zip.in13 - Source.repeat(15) ~> zip.in14 - Source.repeat(16).map(_.toString) ~> zip.in15 - Source.repeat(17) ~> zip.in16 - Source.repeat(18).map(_.toString) ~> zip.in17 - Source.fromPublisher(upstreamProbe) ~> zip.in18 + Source.single(1) ~> zip.in0 + Source.single(2).map(_.toString) ~> zip.in1 + Source.single(3) ~> zip.in2 + Source.single(4).map(_.toString) ~> zip.in3 + Source.single(5) ~> zip.in4 + Source.single(6).map(_.toString) ~> zip.in5 + Source.single(7) ~> zip.in6 + Source.single(8).map(_.toString) ~> zip.in7 + Source.single(9) ~> zip.in8 + Source.single(10).map(_.toString) ~> zip.in9 + Source.single(11) ~> zip.in10 + Source.single(12).map(_.toString) ~> zip.in11 + Source.single(13) ~> zip.in12 + Source.single(14).map(_.toString) ~> zip.in13 + Source.single(15) ~> zip.in14 + Source.single(16).map(_.toString) ~> zip.in15 + Source.single(17) ~> zip.in16 + Source.single(18).map(_.toString) ~> zip.in17 + Source.single(19) ~> zip.in18 + Source.single(20).map(_.toString) ~> zip.in19 + Source.single(21) ~> zip.in20 + Source.fromPublisher(upstreamProbe).map(_.toString) ~> zip.in21 zip.out ~> Sink.fromSubscriber(downstreamProbe) @@ -226,10 +235,9 @@ class GraphZipLatestWithSpec extends TwoStreamsSetup { val upstreamSubscription = upstreamProbe.expectSubscription() downstreamSubscription.request(1) - upstreamSubscription.sendNext(19) + upstreamSubscription.sendNext(22) upstreamSubscription.sendComplete() - downstreamProbe.expectNext((1 to 19).mkString("")) - + downstreamProbe.expectNext((1 to 22).mkString("")) downstreamProbe.expectComplete() } diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template index 0df74ffa24..a1363a6084 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipLatestWith.scala.template @@ -36,7 +36,7 @@ object ZipLatestWith { def create[A, B, Out](f: function.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], NotUsed] = scaladsl.ZipLatestWith(f.apply _) - [3..20#/** Create a new `ZipLatestWith` specialized for 1 inputs. + [3..22#/** Create a new `ZipLatestWith` specialized for 1 inputs. * * @param f zipping-function from the input values to the output value * @param attributes optional attributes for this vertex diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template index 25e481e1e7..34c0d18d0c 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template @@ -9,7 +9,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } trait ZipLatestWithApply { - [2..20#/** + [2..22#/** * Create a new `ZipLatestWith` specialized for 1 inputs. * * @param zipper zipping-function from the input values to the output value @@ -22,7 +22,7 @@ trait ZipLatestWithApply { } -[2..20#/** `ZipLatestWith` specialized for 1 inputs */ +[2..22#/** `ZipLatestWith` specialized for 1 inputs */ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { override def initialAttributes = Attributes.name("ZipLatestWith1") override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipLatestWith1") @@ -57,7 +57,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ } else { waitingForTuple = true } - pullAllIfNeeded() + tryPullAllIfNeeded() } } ) @@ -73,7 +73,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ staleTupleValues = true } - private def pullAllIfNeeded(): Unit = { + private def tryPullAllIfNeeded(): Unit = { [#if (!hasBeenPulled(in0)) { tryPull(in0) }# @@ -91,7 +91,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[ if (outer.waitingForTuple && outer.hasAllValues) { outer.pushOutput() outer.waitingForTuple = false - outer.pullAllIfNeeded() + outer.tryPullAllIfNeeded() } }