!str #16066 rename connect to via/to

* add missing implicit conversions for ~>
* tests for all combinations when using ~>
This commit is contained in:
Martynas Mickevičius 2014-10-31 10:43:42 +02:00
parent 81bc5c76bc
commit 412003c11e
52 changed files with 308 additions and 240 deletions

View file

@ -76,7 +76,7 @@ class HttpServerExampleSpec
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer)
println("Accepted new connection from " + remoteAddress) println("Accepted new connection from " + remoteAddress)
Source(requestProducer).map(requestHandler).connect(Sink(responseConsumer)).run() Source(requestProducer).map(requestHandler).to(Sink(responseConsumer)).run()
}) })
} }
//#full-server-example //#full-server-example

View file

@ -35,8 +35,8 @@ private object RenderSupport {
// materializes // materializes
private case class CancelSecond[T](first: Source[T], second: Source[T]) extends SimpleActorFlowSource[T] { private case class CancelSecond[T](first: Source[T], second: Source[T]) extends SimpleActorFlowSource[T] {
override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = {
first.connect(Sink(flowSubscriber)).run()(materializer) first.to(Sink(flowSubscriber)).run()(materializer)
second.connect(Sink.cancelled).run()(materializer) second.to(Sink.cancelled).run()(materializer)
} }
} }

View file

@ -39,7 +39,7 @@ object TestClient extends App {
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Source(List(HttpRequest() -> 'NoContext)) Source(List(HttpRequest() -> 'NoContext))
.connect(Sink(connection.requestSubscriber)) .to(Sink(connection.requestSubscriber))
.run() .run()
Source(connection.responsePublisher).map(_._1).runWith(Sink.future) Source(connection.responsePublisher).map(_._1).runWith(Sink.future)
} }

View file

@ -39,7 +39,7 @@ object TestServer extends App {
Source(connectionStream).foreach { Source(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber) case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber)
println("Accepted new connection from " + remoteAddress) println("Accepted new connection from " + remoteAddress)
Source(requestPublisher).map(requestHandler).connect(Sink(responseSubscriber)).run() Source(requestPublisher).map(requestHandler).to(Sink(responseSubscriber)).run()
} }
} }

View file

@ -40,7 +40,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ByteString] val dataProbe = StreamTestKit.SubscriberProbe[ByteString]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNoMsg(50.millis) dataProbe.expectNoMsg(50.millis)
@ -76,7 +76,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(Chunk(ByteString("abcdef"))) dataProbe.expectNext(Chunk(ByteString("abcdef")))
@ -112,7 +112,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ByteString] val dataProbe = StreamTestKit.SubscriberProbe[ByteString]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(ByteString("abcdef")) dataProbe.expectNext(ByteString("abcdef"))
@ -134,7 +134,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(Chunk(ByteString("abcdef"))) dataProbe.expectNext(Chunk(ByteString("abcdef")))
@ -182,7 +182,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ByteString] val dataProbe = StreamTestKit.SubscriberProbe[ByteString]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(ByteString("abcdef")) dataProbe.expectNext(ByteString("abcdef"))
@ -218,7 +218,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(Chunk(ByteString("abcdef"))) dataProbe.expectNext(Chunk(ByteString("abcdef")))
@ -254,7 +254,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ByteString] val dataProbe = StreamTestKit.SubscriberProbe[ByteString]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(ByteString("abcdef")) dataProbe.expectNext(ByteString("abcdef"))
@ -276,7 +276,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(Chunk(ByteString("abcdef"))) dataProbe.expectNext(Chunk(ByteString("abcdef")))
@ -298,7 +298,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Default(_, 12, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ByteString] val dataProbe = StreamTestKit.SubscriberProbe[ByteString]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(ByteString("abcdef")) dataProbe.expectNext(ByteString("abcdef"))
@ -320,7 +320,7 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
inside(expectRequest) { inside(expectRequest) {
case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _) case HttpRequest(HttpMethods.POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart] val dataProbe = StreamTestKit.SubscriberProbe[ChunkStreamPart]
data.connect(Sink(dataProbe)).run() data.to(Sink(dataProbe)).run()
val sub = dataProbe.expectSubscription() val sub = dataProbe.expectSubscription()
sub.request(10) sub.request(10)
dataProbe.expectNext(Chunk(ByteString("abcdef"))) dataProbe.expectNext(Chunk(ByteString("abcdef")))

View file

@ -56,7 +56,7 @@ trait ScalaRoutingDSL extends Directives {
val runner = f(setup) val runner = f(setup)
Source(requestProducer) Source(requestProducer)
.mapAsync(request runner(request)) .mapAsync(request runner(request))
.connect(Sink(responseConsumer)).run()(fm) .to(Sink(responseConsumer)).run()(fm)
} }
} }
} }

View file

@ -20,7 +20,7 @@ class ChainSetup[In, Out](
val upstream = StreamTestKit.PublisherProbe[In]() val upstream = StreamTestKit.PublisherProbe[In]()
val downstream = StreamTestKit.SubscriberProbe[Out]() val downstream = StreamTestKit.SubscriberProbe[Out]()
private val s = Source(upstream).connect(stream(Flow[In])) private val s = Source(upstream).via(stream(Flow[In]))
val publisher = toPublisher(s, materializer) val publisher = toPublisher(s, materializer)
val upstreamSubscription = upstream.expectSubscription() val upstreamSubscription = upstream.expectSubscription()
publisher.subscribe(downstream) publisher.subscribe(downstream)

View file

@ -245,7 +245,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val mat = source.collect { val mat = source.collect {
case n if n % 2 == 0 "elem-" + n case n if n % 2 == 0 "elem-" + n
}.connect(sink).run() }.to(sink).run()
val snd = mat.get(source) val snd = mat.get(source)
val rcv = mat.get(sink) val rcv = mat.get(sink)

View file

@ -122,7 +122,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"remember requested after restart" in { "remember requested after restart" in {
// creating actor with default supervision, because stream supervisor default strategy is to stop // creating actor with default supervision, because stream supervisor default strategy is to stop
val ref = system.actorOf(manualSubscriberProps(testActor)) val ref = system.actorOf(manualSubscriberProps(testActor))
Source(1 to 7).connect(Sink(ActorSubscriber[Int](ref))).run() Source(1 to 7).to(Sink(ActorSubscriber[Int](ref))).run()
ref ! "ready" ref ! "ready"
expectMsg(OnNext(1)) expectMsg(OnNext(1))
expectMsg(OnNext(2)) expectMsg(OnNext(2))

View file

@ -79,7 +79,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
val flow: Flow[Int, Long] = Flow[Int].map(_.toLong).timedIntervalBetween(in in % 2 == 1, d probe.ref ! d) val flow: Flow[Int, Long] = Flow[Int].map(_.toLong).timedIntervalBetween(in in % 2 == 1, d probe.ref ! d)
val c1 = StreamTestKit.SubscriberProbe[Long]() val c1 = StreamTestKit.SubscriberProbe[Long]()
Source(List(1, 2, 3)).connect(flow).connect(Sink(c1)).run() Source(List(1, 2, 3)).via(flow).to(Sink(c1)).run()
val s = c1.expectSubscription() val s = c1.expectSubscription()
s.request(100) s.request(100)

View file

@ -120,7 +120,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
val ssession = Await.result(ssessionf, duration) val ssession = Await.result(ssessionf, duration)
val sdata = ssession.data val sdata = ssession.data
Source(sdata).map(bs ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')). Source(sdata).map(bs ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')).
connect(Sink(scipher.plainTextOutbound)).run() to(Sink(scipher.plainTextOutbound)).run()
} }
def replyFirstLineInUpperCase(clientConnection: JavaSslConnection): Unit = { def replyFirstLineInUpperCase(clientConnection: JavaSslConnection): Unit = {
@ -129,7 +129,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = { def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = {
val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future) val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future)
Source(List(ByteString(message + '\n'))).connect(Sink(ccipher.plainTextOutbound)).run() Source(List(ByteString(message + '\n'))).to(Sink(ccipher.plainTextOutbound)).run()
val csession = Await.result(csessionf, duration) val csession = Await.result(csessionf, duration)
val cdata = csession.data val cdata = csession.data
Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration) Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration)

