diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index e98cb37913..cf5b834624 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -78,7 +78,7 @@ class HttpServerExampleSpec case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ println("Accepted new connection from " + remoteAddress) - Flow(requestProducer).map(requestHandler).produceTo(materializer, responseConsumer) + Flow(requestProducer).map(requestHandler).produceTo(responseConsumer, materializer) }.consume(materializer) } //#full-server-example diff --git a/akka-docs-dev/rst/scala/code/docs/http/ModelSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/ModelSpec.scala index 8d1f9e7ea7..9ff297119a 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/ModelSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/ModelSpec.scala @@ -10,7 +10,6 @@ import akka.util.ByteString import akka.http.model.headers.{ GenericHttpCredentials, BasicHttpCredentials } import org.scalatest.MustMatchers - class ModelSpec extends AkkaSpec { "construct request" in { //#construct-request diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala index 725d211e6c..5f45490dac 100644 --- a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala @@ -47,7 +47,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin .transform(responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) .transform(errorLogger(log, "Outgoing request stream error")) - .produceTo(materializer, tcpConn.outputStream) + .produceTo(tcpConn.outputStream, materializer) val responsePublisher = Flow(tcpConn.inputStream) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 185f895c80..d21da058ec 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -167,7 +167,7 @@ object HttpEntity { */ def apply(contentType: ContentType, chunks: Publisher[ByteString], materializer: FlowMaterializer): Chunked = Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] { - case b: ByteString if b.nonEmpty => Chunk(b) + case b: ByteString if b.nonEmpty ⇒ Chunk(b) }.toPublisher(materializer)) } diff --git a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala index 0ba396f560..c9c7dd0e15 100644 --- a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala @@ -59,7 +59,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, .transform(responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) .transform(errorLogger(log, "Outgoing response stream error")) - .produceTo(materializer, tcpConn.outputStream) + .produceTo(tcpConn.outputStream, materializer) Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) } diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 29004c10c3..912f544c14 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -48,7 +48,7 @@ public abstract class JavaTestServer { return JavaApiTestCases.handleRequest(request); } }) - .produceTo(materializer, conn.getResponseSubscriber()); + .produceTo(conn.getResponseSubscriber(), materializer); } }).consume(materializer); } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index a1f7064673..6dd9c76fc8 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -37,7 +37,7 @@ object TestClient extends App { } yield response.header[headers.Server] def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { - Flow(List(HttpRequest() -> 'NoContext)).produceTo(materializer, connection.processor) + Flow(List(HttpRequest() -> 'NoContext)).produceTo(connection.processor, materializer) Flow(connection.processor).map(_._1).toFuture(materializer) } diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index 205c8d0074..c050ddbb4c 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -39,7 +39,7 @@ object TestServer extends App { Flow(connectionStream).foreach { case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber) ⇒ println("Accepted new connection from " + remoteAddress) - Flow(requestPublisher).map(requestHandler).produceTo(materializer, responseSubscriber) + Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber, materializer) }.consume(materializer) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index ce2000bc56..8fa1f54dbc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -49,9 +49,9 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops } override def consume(materializer: FlowMaterializer): Unit = - produceTo(materializer, new BlackholeSubscriber(materializer.settings.maximumInputBufferSize)) + produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer) - override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = + override def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Unit = transform(new Transformer[O, Unit] { override def onNext(in: O) = Nil override def onError(e: Throwable) = { @@ -66,7 +66,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops override def toPublisher(materializer: FlowMaterializer): Publisher[O] = materializer.toPublisher(publisherNode, ops) - override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: O]): Unit = + override def produceTo(subscriber: Subscriber[_ >: O], materializer: FlowMaterializer): Unit = toPublisher(materializer).subscribe(subscriber.asInstanceOf[Subscriber[O]]) } @@ -86,13 +86,13 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] = copy(ops = duct.ops ++: ops) - override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out]): Subscriber[In] = + override def produceTo(subscriber: Subscriber[Out], materializer: FlowMaterializer): Subscriber[In] = materializer.ductProduceTo(subscriber, ops) override def consume(materializer: FlowMaterializer): Subscriber[In] = - produceTo(materializer, new BlackholeSubscriber(materializer.settings.maximumInputBufferSize)) + produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer) - override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Subscriber[In] = + override def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Subscriber[In] = transform(new Transformer[Out, Unit] { override def onNext(in: Out) = Nil override def onError(e: Throwable) = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 1e0768a1aa..ca7590b4f4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -287,7 +287,7 @@ abstract class Duct[In, Out] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out]): Subscriber[In] + def produceTo(subscriber: Subscriber[Out], materializer: FlowMaterializer): Subscriber[In] /** * Attaches a subscriber to this stream which will just discard all received @@ -309,7 +309,7 @@ abstract class Duct[In, Out] { * * *This operation materializes the flow and initiates its execution.* */ - def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Subscriber[In] + def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Subscriber[In] /** * Materialize this `Duct` into a `Subscriber` representing the input side of the `Duct` @@ -414,17 +414,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def append[U](duct: Duct[_ >: T, U]): Duct[In, U] = new DuctAdapter(delegate.appendJava(duct)) - override def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[T]): Subscriber[In] = - delegate.produceTo(materializer, subscriber) + override def produceTo(subscriber: Subscriber[T], materializer: FlowMaterializer): Subscriber[In] = + delegate.produceTo(subscriber, materializer) override def consume(materializer: FlowMaterializer): Subscriber[In] = delegate.consume(materializer) - override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Subscriber[In] = - delegate.onComplete(materializer) { + override def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Subscriber[In] = + delegate.onComplete({ + case Success(_) ⇒ callback.onComplete(null) case Failure(e) ⇒ callback.onComplete(e) - } + }, materializer) override def build(materializer: FlowMaterializer): Pair[Subscriber[In], Publisher[T]] = { val (in, out) = delegate.build(materializer) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1599d79ec..62a99231ab 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -366,7 +366,7 @@ abstract class Flow[T] { * * *This operation materializes the flow and initiates its execution.* */ - def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit + def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Unit /** * Materialize this flow and return the downstream-most @@ -388,7 +388,7 @@ abstract class Flow[T] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: T]): Unit + def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit } @@ -487,16 +487,16 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def consume(materializer: FlowMaterializer): Unit = delegate.consume(materializer) - override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit = - delegate.onComplete(materializer) { + override def onComplete(callback: OnCompleteCallback, materializer: FlowMaterializer): Unit = + delegate.onComplete({ case Success(_) ⇒ callback.onComplete(null) case Failure(e) ⇒ callback.onComplete(e) - } + }, materializer) override def toPublisher(materializer: FlowMaterializer): Publisher[T] = delegate.toPublisher(materializer) - override def produceTo(materializer: FlowMaterializer, subsriber: Subscriber[_ >: T]): Unit = - delegate.produceTo(materializer, subsriber) + override def produceTo(subsriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit = + delegate.produceTo(subsriber, materializer) } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 6315c1095a..04a493155a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -281,7 +281,7 @@ trait Duct[In, +Out] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[Out] @uncheckedVariance): Subscriber[In] + def produceTo(subscriber: Subscriber[Out] @uncheckedVariance, materializer: FlowMaterializer): Subscriber[In] /** * Attaches a subscriber to this stream which will just discard all received @@ -303,7 +303,7 @@ trait Duct[In, +Out] { * * *This operation materializes the flow and initiates its execution.* */ - def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Subscriber[In] + def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Subscriber[In] /** * Materialize this `Duct` into a `Subscriber` representing the input side of the `Duct` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0bdee671ac..eb880fa132 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -368,7 +368,7 @@ trait Flow[+T] { * * *This operation materializes the flow and initiates its execution.* */ - def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit + def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Unit /** * Materialize this flow and return the downstream-most @@ -390,7 +390,7 @@ trait Flow[+T] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(materializer: FlowMaterializer, subscriber: Subscriber[_ >: T]): Unit + def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit } diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index fba9608405..6896e9d22d 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -96,7 +96,7 @@ public class DuctTest { } }).consume(materializer); - Subscriber inSubscriber = Duct.create(String.class).produceTo(materializer, subscriber); + Subscriber inSubscriber = Duct.create(String.class).produceTo(subscriber, materializer); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); @@ -160,7 +160,7 @@ public class DuctTest { } }).consume(materializer); - Flow.create(Arrays.asList(1, 2, 3)).produceTo(materializer, ductInSubscriber); + Flow.create(Arrays.asList(1, 2, 3)).produceTo(ductInSubscriber, materializer); probe.expectMsgEquals("elem-12"); probe.expectMsgEquals("elem-14"); @@ -175,7 +175,7 @@ public class DuctTest { public String apply(Integer elem) { return elem.toString(); } - }).onComplete(materializer, new OnCompleteCallback() { + }).onComplete(new OnCompleteCallback() { @Override public void onComplete(Throwable e) { if (e == null) @@ -183,7 +183,7 @@ public class DuctTest { else probe.getRef().tell(e, ActorRef.noSender()); } - }); + }, materializer); Publisher publisher = Flow.create(Arrays.asList(1, 2, 3)).toPublisher(materializer); publisher.subscribe(inSubscriber); @@ -204,7 +204,7 @@ public class DuctTest { }).consume(materializer); final java.lang.Iterable input = Arrays.asList("a", "b", "c"); - Flow.create(input).produceTo(materializer, c); + Flow.create(input).produceTo(c, materializer); probe.expectMsgEquals("A"); probe.expectMsgEquals("B"); probe.expectMsgEquals("C"); diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 1590fe453b..fee3e6c279 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -355,15 +355,12 @@ public class FlowTest { public void mustBeAbleToUseOnCompleteSuccess() { final JavaTestKit probe = new JavaTestKit(system); final java.lang.Iterable input = Arrays.asList("A", "B", "C"); - Flow.create(input).onComplete(materializer, new OnCompleteCallback() { + Flow.create(input).onComplete(new OnCompleteCallback() { @Override public void onComplete(Throwable e) { - if (e == null) - probe.getRef().tell("done", ActorRef.noSender()); - else - probe.getRef().tell(e, ActorRef.noSender()); + probe.getRef().tell( (e == null) ? "done" : e, ActorRef.noSender()); } - }); + }, materializer); probe.expectMsgEquals("done"); } @@ -376,7 +373,7 @@ public class FlowTest { public String apply(String arg0) throws Exception { throw new RuntimeException("simulated err"); } - }).onComplete(materializer, new OnCompleteCallback() { + }).onComplete(new OnCompleteCallback() { @Override public void onComplete(Throwable e) { if (e == null) @@ -384,7 +381,7 @@ public class FlowTest { else probe.getRef().tell(e.getMessage(), ActorRef.noSender()); } - }); + }, materializer); probe.expectMsgEquals("simulated err"); } diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 027c7145bd..2aeb413d49 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -78,7 +78,7 @@ class DuctSpec extends AkkaSpec { "subscribe Subscriber" in { val duct: Duct[String, String] = Duct[String] val c1 = StreamTestKit.SubscriberProbe[String]() - val c2: Subscriber[String] = duct.produceTo(materializer, c1) + val c2: Subscriber[String] = duct.produceTo(c1, materializer) val source: Publisher[String] = Flow(List("1", "2", "3")).toPublisher(materializer) source.subscribe(c2) @@ -117,7 +117,7 @@ class DuctSpec extends AkkaSpec { "perform transformation operation and subscribe Subscriber" in { val duct = Duct[Int].map(_.toString) val c1 = StreamTestKit.SubscriberProbe[String]() - val c2: Subscriber[Int] = duct.produceTo(materializer, c1) + val c2: Subscriber[Int] = duct.produceTo(c1, materializer) val sub1 = c1.expectSubscription sub1.request(3) @@ -135,7 +135,7 @@ class DuctSpec extends AkkaSpec { "perform multiple transformation operations and subscribe Subscriber" in { val duct = Duct[Int].map(_.toString).map("elem-" + _) val c1 = StreamTestKit.SubscriberProbe[String]() - val c2 = duct.produceTo(materializer, c1) + val c2 = duct.produceTo(c1, materializer) val sub1 = c1.expectSubscription sub1.request(3) @@ -152,10 +152,10 @@ class DuctSpec extends AkkaSpec { "call onComplete callback when done" in { val duct = Duct[Int].map(i ⇒ { testActor ! i.toString; i.toString }) - val c = duct.onComplete(materializer) { + val c = duct.onComplete({ case Success(_) ⇒ testActor ! "DONE" case Failure(e) ⇒ testActor ! e - } + }, materializer) val source = Flow(List(1, 2, 3)).toPublisher(materializer) source.subscribe(c) @@ -169,7 +169,7 @@ class DuctSpec extends AkkaSpec { "be appendable to a Flow" in { val c = StreamTestKit.SubscriberProbe[String]() val duct = Duct[Int].map(_ + 10).map(_.toString) - Flow(List(1, 2, 3)).map(_ * 2).append(duct).map((s: String) ⇒ "elem-" + s).produceTo(materializer, c) + Flow(List(1, 2, 3)).map(_ * 2).append(duct).map((s: String) ⇒ "elem-" + s).produceTo(c, materializer) val sub = c.expectSubscription sub.request(3) @@ -186,9 +186,9 @@ class DuctSpec extends AkkaSpec { .map { i ⇒ (i * 2).toString } .append(duct1) .map { i ⇒ "elem-" + (i + 10) } - .produceTo(materializer, c) + .produceTo(c, materializer) - Flow(List(1, 2, 3)).produceTo(materializer, ductInSubscriber) + Flow(List(1, 2, 3)).produceTo(ductInSubscriber, materializer) val sub = c.expectSubscription sub.request(3) diff --git a/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala index d35e4138e5..a9a63961e7 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala @@ -47,7 +47,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(materializer, subscriber) + Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -67,7 +67,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(materializer, subscriber) + Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -95,7 +95,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(materializer, subscriber) + Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -126,7 +126,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(materializer, subscriber) + Flow(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -157,7 +157,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).buffer(1, overflowStrategy = strategy).produceTo(materializer, subscriber) + Flow(publisher).buffer(1, overflowStrategy = strategy).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala index 0314c31f8d..20298158bf 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala @@ -44,7 +44,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the current open substream and signal error" in { val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber) + Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -63,7 +63,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on open substream, cancel the master stream and signal error " in { val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber) + Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -82,7 +82,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the current open substream and the master stream" in { val publisher = StreamTestKit.PublisherProbe[Publisher[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).flatten(FlattenStrategy.concat).produceTo(m, subscriber) + Flow(publisher).flatten(FlattenStrategy.concat).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index e36a743778..056351b04e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -95,7 +95,7 @@ class FlowConcatSpec extends TwoStreamsSetup { val promise = Promise[Int]() val flow = Flow(List(1, 2, 3)).concat(Flow(promise.future).toPublisher(materializer)) val subscriber = StreamTestKit.SubscriberProbe[Int]() - flow.produceTo(materializer, subscriber) + flow.produceTo(subscriber, materializer) val subscription = subscriber.expectSubscription() subscription.request(4) subscriber.expectNext(1) diff --git a/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala index 07ec36037b..5475013c7c 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala @@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, subscriber) + Flow(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, subscriber) + Flow(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -67,12 +67,12 @@ class FlowConflateSpec extends AkkaSpec { } "backpressure subscriber when upstream is slower" in { - val oublisher = StreamTestKit.PublisherProbe[Int]() + val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(oublisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, subscriber) + Flow(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(subscriber, materializer) - val autoPublisher = new StreamTestKit.AutoPublisher(oublisher) + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() sub.request(1) diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala index b859a905a6..fe01dc529b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala @@ -20,7 +20,7 @@ class FlowDropWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Flow(p).dropWithin(1.second).produceTo(materializer, c) + Flow(p).dropWithin(1.second).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) diff --git a/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala index 7ff525bcfa..0c8f51d0de 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala @@ -25,7 +25,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, subscriber) + Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -45,7 +45,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, subscriber) + Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -78,7 +78,7 @@ class FlowExpandSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, subscriber) + Flow(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(subscriber, materializer) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala index 22ec2ad136..97c3d27e8f 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala @@ -23,7 +23,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c) + Flow(p).groupedWithin(1000, 1.second).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) @@ -48,7 +48,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(materializer, c) + Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(c, materializer) val cSub = c.expectSubscription cSub.request(100) c.expectNext((1 to 3).toList) @@ -60,7 +60,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c) + Flow(p).groupedWithin(1000, 1.second).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(1) @@ -80,7 +80,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "drop empty groups" in { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Flow(p).groupedWithin(1000, 500.millis).produceTo(materializer, c) + Flow(p).groupedWithin(1000, 500.millis).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(2) @@ -102,7 +102,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Flow(p).groupedWithin(3, 2.second).produceTo(materializer, c) + Flow(p).groupedWithin(3, 2.second).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(4) diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala index bcf4736da7..e954c356bb 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala @@ -25,7 +25,7 @@ class FlowMapFutureSpec extends AkkaSpec { "produce future elements" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Flow(1 to 3).mapFuture(n ⇒ Future(n)).produceTo(materializer, c) + val p = Flow(1 to 3).mapFuture(n ⇒ Future(n)).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(2) c.expectNext(1) @@ -42,7 +42,7 @@ class FlowMapFutureSpec extends AkkaSpec { val p = Flow(1 to 50).mapFuture(n ⇒ Future { Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) n - }).produceTo(materializer, c) + }).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(1000) for (n ← 1 to 50) c.expectNext(n) @@ -56,7 +56,7 @@ class FlowMapFutureSpec extends AkkaSpec { val p = Flow(1 to 20).mapFuture(n ⇒ Future { probe.ref ! n n - }).produceTo(materializer, c) + }).produceTo(c, materializer) val sub = c.expectSubscription() // nothing before requested probe.expectNoMsg(500.millis) @@ -84,7 +84,7 @@ class FlowMapFutureSpec extends AkkaSpec { Await.ready(latch, 10.seconds) n } - }).produceTo(materializer, c) + }).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err1") @@ -103,7 +103,7 @@ class FlowMapFutureSpec extends AkkaSpec { n } }). - produceTo(materializer, c) + produceTo(c, materializer) val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err2") diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 6e7d17a8a6..e18a3f6c88 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -31,7 +31,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback on normal completion" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } + Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) @@ -43,7 +43,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } + Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer) val proc = p.expectSubscription proc.expectRequest() val ex = new RuntimeException("ex") with NoStackTrace @@ -55,7 +55,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback for an empty stream" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } + Flow(p).onComplete({ onCompleteProbe.ref ! _ }, materializer) val proc = p.expectSubscription proc.expectRequest() proc.sendComplete() @@ -71,7 +71,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { x }.foreach { x ⇒ onCompleteProbe.ref ! ("foreach-" + x) - }.onComplete(materializer) { onCompleteProbe.ref ! _ } + }.onComplete({ onCompleteProbe.ref ! _ }, materializer) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) diff --git a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala index 8afd5ae4c8..9ce4fcfc03 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala @@ -49,7 +49,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(10).toFuture(m), 3.seconds) takes should be(1 to 10) val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(tail).produceTo(m, subscriber) + Flow(tail).produceTo(subscriber, m) subscriber.expectCompletedOrSubscriptionFollowedByComplete() } @@ -57,7 +57,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]() - Flow(publisher).prefixAndTail(3).produceTo(m, subscriber) + Flow(publisher).prefixAndTail(3).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -75,7 +75,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]() - Flow(publisher).prefixAndTail(1).produceTo(m, subscriber) + Flow(publisher).prefixAndTail(1).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -90,7 +90,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(tail).produceTo(m, substreamSubscriber) + Flow(tail).produceTo(substreamSubscriber, m) substreamSubscriber.expectSubscription() upstream.sendError(testException) @@ -102,7 +102,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]() - Flow(publisher).prefixAndTail(3).produceTo(m, subscriber) + Flow(publisher).prefixAndTail(3).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -120,7 +120,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(Seq[Int], Publisher[Int])]() - Flow(publisher).prefixAndTail(1).produceTo(m, subscriber) + Flow(publisher).prefixAndTail(1).produceTo(subscriber, m) val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -135,7 +135,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(tail).produceTo(m, substreamSubscriber) + Flow(tail).produceTo(substreamSubscriber, m) substreamSubscriber.expectSubscription().cancel() upstream.expectCancellation() diff --git a/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala index c4b92316f0..012022e213 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowProduceToSubscriberSpec.scala @@ -15,7 +15,7 @@ class FlowProduceToSubscriberSpec extends AkkaSpec { "produce elements to the subscriber" in { val c = StreamTestKit.SubscriberProbe[Int]() - Flow(List(1, 2, 3)).produceTo(materializer, c) + Flow(List(1, 2, 3)).produceTo(c, materializer) val s = c.expectSubscription() s.request(3) c.expectNext(1) diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala index c1a8d80090..b423fb2777 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala @@ -20,7 +20,7 @@ class FlowTakeWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Flow(p).takeWithin(1.second).produceTo(materializer, c) + Flow(p).takeWithin(1.second).produceTo(c, materializer) val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) @@ -40,7 +40,7 @@ class FlowTakeWithinSpec extends AkkaSpec { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[Int]() - Flow(1 to 3).takeWithin(1.second).produceTo(materializer, c) + Flow(1 to 3).takeWithin(1.second).produceTo(c, materializer) val cSub = c.expectSubscription c.expectNoMsg(200.millis) cSub.request(100) diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index aa864632ef..3266ca7acf 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -359,7 +359,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case _ ⇒ Nil } } - }).produceTo(materializer, subscriber) + }).produceTo(subscriber, materializer) val subscription = subscriber.expectSubscription() subscription.request(10) diff --git a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala index a9255f4006..3c75b02661 100644 --- a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala @@ -19,7 +19,7 @@ class TickPublisherSpec extends AkkaSpec { "produce ticks" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(materializer, c) + Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(3) c.expectNext("tick-1") @@ -34,7 +34,7 @@ class TickPublisherSpec extends AkkaSpec { "drop ticks when not requested" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(materializer, c) + Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(2) c.expectNext("tick-1") @@ -75,7 +75,7 @@ class TickPublisherSpec extends AkkaSpec { "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).produceTo(materializer, c) + Flow(1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") @@ -84,7 +84,7 @@ class TickPublisherSpec extends AkkaSpec { "be usable with zip for a simple form of rate limiting" in { val c = StreamTestKit.SubscriberProbe[Int]() val rate = Flow(1.second, () ⇒ "tick").toPublisher(materializer) - Flow(1 to 100).zip(rate).map { case (n, _) ⇒ n }.produceTo(materializer, c) + Flow(1 to 100).zip(rate).map { case (n, _) ⇒ n }.produceTo(c, materializer) val sub = c.expectSubscription() sub.request(1000) c.expectNext(1) diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index da3bfac65c..ce381b1071 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -232,7 +232,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val rcv = system.actorOf(receiverProps(probe.ref)) Flow(ActorPublisher[Int](snd)).collect { case n if n % 2 == 0 ⇒ "elem-" + n - }.produceTo(materializer, ActorSubscriber(rcv)) + }.produceTo(ActorSubscriber(rcv), materializer) (1 to 3) foreach { snd ! _ } probe.expectMsg("elem-2") diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 602f0e4914..d5803cb223 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -106,7 +106,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "receive requested elements" in { val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(List(1, 2, 3)).produceTo(materializer, ActorSubscriber(ref)) + Flow(List(1, 2, 3)).produceTo(ActorSubscriber(ref), materializer) expectNoMsg(200.millis) ref ! "ready" // requesting 2 expectMsg(OnNext(1)) @@ -120,14 +120,14 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "signal error" in { val ref = system.actorOf(manualSubscriberProps(testActor)) val e = new RuntimeException("simulated") with NoStackTrace - Flow(() ⇒ throw e).produceTo(materializer, ActorSubscriber(ref)) + Flow(() ⇒ throw e).produceTo(ActorSubscriber(ref), materializer) ref ! "ready" expectMsg(OnError(e)) } "remember requested after restart" in { val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(1 to 7).produceTo(materializer, ActorSubscriber(ref)) + Flow(1 to 7).produceTo(ActorSubscriber(ref), materializer) ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) @@ -145,7 +145,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "not deliver more after cancel" in { val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(1 to 5).produceTo(materializer, ActorSubscriber(ref)) + Flow(1 to 5).produceTo(ActorSubscriber(ref), materializer) ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) @@ -155,14 +155,14 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "work with OneByOneRequestStrategy" in { val ref = system.actorOf(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)) - Flow(1 to 17).produceTo(materializer, ActorSubscriber(ref)) + Flow(1 to 17).produceTo(ActorSubscriber(ref), materializer) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } "work with WatermarkRequestStrategy" in { val ref = system.actorOf(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))) - Flow(1 to 17).produceTo(materializer, ActorSubscriber(ref)) + Flow(1 to 17).produceTo(ActorSubscriber(ref), materializer) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } @@ -170,7 +170,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "suport custom max in flight request strategy with child workers" in { val ref = system.actorOf(streamerProps) val N = 117 - Flow(1 to N).map(Msg(_, testActor)).produceTo(materializer, ActorSubscriber(ref)) + Flow(1 to N).map(Msg(_, testActor)).produceTo(ActorSubscriber(ref), materializer) receiveN(N).toSet should be((1 to N).map(Done(_)).toSet) } diff --git a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index 1f1ce10216..0067b42b15 100644 --- a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -83,7 +83,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { val duct: Duct[Int, Long] = Duct[Int].map(_.toLong).timedIntervalBetween(in ⇒ in % 2 == 1, d ⇒ probe.ref ! d) val c1 = StreamTestKit.SubscriberProbe[Long]() - val c2: Subscriber[Int] = duct.produceTo(materializer, c1) + val c2: Subscriber[Int] = duct.produceTo(c1, materializer) val p = Flow(List(1, 2, 3)).toPublisher(materializer) p.subscribe(c2)