diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 2e4d05137e..dc0eb56c1c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) + Source(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) + Source(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -58,7 +58,7 @@ class FlowConflateSpec extends AkkaSpec { "work on a variable rate chain" in { val future = Source((1 to 1000).iterator) - .conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i) + .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .fold(0)(_ + _) Await.result(future, 10.seconds) should be(500500) @@ -68,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) + Source(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 45aeaaaca0..23a5b94318 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -240,7 +240,7 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S] = - new Flow(delegate.conflate(seed.apply, aggregate.apply)) + new Flow(delegate.conflate(seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9f9c476a2b..a5853eaa82 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -357,7 +357,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Source[S] = - new Source(delegate.conflate(seed.apply, aggregate.apply)) + new Source(delegate.conflate(seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 23cef6f6b5..616bb21645 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -306,7 +306,7 @@ trait FlowOps[+Out] { * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[S] = + def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) /**