View file

@ -48,7 +48,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
serverConnection.read(256) serverConnection.read(256)
Source(tcpProcessor).connect(Sink.ignore).run() Source(tcpProcessor).to(Sink.ignore).run()
Source(testInput).runWith(Sink.publisher).subscribe(tcpProcessor) Source(testInput).runWith(Sink.publisher).subscribe(tcpProcessor)
serverConnection.waitRead() should be(expectedOutput) serverConnection.waitRead() should be(expectedOutput)
@ -158,7 +158,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_)) val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).connect(Sink(conn.outputStream)).run() Source(testInput).to(Sink(conn.outputStream)).run()
val resultFuture = Source(conn.inputStream).fold(ByteString.empty)((acc, in) acc ++ in) val resultFuture = Source(conn.inputStream).fold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 3.seconds) should be(expectedOutput) Await.result(resultFuture, 3.seconds) should be(expectedOutput)
@ -178,7 +178,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_)) val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).connect(Sink(conn1.outputStream)).run() Source(testInput).to(Sink(conn1.outputStream)).run()
conn1.inputStream.subscribe(conn2.outputStream) conn1.inputStream.subscribe(conn2.outputStream)
conn2.inputStream.subscribe(conn3.outputStream) conn2.inputStream.subscribe(conn3.outputStream)
val resultFuture = Source(conn3.inputStream).fold(ByteString.empty)((acc, in) acc ++ in) val resultFuture = Source(conn3.inputStream).fold(ByteString.empty)((acc, in) acc ++ in)

View file

@ -48,9 +48,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
serverConnection.read(256) serverConnection.read(256)
Source(tcpPublisher).connect(Sink.ignore).run() Source(tcpPublisher).to(Sink.ignore).run()
Source(testInput).connect(Sink(tcpSubscriber)).run() Source(testInput).to(Sink(tcpSubscriber)).run()
serverConnection.waitRead() should be(expectedOutput) serverConnection.waitRead() should be(expectedOutput)
server.close() server.close()
@ -162,7 +162,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_)) val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).connect(Sink(tcpSubscriber)).run() Source(testInput).to(Sink(tcpSubscriber)).run()
val resultFuture = Source(tcpPublisher).fold(ByteString.empty) { case (res, elem) res ++ elem } val resultFuture = Source(tcpPublisher).fold(ByteString.empty) { case (res, elem) res ++ elem }
Await.result(resultFuture, 3.seconds) should be(expectedOutput) Await.result(resultFuture, 3.seconds) should be(expectedOutput)
@ -181,7 +181,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_)) val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).connect(Sink(tcpSubscriber1)).run() Source(testInput).to(Sink(tcpSubscriber1)).run()
tcpPublisher1.subscribe(tcpSubscriber2) tcpPublisher1.subscribe(tcpSubscriber2)
tcpPublisher2.subscribe(tcpSubscriber3) tcpPublisher2.subscribe(tcpSubscriber3)
val resultFuture = Source(tcpPublisher3).fold(ByteString.empty) { case (res, elem) res ++ elem } val resultFuture = Source(tcpPublisher3).fold(ByteString.empty) { case (res, elem) res ++ elem }

View file

@ -223,9 +223,9 @@ trait TcpHelper { this: TestKitBase ⇒
def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = { def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = {
val foreachSink = Sink.foreach[IncomingTcpConnection] { conn val foreachSink = Sink.foreach[IncomingTcpConnection] { conn
conn.inbound.connect(conn.outbound).run() conn.inbound.to(conn.outbound).run()
} }
val binding = bind(Flow[IncomingTcpConnection].connect(foreachSink), serverAddress) val binding = bind(Flow[IncomingTcpConnection].to(foreachSink), serverAddress)
new EchoServer(binding.connection.get(foreachSink), binding) new EchoServer(binding.connection.get(foreachSink), binding)
} }
} }

View file

@ -17,26 +17,26 @@ class FlowAppendSpec extends AkkaSpec with River {
"Flow" should { "Flow" should {
"append Flow" in riverOf[String] { subscriber "append Flow" in riverOf[String] { subscriber
val flow = Flow[Int].connect(otherFlow) val flow = Flow[Int].via(otherFlow)
Source(elements).connect(flow).connect(Sink(subscriber)).run() Source(elements).via(flow).to(Sink(subscriber)).run()
} }
"append Sink" in riverOf[String] { subscriber "append Sink" in riverOf[String] { subscriber
val sink = Flow[Int].connect(otherFlow.connect(Sink(subscriber))) val sink = Flow[Int].to(otherFlow.to(Sink(subscriber)))
Source(elements).connect(sink).run() Source(elements).to(sink).run()
} }
} }
"Source" should { "Source" should {
"append Flow" in riverOf[String] { subscriber "append Flow" in riverOf[String] { subscriber
Source(elements) Source(elements)
.connect(otherFlow) .via(otherFlow)
.connect(Sink(subscriber)).run() .to(Sink(subscriber)).run()
} }
"append Sink" in riverOf[String] { subscriber "append Sink" in riverOf[String] { subscriber
Source(elements) Source(elements)
.connect(otherFlow.connect(Sink(subscriber))) .to(otherFlow.to(Sink(subscriber)))
.run() .run()
} }
} }

View file

@ -53,7 +53,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).connect(Sink(subscriber)).run() Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -73,7 +73,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).connect(Sink(subscriber)).run() Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -101,7 +101,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).connect(Sink(subscriber)).run() Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -132,7 +132,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int] val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).connect(Sink(subscriber)).run() Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -160,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int] val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).connect(Sink(subscriber)).run() Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -189,7 +189,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int] val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(1, overflowStrategy = strategy).connect(Sink(subscriber)).run() Source(publisher).buffer(1, overflowStrategy = strategy).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()

View file

