diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 8e455f7bf4..0405efa6fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -122,7 +122,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "remember requested after restart" in { // creating actor with default supervision, because stream supervisor default strategy is to stop val ref = system.actorOf(manualSubscriberProps(testActor)) - Source(1 to 7).to(Sink(ActorSubscriber[Int](ref))).run() + Source(1 to 7).runWith(Sink(ActorSubscriber[Int](ref))) ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index 31deddd88a..a9a8b411ad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -79,7 +79,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { val flow: Flow[Int, Long] = Flow[Int].map(_.toLong).timedIntervalBetween(in ⇒ in % 2 == 1, d ⇒ probe.ref ! d) val c1 = StreamTestKit.SubscriberProbe[Long]() - Source(List(1, 2, 3)).via(flow).to(Sink(c1)).run() + Source(List(1, 2, 3)).via(flow).runWith(Sink(c1)) val s = c1.expectSubscription() s.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala index b606c3199a..7f582c2ef6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SslTlsFlowSpec.scala @@ -120,7 +120,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = val ssession = Await.result(ssessionf, duration) val sdata = ssession.data Source(sdata).map(bs ⇒ ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')). - to(Sink(scipher.plainTextOutbound)).run() + runWith(Sink(scipher.plainTextOutbound)) } def replyFirstLineInUpperCase(clientConnection: JavaSslConnection): Unit = { @@ -129,7 +129,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type = def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = { val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future) - Source(List(ByteString(message + '\n'))).to(Sink(ccipher.plainTextOutbound)).run() + Source(List(ByteString(message + '\n'))).runWith(Sink(ccipher.plainTextOutbound)) val csession = Await.result(csessionf, duration) val cdata = csession.data Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 1457f0a552..db796d4d52 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -48,7 +48,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) serverConnection.read(256) - Source(tcpProcessor).to(Sink.ignore).run() + Source(tcpProcessor).runWith(Sink.ignore) Source(testInput).runWith(Sink.publisher).subscribe(tcpProcessor) serverConnection.waitRead() should be(expectedOutput) @@ -158,7 +158,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).to(Sink(conn.outputStream)).run() + Source(testInput).runWith(Sink(conn.outputStream)) val resultFuture = Source(conn.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 3.seconds) should be(expectedOutput) @@ -178,7 +178,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).to(Sink(conn1.outputStream)).run() + Source(testInput).runWith(Sink(conn1.outputStream)) conn1.inputStream.subscribe(conn2.outputStream) conn2.inputStream.subscribe(conn3.outputStream) val resultFuture = Source(conn3.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala index 04c3337e06..4483ff6cbf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpFlowSpec.scala @@ -48,9 +48,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) serverConnection.read(256) - Source(tcpPublisher).to(Sink.ignore).run() + Source(tcpPublisher).runWith(Sink.ignore) - Source(testInput).to(Sink(tcpSubscriber)).run() + Source(testInput).runWith(Sink(tcpSubscriber)) serverConnection.waitRead() should be(expectedOutput) server.close() @@ -162,7 +162,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).to(Sink(tcpSubscriber)).run() + Source(testInput).runWith(Sink(tcpSubscriber)) val resultFuture = Source(tcpPublisher).fold(ByteString.empty) { case (res, elem) ⇒ res ++ elem } Await.result(resultFuture, 3.seconds) should be(expectedOutput) @@ -181,7 +181,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = Iterator.range(0, 256).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).to(Sink(tcpSubscriber1)).run() + Source(testInput).runWith(Sink(tcpSubscriber1)) tcpPublisher1.subscribe(tcpSubscriber2) tcpPublisher2.subscribe(tcpSubscriber3) val resultFuture = Source(tcpPublisher3).fold(ByteString.empty) { case (res, elem) ⇒ res ++ elem } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala index 9d2de18906..de8be30db4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io2/TcpHelper.scala @@ -223,7 +223,7 @@ trait TcpHelper { this: TestKitBase ⇒ def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = { val foreachSink = Sink.foreach[IncomingTcpConnection] { conn ⇒ - conn.inbound.to(conn.outbound).run() + conn.inbound.runWith(conn.outbound) } val binding = bind(Flow[IncomingTcpConnection].to(foreachSink), serverAddress) new EchoServer(binding.connection.get(foreachSink), binding) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 3b28496fe0..ee7b327dbc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -53,7 +53,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).to(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -73,7 +73,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).to(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -101,7 +101,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).to(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -132,7 +132,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).to(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -160,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).to(Sink(subscriber)).run() + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -189,7 +189,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(1, overflowStrategy = strategy).to(Sink(subscriber)).run() + Source(publisher).buffer(1, overflowStrategy = strategy).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index a09ca25716..9678152922 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index d159ed5c5b..2e4d05137e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -68,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).to(Sink(subscriber)).run() + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index 690f51ccc6..6c956b7070 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -27,7 +27,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -47,7 +47,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).to(Sink(subscriber)).run() + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala index 8109e8b664..41a3c0b3ba 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala @@ -31,7 +31,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { val probe = StreamTestKit.SubscriberProbe[Int]() Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). - to(Sink(probe)).run() + runWith(Sink(probe)) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 8c1994f67f..234e65e2d5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -25,7 +25,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "produce future elements" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).to(Sink(c)).run() + val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).runWith(Sink(c)) val sub = c.expectSubscription() sub.request(2) c.expectNext(1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 58b5e8cc66..81fc4213fb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -16,7 +16,6 @@ import akka.stream.impl.{ ActorBasedFlowMaterializer, ActorProcessorFactory, Fan import java.util.concurrent.atomic.AtomicReference sealed trait ActorFlowSink[-In] extends Sink[In] { - type MaterializedType /** * Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given @@ -64,7 +63,7 @@ trait SimpleActorFlowSink[-In] extends ActorFlowSink[In] { */ trait KeyedActorFlowSink[-In] extends ActorFlowSink[In] with KeyedSink[In] -private[scaladsl] object PublisherSink { +object PublisherSink { def apply[T](): PublisherSink[T] = new PublisherSink[T] def withFanout[T](initialBufferSize: Int, maximumBufferSize: Int): FanoutPublisherSink[T] = new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize) @@ -76,7 +75,7 @@ private[scaladsl] object PublisherSink { * elements to fill the internal buffers it will assert back-pressure until * a subscriber connects and creates demand for elements to be emitted. */ -private[scaladsl] class PublisherSink[In] extends KeyedActorFlowSink[In] { +class PublisherSink[In] extends KeyedActorFlowSink[In] { type MaterializedType = Publisher[In] override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = flowPublisher @@ -84,7 +83,7 @@ private[scaladsl] class PublisherSink[In] extends KeyedActorFlowSink[In] { override def toString: String = "PublisherSink" } -private[scaladsl] final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] { +final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] { type MaterializedType = Publisher[In] override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { @@ -96,7 +95,7 @@ private[scaladsl] final case class FanoutPublisherSink[In](initialBufferSize: In } } -private[scaladsl] object FutureSink { +object FutureSink { def apply[T](): FutureSink[T] = new FutureSink[T] } @@ -107,7 +106,7 @@ private[scaladsl] object FutureSink { * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -private[scaladsl] class FutureSink[In] extends KeyedActorFlowSink[In] { +class FutureSink[In] extends KeyedActorFlowSink[In] { type MaterializedType = Future[In] @@ -138,7 +137,7 @@ private[scaladsl] class FutureSink[In] extends KeyedActorFlowSink[In] { * Attaches a subscriber to this stream which will just discard all received * elements. */ -private[scaladsl] final case object BlackholeSink extends SimpleActorFlowSink[Any] { +final case object BlackholeSink extends SimpleActorFlowSink[Any] { override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = flowPublisher.subscribe(create(materializer, flowName)._1) override def isActive: Boolean = true @@ -149,14 +148,14 @@ private[scaladsl] final case object BlackholeSink extends SimpleActorFlowSink[An /** * Attaches a subscriber to this stream. */ -private[scaladsl] final case class SubscriberSink[In](subscriber: Subscriber[In]) extends SimpleActorFlowSink[In] { +final case class SubscriberSink[In](subscriber: Subscriber[In]) extends SimpleActorFlowSink[In] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = flowPublisher.subscribe(subscriber) override def isActive: Boolean = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = (subscriber, ()) } -private[scaladsl] object OnCompleteSink { +object OnCompleteSink { private val SuccessUnit = Success[Unit](()) } @@ -165,7 +164,7 @@ private[scaladsl] object OnCompleteSink { * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ -private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends SimpleActorFlowSink[In] { +final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends SimpleActorFlowSink[In] { override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = Source(flowPublisher).transform("onCompleteSink", () ⇒ new Transformer[In, Unit] { @@ -186,7 +185,7 @@ private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Un * that will be completed with `Success` when reaching the normal end of the stream, or completed * with `Failure` if there is an error is signaled in the stream. */ -private[scaladsl] final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] { +final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] { override type MaterializedType = Future[Unit] @@ -214,7 +213,7 @@ private[scaladsl] final case class ForeachSink[In](f: In ⇒ Unit) extends Keyed * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ -private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFlowSink[In] { +final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFlowSink[In] { type MaterializedType = Future[U] @@ -241,7 +240,7 @@ private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) ex /** * A sink that immediately cancels its upstream upon materialization. */ -private[scaladsl] final case object CancelSink extends SimpleActorFlowSink[Any] { +final case object CancelSink extends SimpleActorFlowSink[Any] { override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = { flowPublisher.subscribe(new Subscriber[Any] { @@ -257,7 +256,7 @@ private[scaladsl] final case object CancelSink extends SimpleActorFlowSink[Any] * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -private[scaladsl] final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] { +final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] { type MaterializedType = ActorRef diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index af267cc568..b7711d7644 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -19,7 +19,6 @@ import scala.util.Failure import scala.util.Success sealed trait ActorFlowSource[+Out] extends Source[Out] { - type MaterializedType /** * Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given @@ -82,7 +81,7 @@ trait KeyedActorFlowSource[+Out] extends ActorFlowSource[Out] with KeyedSource[O * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] { +final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] { override type MaterializedType = Subscriber[Out] override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = @@ -96,7 +95,7 @@ private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlo * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ -private[scaladsl] final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlowSource[Out] { +final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = p.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -110,7 +109,7 @@ private[scaladsl] final case class PublisherSource[Out](p: Publisher[Out]) exten * in accordance with the demand coming from the downstream transformation * steps. */ -private[scaladsl] final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] { +final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -126,7 +125,7 @@ private[scaladsl] final case class IteratorSource[Out](iterator: Iterator[Out]) * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ -private[scaladsl] final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends SimpleActorFlowSource[Out] { +final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -141,7 +140,7 @@ private[scaladsl] final case class IterableSource[Out](iterable: immutable.Itera * The stream ends normally when evaluation of the closure returns a `None`. * The stream ends exceptionally when an exception is thrown from the closure. */ -private[scaladsl] final case class ThunkSource[Out](f: () ⇒ Option[Out]) extends SimpleActorFlowSource[Out] { +final case class ThunkSource[Out](f: () ⇒ Option[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -159,7 +158,7 @@ private[scaladsl] final case class ThunkSource[Out](f: () ⇒ Option[Out]) exten * may happen before or after materializing the `Flow`. * The stream terminates with an error if the `Future` is completed with a failure. */ -private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] { +final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -183,7 +182,7 @@ private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extend * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -private[scaladsl] final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends SimpleActorFlowSource[Out] { +final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true @@ -197,7 +196,7 @@ private[scaladsl] final case class TickSource[Out](initialDelay: FiniteDuration, * completely, then draining the elements arriving from the second Source. If the first Source is infinite then the * second Source will be never drained. */ -private[scaladsl] final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] { +final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] { override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { val concatter = Concat[Out] @@ -216,7 +215,7 @@ private[scaladsl] final case class ConcatSource[Out](source1: Source[Out], sourc * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -private[scaladsl] final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] { +final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] { override type MaterializedType = ActorRef override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1186f112f0..23cef6f6b5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -35,34 +35,11 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a [[SubscriberSource]] and * and `Publisher` of a [[PublisherSink]]. */ - def runWith(source: KeyedSource[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = { + def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = { val m = source.via(this).to(sink).run() (m.get(source), m.get(sink)) } - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]]. - */ - def runWith(source: Source[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = - source.via(this).runWith(sink) - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * The returned value will contain the materialized value of the `SourceWithKey`, e.g. `Subscriber` of a [[SubscriberSource]]. - */ - def runWith(source: KeyedSource[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): source.MaterializedType = - source.via(this).to(sink).run().get(source) - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload. - */ - def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): Unit = - source.via(this).to(sink).run() } object Flow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index f2499e02e4..941b576de3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -1320,18 +1320,24 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl private[scaladsl] class MaterializedFlowGraph(materializedSources: Map[KeyedSource[_], Any], materializedSinks: Map[KeyedSink[_], Any]) extends MaterializedMap { - override def get(key: KeyedSource[_]): key.MaterializedType = - materializedSources.get(key) match { - case Some(matSource) ⇒ matSource.asInstanceOf[key.MaterializedType] - case None ⇒ - throw new IllegalArgumentException(s"Source key [$key] doesn't exist in this flow graph") + override def get(key: Source[_]): key.MaterializedType = + key match { + case k: KeyedSource[_] ⇒ materializedSources.get(k) match { + case Some(matSource) ⇒ matSource.asInstanceOf[key.MaterializedType] + case None ⇒ + throw new IllegalArgumentException(s"Source key [$key] doesn't exist in this flow graph") + } + case _ ⇒ ().asInstanceOf[key.MaterializedType] } - def get(key: KeyedSink[_]): key.MaterializedType = - materializedSinks.get(key) match { - case Some(matSink) ⇒ matSink.asInstanceOf[key.MaterializedType] - case None ⇒ - throw new IllegalArgumentException(s"Sink key [$key] doesn't exist in this flow graph") + def get(key: Sink[_]): key.MaterializedType = + key match { + case k: KeyedSink[_] ⇒ materializedSinks.get(k) match { + case Some(matSink) ⇒ matSink.asInstanceOf[key.MaterializedType] + case None ⇒ + throw new IllegalArgumentException(s"Sink key [$key] doesn't exist in this flow graph") + } + case _ ⇒ ().asInstanceOf[key.MaterializedType] } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala index 59e731795d..3436900d4c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala @@ -12,10 +12,10 @@ trait MaterializedMap { /** * Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ - def get(key: KeyedSource[_]): key.MaterializedType + def get(key: Source[_]): key.MaterializedType /** * Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[PublisherSink]]. */ - def get(key: KeyedSink[_]): key.MaterializedType + def get(key: Sink[_]): key.MaterializedType } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index 15ee52c14c..36080f13f7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -48,8 +48,6 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops) private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) - override def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = - source.to(this).run() } @@ -91,11 +89,19 @@ private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], * `Source` input or `Sink` output. */ private[stream] class MaterializedPipe(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedMap { - override def get(key: KeyedSource[_]): key.MaterializedType = - if (key == sourceKey) matSource.asInstanceOf[key.MaterializedType] - else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") + override def get(key: Source[_]): key.MaterializedType = + key match { + case _: KeyedSource[_] ⇒ + if (key == sourceKey) matSource.asInstanceOf[key.MaterializedType] + else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") + case _ ⇒ ().asInstanceOf[key.MaterializedType] + } - override def get(key: KeyedSink[_]): key.MaterializedType = - if (key == sinkKey) matSink.asInstanceOf[key.MaterializedType] - else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") + override def get(key: Sink[_]): key.MaterializedType = + key match { + case _: KeyedSink[_] ⇒ + if (key == sinkKey) matSink.asInstanceOf[key.MaterializedType] + else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") + case _ ⇒ ().asInstanceOf[key.MaterializedType] + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 0f7fc2d43a..5c161e1498 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -13,19 +13,15 @@ import akka.stream.FlowMaterializer * Can be used as a `Subscriber` */ trait Sink[-In] { - /** - * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value - * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. - */ - def runWith(source: KeyedSource[In])(implicit materializer: FlowMaterializer): source.MaterializedType = - source.to(this).run().get(source) + type MaterializedType /** * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ - def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit = - source.to(this).run() + def runWith(source: Source[In])(implicit materializer: FlowMaterializer): source.MaterializedType = + source.to(this).run().get(source) + } object Sink { @@ -118,6 +114,4 @@ object Sink { * to retrieve in order to access aspects of this sink (could be a completion Future * or a cancellation handle, etc.) */ -trait KeyedSink[-In] extends Sink[In] { - type MaterializedType -} +trait KeyedSink[-In] extends Sink[In] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 4a44b83e0a..7726cc660a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -17,6 +17,7 @@ import akka.stream.FlowMaterializer * Can be used as a `Publisher` */ trait Source[+Out] extends FlowOps[Out] { + type MaterializedType override type Repr[+O] <: Source[O] /** @@ -33,7 +34,7 @@ trait Source[+Out] extends FlowOps[Out] { * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]]. */ - def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = + def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = to(sink).run().get(sink) /** @@ -195,6 +196,4 @@ object Source { * to retrieve in order to access aspects of this source (could be a Subscriber, a * Future/Promise, etc.). */ -trait KeyedSource[+Out] extends Source[Out] { - type MaterializedType -} +trait KeyedSource[+Out] extends Source[Out]