diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index 5678fcbc82..471698934d 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -76,7 +76,7 @@ class HttpServerExampleSpec case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ println("Accepted new connection from " + remoteAddress) - Source(requestProducer).map(requestHandler).connect(Sink(responseConsumer)).run() + Source(requestProducer).map(requestHandler).to(Sink(responseConsumer)).run() }) } //#full-server-example diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala index 05f01f2980..f0edc41315 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala @@ -35,8 +35,8 @@ private object RenderSupport { // materializes private case class CancelSecond[T](first: Source[T], second: Source[T]) extends SimpleActorFlowSource[T] { override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { - first.connect(Sink(flowSubscriber)).run()(materializer) - second.connect(Sink.cancelled).run()(materializer) + first.to(Sink(flowSubscriber)).run()(materializer) + second.to(Sink.cancelled).run()(materializer) } } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 44150d4fc2..4df0849114 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -39,7 +39,7 @@ object TestClient extends App { def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { Source(List(HttpRequest() -> 'NoContext)) - .connect(Sink(connection.requestSubscriber)) + .to(Sink(connection.requestSubscriber)) .run() Source(connection.responsePublisher).map(_._1).runWith(Sink.future) } diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index da15eaa9f2..29ed192a07 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -39,7 +39,7 @@ object TestServer extends App { Source(connectionStream).foreach { case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber) ⇒ println("Accepted new connection from " + remoteAddress) - Source(requestPublisher).map(requestHandler).connect(Sink(responseSubscriber)).run() + Source(requestPublisher).map(requestHandler).to(Sink(responseSubscriber)).run() } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala index 8183ed3b7c..2b4be61c2b 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala @@ -40,7 +40,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNoMsg(50.millis) @@ -76,7 +76,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -112,7 +112,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -134,7 +134,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -182,7 +182,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -218,7 +218,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -254,7 +254,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -276,7 +276,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) @@ -298,7 +298,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ByteString] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(ByteString("abcdef")) @@ -320,7 +320,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA inside(expectRequest) { case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] - data.connect(Sink(dataProbe)).run() + data.to(Sink(dataProbe)).run() val sub = dataProbe.expectSubscription() sub.request(10) dataProbe.expectNext(Chunk(ByteString("abcdef"))) diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala index 5f63dfac63..5c2924fa86 100644 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala @@ -56,7 +56,7 @@ trait ScalaRoutingDSL extends Directives { val runner = f(setup) Source(requestProducer) .mapAsync(request ⇒ runner(request)) - .connect(Sink(responseConsumer)).run()(fm) + .to(Sink(responseConsumer)).run()(fm) } } } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala index 660aabd86c..f5e5cc5cc6 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -20,7 +20,7 @@ class ChainSetup[In, Out]( val upstream = StreamTestKit.PublisherProbe[In]() val downstream = StreamTestKit.SubscriberProbe[Out]() - private val s = Source(upstream).connect(stream(Flow[In])) + private val s = Source(upstream).via(stream(Flow[In])) val publisher = toPublisher(s, materializer) val upstreamSubscription = upstream.expectSubscription() publisher.subscribe(downstream) diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index c15c3e9dab..4b279728c5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -245,7 +245,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val mat = source.collect { case n if n % 2 == 0 ⇒ "elem-" + n - }.connect(sink).run() + }.to(sink).run() val snd = mat.get(source) val rcv = mat.get(sink) diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 7c2c6fd0b2..8e455f7bf4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -122,7 +122,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "remember requested after restart" in { // creating actor with default supervision, because stream supervisor default strategy is to stop val ref = system.actorOf(manualSubscriberProps(testActor)) - Source(1 to 7).connect(Sink(ActorSubscriber[Int](ref))).run() + Source(1 to 7).to(Sink(ActorSubscriber[Int](ref))).run() ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index 24171808cc..31deddd88a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -79,7 +79,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { val flow: Flow[Int, Long] = Flow[Int].map(_.toLong).timedIntervalBetween(in ⇒ in % 2 == 1, d ⇒ probe.ref ! d) val c1 = StreamTestKit.SubscriberProbe[Long]() - Source(List(1, 2, 3)).connect(flow).connect(Sink(c1)).run() + Source(List(1, 2, 3)).via(flow).to(Sink(c1)).run() val s = c1.expectSubscription() s.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala index 67d45907c5..b606c3199a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala @@ -120,7 +120,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = val ssession = Await.result(ssessionf, duration) val sdata = ssession.data Source(sdata).map(bs ⇒ ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')). - connect(Sink(scipher.plainTextOutbound)).run() + to(Sink(scipher.plainTextOutbound)).run() } def replyFirstLineInUpperCase(clientConnection: JavaSslConnection): Unit = { @@ -129,7 +129,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = { val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future) - Source(List(ByteString(message + '\n'))).connect(Sink(ccipher.plainTextOutbound)).run() + Source(List(ByteString(message + '\n'))).to(Sink(ccipher.plainTextOutbound)).run() val csession = Await.result(csessionf, duration) val cdata = csession.data Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 5689f25bd6..1457f0a552 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -48,7 +48,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) serverConnection.read(256) - Source(tcpProcessor).connect(Sink.ignore).run() + Source(tcpProcessor).to(Sink.ignore).run() Source(testInput).runWith(Sink.publisher).subscribe(tcpProcessor) serverConnection.waitRead() should be(expectedOutput) @@ -158,7 +158,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).connect(Sink(conn.outputStream)).run() + Source(testInput).to(Sink(conn.outputStream)).run() val resultFuture = Source(conn.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 3.seconds) should be(expectedOutput) @@ -178,7 +178,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).connect(Sink(conn1.outputStream)).run() + Source(testInput).to(Sink(conn1.outputStream)).run() conn1.inputStream.subscribe(conn2.outputStream) conn2.inputStream.subscribe(conn3.outputStream) val resultFuture = Source(conn3.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala index cb60015d0d..04c3337e06 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala @@ -48,9 +48,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) serverConnection.read(256) - Source(tcpPublisher).connect(Sink.ignore).run() + Source(tcpPublisher).to(Sink.ignore).run() - Source(testInput).connect(Sink(tcpSubscriber)).run() + Source(testInput).to(Sink(tcpSubscriber)).run() serverConnection.waitRead() should be(expectedOutput) server.close() @@ -162,7 +162,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).connect(Sink(tcpSubscriber)).run() + Source(testInput).to(Sink(tcpSubscriber)).run() val resultFuture = Source(tcpPublisher).fold(ByteString.empty) { case (res, elem) ⇒ res ++ elem } Await.result(resultFuture, 3.seconds) should be(expectedOutput) @@ -181,7 +181,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).connect(Sink(tcpSubscriber1)).run() + Source(testInput).to(Sink(tcpSubscriber1)).run() tcpPublisher1.subscribe(tcpSubscriber2) tcpPublisher2.subscribe(tcpSubscriber3) val resultFuture = Source(tcpPublisher3).fold(ByteString.empty) { case (res, elem) ⇒ res ++ elem } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala index 1e3e2f74e5..9d2de18906 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala @@ -223,9 +223,9 @@ trait TcpHelper { this: TestKitBase ⇒ def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = { val foreachSink = Sink.foreach[IncomingTcpConnection] { conn ⇒ - conn.inbound.connect(conn.outbound).run() + conn.inbound.to(conn.outbound).run() } - val binding = bind(Flow[IncomingTcpConnection].connect(foreachSink), serverAddress) + val binding = bind(Flow[IncomingTcpConnection].to(foreachSink), serverAddress) new EchoServer(binding.connection.get(foreachSink), binding) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala index 466ea64dcf..fa3a203f24 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala @@ -17,26 +17,26 @@ class FlowAppendSpec extends AkkaSpec with River { "Flow" should { "append Flow" in riverOf[String] { subscriber ⇒ - val flow = Flow[Int].connect(otherFlow) - Source(elements).connect(flow).connect(Sink(subscriber)).run() + val flow = Flow[Int].via(otherFlow) + Source(elements).via(flow).to(Sink(subscriber)).run() } "append Sink" in riverOf[String] { subscriber ⇒ - val sink = Flow[Int].connect(otherFlow.connect(Sink(subscriber))) - Source(elements).connect(sink).run() + val sink = Flow[Int].to(otherFlow.to(Sink(subscriber))) + Source(elements).to(sink).run() } } "Source" should { "append Flow" in riverOf[String] { subscriber ⇒ Source(elements) - .connect(otherFlow) - .connect(Sink(subscriber)).run() + .via(otherFlow) + .to(Sink(subscriber)).run() } "append Sink" in riverOf[String] { subscriber ⇒ Source(elements) - .connect(otherFlow.connect(Sink(subscriber))) + .to(otherFlow.to(Sink(subscriber))) .run() } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 6ae529673c..3b28496fe0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -53,7 +53,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).connect(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -73,7 +73,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).connect(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -101,7 +101,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).connect(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -132,7 +132,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).connect(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -160,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).connect(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -189,7 +189,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(1, overflowStrategy = strategy).connect(Sink(subscriber)).run() + Source(publisher).buffer(1, overflowStrategy = strategy).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index 53659a40f7..dd090209fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -25,49 +25,49 @@ class FlowCompileSpec extends AkkaSpec { "open.run()" shouldNot compile } "accept Iterable" in { - val f: Source[Int] = intSeq.connect(Flow[Int]) + val f: Source[Int] = intSeq.via(Flow[Int]) } "accept Future" in { - val f: Source[Int] = intFut.connect(Flow[Int]) + val f: Source[Int] = intFut.via(Flow[Int]) } "append Flow" in { val open1: Flow[Int, String] = Flow[Int].map(_.toString) val open2: Flow[String, Int] = Flow[String].map(_.hashCode) - val open3: Flow[Int, Int] = open1.connect(open2) + val open3: Flow[Int, Int] = open1.via(open2) "open3.run()" shouldNot compile - val closedSource: Source[Int] = intSeq.connect(open3) + val closedSource: Source[Int] = intSeq.via(open3) "closedSource.run()" shouldNot compile - val closedSink: Sink[Int] = open3.connect(Sink.publisher[Int]) + val closedSink: Sink[Int] = open3.to(Sink.publisher[Int]) "closedSink.run()" shouldNot compile - closedSource.connect(Sink.publisher[Int]).run() - intSeq.connect(closedSink).run() + closedSource.to(Sink.publisher[Int]).run() + intSeq.to(closedSink).run() } "append Sink" in { val open: Flow[Int, String] = Flow[Int].map(_.toString) - val closedSink: Sink[String] = Flow[String].map(_.hashCode).connect(Sink.publisher[Int]) - val appended: Sink[Int] = open.connect(closedSink) + val closedSink: Sink[String] = Flow[String].map(_.hashCode).to(Sink.publisher[Int]) + val appended: Sink[Int] = open.to(closedSink) "appended.run()" shouldNot compile "appended.connect(Sink.future[Int])" shouldNot compile - intSeq.connect(appended).run + intSeq.to(appended).run } "be appended to Source" in { val open: Flow[Int, String] = Flow[Int].map(_.toString) - val closedSource: Source[Int] = strSeq.connect(Flow[String].map(_.hashCode)) - val closedSource2: Source[String] = closedSource.connect(open) + val closedSource: Source[Int] = strSeq.via(Flow[String].map(_.hashCode)) + val closedSource2: Source[String] = closedSource.via(open) "closedSource2.run()" shouldNot compile "strSeq.connect(closedSource2)" shouldNot compile - closedSource2.connect(Sink.publisher[String]).run + closedSource2.to(Sink.publisher[String]).run } } "Sink" should { val openSource: Sink[Int] = - Flow[Int].map(_.toString).connect(Sink.publisher[String]) + Flow[Int].map(_.toString).to(Sink.publisher[String]) "accept Source" in { - intSeq.connect(openSource) + intSeq.to(openSource) } "not accept Sink" in { "openSource.connect(Sink.future[String])" shouldNot compile @@ -81,7 +81,7 @@ class FlowCompileSpec extends AkkaSpec { val openSource: Source[String] = Source(Seq(1, 2, 3)).map(_.toString) "accept Sink" in { - openSource.connect(Sink.publisher[String]) + openSource.to(Sink.publisher[String]) } "not be accepted by Source" in { "openSource.connect(intSeq)" shouldNot compile @@ -94,7 +94,7 @@ class FlowCompileSpec extends AkkaSpec { "RunnableFlow" should { Sink.future[String] val closed: RunnableFlow = - Source(Seq(1, 2, 3)).map(_.toString).connect(Sink.publisher[String]) + Source(Seq(1, 2, 3)).map(_.toString).to(Sink.publisher[String]) "run" in { closed.run() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index 4b6a853828..a09ca25716 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -32,7 +32,7 @@ class FlowConcatAllSpec extends AkkaSpec { val main = Source(List(s1, s2, s3, s4, s5)) val subscriber = StreamTestKit.SubscriberProbe[Int]() - main.flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() + main.flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) @@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() + Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) @@ -53,7 +53,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the current open substream and signal error" in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() + Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -73,7 +73,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on open substream, cancel the master stream and signal error " in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() + Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -93,7 +93,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the current open substream and the master stream" in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() + Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index ffad9cfb0a..d159ed5c5b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -68,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala index f3b5a96399..7945ca52ae 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala @@ -17,7 +17,7 @@ class FlowDispatcherSpec extends AkkaSpec { val probe = TestProbe() val p = Source(List(1, 2, 3)).map(i ⇒ { probe.ref ! Thread.currentThread().getName(); i }). - connect(Sink.ignore).run() + to(Sink.ignore).run() probe.receiveN(3) foreach { case s: String ⇒ s should startWith(system.name + "-akka.test.stream-dispatcher") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropSpec.scala index 06f0723b26..3661954d9a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropSpec.scala @@ -31,7 +31,7 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest { "not drop anything for negative n" in { val probe = StreamTestKit.SubscriberProbe[Int]() - Source(List(1, 2, 3)).drop(-1).connect(Sink(probe)).run() + Source(List(1, 2, 3)).drop(-1).to(Sink(probe)).run() probe.expectSubscription().request(10) probe.expectNext(1) probe.expectNext(2) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala index cfc6b0e7e3..14ce63aa3c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala @@ -20,7 +20,7 @@ class FlowDropWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Source(p).dropWithin(1.second).connect(Sink(c)).run() + Source(p).dropWithin(1.second).to(Sink(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index a6f1d9b3ac..690f51ccc6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -27,7 +27,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -47,7 +47,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala index b43b4cd33e..8109e8b664 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala @@ -31,7 +31,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { val probe = StreamTestKit.SubscriberProbe[Int]() Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). - connect(Sink(probe)).run() + to(Sink(probe)).run() val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 953de8a882..e868b74f45 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -222,15 +222,15 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() FlowGraph(partial2) { b ⇒ - b.attachSink(undefinedSink1, f1.connect(out1)) - b.attachSink(UndefinedSink[String]("sink2"), f2.connect(out2)) + b.attachSink(undefinedSink1, f1.to(out1)) + b.attachSink(UndefinedSink[String]("sink2"), f2.to(out2)) }.run() FlowGraph(partial1) { implicit b ⇒ import FlowGraphImplicits._ - b.attachSink(undefinedSink1, f1.connect(out1)) - b.attachSource(undefinedSource1, Source(List("a", "b", "c")).connect(f1)) - b.attachSource(undefinedSource2, Source(List("d", "e", "f")).connect(f2)) + b.attachSink(undefinedSink1, f1.to(out1)) + b.attachSource(undefinedSource1, Source(List("a", "b", "c")).via(f1)) + b.attachSource(undefinedSource2, Source(List("d", "e", "f")).via(f2)) bcast ~> f5 ~> out2 }.run() } @@ -363,10 +363,10 @@ class FlowGraphCompileSpec extends AkkaSpec { b.addEdge(in1, f1, out1) }.run() FlowGraph { b ⇒ - b.addEdge(in1, f1, f2.connect(out1)) + b.addEdge(in1, f1, f2.to(out1)) }.run() FlowGraph { b ⇒ - b.addEdge(in1.connect(f1), f2, out1) + b.addEdge(in1.via(f1), f2, out1) }.run() FlowGraph { implicit b ⇒ import FlowGraphImplicits._ @@ -378,18 +378,81 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() FlowGraph { implicit b ⇒ import FlowGraphImplicits._ - in1 ~> f1.connect(out1) + in1 ~> f1.to(out1) }.run() FlowGraph { implicit b ⇒ import FlowGraphImplicits._ - in1.connect(f1) ~> out1 + in1.via(f1) ~> out1 }.run() FlowGraph { implicit b ⇒ import FlowGraphImplicits._ - in1.connect(f1) ~> f2.connect(out1) + in1.via(f1) ~> f2.to(out1) }.run() } + "build all combinations with implicits" when { + + "Source is connected directly" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + Source.empty[Int] ~> Flow[Int] + Source.empty[Int] ~> Broadcast[Int] + Source.empty[Int] ~> Sink.ignore + Source.empty[Int] ~> UndefinedSink[Int] + } + } + + "Source is connected through flow" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + Source.empty[Int] ~> Flow[Int] ~> Flow[Int] + Source.empty[Int] ~> Flow[Int] ~> Broadcast[Int] + Source.empty[Int] ~> Flow[Int] ~> Sink.ignore + Source.empty[Int] ~> Flow[Int] ~> UndefinedSink[Int] + } + } + + "Junction is connected directly" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + Broadcast[Int] ~> Flow[Int] + Broadcast[Int] ~> Broadcast[Int] + Broadcast[Int] ~> Sink.ignore + Broadcast[Int] ~> UndefinedSink[Int] + } + } + + "Junction is connected through flow" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + Broadcast[Int] ~> Flow[Int] ~> Flow[Int] + Broadcast[Int] ~> Flow[Int] ~> Broadcast[Int] + Broadcast[Int] ~> Flow[Int] ~> Sink.ignore + Broadcast[Int] ~> Flow[Int] ~> UndefinedSink[Int] + } + } + + "UndefinedSource is connected directly" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + UndefinedSource[Int] ~> Flow[Int] + UndefinedSource[Int] ~> Broadcast[Int] + UndefinedSource[Int] ~> Sink.ignore + UndefinedSource[Int] ~> UndefinedSink[Int] + } + } + + "UndefinedSource is connected through flow" in { + PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + UndefinedSource[Int] ~> Flow[Int] ~> Flow[Int] + UndefinedSource[Int] ~> Flow[Int] ~> Broadcast[Int] + UndefinedSource[Int] ~> Flow[Int] ~> Sink.ignore + UndefinedSource[Int] ~> Flow[Int] ~> UndefinedSink[Int] + } + } + } + "build partial with only undefined sources and sinks" in { PartialFlowGraph { b ⇒ b.addEdge(UndefinedSource[String], f1, UndefinedSink[String]) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index c1b23ebd22..f3ec5db06a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -26,7 +26,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 1.second).connect(Sink(c)).run() + Source(p).groupedWithin(1000, 1.second).to(Sink(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) @@ -51,7 +51,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(1 to 3).groupedWithin(1000, 10.second).connect(Sink(c)).run() + Source(1 to 3).groupedWithin(1000, 10.second).to(Sink(c)).run() val cSub = c.expectSubscription cSub.request(100) c.expectNext((1 to 3).toList) @@ -63,7 +63,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 1.second).connect(Sink(c)).run() + Source(p).groupedWithin(1000, 1.second).to(Sink(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(1) @@ -83,7 +83,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "drop empty groups" in { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 500.millis).connect(Sink(c)).run() + Source(p).groupedWithin(1000, 500.millis).to(Sink(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(2) @@ -105,7 +105,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(3, 2.second).connect(Sink(c)).run() + Source(p).groupedWithin(3, 2.second).to(Sink(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(4) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 18d5d757fd..8c1994f67f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -25,7 +25,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "produce future elements" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).connect(Sink(c)).run() + val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(2) c.expectNext(1) @@ -42,7 +42,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val p = Source(1 to 50).mapAsync(n ⇒ Future { Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) n - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(1000) for (n ← 1 to 50) c.expectNext(n) @@ -56,7 +56,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val p = Source(1 to 20).mapAsync(n ⇒ Future { probe.ref ! n n - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() // nothing before requested probe.expectNoMsg(500.millis) @@ -84,7 +84,7 @@ class FlowMapAsyncSpec extends AkkaSpec { Await.ready(latch, 10.seconds) n } - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err1") @@ -103,7 +103,7 @@ class FlowMapAsyncSpec extends AkkaSpec { n } }). - connect(Sink(c)).run() + to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index fa34c8df0f..f21615a96c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -27,7 +27,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val p = Source(1 to 4).mapAsyncUnordered(n ⇒ Future { Await.ready(latch(n), 5.seconds) n - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(5) latch(2).countDown() @@ -48,7 +48,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val p = Source(1 to 20).mapAsyncUnordered(n ⇒ Future { probe.ref ! n n - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() // nothing before requested probe.expectNoMsg(500.millis) @@ -77,7 +77,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { Await.ready(latch, 10.seconds) n } - }).connect(Sink(c)).run() + }).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err1") @@ -96,7 +96,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { n } }). - connect(Sink(c)).run() + to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala index 44504bdb64..5b036efbec 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala @@ -28,7 +28,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback on normal completion" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() + Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) @@ -40,7 +40,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() + Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() val proc = p.expectSubscription proc.expectRequest() val ex = new RuntimeException("ex") with NoStackTrace @@ -52,7 +52,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback for an empty stream" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() - Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() + Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() val proc = p.expectSubscription proc.expectRequest() proc.sendComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 1e337a0282..a3cd2fa889 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -33,7 +33,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(Nil) val tailSubscriber = SubscriberProbe[Int] - tailFlow.connect(Sink(tailSubscriber)).run() + tailFlow.to(Sink(tailSubscriber)).run() tailSubscriber.expectComplete() } @@ -43,7 +43,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(List(1, 2, 3)) val tailSubscriber = SubscriberProbe[Int] - tailFlow.connect(Sink(tailSubscriber)).run() + tailFlow.to(Sink(tailSubscriber)).run() tailSubscriber.expectComplete() } @@ -87,7 +87,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { takes should be(1 to 10) val subscriber = StreamTestKit.SubscriberProbe[Int]() - tail.connect(Sink(subscriber)).run() + tail.to(Sink(subscriber)).run() subscriber.expectCompletedOrSubscriptionFollowedByComplete() } @@ -95,7 +95,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(3).connect(Sink(subscriber)).run() + Source(publisher).prefixAndTail(3).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -113,7 +113,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(1).connect(Sink(subscriber)).run() + Source(publisher).prefixAndTail(1).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -128,7 +128,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - tail.connect(Sink(substreamSubscriber)).run() + tail.to(Sink(substreamSubscriber)).run() substreamSubscriber.expectSubscription() upstream.sendError(testException) @@ -140,7 +140,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(3).connect(Sink(subscriber)).run() + Source(publisher).prefixAndTail(3).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -158,7 +158,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(1).connect(Sink(subscriber)).run() + Source(publisher).prefixAndTail(1).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -173,7 +173,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - tail.connect(Sink(substreamSubscriber)).run() + tail.to(Sink(substreamSubscriber)).run() substreamSubscriber.expectSubscription().cancel() upstream.expectCancellation() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 3ef2b6213d..826fd1a890 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -228,9 +228,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "subscribe Subscriber" in { val flow: Flow[String, String] = Flow[String] val c1 = StreamTestKit.SubscriberProbe[String]() - val sink: Sink[String] = flow.connect(Sink(c1)) + val sink: Sink[String] = flow.to(Sink(c1)) val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) - Source(publisher).connect(sink).run() + Source(publisher).to(sink).run() val sub1 = c1.expectSubscription sub1.request(3) @@ -244,7 +244,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString }) val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher) - Source(publisher).connect(flow).connect(Sink.ignore).run() + Source(publisher).via(flow).to(Sink.ignore).run() expectMsg("1") expectMsg("2") @@ -254,9 +254,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "perform transformation operation and subscribe Subscriber" in { val flow = Flow[Int].map(_.toString) val c1 = StreamTestKit.SubscriberProbe[String]() - val sink: Sink[Int] = flow.connect(Sink(c1)) + val sink: Sink[Int] = flow.to(Sink(c1)) val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) - Source(publisher).connect(sink).run() + Source(publisher).to(sink).run() val sub1 = c1.expectSubscription sub1.request(3) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala index fb789f3285..2d9cd86fed 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala @@ -36,7 +36,7 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest { "not take anything for negative n" in { val probe = StreamTestKit.SubscriberProbe[Int]() - Source(List(1, 2, 3)).take(-1).connect(Sink(probe)).run() + Source(List(1, 2, 3)).take(-1).to(Sink(probe)).run() probe.expectSubscription().request(10) probe.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 858656ed81..31aef32bc2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -20,7 +20,7 @@ class FlowTakeWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Source(p).takeWithin(1.second).connect(Sink(c)).run() + Source(p).takeWithin(1.second).to(Sink(c)).run() val pSub = p.expectSubscription() val cSub = c.expectSubscription() cSub.request(100) @@ -40,7 +40,7 @@ class FlowTakeWithinSpec extends AkkaSpec { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[Int]() - Source(1 to 3).takeWithin(1.second).connect(Sink(c)).run() + Source(1 to 3).takeWithin(1.second).to(Sink(c)).run() val cSub = c.expectSubscription() c.expectNoMsg(200.millis) cSub.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala index e31b3a6d85..e647e4097e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala @@ -57,7 +57,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { } override def isComplete: Boolean = !isTimerActive("tick") }). - connect(Sink.ignore).run() + to(Sink.ignore).run() val pSub = p.expectSubscription() expectMsg("tick-1") expectMsg("tick-2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformSpec.scala index 86b55bb639..f2c34692be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformSpec.scala @@ -194,7 +194,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def cleanup() = cleanupProbe.ref ! s }). - connect(Sink.ignore).run() + to(Sink.ignore).run() cleanupProbe.expectMsg("a") } @@ -365,7 +365,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case _ ⇒ Nil } } - }).connect(Sink(subscriber)).run() + }).to(Sink(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) @@ -388,13 +388,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d }) val s1 = StreamTestKit.SubscriberProbe[Int]() - flow.connect(Sink(s1)).run() + flow.to(Sink(s1)).run() s1.expectSubscription().request(3) s1.expectNext(1, 2, 3) s1.expectComplete() val s2 = StreamTestKit.SubscriberProbe[Int]() - flow.connect(Sink(s2)).run() + flow.to(Sink(s2)).run() s2.expectSubscription().request(3) s2.expectNext(1, 2, 3) s2.expectComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala index b0de2f32a2..46c8fd22be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureSinkSpec.scala @@ -37,7 +37,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest { val p = StreamTestKit.PublisherProbe[Int]() val f = Sink.future[Int] val s = Source.subscriber[Int] - val m = s.connect(f).run() + val m = s.to(f).run() p.subscribe(m.get(s)) val proc = p.expectSubscription proc.expectRequest() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala index f216b141f3..bffe0d86cd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala @@ -68,7 +68,7 @@ class GraphFlowSpec extends AkkaSpec { in -> out } - source1.connect(flow).connect(Sink(probe)).run() + source1.via(flow).to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -86,7 +86,7 @@ class GraphFlowSpec extends AkkaSpec { in -> out } - source1.connect(flow).map(_.toInt).connect(Sink(probe)).run() + source1.via(flow).map(_.toInt).to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -113,7 +113,7 @@ class GraphFlowSpec extends AkkaSpec { in2 -> out2 } - source1.connect(flow1).connect(flow2).connect(Sink(probe)).run() + source1.via(flow1).via(flow2).to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -150,7 +150,7 @@ class GraphFlowSpec extends AkkaSpec { out } - source.connect(Sink(probe)).run() + source.to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -167,7 +167,7 @@ class GraphFlowSpec extends AkkaSpec { out } - source.map(_.toInt).connect(Sink(probe)).run() + source.map(_.toInt).to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -193,7 +193,7 @@ class GraphFlowSpec extends AkkaSpec { in2 -> out2 } - source.connect(flow).connect(Sink(probe)).run() + source.via(flow).to(Sink(probe)).run() validateProbe(probe, stdRequests, stdResult) } @@ -230,7 +230,7 @@ class GraphFlowSpec extends AkkaSpec { in } - source1.connect(sink).run() + source1.to(sink).run() validateProbe(probe, stdRequests, stdResult) } @@ -247,8 +247,8 @@ class GraphFlowSpec extends AkkaSpec { in } - val iSink = Flow[Int].map(_.toString).connect(sink) - source1.connect(iSink).run() + val iSink = Flow[Int].map(_.toString).to(sink) + source1.to(iSink).run() validateProbe(probe, stdRequests, stdResult) } @@ -274,7 +274,7 @@ class GraphFlowSpec extends AkkaSpec { in2 } - source1.connect(flow).connect(sink).run() + source1.via(flow).to(sink).run() validateProbe(probe, stdRequests, stdResult) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index df86443004..9e425928e5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -158,7 +158,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { Source(List(1, 2, 3)) ~> Flow[Int].map(_ * 2) ~> bcast bcast ~> merge bcast ~> Flow[Int].map(_ + 3) ~> merge - merge ~> Flow[Int].grouped(10).connect(resultFuture) + merge ~> Flow[Int].grouped(10).to(resultFuture) }.run() Await.result(g.get(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9)) @@ -194,8 +194,8 @@ class GraphOpsIntegrationSpec extends AkkaSpec { val s2 = SubscriberProbe[String] FlowGraph(partial) { builder ⇒ builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1))) - builder.attachSink(output1, Flow[Int].filter(n ⇒ (n % 2) != 0).connect(Sink(s1))) - builder.attachSink(output2, Flow[String].map(_.toUpperCase).connect(Sink(s2))) + builder.attachSink(output1, Flow[Int].filter(n ⇒ (n % 2) != 0).to(Sink(s1))) + builder.attachSink(output2, Flow[String].map(_.toUpperCase).to(Sink(s2))) }.run() val sub1 = s1.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala index 8888210ba4..74638edc42 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSinkSpec.scala @@ -20,7 +20,7 @@ class SubscriberSinkSpec extends AkkaSpec { "publish elements to the subscriber" in { val c = StreamTestKit.SubscriberProbe[Int]() - Source(List(1, 2, 3)).connect(Sink(c)).run() + Source(List(1, 2, 3)).to(Sink(c)).run() val s = c.expectSubscription() s.request(3) c.expectNext(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 999f1322b8..125bcf96df 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -19,7 +19,7 @@ class TickSourceSpec extends AkkaSpec { "produce ticks" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).connect(Sink(c)).run() + Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectNoMsg(600.millis) @@ -35,7 +35,7 @@ class TickSourceSpec extends AkkaSpec { "drop ticks when not requested" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).connect(Sink(c)).run() + Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(2) c.expectNext("tick-1") @@ -76,7 +76,7 @@ class TickSourceSpec extends AkkaSpec { "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).connect(Sink(c)).run() + Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") diff --git a/akka-stream/src/main/scala/akka/stream/io2/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io2/TcpConnectionStream.scala index b465bfc298..cbb0214e64 100644 --- a/akka-stream/src/main/scala/akka/stream/io2/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io2/TcpConnectionStream.scala @@ -211,8 +211,8 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) tcpOutputs.setConnection(connection) tcpInputs.setConnection(connection) - val obmf = outbound.connect(Sink(processor)).run() - val ibmf = Source(processor).connect(inbound).run() + val obmf = outbound.to(Sink(processor)).run() + val ibmf = Source(processor).to(inbound).run() requester ! StreamTcp.OutgoingTcpConnection(remoteAddress, localAddress)(obmf, ibmf) context.become(super.receive) case f: CommandFailed ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/io2/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io2/TcpListenStreamActor.scala index 1307999af0..f72a7dd4bb 100644 --- a/akka-stream/src/main/scala/akka/stream/io2/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io2/TcpListenStreamActor.scala @@ -109,7 +109,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, nextPhase(runningPhase) listener ! ResumeAccepting(1) val publisher = ActorPublisher[IncomingTcpConnection](self) - val mf = Source(publisher).connect(connectionHandler).run() + val mf = Source(publisher).to(connectionHandler).run() val target = self requester ! StreamTcp.TcpServerBinding(localAddress)(mf, Some(new Closeable { override def close() = target ! Unbind diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 7a3ae31455..45aeaaaca0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -62,21 +62,17 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { /** Converts this Flow to it's Scala DSL counterpart */ def asScala: scaladsl.Flow[In, Out] = delegate - // CONNECT // + /** + * Transform this [[Flow]] by appending the given processing steps. + */ + def via[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] = + new Flow(delegate.via(flow.asScala)) /** - * Transform this flow by appending the given processing steps. + * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] = - new Flow(delegate.connect(flow.asScala)) - - /** - * Connect this flow to a sink, concatenating the processing steps of both. - */ - def connect(sink: javadsl.Sink[Out]): javadsl.Sink[In] = - new Sink(delegate.connect(sink.asScala)) - - // RUN WITH // + def to(sink: javadsl.Sink[Out]): javadsl.Sink[In] = + new Sink(delegate.to(sink.asScala)) /** * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. @@ -120,8 +116,6 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { def runWith(source: javadsl.Source[In], sink: javadsl.Sink[Out], materializer: FlowMaterializer): Unit = delegate.runWith(source.asScala, sink.asScala)(materializer) - // COMMON OPS // - /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index cd18aa5bed..9f9c476a2b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -175,21 +175,17 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { /** Converts this Java DSL element to it's Scala DSL counterpart. */ def asScala: scaladsl.Source[Out] = delegate - // CONNECT // + /** + * Transform this [[Source]] by appending the given processing stages. + */ + def via[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] = + new Source(delegate.via(flow.asScala)) /** - * Transform this source by appending the given processing stages. + * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] = - new Source(delegate.connect(flow.asScala)) - - /** - * Connect this `Source` to a `Sink`, concatenating the processing steps of both. - */ - def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = - new RunnableFlowAdapter(delegate.connect(sink.asScala)) - - // RUN WITH // + def to(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = + new RunnableFlowAdapter(delegate.to(sink.asScala)) /** * Connect this `Source` to a `KeyedSink` and run it. @@ -206,9 +202,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. */ def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit = - delegate.connect(sink.asScala).run()(materializer) - - // OPS // + delegate.to(sink.asScala).run()(materializer) /** * Shortcut for running this `Source` with a fold function. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 14cdbc1672..58b5e8cc66 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -178,7 +178,7 @@ private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Un } Nil } - }).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) } /** @@ -202,7 +202,7 @@ private[scaladsl] final case class ForeachSink[In](f: In ⇒ Unit) extends Keyed } Nil } - }).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) promise.future } } @@ -232,7 +232,7 @@ private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) ex } Nil } - }).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) promise.future } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index e34d4d34d0..dbdca2782c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -56,9 +56,9 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] { private def sourcePipe = Pipe.empty[Out].withSource(this) - override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow) + override def via[T](flow: Flow[Out, T]): Source[T] = sourcePipe.via(flow) - override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) + override def to(sink: Sink[Out]): RunnableFlow = sourcePipe.to(sink) /** INTERNAL API */ override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 13a2e86248..1186f112f0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -20,14 +20,14 @@ trait Flow[-In, +Out] extends FlowOps[Out] { override type Repr[+O] <: Flow[In, O] /** - * Transform this flow by appending the given processing steps. + * Transform this [[Flow]] by appending the given processing steps. */ - def connect[T](flow: Flow[Out, T]): Flow[In, T] + def via[T](flow: Flow[Out, T]): Flow[In, T] /** - * Connect this flow to a sink, concatenating the processing steps of both. + * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def connect(sink: Sink[Out]): Sink[In] + def to(sink: Sink[Out]): Sink[In] /** * @@ -36,7 +36,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * and `Publisher` of a [[PublisherSink]]. */ def runWith(source: KeyedSource[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = { - val m = source.connect(this).connect(sink).run() + val m = source.via(this).to(sink).run() (m.get(source), m.get(sink)) } @@ -46,7 +46,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]]. */ def runWith(source: Source[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = - source.connect(this).runWith(sink) + source.via(this).runWith(sink) /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. @@ -54,7 +54,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * The returned value will contain the materialized value of the `SourceWithKey`, e.g. `Subscriber` of a [[SubscriberSource]]. */ def runWith(source: KeyedSource[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): source.MaterializedType = - source.connect(this).connect(sink).run().get(source) + source.via(this).to(sink).run().get(source) /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. @@ -62,7 +62,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload. */ def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): Unit = - source.connect(this).connect(sink).run() + source.via(this).to(sink).run() } object Flow { @@ -101,7 +101,7 @@ trait RunnableFlow { } /** - * Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only. + * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only. */ trait FlowOps[+Out] { import FlowOps._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index d09f62ada7..5e22fe6b2c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -656,15 +656,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph (source, flow, sink) match { case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) ⇒ val src = sourcePipe.input - val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops)) + val newPipe = Pipe(sourcePipe.ops).via(pipe).via(Pipe(sinkPipe.ops)) val snk = sinkPipe.output addEdge(src, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sink: Sink[Out]) ⇒ val src = sourcePipe.input - val newPipe = Pipe(sourcePipe.ops).connect(pipe) + val newPipe = Pipe(sourcePipe.ops).via(pipe) addEdge(src, newPipe, sink) // recursive, but now it is a Source-Pipe-Sink case (source: Source[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) ⇒ - val newPipe = pipe.connect(Pipe(sinkPipe.ops)) + val newPipe = pipe.via(Pipe(sinkPipe.ops)) val snk = sinkPipe.output addEdge(source, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink case (_, gflow: GraphFlow[In, _, _, Out], _) ⇒ @@ -1291,6 +1291,8 @@ private[scaladsl] class MaterializedFlowGraph(materializedSources: Map[KeyedSour /** * Implicit conversions that provides syntactic sugar for building flow graphs. + * Every method in *Ops classes should have an implicit builder parameter to prevent + * using conversions where builder is not available (e.g. outside FlowGraph scope). */ object FlowGraphImplicits { @@ -1304,20 +1306,23 @@ object FlowGraphImplicits { junctionIn.next } + def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(source, sink) + def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit = builder.addEdge(source, sink) } class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { - def ~>[O](otherflow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[In, O] = - new SourceNextStep(source, flow.connect(otherflow), builder) + def ~>[O](otherflow: Flow[Out, O]): SourceNextStep[In, O] = + new SourceNextStep(source, flow.via(otherflow), builder) def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { builder.addEdge(source, flow, junctionIn) junctionIn.next } - def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit = + def ~>(sink: UndefinedSink[Out]): Unit = builder.addEdge(source, flow, sink) def ~>(sink: Sink[Out]): Unit = @@ -1328,30 +1333,32 @@ object FlowGraphImplicits { def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = new JunctionNextStep(junction, flow, builder) - def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit = - builder.addEdge(junction, Pipe.empty[In], sink) - def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { builder.addEdge(junction, junctionIn) junctionIn.next } - def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = builder.addEdge(junction, sink) + def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, Pipe.empty[In], sink) + + def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, sink) } - class JunctionNextStep[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + class JunctionNextStep[In, Out](junction: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>[O](otherFlow: Flow[Out, O]): JunctionNextStep[In, O] = + new JunctionNextStep(junction, flow.via(otherFlow), builder) + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { - builder.addEdge(junctionOut, flow, junctionIn) + builder.addEdge(junction, flow, junctionIn) junctionIn.next } - def ~>(sink: Sink[Out]): Unit = { - builder.addEdge(junctionOut, flow, sink) - } + def ~>(sink: UndefinedSink[Out]): Unit = + builder.addEdge(junction, flow, sink) - def ~>(sink: UndefinedSink[Out]): Unit = { - builder.addEdge(junctionOut, flow, sink) - } + def ~>(sink: Sink[Out]): Unit = + builder.addEdge(junction, flow, sink) } implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal { @@ -1363,23 +1370,26 @@ object FlowGraphImplicits { junctionIn.next } + def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(source, sink) + + def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(source, sink) } class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>[T](otherFlow: Flow[Out, T]): UndefinedSourceNextStep[In, T] = + new UndefinedSourceNextStep(source, flow.via(otherFlow), builder) + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { builder.addEdge(source, flow, junctionIn) junctionIn.next } - def ~>[T](otherFlow: Flow[Out, T])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, T] = - new UndefinedSourceNextStep(source, flow.connect(otherFlow), builder) - - def ~>(sink: Sink[Out]): Unit = { + def ~>(sink: UndefinedSink[Out]): Unit = builder.addEdge(source, flow, sink) - } - def ~>(sink: UndefinedSink[Out]): Unit = { + def ~>(sink: Sink[Out]): Unit = builder.addEdge(source, flow, sink) - } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala index 7bfcf01290..2e5805680b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala @@ -7,7 +7,14 @@ import akka.stream.impl.Ast.AstNode import scala.annotation.unchecked.uncheckedVariance -private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Flow[In, Out] { +private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( + inPipe: Pipe[In, CIn], + in: UndefinedSource[CIn], + graph: PartialFlowGraph, + out: UndefinedSink[COut], + outPipe: Pipe[COut, Out]) + extends Flow[In, Out] { + override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O] private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe)) @@ -32,30 +39,30 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CI builder.connect(nOut, outPipe, oIn) } - def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { + def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match { case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) case gFlow: GraphFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ val (oIn, oOut) = gFlow.remap(b) - b.connect(out, outPipe.connect(gFlow.inPipe), oIn) + b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) } GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe) } - override def connect(sink: Sink[Out]) = sink match { + override def to(sink: Sink[Out]) = sink match { case sinkPipe: SinkPipe[Out] ⇒ val newGraph = PartialFlowGraph(this.graph) { builder ⇒ - builder.attachSink(out, outPipe.connect(sinkPipe)) + builder.attachSink(out, outPipe.to(sinkPipe)) } GraphSink(inPipe, in, newGraph) case gSink: GraphSink[Out, Out] ⇒ val newGraph = PartialFlowGraph(graph) { b ⇒ val oIn = gSink.remap(b) - b.connect(out, outPipe.connect(gSink.inPipe), oIn) + b.connect(out, outPipe.via(gSink.inPipe), oIn) } GraphSink(inPipe, in, newGraph) - case sink: Sink[Out] ⇒ connect(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe + case sink: Sink[Out] ⇒ to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe } override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) @@ -75,29 +82,29 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou builder.connect(nOut, outPipe, oIn) } - override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { + override def via[T](flow: Flow[Out, T]): Source[T] = flow match { case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) case gFlow: GraphFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ val (oIn, oOut) = gFlow.remap(b) - b.connect(out, outPipe.connect(gFlow.inPipe), oIn) + b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) } GraphSource(newGraph, nOut, gFlow.outPipe) } - override def connect(sink: Sink[Out]): RunnableFlow = sink match { + override def to(sink: Sink[Out]): RunnableFlow = sink match { case sinkPipe: SinkPipe[Out] ⇒ FlowGraph(this.graph) { implicit builder ⇒ - builder.attachSink(out, outPipe.connect(sinkPipe)) + builder.attachSink(out, outPipe.to(sinkPipe)) } case gSink: GraphSink[Out, _] ⇒ FlowGraph(graph) { b ⇒ val oIn = gSink.remap(b) - b.connect(out, outPipe.connect(gSink.inPipe), oIn) + b.connect(out, outPipe.via(gSink.inPipe), oIn) } case sink: Sink[Out] ⇒ - connect(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe + to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe } override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) @@ -113,7 +120,7 @@ private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: Unde private[scaladsl] def prepend(pipe: SourcePipe[In]): FlowGraph = { FlowGraph(this.graph) { b ⇒ - b.attachSource(in, pipe.connect(inPipe)) + b.attachSource(in, pipe.via(inPipe)) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index 6932c3eeb1..15ee52c14c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -25,13 +25,13 @@ private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flo private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops) - override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { + override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match { case p: Pipe[T, In] ⇒ Pipe(p.ops ++: ops) case gf: GraphFlow[Out, _, _, T] ⇒ gf.prepend(this) case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } - override def connect(sink: Sink[Out]): Sink[In] = sink match { + override def to(sink: Sink[Out]): Sink[In] = sink match { case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) case gs: GraphSink[Out, _] ⇒ gs.prepend(this) case d: Sink[Out] ⇒ this.withSink(d) @@ -49,7 +49,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) override def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = - source.connect(this).run() + source.to(this).run() } @@ -65,13 +65,13 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) - override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { + override def via[T](flow: Flow[Out, T]): Source[T] = flow match { case p: Pipe[Out, T] ⇒ appendPipe(p) case g: GraphFlow[Out, _, _, T] ⇒ g.prepend(this) case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } - override def connect(sink: Sink[Out]): RunnableFlow = sink match { + override def to(sink: Sink[Out]): RunnableFlow = sink match { case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ++: ops) case g: GraphSink[Out, _] ⇒ g.prepend(this) case d: Sink[Out] ⇒ this.withSink(d) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 7441a0f4f0..0f7fc2d43a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -18,14 +18,14 @@ trait Sink[-In] { * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ def runWith(source: KeyedSource[In])(implicit materializer: FlowMaterializer): source.MaterializedType = - source.connect(this).run().get(source) + source.to(this).run().get(source) /** * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = - source.connect(this).run() + source.to(this).run() } object Sink { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index e9e6e80ae4..4a44b83e0a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -20,21 +20,21 @@ trait Source[+Out] extends FlowOps[Out] { override type Repr[+O] <: Source[O] /** - * Transform this source by appending the given processing stages. + * Transform this [[Source]] by appending the given processing stages. */ - def connect[T](flow: Flow[Out, T]): Source[T] + def via[T](flow: Flow[Out, T]): Source[T] /** - * Connect this source to a sink, concatenating the processing steps of both. + * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def connect(sink: Sink[Out]): RunnableFlow + def to(sink: Sink[Out]): RunnableFlow /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]]. */ def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = - connect(sink).run().get(sink) + to(sink).run().get(sink) /** * Shortcut for running this `Source` with a fold function.