@ -25,49 +25,49 @@ class FlowCompileSpec extends AkkaSpec {
"open.run()" shouldNot compile "open.run()" shouldNot compile
} }
"accept Iterable" in { "accept Iterable" in {
val f: Source[Int] = intSeq.connect(Flow[Int]) val f: Source[Int] = intSeq.via(Flow[Int])
} }
"accept Future" in { "accept Future" in {
val f: Source[Int] = intFut.connect(Flow[Int]) val f: Source[Int] = intFut.via(Flow[Int])
} }
"append Flow" in { "append Flow" in {
val open1: Flow[Int, String] = Flow[Int].map(_.toString) val open1: Flow[Int, String] = Flow[Int].map(_.toString)
val open2: Flow[String, Int] = Flow[String].map(_.hashCode) val open2: Flow[String, Int] = Flow[String].map(_.hashCode)
val open3: Flow[Int, Int] = open1.connect(open2) val open3: Flow[Int, Int] = open1.via(open2)
"open3.run()" shouldNot compile "open3.run()" shouldNot compile
val closedSource: Source[Int] = intSeq.connect(open3) val closedSource: Source[Int] = intSeq.via(open3)
"closedSource.run()" shouldNot compile "closedSource.run()" shouldNot compile
val closedSink: Sink[Int] = open3.connect(Sink.publisher[Int]) val closedSink: Sink[Int] = open3.to(Sink.publisher[Int])
"closedSink.run()" shouldNot compile "closedSink.run()" shouldNot compile
closedSource.connect(Sink.publisher[Int]).run() closedSource.to(Sink.publisher[Int]).run()
intSeq.connect(closedSink).run() intSeq.to(closedSink).run()
} }
"append Sink" in { "append Sink" in {
val open: Flow[Int, String] = Flow[Int].map(_.toString) val open: Flow[Int, String] = Flow[Int].map(_.toString)
val closedSink: Sink[String] = Flow[String].map(_.hashCode).connect(Sink.publisher[Int]) val closedSink: Sink[String] = Flow[String].map(_.hashCode).to(Sink.publisher[Int])
val appended: Sink[Int] = open.connect(closedSink) val appended: Sink[Int] = open.to(closedSink)
"appended.run()" shouldNot compile "appended.run()" shouldNot compile
"appended.connect(Sink.future[Int])" shouldNot compile "appended.connect(Sink.future[Int])" shouldNot compile
intSeq.connect(appended).run intSeq.to(appended).run
} }
"be appended to Source" in { "be appended to Source" in {
val open: Flow[Int, String] = Flow[Int].map(_.toString) val open: Flow[Int, String] = Flow[Int].map(_.toString)
val closedSource: Source[Int] = strSeq.connect(Flow[String].map(_.hashCode)) val closedSource: Source[Int] = strSeq.via(Flow[String].map(_.hashCode))
val closedSource2: Source[String] = closedSource.connect(open) val closedSource2: Source[String] = closedSource.via(open)
"closedSource2.run()" shouldNot compile "closedSource2.run()" shouldNot compile
"strSeq.connect(closedSource2)" shouldNot compile "strSeq.connect(closedSource2)" shouldNot compile
closedSource2.connect(Sink.publisher[String]).run closedSource2.to(Sink.publisher[String]).run
} }
} }
"Sink" should { "Sink" should {
val openSource: Sink[Int] = val openSource: Sink[Int] =
Flow[Int].map(_.toString).connect(Sink.publisher[String]) Flow[Int].map(_.toString).to(Sink.publisher[String])
"accept Source" in { "accept Source" in {
intSeq.connect(openSource) intSeq.to(openSource)
} }
"not accept Sink" in { "not accept Sink" in {
"openSource.connect(Sink.future[String])" shouldNot compile "openSource.connect(Sink.future[String])" shouldNot compile
@ -81,7 +81,7 @@ class FlowCompileSpec extends AkkaSpec {
val openSource: Source[String] = val openSource: Source[String] =
Source(Seq(1, 2, 3)).map(_.toString) Source(Seq(1, 2, 3)).map(_.toString)
"accept Sink" in { "accept Sink" in {
openSource.connect(Sink.publisher[String]) openSource.to(Sink.publisher[String])
} }
"not be accepted by Source" in { "not be accepted by Source" in {
"openSource.connect(intSeq)" shouldNot compile "openSource.connect(intSeq)" shouldNot compile
@ -94,7 +94,7 @@ class FlowCompileSpec extends AkkaSpec {
"RunnableFlow" should { "RunnableFlow" should {
Sink.future[String] Sink.future[String]
val closed: RunnableFlow = val closed: RunnableFlow =
Source(Seq(1, 2, 3)).map(_.toString).connect(Sink.publisher[String]) Source(Seq(1, 2, 3)).map(_.toString).to(Sink.publisher[String])
"run" in { "run" in {
closed.run() closed.run()
} }

View file

@ -32,7 +32,7 @@ class FlowConcatAllSpec extends AkkaSpec {
val main = Source(List(s1, s2, s3, s4, s5)) val main = Source(List(s1, s2, s3, s4, s5))
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
main.flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() main.flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))
@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in { "work together with SplitWhen" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))
@ -53,7 +53,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the current open substream and signal error" in { "on onError on master stream cancel the current open substream and signal error" in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -73,7 +73,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on open substream, cancel the master stream and signal error " in { "on onError on open substream, cancel the master stream and signal error " in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -93,7 +93,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the current open substream and the master stream" in { "on cancellation cancel the current open substream and the master stream" in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).connect(Sink(subscriber)).run() Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()

View file

@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(Sink(subscriber)).run() Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(Sink(subscriber)).run() Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -68,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(Sink(subscriber)).run() Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()

View file

@ -17,7 +17,7 @@ class FlowDispatcherSpec extends AkkaSpec {
val probe = TestProbe() val probe = TestProbe()
val p = Source(List(1, 2, 3)).map(i val p = Source(List(1, 2, 3)).map(i
{ probe.ref ! Thread.currentThread().getName(); i }). { probe.ref ! Thread.currentThread().getName(); i }).
connect(Sink.ignore).run() to(Sink.ignore).run()
probe.receiveN(3) foreach { probe.receiveN(3) foreach {
case s: String s should startWith(system.name + "-akka.test.stream-dispatcher") case s: String s should startWith(system.name + "-akka.test.stream-dispatcher")
} }

View file

@ -31,7 +31,7 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest {
"not drop anything for negative n" in { "not drop anything for negative n" in {
val probe = StreamTestKit.SubscriberProbe[Int]() val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1, 2, 3)).drop(-1).connect(Sink(probe)).run() Source(List(1, 2, 3)).drop(-1).to(Sink(probe)).run()
probe.expectSubscription().request(10) probe.expectSubscription().request(10)
probe.expectNext(1) probe.expectNext(1)
probe.expectNext(2) probe.expectNext(2)

View file

@ -20,7 +20,7 @@ class FlowDropWithinSpec extends AkkaSpec {
val input = Iterator.from(1) val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Source(p).dropWithin(1.second).connect(Sink(c)).run() Source(p).dropWithin(1.second).to(Sink(c)).run()
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(100) cSub.request(100)

View file

@ -27,7 +27,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step // Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(Sink(subscriber)).run() Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -47,7 +47,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step // Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(Sink(subscriber)).run() Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()
@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(Sink(subscriber)).run() Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription() val sub = subscriber.expectSubscription()

View file

@ -31,7 +31,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
val probe = StreamTestKit.SubscriberProbe[Int]() val probe = StreamTestKit.SubscriberProbe[Int]()
Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
connect(Sink(probe)).run() to(Sink(probe)).run()
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()
for (_ 1 to 10000) { for (_ 1 to 10000) {

View file

@ -222,15 +222,15 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run() }.run()
FlowGraph(partial2) { b FlowGraph(partial2) { b
b.attachSink(undefinedSink1, f1.connect(out1)) b.attachSink(undefinedSink1, f1.to(out1))
b.attachSink(UndefinedSink[String]("sink2"), f2.connect(out2)) b.attachSink(UndefinedSink[String]("sink2"), f2.to(out2))
}.run() }.run()
FlowGraph(partial1) { implicit b FlowGraph(partial1) { implicit b
import FlowGraphImplicits._ import FlowGraphImplicits._
b.attachSink(undefinedSink1, f1.connect(out1)) b.attachSink(undefinedSink1, f1.to(out1))
b.attachSource(undefinedSource1, Source(List("a", "b", "c")).connect(f1)) b.attachSource(undefinedSource1, Source(List("a", "b", "c")).via(f1))
b.attachSource(undefinedSource2, Source(List("d", "e", "f")).connect(f2)) b.attachSource(undefinedSource2, Source(List("d", "e", "f")).via(f2))
bcast ~> f5 ~> out2 bcast ~> f5 ~> out2
}.run() }.run()
} }
@ -363,10 +363,10 @@ class FlowGraphCompileSpec extends AkkaSpec {
b.addEdge(in1, f1, out1) b.addEdge(in1, f1, out1)
}.run() }.run()
FlowGraph { b FlowGraph { b
b.addEdge(in1, f1, f2.connect(out1)) b.addEdge(in1, f1, f2.to(out1))
}.run() }.run()
FlowGraph { b FlowGraph { b
b.addEdge(in1.connect(f1), f2, out1) b.addEdge(in1.via(f1), f2, out1)
}.run() }.run()
FlowGraph { implicit b FlowGraph { implicit b
import FlowGraphImplicits._ import FlowGraphImplicits._
@ -378,18 +378,81 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run() }.run()
FlowGraph { implicit b FlowGraph { implicit b
import FlowGraphImplicits._ import FlowGraphImplicits._
in1 ~> f1.connect(out1) in1 ~> f1.to(out1)
}.run() }.run()
FlowGraph { implicit b FlowGraph { implicit b
import FlowGraphImplicits._ import FlowGraphImplicits._
in1.connect(f1) ~> out1 in1.via(f1) ~> out1
}.run() }.run()
FlowGraph { implicit b FlowGraph { implicit b
import FlowGraphImplicits._ import FlowGraphImplicits._
in1.connect(f1) ~> f2.connect(out1) in1.via(f1) ~> f2.to(out1)
}.run() }.run()
} }
"build all combinations with implicits" when {
"Source is connected directly" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
Source.empty[Int] ~> Flow[Int]
Source.empty[Int] ~> Broadcast[Int]
Source.empty[Int] ~> Sink.ignore
Source.empty[Int] ~> UndefinedSink[Int]
}
}
"Source is connected through flow" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
Source.empty[Int] ~> Flow[Int] ~> Flow[Int]
Source.empty[Int] ~> Flow[Int] ~> Broadcast[Int]
Source.empty[Int] ~> Flow[Int] ~> Sink.ignore
Source.empty[Int] ~> Flow[Int] ~> UndefinedSink[Int]
}
}
"Junction is connected directly" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
Broadcast[Int] ~> Flow[Int]
Broadcast[Int] ~> Broadcast[Int]
Broadcast[Int] ~> Sink.ignore
Broadcast[Int] ~> UndefinedSink[Int]
}
}
"Junction is connected through flow" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
Broadcast[Int] ~> Flow[Int] ~> Flow[Int]
Broadcast[Int] ~> Flow[Int] ~> Broadcast[Int]
Broadcast[Int] ~> Flow[Int] ~> Sink.ignore
Broadcast[Int] ~> Flow[Int] ~> UndefinedSink[Int]
}
}
"UndefinedSource is connected directly" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
UndefinedSource[Int] ~> Flow[Int]
UndefinedSource[Int] ~> Broadcast[Int]
UndefinedSource[Int] ~> Sink.ignore
UndefinedSource[Int] ~> UndefinedSink[Int]
}
}
"UndefinedSource is connected through flow" in {
PartialFlowGraph { implicit b
import FlowGraphImplicits._
UndefinedSource[Int] ~> Flow[Int] ~> Flow[Int]
UndefinedSource[Int] ~> Flow[Int] ~> Broadcast[Int]
UndefinedSource[Int] ~> Flow[Int] ~> Sink.ignore
UndefinedSource[Int] ~> Flow[Int] ~> UndefinedSink[Int]
}
}
}
"build partial with only undefined sources and sinks" in { "build partial with only undefined sources and sinks" in {
PartialFlowGraph { b PartialFlowGraph { b
b.addEdge(UndefinedSource[String], f1, UndefinedSink[String]) b.addEdge(UndefinedSource[String], f1, UndefinedSink[String])

View file

@ -26,7 +26,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1) val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 1.second).connect(Sink(c)).run() Source(p).groupedWithin(1000, 1.second).to(Sink(c)).run()
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(100) cSub.request(100)
@ -51,7 +51,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"deliver bufferd elements onComplete before the timeout" in { "deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(1 to 3).groupedWithin(1000, 10.second).connect(Sink(c)).run() Source(1 to 3).groupedWithin(1000, 10.second).to(Sink(c)).run()
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(100) cSub.request(100)
c.expectNext((1 to 3).toList) c.expectNext((1 to 3).toList)
@ -63,7 +63,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1) val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 1.second).connect(Sink(c)).run() Source(p).groupedWithin(1000, 1.second).to(Sink(c)).run()
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(1) cSub.request(1)
@ -83,7 +83,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"drop empty groups" in { "drop empty groups" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 500.millis).connect(Sink(c)).run() Source(p).groupedWithin(1000, 500.millis).to(Sink(c)).run()
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(2) cSub.request(2)
@ -105,7 +105,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1) val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(3, 2.second).connect(Sink(c)).run() Source(p).groupedWithin(3, 2.second).to(Sink(c)).run()
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(4) cSub.request(4)

View file

@ -25,7 +25,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements" in { "produce future elements" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 3).mapAsync(n Future(n)).connect(Sink(c)).run() val p = Source(1 to 3).mapAsync(n Future(n)).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(2) sub.request(2)
c.expectNext(1) c.expectNext(1)
@ -42,7 +42,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val p = Source(1 to 50).mapAsync(n Future { val p = Source(1 to 50).mapAsync(n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n n
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(1000) sub.request(1000)
for (n 1 to 50) c.expectNext(n) for (n 1 to 50) c.expectNext(n)
@ -56,7 +56,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val p = Source(1 to 20).mapAsync(n Future { val p = Source(1 to 20).mapAsync(n Future {
probe.ref ! n probe.ref ! n
n n
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
// nothing before requested // nothing before requested
probe.expectNoMsg(500.millis) probe.expectNoMsg(500.millis)
@ -84,7 +84,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
Await.ready(latch, 10.seconds) Await.ready(latch, 10.seconds)
n n
} }
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err1") c.expectError.getMessage should be("err1")
@ -103,7 +103,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
n n
} }
}). }).
connect(Sink(c)).run() to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err2") c.expectError.getMessage should be("err2")

View file

@ -27,7 +27,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val p = Source(1 to 4).mapAsyncUnordered(n Future { val p = Source(1 to 4).mapAsyncUnordered(n Future {
Await.ready(latch(n), 5.seconds) Await.ready(latch(n), 5.seconds)
n n
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(5) sub.request(5)
latch(2).countDown() latch(2).countDown()
@ -48,7 +48,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val p = Source(1 to 20).mapAsyncUnordered(n Future { val p = Source(1 to 20).mapAsyncUnordered(n Future {
probe.ref ! n probe.ref ! n
n n
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
// nothing before requested // nothing before requested
probe.expectNoMsg(500.millis) probe.expectNoMsg(500.millis)
@ -77,7 +77,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
Await.ready(latch, 10.seconds) Await.ready(latch, 10.seconds)
n n
} }
}).connect(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err1") c.expectError.getMessage should be("err1")
@ -96,7 +96,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
n n
} }
}). }).
connect(Sink(c)).run() to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err2") c.expectError.getMessage should be("err2")

View file

@ -28,7 +28,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback on normal completion" in { "invoke callback on normal completion" in {
val onCompleteProbe = TestProbe() val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run()
val proc = p.expectSubscription val proc = p.expectSubscription
proc.expectRequest() proc.expectRequest()
proc.sendNext(42) proc.sendNext(42)
@ -40,7 +40,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in { "yield the first error" in {
val onCompleteProbe = TestProbe() val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run()
val proc = p.expectSubscription val proc = p.expectSubscription
proc.expectRequest() proc.expectRequest()
val ex = new RuntimeException("ex") with NoStackTrace val ex = new RuntimeException("ex") with NoStackTrace
@ -52,7 +52,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback for an empty stream" in { "invoke callback for an empty stream" in {
val onCompleteProbe = TestProbe() val onCompleteProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
Source(p).connect(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run() Source(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run()
val proc = p.expectSubscription val proc = p.expectSubscription
proc.expectRequest() proc.expectRequest()
proc.sendComplete() proc.sendComplete()

View file

@ -33,7 +33,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (prefix, tailFlow) = Await.result(fut, 3.seconds) val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(Nil) prefix should be(Nil)
val tailSubscriber = SubscriberProbe[Int] val tailSubscriber = SubscriberProbe[Int]
tailFlow.connect(Sink(tailSubscriber)).run() tailFlow.to(Sink(tailSubscriber)).run()
tailSubscriber.expectComplete() tailSubscriber.expectComplete()
} }
@ -43,7 +43,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (prefix, tailFlow) = Await.result(fut, 3.seconds) val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(List(1, 2, 3)) prefix should be(List(1, 2, 3))
val tailSubscriber = SubscriberProbe[Int] val tailSubscriber = SubscriberProbe[Int]
tailFlow.connect(Sink(tailSubscriber)).run() tailFlow.to(Sink(tailSubscriber)).run()
tailSubscriber.expectComplete() tailSubscriber.expectComplete()
} }
@ -87,7 +87,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
takes should be(1 to 10) takes should be(1 to 10)
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
tail.connect(Sink(subscriber)).run() tail.to(Sink(subscriber)).run()
subscriber.expectCompletedOrSubscriptionFollowedByComplete() subscriber.expectCompletedOrSubscriptionFollowedByComplete()
} }
@ -95,7 +95,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(3).connect(Sink(subscriber)).run() Source(publisher).prefixAndTail(3).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -113,7 +113,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(1).connect(Sink(subscriber)).run() Source(publisher).prefixAndTail(1).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -128,7 +128,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete() subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
tail.connect(Sink(substreamSubscriber)).run() tail.to(Sink(substreamSubscriber)).run()
substreamSubscriber.expectSubscription() substreamSubscriber.expectSubscription()
upstream.sendError(testException) upstream.sendError(testException)
@ -140,7 +140,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(3).connect(Sink(subscriber)).run() Source(publisher).prefixAndTail(3).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -158,7 +158,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]() val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(1).connect(Sink(subscriber)).run() Source(publisher).prefixAndTail(1).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -173,7 +173,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete() subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
tail.connect(Sink(substreamSubscriber)).run() tail.to(Sink(substreamSubscriber)).run()
substreamSubscriber.expectSubscription().cancel() substreamSubscriber.expectSubscription().cancel()
upstream.expectCancellation() upstream.expectCancellation()

View file

@ -228,9 +228,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"subscribe Subscriber" in { "subscribe Subscriber" in {
val flow: Flow[String, String] = Flow[String] val flow: Flow[String, String] = Flow[String]
val c1 = StreamTestKit.SubscriberProbe[String]() val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[String] = flow.connect(Sink(c1)) val sink: Sink[String] = flow.to(Sink(c1))
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
Source(publisher).connect(sink).run() Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription
sub1.request(3) sub1.request(3)
@ -244,7 +244,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val flow = Flow[Int].map(i { testActor ! i.toString; i.toString }) val flow = Flow[Int].map(i { testActor ! i.toString; i.toString })
val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher) val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher)
Source(publisher).connect(flow).connect(Sink.ignore).run() Source(publisher).via(flow).to(Sink.ignore).run()
expectMsg("1") expectMsg("1")
expectMsg("2") expectMsg("2")
@ -254,9 +254,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"perform transformation operation and subscribe Subscriber" in { "perform transformation operation and subscribe Subscriber" in {
val flow = Flow[Int].map(_.toString) val flow = Flow[Int].map(_.toString)
val c1 = StreamTestKit.SubscriberProbe[String]() val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[Int] = flow.connect(Sink(c1)) val sink: Sink[Int] = flow.to(Sink(c1))
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
Source(publisher).connect(sink).run() Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription
sub1.request(3) sub1.request(3)

View file

@ -36,7 +36,7 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest {
"not take anything for negative n" in { "not take anything for negative n" in {
val probe = StreamTestKit.SubscriberProbe[Int]() val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1, 2, 3)).take(-1).connect(Sink(probe)).run() Source(List(1, 2, 3)).take(-1).to(Sink(probe)).run()
probe.expectSubscription().request(10) probe.expectSubscription().request(10)
probe.expectComplete() probe.expectComplete()
} }

View file

@ -20,7 +20,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
val input = Iterator.from(1) val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Source(p).takeWithin(1.second).connect(Sink(c)).run() Source(p).takeWithin(1.second).to(Sink(c)).run()
val pSub = p.expectSubscription() val pSub = p.expectSubscription()
val cSub = c.expectSubscription() val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
@ -40,7 +40,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
"deliver bufferd elements onComplete before the timeout" in { "deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Source(1 to 3).takeWithin(1.second).connect(Sink(c)).run() Source(1 to 3).takeWithin(1.second).to(Sink(c)).run()
val cSub = c.expectSubscription() val cSub = c.expectSubscription()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
cSub.request(100) cSub.request(100)

View file

@ -57,7 +57,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
} }
override def isComplete: Boolean = !isTimerActive("tick") override def isComplete: Boolean = !isTimerActive("tick")
}). }).
connect(Sink.ignore).run() to(Sink.ignore).run()
val pSub = p.expectSubscription() val pSub = p.expectSubscription()
expectMsg("tick-1") expectMsg("tick-1")
expectMsg("tick-2") expectMsg("tick-2")

View file

@ -194,7 +194,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
override def cleanup() = cleanupProbe.ref ! s override def cleanup() = cleanupProbe.ref ! s
}). }).
connect(Sink.ignore).run() to(Sink.ignore).run()
cleanupProbe.expectMsg("a") cleanupProbe.expectMsg("a")
} }
@ -365,7 +365,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
case _ Nil case _ Nil
} }
} }
}).connect(Sink(subscriber)).run() }).to(Sink(subscriber)).run()
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
@ -388,13 +388,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}) })
val s1 = StreamTestKit.SubscriberProbe[Int]() val s1 = StreamTestKit.SubscriberProbe[Int]()
flow.connect(Sink(s1)).run() flow.to(Sink(s1)).run()
s1.expectSubscription().request(3) s1.expectSubscription().request(3)
s1.expectNext(1, 2, 3) s1.expectNext(1, 2, 3)
s1.expectComplete() s1.expectComplete()
val s2 = StreamTestKit.SubscriberProbe[Int]() val s2 = StreamTestKit.SubscriberProbe[Int]()
flow.connect(Sink(s2)).run() flow.to(Sink(s2)).run()
s2.expectSubscription().request(3) s2.expectSubscription().request(3)
s2.expectNext(1, 2, 3) s2.expectNext(1, 2, 3)
s2.expectComplete() s2.expectComplete()

View file

@ -37,7 +37,7 @@ class FutureSinkSpec extends AkkaSpec with ScriptedTest {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val f = Sink.future[Int] val f = Sink.future[Int]
val s = Source.subscriber[Int] val s = Source.subscriber[Int]
val m = s.connect(f).run() val m = s.to(f).run()
p.subscribe(m.get(s)) p.subscribe(m.get(s))
val proc = p.expectSubscription val proc = p.expectSubscription
proc.expectRequest() proc.expectRequest()

View file

@ -68,7 +68,7 @@ class GraphFlowSpec extends AkkaSpec {
in -> out in -> out
} }
source1.connect(flow).connect(Sink(probe)).run() source1.via(flow).to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -86,7 +86,7 @@ class GraphFlowSpec extends AkkaSpec {
in -> out in -> out
} }
source1.connect(flow).map(_.toInt).connect(Sink(probe)).run() source1.via(flow).map(_.toInt).to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -113,7 +113,7 @@ class GraphFlowSpec extends AkkaSpec {
in2 -> out2 in2 -> out2
} }
source1.connect(flow1).connect(flow2).connect(Sink(probe)).run() source1.via(flow1).via(flow2).to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -150,7 +150,7 @@ class GraphFlowSpec extends AkkaSpec {
out out
} }
source.connect(Sink(probe)).run() source.to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -167,7 +167,7 @@ class GraphFlowSpec extends AkkaSpec {
out out
} }
source.map(_.toInt).connect(Sink(probe)).run() source.map(_.toInt).to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -193,7 +193,7 @@ class GraphFlowSpec extends AkkaSpec {
in2 -> out2 in2 -> out2
} }
source.connect(flow).connect(Sink(probe)).run() source.via(flow).to(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -230,7 +230,7 @@ class GraphFlowSpec extends AkkaSpec {
in in
} }
source1.connect(sink).run() source1.to(sink).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -247,8 +247,8 @@ class GraphFlowSpec extends AkkaSpec {
in in
} }
val iSink = Flow[Int].map(_.toString).connect(sink) val iSink = Flow[Int].map(_.toString).to(sink)
source1.connect(iSink).run() source1.to(iSink).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }
@ -274,7 +274,7 @@ class GraphFlowSpec extends AkkaSpec {
in2 in2
} }
source1.connect(flow).connect(sink).run() source1.via(flow).to(sink).run()
validateProbe(probe, stdRequests, stdResult) validateProbe(probe, stdRequests, stdResult)
} }

View file

@ -158,7 +158,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
Source(List(1, 2, 3)) ~> Flow[Int].map(_ * 2) ~> bcast Source(List(1, 2, 3)) ~> Flow[Int].map(_ * 2) ~> bcast
bcast ~> merge bcast ~> merge
bcast ~> Flow[Int].map(_ + 3) ~> merge bcast ~> Flow[Int].map(_ + 3) ~> merge
merge ~> Flow[Int].grouped(10).connect(resultFuture) merge ~> Flow[Int].grouped(10).to(resultFuture)
}.run() }.run()
Await.result(g.get(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9)) Await.result(g.get(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9))
@ -194,8 +194,8 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
val s2 = SubscriberProbe[String] val s2 = SubscriberProbe[String]
FlowGraph(partial) { builder FlowGraph(partial) { builder
builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1))) builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1)))
builder.attachSink(output1, Flow[Int].filter(n (n % 2) != 0).connect(Sink(s1))) builder.attachSink(output1, Flow[Int].filter(n (n % 2) != 0).to(Sink(s1)))
builder.attachSink(output2, Flow[String].map(_.toUpperCase).connect(Sink(s2))) builder.attachSink(output2, Flow[String].map(_.toUpperCase).to(Sink(s2)))
}.run() }.run()
val sub1 = s1.expectSubscription() val sub1 = s1.expectSubscription()

View file

@ -20,7 +20,7 @@ class SubscriberSinkSpec extends AkkaSpec {
"publish elements to the subscriber" in { "publish elements to the subscriber" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Source(List(1, 2, 3)).connect(Sink(c)).run() Source(List(1, 2, 3)).to(Sink(c)).run()
val s = c.expectSubscription() val s = c.expectSubscription()
s.request(3) s.request(3)
c.expectNext(1) c.expectNext(1)

View file

@ -19,7 +19,7 @@ class TickSourceSpec extends AkkaSpec {
"produce ticks" in { "produce ticks" in {
val tickGen = Iterator from 1 val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 500.millis, () "tick-" + tickGen.next()).connect(Sink(c)).run() Source(1.second, 500.millis, () "tick-" + tickGen.next()).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(3) sub.request(3)
c.expectNoMsg(600.millis) c.expectNoMsg(600.millis)
@ -35,7 +35,7 @@ class TickSourceSpec extends AkkaSpec {
"drop ticks when not requested" in { "drop ticks when not requested" in {
val tickGen = Iterator from 1 val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 1.second, () "tick-" + tickGen.next()).connect(Sink(c)).run() Source(1.second, 1.second, () "tick-" + tickGen.next()).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(2) sub.request(2)
c.expectNext("tick-1") c.expectNext("tick-1")
@ -76,7 +76,7 @@ class TickSourceSpec extends AkkaSpec {
"signal onError when tick closure throws" in { "signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).connect(Sink(c)).run() Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(3) sub.request(3)
c.expectError.getMessage should be("tick err") c.expectError.getMessage should be("tick err")

View file

@ -211,8 +211,8 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
tcpOutputs.setConnection(connection) tcpOutputs.setConnection(connection)
tcpInputs.setConnection(connection) tcpInputs.setConnection(connection)
val obmf = outbound.connect(Sink(processor)).run() val obmf = outbound.to(Sink(processor)).run()
val ibmf = Source(processor).connect(inbound).run() val ibmf = Source(processor).to(inbound).run()
requester ! StreamTcp.OutgoingTcpConnection(remoteAddress, localAddress)(obmf, ibmf) requester ! StreamTcp.OutgoingTcpConnection(remoteAddress, localAddress)(obmf, ibmf)
context.become(super.receive) context.become(super.receive)
case f: CommandFailed case f: CommandFailed

View file

@ -109,7 +109,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind,
nextPhase(runningPhase) nextPhase(runningPhase)
listener ! ResumeAccepting(1) listener ! ResumeAccepting(1)
val publisher = ActorPublisher[IncomingTcpConnection](self) val publisher = ActorPublisher[IncomingTcpConnection](self)
val mf = Source(publisher).connect(connectionHandler).run() val mf = Source(publisher).to(connectionHandler).run()
val target = self val target = self
requester ! StreamTcp.TcpServerBinding(localAddress)(mf, Some(new Closeable { requester ! StreamTcp.TcpServerBinding(localAddress)(mf, Some(new Closeable {
override def close() = target ! Unbind override def close() = target ! Unbind

View file

@ -62,21 +62,17 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
/** Converts this Flow to it's Scala DSL counterpart */ /** Converts this Flow to it's Scala DSL counterpart */
def asScala: scaladsl.Flow[In, Out] = delegate def asScala: scaladsl.Flow[In, Out] = delegate
// CONNECT // /**
* Transform this [[Flow]] by appending the given processing steps.
*/
def via[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] =
new Flow(delegate.via(flow.asScala))
/** /**
* Transform this flow by appending the given processing steps. * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] = def to(sink: javadsl.Sink[Out]): javadsl.Sink[In] =
new Flow(delegate.connect(flow.asScala)) new Sink(delegate.to(sink.asScala))
/**
* Connect this flow to a sink, concatenating the processing steps of both.
*/
def connect(sink: javadsl.Sink[Out]): javadsl.Sink[In] =
new Sink(delegate.connect(sink.asScala))
// RUN WITH //
/** /**
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it.
@ -120,8 +116,6 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
def runWith(source: javadsl.Source[In], sink: javadsl.Sink[Out], materializer: FlowMaterializer): Unit = def runWith(source: javadsl.Source[In], sink: javadsl.Sink[Out], materializer: FlowMaterializer): Unit =
delegate.runWith(source.asScala, sink.asScala)(materializer) delegate.runWith(source.asScala, sink.asScala)(materializer)
// COMMON OPS //
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. * as they pass through this processing step.

View file

@ -175,21 +175,17 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
/** Converts this Java DSL element to it's Scala DSL counterpart. */ /** Converts this Java DSL element to it's Scala DSL counterpart. */
def asScala: scaladsl.Source[Out] = delegate def asScala: scaladsl.Source[Out] = delegate
// CONNECT // /**
* Transform this [[Source]] by appending the given processing stages.
*/
def via[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] =
new Source(delegate.via(flow.asScala))
/** /**
* Transform this source by appending the given processing stages. * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] = def to(sink: javadsl.Sink[Out]): javadsl.RunnableFlow =
new Source(delegate.connect(flow.asScala)) new RunnableFlowAdapter(delegate.to(sink.asScala))
/**
* Connect this `Source` to a `Sink`, concatenating the processing steps of both.
*/
def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow =
new RunnableFlowAdapter(delegate.connect(sink.asScala))
// RUN WITH //
/** /**
* Connect this `Source` to a `KeyedSink` and run it. * Connect this `Source` to a `KeyedSink` and run it.
@ -206,9 +202,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`.
*/ */
def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit = def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit =
delegate.connect(sink.asScala).run()(materializer) delegate.to(sink.asScala).run()(materializer)
// OPS //
/** /**
* Shortcut for running this `Source` with a fold function. * Shortcut for running this `Source` with a fold function.

View file

@ -178,7 +178,7 @@ private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Un
} }
Nil Nil
} }
}).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
} }
/** /**
@ -202,7 +202,7 @@ private[scaladsl] final case class ForeachSink[In](f: In ⇒ Unit) extends Keyed
} }
Nil Nil
} }
}).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
promise.future promise.future
} }
} }
@ -232,7 +232,7 @@ private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) ex
} }
Nil Nil
} }
}).connect(BlackholeSink).run()(materializer.withNamePrefix(flowName)) }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
promise.future promise.future
} }

View file

@ -56,9 +56,9 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] {
private def sourcePipe = Pipe.empty[Out].withSource(this) private def sourcePipe = Pipe.empty[Out].withSource(this)
override def connect[T](flow: Flow[Out, T]): Source[T] = sourcePipe.connect(flow) override def via[T](flow: Flow[Out, T]): Source[T] = sourcePipe.via(flow)
override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) override def to(sink: Sink[Out]): RunnableFlow = sourcePipe.to(sink)
/** INTERNAL API */ /** INTERNAL API */
override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op))

View file

@ -20,14 +20,14 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
override type Repr[+O] <: Flow[In, O] override type Repr[+O] <: Flow[In, O]
/** /**
* Transform this flow by appending the given processing steps. * Transform this [[Flow]] by appending the given processing steps.
*/ */
def connect[T](flow: Flow[Out, T]): Flow[In, T] def via[T](flow: Flow[Out, T]): Flow[In, T]
/** /**
* Connect this flow to a sink, concatenating the processing steps of both. * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def connect(sink: Sink[Out]): Sink[In] def to(sink: Sink[Out]): Sink[In]
/** /**
* *
@ -36,7 +36,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* and `Publisher` of a [[PublisherSink]]. * and `Publisher` of a [[PublisherSink]].
*/ */
def runWith(source: KeyedSource[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = { def runWith(source: KeyedSource[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = {
val m = source.connect(this).connect(sink).run() val m = source.via(this).to(sink).run()
(m.get(source), m.get(sink)) (m.get(source), m.get(sink))
} }
@ -46,7 +46,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]]. * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]].
*/ */
def runWith(source: Source[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = def runWith(source: Source[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
source.connect(this).runWith(sink) source.via(this).runWith(sink)
/** /**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
@ -54,7 +54,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* The returned value will contain the materialized value of the `SourceWithKey`, e.g. `Subscriber` of a [[SubscriberSource]]. * The returned value will contain the materialized value of the `SourceWithKey`, e.g. `Subscriber` of a [[SubscriberSource]].
*/ */
def runWith(source: KeyedSource[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): source.MaterializedType = def runWith(source: KeyedSource[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): source.MaterializedType =
source.connect(this).connect(sink).run().get(source) source.via(this).to(sink).run().get(source)
/** /**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
@ -62,7 +62,7 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload. * As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload.
*/ */
def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): Unit = def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): Unit =
source.connect(this).connect(sink).run() source.via(this).to(sink).run()
} }
object Flow { object Flow {
@ -101,7 +101,7 @@ trait RunnableFlow {
} }
/** /**
* Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only. * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only.
*/ */
trait FlowOps[+Out] { trait FlowOps[+Out] {
import FlowOps._ import FlowOps._

View file

@ -656,15 +656,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
(source, flow, sink) match { (source, flow, sink) match {
case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out])
val src = sourcePipe.input val src = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops)) val newPipe = Pipe(sourcePipe.ops).via(pipe).via(Pipe(sinkPipe.ops))
val snk = sinkPipe.output val snk = sinkPipe.output
addEdge(src, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink addEdge(src, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink
case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sink: Sink[Out]) case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sink: Sink[Out])
val src = sourcePipe.input val src = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe) val newPipe = Pipe(sourcePipe.ops).via(pipe)
addEdge(src, newPipe, sink) // recursive, but now it is a Source-Pipe-Sink addEdge(src, newPipe, sink) // recursive, but now it is a Source-Pipe-Sink
case (source: Source[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) case (source: Source[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out])
val newPipe = pipe.connect(Pipe(sinkPipe.ops)) val newPipe = pipe.via(Pipe(sinkPipe.ops))
val snk = sinkPipe.output val snk = sinkPipe.output
addEdge(source, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink addEdge(source, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink
case (_, gflow: GraphFlow[In, _, _, Out], _) case (_, gflow: GraphFlow[In, _, _, Out], _)
@ -1291,6 +1291,8 @@ private[scaladsl] class MaterializedFlowGraph(materializedSources: Map[KeyedSour
/** /**
* Implicit conversions that provides syntactic sugar for building flow graphs. * Implicit conversions that provides syntactic sugar for building flow graphs.
* Every method in *Ops classes should have an implicit builder parameter to prevent
* using conversions where builder is not available (e.g. outside FlowGraph scope).
*/ */
object FlowGraphImplicits { object FlowGraphImplicits {
@ -1304,20 +1306,23 @@ object FlowGraphImplicits {
junctionIn.next junctionIn.next
} }
def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, sink)
def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit = def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, sink) builder.addEdge(source, sink)
} }
class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>[O](otherflow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[In, O] = def ~>[O](otherflow: Flow[Out, O]): SourceNextStep[In, O] =
new SourceNextStep(source, flow.connect(otherflow), builder) new SourceNextStep(source, flow.via(otherflow), builder)
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, flow, junctionIn) builder.addEdge(source, flow, junctionIn)
junctionIn.next junctionIn.next
} }
def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit = def ~>(sink: UndefinedSink[Out]): Unit =
builder.addEdge(source, flow, sink) builder.addEdge(source, flow, sink)
def ~>(sink: Sink[Out]): Unit = def ~>(sink: Sink[Out]): Unit =
@ -1328,30 +1333,32 @@ object FlowGraphImplicits {
def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] =
new JunctionNextStep(junction, flow, builder) new JunctionNextStep(junction, flow, builder)
def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, Pipe.empty[In], sink)
def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(junction, junctionIn) builder.addEdge(junction, junctionIn)
junctionIn.next junctionIn.next
} }
def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = builder.addEdge(junction, sink) def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, Pipe.empty[In], sink)
def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, sink)
} }
class JunctionNextStep[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { class JunctionNextStep[In, Out](junction: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>[O](otherFlow: Flow[Out, O]): JunctionNextStep[In, O] =
new JunctionNextStep(junction, flow.via(otherFlow), builder)
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(junctionOut, flow, junctionIn) builder.addEdge(junction, flow, junctionIn)
junctionIn.next junctionIn.next
} }
def ~>(sink: Sink[Out]): Unit = { def ~>(sink: UndefinedSink[Out]): Unit =
builder.addEdge(junctionOut, flow, sink) builder.addEdge(junction, flow, sink)
}
def ~>(sink: UndefinedSink[Out]): Unit = { def ~>(sink: Sink[Out]): Unit =
builder.addEdge(junctionOut, flow, sink) builder.addEdge(junction, flow, sink)
}
} }
implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal { implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal {
@ -1363,23 +1370,26 @@ object FlowGraphImplicits {
junctionIn.next junctionIn.next
} }
def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, sink)
def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, sink)
} }
class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>[T](otherFlow: Flow[Out, T]): UndefinedSourceNextStep[In, T] =
new UndefinedSourceNextStep(source, flow.via(otherFlow), builder)
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, flow, junctionIn) builder.addEdge(source, flow, junctionIn)
junctionIn.next junctionIn.next
} }
def ~>[T](otherFlow: Flow[Out, T])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, T] = def ~>(sink: UndefinedSink[Out]): Unit =
new UndefinedSourceNextStep(source, flow.connect(otherFlow), builder)
def ~>(sink: Sink[Out]): Unit = {
builder.addEdge(source, flow, sink) builder.addEdge(source, flow, sink)
}
def ~>(sink: UndefinedSink[Out]): Unit = { def ~>(sink: Sink[Out]): Unit =
builder.addEdge(source, flow, sink) builder.addEdge(source, flow, sink)
}
} }
} }

View file

@ -7,7 +7,14 @@ import akka.stream.impl.Ast.AstNode
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Flow[In, Out] { private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
inPipe: Pipe[In, CIn],
in: UndefinedSource[CIn],
graph: PartialFlowGraph,
out: UndefinedSink[COut],
outPipe: Pipe[COut, Out])
extends Flow[In, Out] {
override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O] override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O]
private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe)) private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe))
@ -32,30 +39,30 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CI
builder.connect(nOut, outPipe, oIn) builder.connect(nOut, outPipe, oIn)
} }
def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe)) case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T] case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b val (newGraph, nOut) = FlowGraphBuilder(graph) { b
val (oIn, oOut) = gFlow.remap(b) val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.connect(gFlow.inPipe), oIn) b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut) (b.partialBuild(), oOut)
} }
GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe) GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe)
} }
override def connect(sink: Sink[Out]) = sink match { override def to(sink: Sink[Out]) = sink match {
case sinkPipe: SinkPipe[Out] case sinkPipe: SinkPipe[Out]
val newGraph = PartialFlowGraph(this.graph) { builder val newGraph = PartialFlowGraph(this.graph) { builder
builder.attachSink(out, outPipe.connect(sinkPipe)) builder.attachSink(out, outPipe.to(sinkPipe))
} }
GraphSink(inPipe, in, newGraph) GraphSink(inPipe, in, newGraph)
case gSink: GraphSink[Out, Out] case gSink: GraphSink[Out, Out]
val newGraph = PartialFlowGraph(graph) { b val newGraph = PartialFlowGraph(graph) { b
val oIn = gSink.remap(b) val oIn = gSink.remap(b)
b.connect(out, outPipe.connect(gSink.inPipe), oIn) b.connect(out, outPipe.via(gSink.inPipe), oIn)
} }
GraphSink(inPipe, in, newGraph) GraphSink(inPipe, in, newGraph)
case sink: Sink[Out] connect(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe case sink: Sink[Out] to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe
} }
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
@ -75,29 +82,29 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou
builder.connect(nOut, outPipe, oIn) builder.connect(nOut, outPipe, oIn)
} }
override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { override def via[T](flow: Flow[Out, T]): Source[T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe)) case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T] case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b val (newGraph, nOut) = FlowGraphBuilder(graph) { b
val (oIn, oOut) = gFlow.remap(b) val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.connect(gFlow.inPipe), oIn) b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut) (b.partialBuild(), oOut)
} }
GraphSource(newGraph, nOut, gFlow.outPipe) GraphSource(newGraph, nOut, gFlow.outPipe)
} }
override def connect(sink: Sink[Out]): RunnableFlow = sink match { override def to(sink: Sink[Out]): RunnableFlow = sink match {
case sinkPipe: SinkPipe[Out] case sinkPipe: SinkPipe[Out]
FlowGraph(this.graph) { implicit builder FlowGraph(this.graph) { implicit builder
builder.attachSink(out, outPipe.connect(sinkPipe)) builder.attachSink(out, outPipe.to(sinkPipe))
} }
case gSink: GraphSink[Out, _] case gSink: GraphSink[Out, _]
FlowGraph(graph) { b FlowGraph(graph) { b
val oIn = gSink.remap(b) val oIn = gSink.remap(b)
b.connect(out, outPipe.connect(gSink.inPipe), oIn) b.connect(out, outPipe.via(gSink.inPipe), oIn)
} }
case sink: Sink[Out] case sink: Sink[Out]
connect(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe
} }
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
@ -113,7 +120,7 @@ private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: Unde
private[scaladsl] def prepend(pipe: SourcePipe[In]): FlowGraph = { private[scaladsl] def prepend(pipe: SourcePipe[In]): FlowGraph = {
FlowGraph(this.graph) { b FlowGraph(this.graph) { b
b.attachSource(in, pipe.connect(inPipe)) b.attachSource(in, pipe.via(inPipe))
} }
} }

View file

@ -25,13 +25,13 @@ private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flo
private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops) private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops)
override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case p: Pipe[T, In] Pipe(p.ops ++: ops) case p: Pipe[T, In] Pipe(p.ops ++: ops)
case gf: GraphFlow[Out, _, _, T] gf.prepend(this) case gf: GraphFlow[Out, _, _, T] gf.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x) case x FlowGraphInternal.throwUnsupportedValue(x)
} }
override def connect(sink: Sink[Out]): Sink[In] = sink match { override def to(sink: Sink[Out]): Sink[In] = sink match {
case sp: SinkPipe[Out] sp.prependPipe(this) case sp: SinkPipe[Out] sp.prependPipe(this)
case gs: GraphSink[Out, _] gs.prepend(this) case gs: GraphSink[Out, _] gs.prepend(this)
case d: Sink[Out] this.withSink(d) case d: Sink[Out] this.withSink(d)
@ -49,7 +49,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
override def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = override def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit =
source.connect(this).run() source.to(this).run()
} }
@ -65,13 +65,13 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops)
override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { override def via[T](flow: Flow[Out, T]): Source[T] = flow match {
case p: Pipe[Out, T] appendPipe(p) case p: Pipe[Out, T] appendPipe(p)
case g: GraphFlow[Out, _, _, T] g.prepend(this) case g: GraphFlow[Out, _, _, T] g.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x) case x FlowGraphInternal.throwUnsupportedValue(x)
} }
override def connect(sink: Sink[Out]): RunnableFlow = sink match { override def to(sink: Sink[Out]): RunnableFlow = sink match {
case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ++: ops) case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ++: ops)
case g: GraphSink[Out, _] g.prepend(this) case g: GraphSink[Out, _] g.prepend(this)
case d: Sink[Out] this.withSink(d) case d: Sink[Out] this.withSink(d)

View file

@ -18,14 +18,14 @@ trait Sink[-In] {
* of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/ */
def runWith(source: KeyedSource[In])(implicit materializer: FlowMaterializer): source.MaterializedType = def runWith(source: KeyedSource[In])(implicit materializer: FlowMaterializer): source.MaterializedType =
source.connect(this).run().get(source) source.to(this).run().get(source)
/** /**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/ */
def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit =
source.connect(this).run() source.to(this).run()
} }
object Sink { object Sink {

View file

@ -20,21 +20,21 @@ trait Source[+Out] extends FlowOps[Out] {
override type Repr[+O] <: Source[O] override type Repr[+O] <: Source[O]
/** /**
* Transform this source by appending the given processing stages. * Transform this [[Source]] by appending the given processing stages.
*/ */
def connect[T](flow: Flow[Out, T]): Source[T] def via[T](flow: Flow[Out, T]): Source[T]
/** /**
* Connect this source to a sink, concatenating the processing steps of both. * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def connect(sink: Sink[Out]): RunnableFlow def to(sink: Sink[Out]): RunnableFlow
/** /**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]]. * of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]].
*/ */
def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
connect(sink).run().get(sink) to(sink).run().get(sink)
/** /**
* Shortcut for running this `Source` with a fold function. * Shortcut for running this `Source` with a fold function.