diff --git a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala index f2a741ad8d..4753908b5c 100644 --- a/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/io/akka/integrationtest/http/HttpModelIntegrationSpec.scala @@ -62,7 +62,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte entity = HttpEntity.Default( contentType = ContentTypes.`application/json`, contentLength = 5, - Source(List(ByteString("hello"))).toPublisher())) + Source(List(ByteString("hello"))).runWith(PublisherDrain()))) // Our library uses a simple model of headers: a Seq[(String, String)]. // The body is represented as an Array[Byte]. To get the headers in @@ -141,7 +141,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte // convert the body into a Publisher[ByteString]. val byteStringBody = ByteString(byteArrayBody) - val publisherBody = Source(List(byteStringBody)).toPublisher() + val publisherBody = Source(List(byteStringBody)).runWith(PublisherDrain()) // Finally we can create our HttpResponse. diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit2/ScriptedTest.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit2/ScriptedTest.scala index 468c0cf88e..2a947832c6 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit2/ScriptedTest.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit2/ScriptedTest.scala @@ -9,17 +9,17 @@ import akka.stream.scaladsl2.{ FlowMaterializer, Source, Flow } import akka.stream.testkit.StreamTestKit._ import org.reactivestreams.Publisher import org.scalatest.Matchers - import scala.annotation.tailrec import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.stream.scaladsl2.PublisherDrain trait ScriptedTest extends Matchers { class ScriptException(msg: String) extends RuntimeException(msg) def toPublisher[In, Out]: (Source[Out], FlowMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.toPublisher()(m) + (f, m) ⇒ f.runWith(PublisherDrain())(m) object Script { def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit2/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit2/TwoStreamsSetup.scala index 07619b0714..9c2b316e51 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit2/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit2/TwoStreamsSetup.scala @@ -41,7 +41,7 @@ abstract class TwoStreamsSetup extends AkkaSpec { def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T] - def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).toPublisher() + def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).runWith(PublisherDrain()) def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala index fe47548fb5..bc3e8f4546 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala @@ -17,7 +17,7 @@ class FlowAppendSpec extends AkkaSpec with River { "Flow" should { "append Flow" in riverOf[String] { subscriber ⇒ val flow = Flow[Int].connect(otherFlow) - Source(elements).connect(flow).publishTo(subscriber) + Source(elements).connect(flow).connect(SubscriberDrain(subscriber)).run() } "append Sink" in riverOf[String] { subscriber ⇒ @@ -30,7 +30,7 @@ class FlowAppendSpec extends AkkaSpec with River { "append Flow" in riverOf[String] { subscriber ⇒ Source(elements) .connect(otherFlow) - .publishTo(subscriber) + .connect(SubscriberDrain(subscriber)).run() } "append Sink" in riverOf[String] { subscriber ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala index 025f3a761b..dfc928e630 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import akka.stream.MaterializerSettings import akka.stream.OverflowStrategy +import scala.concurrent.Future class FlowBufferSpec extends AkkaSpec { @@ -20,24 +21,20 @@ class FlowBufferSpec extends AkkaSpec { "Buffer" must { "pass elements through normally in backpressured mode" in { - val futureDrain = FutureDrain[Seq[Int]] - val mf = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - connect(futureDrain).run() - val future = futureDrain.future(mf) + val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + runWith(FutureDrain()) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through normally in backpressured mode with buffer size one" in { val futureDrain = FutureDrain[Seq[Int]] - val mf = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). - connect(futureDrain).run() - val future = futureDrain.future(mf) + val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + runWith(FutureDrain()) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through a chain of backpressured buffers of different size" in { - val futureDrain = FutureDrain[Seq[Int]] - val mf = Source((1 to 1000).iterator) + val future = Source((1 to 1000).iterator) .buffer(1, overflowStrategy = OverflowStrategy.backpressure) .buffer(10, overflowStrategy = OverflowStrategy.backpressure) .buffer(256, overflowStrategy = OverflowStrategy.backpressure) @@ -45,8 +42,7 @@ class FlowBufferSpec extends AkkaSpec { .buffer(5, overflowStrategy = OverflowStrategy.backpressure) .buffer(128, overflowStrategy = OverflowStrategy.backpressure) .grouped(1001) - .connect(futureDrain).run() - val future = futureDrain.future(mf) + .runWith(FutureDrain()) Await.result(future, 3.seconds) should be(1 to 1000) } @@ -54,7 +50,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).publishTo(subscriber) + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -74,7 +70,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).publishTo(subscriber) + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -102,7 +98,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).publishTo(subscriber) + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -133,7 +129,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).publishTo(subscriber) + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -164,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(1, overflowStrategy = strategy).publishTo(subscriber) + Source(publisher).buffer(1, overflowStrategy = strategy).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConcatAllSpec.scala index dac8263499..5ede7d6b07 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConcatAllSpec.scala @@ -32,7 +32,7 @@ class FlowConcatAllSpec extends AkkaSpec { val main = Source(List(s1, s2, s3, s4, s5)) val subscriber = StreamTestKit.SubscriberProbe[Int]() - main.flatten(FlattenStrategy.concat).publishTo(subscriber) + main.flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) @@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).publishTo(subscriber) + Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) @@ -53,7 +53,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the current open substream and signal error" in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber) + Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -73,7 +73,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on open substream, cancel the master stream and signal error " in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber) + Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -93,7 +93,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the current open substream and the master stream" in { val publisher = StreamTestKit.PublisherProbe[Source[Int]]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber) + Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala index c8b817153c..bfa487b7e8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala @@ -23,7 +23,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -41,7 +41,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -57,12 +57,10 @@ class FlowConflateSpec extends AkkaSpec { "work on a variable rate chain" in { val foldDrain = FoldDrain[Int, Int](0)(_ + _) - val mf = Source((1 to 1000).iterator) + val future = Source((1 to 1000).iterator) .conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } - .connect(foldDrain) - .run() - val future = foldDrain.future(mf) + .runWith(FoldDrain[Int, Int](0)(_ + _)) Await.result(future, 10.seconds) should be(500500) } @@ -70,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + Source(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDispatcherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDispatcherSpec.scala index 9373e8b08e..8f561e9b72 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDispatcherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDispatcherSpec.scala @@ -16,7 +16,7 @@ class FlowDispatcherSpec extends AkkaSpec { val probe = TestProbe() val p = Source(List(1, 2, 3)).map(i ⇒ { probe.ref ! Thread.currentThread().getName(); i }). - consume() + connect(BlackholeDrain).run() probe.receiveN(3) foreach { case s: String ⇒ s should startWith(system.name + "-akka.test.stream-dispatcher") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala index 1d9740ec34..8a71742310 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala @@ -29,7 +29,7 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest { "not drop anything for negative n" in { val probe = StreamTestKit.SubscriberProbe[Int]() - Source(List(1, 2, 3)).drop(-1).publishTo(probe) + Source(List(1, 2, 3)).drop(-1).connect(SubscriberDrain(probe)).run() probe.expectSubscription().request(10) probe.expectNext(1) probe.expectNext(2) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala index 4c46440d38..1902e3adad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala @@ -18,7 +18,7 @@ class FlowDropWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Source(p).dropWithin(1.second).publishTo(c) + Source(p).dropWithin(1.second).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala index d775c47638..3624caede3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala @@ -24,7 +24,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -44,7 +44,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() // Simply repeat the last element as an extrapolation step - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() @@ -64,13 +64,10 @@ class FlowExpandSpec extends AkkaSpec { } "work on a variable rate chain" in { - val foldDrain = FoldDrain[Set[Int], Int](Set.empty[Int])(_ + _) - val mf = Source((1 to 100).iterator) + val future = Source((1 to 100).iterator) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)) - .connect(foldDrain) - .run() - val future = foldDrain.future(mf) + .runWith(FoldDrain[Set[Int], Int](Set.empty[Int])(_ + _)) Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100)) } @@ -79,7 +76,7 @@ class FlowExpandSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + Source(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).connect(SubscriberDrain(subscriber)).run() val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala index f5ee770eb0..bebe4baa90 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala @@ -30,7 +30,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { val probe = StreamTestKit.SubscriberProbe[Int]() Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). - toPublisher().subscribe(probe) + connect(SubscriberDrain(probe)).run() val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala index 2ee2b2b9eb..d9e64360d4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala @@ -15,18 +15,14 @@ class FlowFoldSpec extends AkkaSpec with DefaultTimeout { "fold" in { val input = 1 to 100 - val foldDrain = FoldDrain[Int, Int](0)(_ + _) - val mf = Source(input).connect(foldDrain).run() - val future = foldDrain.future(mf) + val future = Source(input).runWith(FoldDrain[Int, Int](0)(_ + _)) val expected = input.fold(0)(_ + _) Await.result(future, timeout.duration) should be(expected) } "propagate an error" in { val error = new Exception with NoStackTrace - val foldSink = FoldDrain[Unit, Unit](())((_, _) ⇒ ()) - val mf = Source[Unit](() ⇒ throw error).connect(foldSink).run() - val future = foldSink.future(mf) + val future = Source[Unit](() ⇒ throw error).runWith(FoldDrain[Unit, Unit](())((_, _) ⇒ ())) the[Exception] thrownBy Await.result(future, timeout.duration) should be(error) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala index aee60f2922..9c899831c7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala @@ -16,9 +16,7 @@ class FlowForeachSpec extends AkkaSpec { "A Foreach" must { "call the procedure for each element" in { - val foreachDrain = ForeachDrain[Int](testActor ! _) - val mf = Source(1 to 3).connect(foreachDrain).run() - foreachDrain.future(mf).onSuccess { + Source(1 to 3).runWith(ForeachDrain[Int](testActor ! _)) onSuccess { case _ ⇒ testActor ! "done" } expectMsg(1) @@ -29,8 +27,7 @@ class FlowForeachSpec extends AkkaSpec { "complete the future for an empty stream" in { val foreachDrain = ForeachDrain[Int](testActor ! _) - val mf = Source(Nil).connect(foreachDrain).run() - foreachDrain.future(mf).onSuccess { + val mf = Source(Nil).runWith(ForeachDrain[Int](testActor ! _)) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") @@ -39,8 +36,7 @@ class FlowForeachSpec extends AkkaSpec { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() val foreachDrain = ForeachDrain[Int](testActor ! _) - val mf = Source(p).connect(foreachDrain).run() - foreachDrain.future(mf).onFailure { + val mf = Source(p).runWith(ForeachDrain[Int](testActor ! _)) onFailure { case ex ⇒ testActor ! ex } val proc = p.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala index fab000f6bd..efde0635ff 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala @@ -17,7 +17,7 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in { - val p = Source(Future.successful(1)).toPublisher() + val p = Source(Future.successful(1)).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -29,7 +29,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in { val ex = new RuntimeException("test") with NoStackTrace - val p = Source(Future.failed[Int](ex)).toPublisher() + val p = Source(Future.failed[Int](ex)).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectError(ex) @@ -37,7 +37,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed" in { val promise = Promise[Int]() - val p = Source(promise.future).toPublisher() + val p = Source(promise.future).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -51,7 +51,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() - val p = Source(promise.future).toPublisher() + val p = Source(promise.future).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -64,7 +64,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in { val promise = Promise[Int]() - val p = Source(promise.future).toPublisher() + val p = Source(promise.future).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -82,7 +82,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements to later subscriber" in { val promise = Promise[Int]() - val p = Source(promise.future).toPublisher() + val p = Source(promise.future).runWith(PublisherDrain()) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -103,7 +103,7 @@ class FlowFromFutureSpec extends AkkaSpec { "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).toPublisher() + val p = Source(promise.future).runWith(PublisherDrain()) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index b64fcf14b1..628aae2bf8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -190,7 +190,7 @@ class FlowGraphCompileSpec extends AkkaSpec { b.attachSink(undefinedSink1, out1) }.run() - out1.publisher(mg) should not be (null) + mg.materializedDrain(out1) should not be (null) } "build partial flow graphs" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala index e87391ec8c..5e0b6c9870 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala @@ -32,8 +32,8 @@ class FlowGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val tap = Source((1 to elementCount).iterator).toPublisher() - val groupStream = Source(tap).groupBy(_ % groupCount).toPublisher() + val tap = Source((1 to elementCount).iterator).runWith(PublisherDrain()) + val groupStream = Source(tap).groupBy(_ % groupCount).runWith(PublisherDrain()) val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() groupStream.subscribe(masterSubscriber) @@ -56,7 +56,7 @@ class FlowGroupBySpec extends AkkaSpec { "groupBy" must { "work in the happy case" in new SubstreamsSupport(groupCount = 2) { - val s1 = StreamPuppet(getSubFlow(1).toPublisher()) + val s1 = StreamPuppet(getSubFlow(1).runWith(PublisherDrain())) masterSubscriber.expectNoMsg(100.millis) s1.expectNoMsg(100.millis) @@ -64,7 +64,7 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectNext(1) s1.expectNoMsg(100.millis) - val s2 = StreamPuppet(getSubFlow(0).toPublisher()) + val s2 = StreamPuppet(getSubFlow(0).runWith(PublisherDrain())) s2.expectNoMsg(100.millis) s2.request(2) @@ -92,9 +92,9 @@ class FlowGroupBySpec extends AkkaSpec { } "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { - StreamPuppet(getSubFlow(1).toPublisher()).cancel() + StreamPuppet(getSubFlow(1).runWith(PublisherDrain())).cancel() - val substream = StreamPuppet(getSubFlow(0).toPublisher()) + val substream = StreamPuppet(getSubFlow(0).runWith(PublisherDrain())) substream.request(2) substream.expectNext(2) substream.expectNext(4) @@ -110,7 +110,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when not consumed anything" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() publisher.subscribe(subscriber) @@ -141,7 +141,7 @@ class FlowGroupBySpec extends AkkaSpec { } "work with empty input stream" in { - val publisher = Source(List.empty[Int]).groupBy(_ % 2).toPublisher() + val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() publisher.subscribe(subscriber) @@ -150,7 +150,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() publisher.subscribe(subscriber) @@ -167,7 +167,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream when substreams are running" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() publisher.subscribe(subscriber) @@ -179,7 +179,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.toPublisher()) + val substreamPuppet = StreamPuppet(substream.runWith(PublisherDrain())) substreamPuppet.request(1) substreamPuppet.expectNext(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala index cfa4917821..14b988090c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala @@ -24,7 +24,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 1.second).publishTo(c) + Source(p).groupedWithin(1000, 1.second).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) @@ -49,7 +49,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(1 to 3).groupedWithin(1000, 10.second).publishTo(c) + Source(1 to 3).groupedWithin(1000, 10.second).connect(SubscriberDrain(c)).run() val cSub = c.expectSubscription cSub.request(100) c.expectNext((1 to 3).toList) @@ -61,7 +61,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 1.second).publishTo(c) + Source(p).groupedWithin(1000, 1.second).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(1) @@ -81,7 +81,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { "drop empty groups" in { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(1000, 500.millis).publishTo(c) + Source(p).groupedWithin(1000, 500.millis).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(2) @@ -103,7 +103,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(3, 2.second).publishTo(c) + Source(p).groupedWithin(3, 2.second).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(4) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala index e2614eb429..03c3c45d27 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala @@ -18,7 +18,7 @@ class FlowIterableSpec extends AkkaSpec { "A Flow based on an iterable" must { "produce elements" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -32,7 +32,7 @@ class FlowIterableSpec extends AkkaSpec { } "complete empty" in { - val p = Source(List.empty[Int]).toPublisher() + val p = Source(List.empty[Int]).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectComplete() @@ -44,7 +44,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -68,7 +68,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -94,7 +94,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Source(List(1, 2, 3)).map(_ * 2).toPublisher() + val p = Source(List(1, 2, 3)).map(_ * 2).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -106,7 +106,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Source(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher() + val p = Source(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -118,7 +118,7 @@ class FlowIterableSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Source(1 to count).toPublisher() + val p = Source(1 to count).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -134,19 +134,19 @@ class FlowIterableSpec extends AkkaSpec { } "have value equality of publisher" in { - val p1 = Source(List(1, 2, 3)).toPublisher() - val p2 = Source(List(1, 2, 3)).toPublisher() + val p1 = Source(List(1, 2, 3)).runWith(PublisherDrain()) + val p2 = Source(List(1, 2, 3)).runWith(PublisherDrain()) p1 should be(p2) p2 should be(p1) - val p3 = Source(List(1, 2, 3, 4)).toPublisher() + val p3 = Source(List(1, 2, 3, 4)).runWith(PublisherDrain()) p1 should not be (p3) p3 should not be (p1) - val p4 = Source(Vector.empty[String]).toPublisher() - val p5 = Source(Set.empty[String]).toPublisher() + val p4 = Source(Vector.empty[String]).runWith(PublisherDrain()) + val p5 = Source(Set.empty[String]).runWith(PublisherDrain()) p1 should not be (p4) p4 should be(p5) p5 should be(p4) - val p6 = Source(List(1, 2, 3).iterator).toPublisher() + val p6 = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) p1 should not be (p6) p6 should not be (p1) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala index d3b39db7bb..38540f13d9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala @@ -22,7 +22,7 @@ class FlowIteratorSpec extends AkkaSpec { "A Flow based on an iterator" must { "produce elements" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -36,7 +36,7 @@ class FlowIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Source(List.empty[Int].iterator).toPublisher() + val p = Source(List.empty[Int].iterator).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectComplete() @@ -48,7 +48,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -72,7 +72,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -95,7 +95,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Source(List(1, 2, 3).iterator).map(_ * 2).toPublisher() + val p = Source(List(1, 2, 3).iterator).map(_ * 2).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -107,7 +107,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher() + val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -119,7 +119,7 @@ class FlowIteratorSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Source((1 to count).iterator).toPublisher() + val p = Source((1 to count).iterator).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala index 0148119ec5..6497d972c2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala @@ -23,7 +23,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "produce future elements" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).publishTo(c) + val p = Source(1 to 3).mapAsync(n ⇒ Future(n)).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(2) c.expectNext(1) @@ -40,7 +40,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val p = Source(1 to 50).mapAsync(n ⇒ Future { Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) n - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(1000) for (n ← 1 to 50) c.expectNext(n) @@ -54,7 +54,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val p = Source(1 to 20).mapAsync(n ⇒ Future { probe.ref ! n n - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() // nothing before requested probe.expectNoMsg(500.millis) @@ -82,7 +82,7 @@ class FlowMapAsyncSpec extends AkkaSpec { Await.ready(latch, 10.seconds) n } - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err1") @@ -101,7 +101,7 @@ class FlowMapAsyncSpec extends AkkaSpec { n } }). - publishTo(c) + connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala index 1595502fdb..abaf2ce7da 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala @@ -26,7 +26,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val p = Source(1 to 4).mapAsyncUnordered(n ⇒ Future { Await.ready(latch(n), 5.seconds) n - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(5) latch(2).countDown() @@ -47,7 +47,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val p = Source(1 to 20).mapAsyncUnordered(n ⇒ Future { probe.ref ! n n - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() // nothing before requested probe.expectNoMsg(500.millis) @@ -76,7 +76,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { Await.ready(latch, 10.seconds) n } - }).publishTo(c) + }).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err1") @@ -95,7 +95,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { n } }). - publishTo(c) + connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be("err2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapSpec.scala index 35c4a90832..14269c3399 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapSpec.scala @@ -28,7 +28,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { val probe = StreamTestKit.SubscriberProbe[Int]() Source(List(1).iterator). map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1). - toPublisher().subscribe(probe) + runWith(PublisherDrain()).subscribe(probe) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala index 842d808c72..ff3a35bca4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala @@ -66,11 +66,11 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { val foreachDrain = ForeachDrain[Int] { x ⇒ onCompleteProbe.ref ! ("foreach-" + x) } - val mf = Source(p).map { x ⇒ + val future = Source(p).map { x ⇒ onCompleteProbe.ref ! ("map-" + x) x - }.connect(foreachDrain).run() - foreachDrain.future(mf) onComplete { onCompleteProbe.ref ! _ } + }.runWith(foreachDrain) + future onComplete { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala index c48d607fd0..aa78ba20d2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala @@ -29,74 +29,65 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "work on empty input" in { val futureDrain = newFutureDrain - val mf = Source(Nil).prefixAndTail(10).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source(Nil).prefixAndTail(10).runWith(futureDrain) val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(Nil) val tailSubscriber = SubscriberProbe[Int] - tailFlow.publishTo(tailSubscriber) + tailFlow.connect(SubscriberDrain(tailSubscriber)).run() tailSubscriber.expectComplete() } "work on short input" in { val futureDrain = newFutureDrain - val mf = Source(List(1, 2, 3)).prefixAndTail(10).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureDrain) val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(List(1, 2, 3)) val tailSubscriber = SubscriberProbe[Int] - tailFlow.publishTo(tailSubscriber) + tailFlow.connect(SubscriberDrain(tailSubscriber)).run() tailSubscriber.expectComplete() } "work on longer inputs" in { val futureDrain = newFutureDrain - val mf = Source((1 to 10).iterator).prefixAndTail(5).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureDrain) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 5) val futureDrain2 = FutureDrain[immutable.Seq[Int]] - val mf2 = tail.grouped(6).connect(futureDrain2).run() - val fut2 = futureDrain2.future(mf2) + val fut2 = tail.grouped(6).runWith(futureDrain2) Await.result(fut2, 3.seconds) should be(6 to 10) } "handle zero take count" in { val futureDrain = newFutureDrain - val mf = Source((1 to 10).iterator).prefixAndTail(0).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureDrain) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) val futureDrain2 = FutureDrain[immutable.Seq[Int]] - val mf2 = tail.grouped(11).connect(futureDrain2).run() - val fut2 = futureDrain2.future(mf2) + val fut2 = tail.grouped(11).runWith(futureDrain2) Await.result(fut2, 3.seconds) should be(1 to 10) } "handle negative take count" in { val futureDrain = newFutureDrain - val mf = Source((1 to 10).iterator).prefixAndTail(-1).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureDrain) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) val futureDrain2 = FutureDrain[immutable.Seq[Int]] - val mf2 = tail.grouped(11).connect(futureDrain2).run() - val fut2 = futureDrain2.future(mf2) + val fut2 = tail.grouped(11).runWith(futureDrain2) Await.result(fut2, 3.seconds) should be(1 to 10) } "work if size of take is equal to stream size" in { val futureDrain = newFutureDrain - val mf = Source((1 to 10).iterator).prefixAndTail(10).connect(futureDrain).run() - val fut = futureDrain.future(mf) + val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureDrain) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 10) val subscriber = StreamTestKit.SubscriberProbe[Int]() - tail.publishTo(subscriber) + tail.connect(SubscriberDrain(subscriber)).run() subscriber.expectCompletedOrSubscriptionFollowedByComplete() } @@ -104,7 +95,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(3).publishTo(subscriber) + Source(publisher).prefixAndTail(3).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -122,7 +113,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(1).publishTo(subscriber) + Source(publisher).prefixAndTail(1).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -137,7 +128,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - tail.publishTo(substreamSubscriber) + tail.connect(SubscriberDrain(substreamSubscriber)).run() substreamSubscriber.expectSubscription() upstream.sendError(testException) @@ -149,7 +140,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(3).publishTo(subscriber) + Source(publisher).prefixAndTail(3).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -167,7 +158,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val publisher = StreamTestKit.PublisherProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]() - Source(publisher).prefixAndTail(1).publishTo(subscriber) + Source(publisher).prefixAndTail(1).connect(SubscriberDrain(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -182,7 +173,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { subscriber.expectComplete() val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]() - tail.publishTo(substreamSubscriber) + tail.connect(SubscriberDrain(substreamSubscriber)).run() substreamSubscriber.expectSubscription().cancel() upstream.expectCancellation() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index 9990ece187..f1e4ddfd6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -83,15 +83,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val identity2: Flow[Any, Any] ⇒ Flow[Any, Any] = in ⇒ identity(in) val toPublisher: (Source[Any], FlowMaterializer) ⇒ Publisher[Any] = - (f, m) ⇒ f.toPublisher()(m) + (f, m) ⇒ f.runWith(PublisherDrain())(m) def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out], FlowMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.toFanoutPublisher(initialBufferSize, maximumBufferSize)(m) + (f, m) ⇒ f.runWith(FanoutPublisherDrain(initialBufferSize, maximumBufferSize))(m) def materializeIntoSubscriberAndPublisher[In, Out](flow: Flow[In, Out]): (Subscriber[In], Publisher[Out]) = { val tap = SubscriberTap[In] val drain = PublisherDrain[Out] - val mf = tap.connect(flow).connect(drain).run() - (tap.subscriber(mf), drain.publisher(mf)) + flow.runWith(tap, drain) } "A Flow" must { @@ -174,7 +173,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = StreamTestKit.SubscriberProbe[String]() flowOut.subscribe(c1) - val tap: Publisher[String] = Source(List("1", "2", "3")).toPublisher() + val tap: Publisher[String] = Source(List("1", "2", "3")).runWith(PublisherDrain()) tap.subscribe(flowIn) val sub1 = c1.expectSubscription @@ -195,7 +194,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val tap: Publisher[Int] = Source(List(1, 2, 3)).toPublisher() + val tap: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain()) tap.subscribe(flowIn) c1.expectNext("1") @@ -214,7 +213,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val tap: Publisher[Int] = Source(List(1, 2, 3)).toPublisher() + val tap: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain()) tap.subscribe(flowIn) c1.expectNext("elem-1") @@ -227,7 +226,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow: Flow[String, String] = Flow[String] val c1 = StreamTestKit.SubscriberProbe[String]() val sink: Sink[String] = flow.connect(SubscriberDrain(c1)) - val publisher: Publisher[String] = Source(List("1", "2", "3")).toPublisher() + val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(PublisherDrain()) Source(publisher).connect(sink).run() val sub1 = c1.expectSubscription @@ -241,8 +240,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "perform transformation operation" in { val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString }) - val publisher = Source(List(1, 2, 3)).toPublisher() - Source(publisher).connect(flow).consume() + val publisher = Source(List(1, 2, 3)).runWith(PublisherDrain()) + Source(publisher).connect(flow).connect(BlackholeDrain).run() expectMsg("1") expectMsg("2") @@ -253,7 +252,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow = Flow[Int].map(_.toString) val c1 = StreamTestKit.SubscriberProbe[String]() val sink: Sink[Int] = flow.connect(SubscriberDrain(c1)) - val publisher: Publisher[Int] = Source(List(1, 2, 3)).toPublisher() + val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain()) Source(publisher).connect(sink).run() val sub1 = c1.expectSubscription @@ -266,8 +265,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be materializable several times with fanout publisher" in { val flow = Source(List(1, 2, 3)).map(_.toString) - val p1 = flow.toFanoutPublisher(2, 2) - val p2 = flow.toFanoutPublisher(2, 2) + val p1 = flow.runWith(FanoutPublisherDrain(2, 2)) + val p2 = flow.runWith(FanoutPublisherDrain(2, 2)) val s1 = StreamTestKit.SubscriberProbe[String]() val s2 = StreamTestKit.SubscriberProbe[String]() val s3 = StreamTestKit.SubscriberProbe[String]() @@ -299,7 +298,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be covariant" in { val f1: Source[Fruit] = Source[Fruit](() ⇒ Some(new Apple)) - val p1: Publisher[Fruit] = Source[Fruit](() ⇒ Some(new Apple)).toPublisher() + val p1: Publisher[Fruit] = Source[Fruit](() ⇒ Some(new Apple)).runWith(PublisherDrain()) val f2: Source[Source[Fruit]] = Source[Fruit](() ⇒ Some(new Apple)).splitWhen(_ ⇒ true) val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](() ⇒ Some(new Apple)).groupBy(_ ⇒ true) val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](() ⇒ Some(new Apple)).prefixAndTail(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala index 5e279d7aef..560f5883d1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSplitWhenSpec.scala @@ -32,7 +32,7 @@ class FlowSplitWhenSpec extends AkkaSpec { class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { val tap = Source((1 to elementCount).iterator) - val groupStream = tap.splitWhen(_ == splitWhen).toPublisher() + val groupStream = tap.splitWhen(_ == splitWhen).runWith(PublisherDrain()) val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int]]() groupStream.subscribe(masterSubscriber) @@ -53,7 +53,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "splitWhen" must { "work in the happy case" in new SubstreamsSupport(elementCount = 4) { - val s1 = StreamPuppet(getSubFlow().toPublisher()) + val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain())) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -62,7 +62,7 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(getSubFlow().toPublisher()) + val s2 = StreamPuppet(getSubFlow().runWith(PublisherDrain())) s2.request(1) s2.expectNext(3) @@ -77,9 +77,9 @@ class FlowSplitWhenSpec extends AkkaSpec { } "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().toPublisher()) + val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain())) s1.cancel() - val s2 = StreamPuppet(getSubFlow().toPublisher()) + val s2 = StreamPuppet(getSubFlow().runWith(PublisherDrain())) s2.request(4) s2.expectNext(5) @@ -94,7 +94,7 @@ class FlowSplitWhenSpec extends AkkaSpec { } "support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().toPublisher()) + val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain())) masterSubscription.cancel() s1.request(4) s1.expectNext(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala index 4fe953c96c..81f231cc34 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala @@ -34,7 +34,7 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest { "not take anything for negative n" in { val probe = StreamTestKit.SubscriberProbe[Int]() - Source(List(1, 2, 3)).take(-1).publishTo(probe) + Source(List(1, 2, 3)).take(-1).connect(SubscriberDrain(probe)).run() probe.expectSubscription().request(10) probe.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala index 198ac1c090..2cbf617f1c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala @@ -18,7 +18,7 @@ class FlowTakeWithinSpec extends AkkaSpec { val input = Iterator.from(1) val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() - Source(p).takeWithin(1.second).publishTo(c) + Source(p).takeWithin(1.second).connect(SubscriberDrain(c)).run() val pSub = p.expectSubscription() val cSub = c.expectSubscription() cSub.request(100) @@ -38,7 +38,7 @@ class FlowTakeWithinSpec extends AkkaSpec { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[Int]() - Source(1 to 3).takeWithin(1.second).publishTo(c) + Source(1 to 3).takeWithin(1.second).connect(SubscriberDrain(c)).run() val cSub = c.expectSubscription() c.expectNoMsg(200.millis) cSub.request(100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala index 1e048d32c1..50fda95a6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala @@ -18,7 +18,7 @@ class FlowThunkSpec extends AkkaSpec { "produce elements" in { val iter = List(1, 2, 3).iterator - val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).toPublisher() + val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -32,7 +32,7 @@ class FlowThunkSpec extends AkkaSpec { } "complete empty" in { - val p = Source(() ⇒ None).toPublisher() + val p = Source(() ⇒ None).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -44,7 +44,7 @@ class FlowThunkSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 val iter = (1 to count).iterator - val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).toPublisher() + val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTimerTransformerSpec.scala index 71b786a859..41bcf18730 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTimerTransformerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTimerTransformerSpec.scala @@ -28,7 +28,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { } override def isComplete: Boolean = !isTimerActive("tick") }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -54,7 +54,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { } override def isComplete: Boolean = !isTimerActive("tick") }). - consume() + connect(BlackholeDrain).run() val pSub = p.expectSubscription() expectMsg("tick-1") expectMsg("tick-2") @@ -72,7 +72,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { def onNext(element: Int) = Nil override def onTimer(timerKey: Any) = throw exception - }).toPublisher() + }).runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala index 768a5b01bc..8c3c54c01b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } import scala.util.Failure import akka.stream.MaterializerSettings +import scala.concurrent.Future class FlowToFutureSpec extends AkkaSpec with ScriptedTest { @@ -19,16 +20,15 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { implicit val materializer = FlowMaterializer(settings) - "A Flow with toFuture" must { + "A Flow with FutureDrain" must { "yield the first value" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = FutureDrain[Int] - val m = Source(p).connect(f).run() + val f: Future[Int] = Source(p).map(identity).runWith(FutureDrain()) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) - Await.result(f.future(m), 100.millis) should be(42) + Await.result(f, 100.millis) should be(42) proc.expectCancellation() } @@ -37,37 +37,33 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { val f = FutureDrain[Int] val s = SubscriberTap[Int] val m = s.connect(f).run() - p.subscribe(s.subscriber(m)) + p.subscribe(m.materializedTap(s)) val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) - Await.result(f.future(m), 100.millis) should be(42) + Await.result(m.materializedDrain(f), 100.millis) should be(42) proc.expectCancellation() } "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = FutureDrain[Int] - val m = Source(p).connect(f).run() + val f = Source(p).runWith(FutureDrain()) val proc = p.expectSubscription proc.expectRequest() val ex = new RuntimeException("ex") proc.sendError(ex) - val future = f.future(m) - Await.ready(future, 100.millis) - future.value.get should be(Failure(ex)) + Await.ready(f, 100.millis) + f.value.get should be(Failure(ex)) } "yield NoSuchElementExcption for empty stream" in { val p = StreamTestKit.PublisherProbe[Int]() - val f = FutureDrain[Int] - val m = Source(p).connect(f).run() + val f = Source(p).runWith(FutureDrain()) val proc = p.expectSubscription proc.expectRequest() proc.sendComplete() - val future = f.future(m) - Await.ready(future, 100.millis) - future.value.get match { + Await.ready(f, 100.millis) + f.value.get match { case Failure(e: NoSuchElementException) ⇒ e.getMessage() should be("empty stream") case x ⇒ fail("expected NoSuchElementException, got " + x) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformRecoverSpec.scala index 853d2c0112..7ffb2246e1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformRecoverSpec.scala @@ -40,7 +40,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -54,7 +54,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case Some(_) ⇒ List(-1) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -68,7 +68,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -82,7 +82,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case Some(_) ⇒ List(-1) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -99,7 +99,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4).iterator).toPublisher() + val p = Source(List(1, 2, 3, 4).iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -113,7 +113,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case Some(_) ⇒ List(-1) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -127,7 +127,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def").iterator).toPublisher() + val p = Source(List("a", "bc", "def").iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new TryRecoveryTransformer[String, Int] { var concat = "" @@ -147,7 +147,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case None ⇒ Nil case Some(_) ⇒ List(-1) } - }).toFanoutPublisher(1, 1) + }).runWith(FanoutPublisherDrain(1, 1)) val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -170,7 +170,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = Source(List("a").iterator).toPublisher() + val p = Source(List("a").iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new TryRecoveryTransformer[String, String] { var s = "" @@ -180,7 +180,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } override def onTermination(e: Option[Throwable]) = List(s + "B") }). - toPublisher() + runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -200,7 +200,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } override def isComplete = s == "Success(1)" }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -225,7 +225,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { override def isComplete = s == "Success(1)" override def onTermination(e: Option[Throwable]) = List(s.length + 10) }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -240,7 +240,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { @@ -249,7 +249,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } override def onError(e: Throwable) = List(-1) }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -279,7 +279,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case Some(_) ⇒ List(-1, -2, -3) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -332,7 +332,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } } }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) @@ -353,7 +353,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { override def onNext(in: Int) = List(in) override def onError(e: Throwable) = throw e }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription() val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -366,13 +366,13 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = Source(List(1, 2, 3).iterator).toPublisher() + val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) override def onError(e: Throwable) = List(-1) }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index 293839d6cc..6642f802c7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -23,7 +23,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -32,7 +32,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -46,7 +46,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce one-to-several transformation as expected" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -55,7 +55,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d Vector.fill(elem)(tot) } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -72,7 +72,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4)).toPublisher() + val p = Source(List(1, 2, 3, 4)).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -85,7 +85,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -99,7 +99,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def")).toPublisher() + val p = Source(List("a", "bc", "def")).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[String, Int] { var concat = "" @@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - toFanoutPublisher(2, 2) + runWith(FanoutPublisherDrain(2, 2)) val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -138,7 +138,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "invoke onComplete when done" in { - val p = Source(List("a")).toPublisher() + val p = Source(List("a")).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -148,7 +148,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def onTermination(e: Option[Throwable]) = List(s + "B") }). - toPublisher() + runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -159,7 +159,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done" in { val cleanupProbe = TestProbe() - val p = Source(List("a")).toPublisher() + val p = Source(List("a")).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -170,7 +170,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - toPublisher() + runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -182,7 +182,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done consume" in { val cleanupProbe = TestProbe() - val p = Source(List("a")).toPublisher() + val p = Source(List("a")).runWith(PublisherDrain()) Source(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "x" @@ -198,7 +198,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done after error" in { val cleanupProbe = TestProbe() - val p = Source(List("a", "b", "c")).toPublisher() + val p = Source(List("a", "b", "c")).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -214,7 +214,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - toPublisher() + runWith(PublisherDrain()) val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -236,7 +236,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def isComplete = s == "1" }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -263,7 +263,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s.length + 10) override def cleanup() = cleanupProbe.ref ! s }). - toPublisher() + runWith(PublisherDrain()) val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -279,7 +279,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "report error when exception is thrown" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { @@ -290,7 +290,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -304,12 +304,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support cancel as expected" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -323,13 +323,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support producing elements from empty inputs" in { - val p = Source(List.empty[Int]).toPublisher() + val p = Source(List.empty[Int]).runWith(PublisherDrain()) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = Nil override def onTermination(e: Option[Throwable]) = List(1, 2, 3) }). - toPublisher() + runWith(PublisherDrain()) val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -363,7 +363,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case _ ⇒ Nil } } - }).publishTo(subscriber) + }).connect(SubscriberDrain(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) @@ -386,13 +386,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d }) val s1 = StreamTestKit.SubscriberProbe[Int]() - flow.publishTo(s1) + flow.connect(SubscriberDrain(s1)).run() s1.expectSubscription().request(3) s1.expectNext(1, 2, 3) s1.expectComplete() val s2 = StreamTestKit.SubscriberProbe[Int]() - flow.publishTo(s2) + flow.connect(SubscriberDrain(s2)).run() s2.expectSubscription().request(3) s2.expectNext(1, 2, 3) s2.expectComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala index 459c4a7529..c23bf2d878 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala @@ -59,7 +59,7 @@ class GraphBalanceSpec extends AkkaSpec { balance ~> Flow[Int].grouped(15) ~> f5 }.run() - Set(f1, f2, f3, f4, f5) flatMap (sink ⇒ Await.result(sink.future(g), 3.seconds)) should be((0 to 14).toSet) + Set(f1, f2, f3, f4, f5) flatMap (sink ⇒ Await.result(g.materializedDrain(sink), 3.seconds)) should be((0 to 14).toSet) } "fairly balance between three outputs" in { @@ -73,7 +73,7 @@ class GraphBalanceSpec extends AkkaSpec { }.run() Seq(f1, f2, f3) map { sink ⇒ - Await.result(sink.future(g), 3.seconds) should be(numElementsForSink +- 1000) + Await.result(g.materializedDrain(sink), 3.seconds) should be(numElementsForSink +- 1000) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBroadcastSpec.scala index b987793436..a1b1a74b54 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBroadcastSpec.scala @@ -62,11 +62,11 @@ class GraphBroadcastSpec extends AkkaSpec { bcast ~> Flow[Int].grouped(5) ~> f5 }.run() - Await.result(g.getDrainFor(f1), 3.seconds) should be(List(1, 2, 3)) - Await.result(g.getDrainFor(f2), 3.seconds) should be(List(1, 2, 3)) - Await.result(g.getDrainFor(f3), 3.seconds) should be(List(1, 2, 3)) - Await.result(g.getDrainFor(f4), 3.seconds) should be(List(1, 2, 3)) - Await.result(g.getDrainFor(f5), 3.seconds) should be(List(1, 2, 3)) + Await.result(g.materializedDrain(f1), 3.seconds) should be(List(1, 2, 3)) + Await.result(g.materializedDrain(f2), 3.seconds) should be(List(1, 2, 3)) + Await.result(g.materializedDrain(f3), 3.seconds) should be(List(1, 2, 3)) + Await.result(g.materializedDrain(f4), 3.seconds) should be(List(1, 2, 3)) + Await.result(g.materializedDrain(f5), 3.seconds) should be(List(1, 2, 3)) } "produce to other even though downstream cancels" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala index 4dab3c4783..9a0a6ebe24 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala @@ -76,7 +76,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { merge ~> Flow[Int].grouped(10) ~> resultFuture }.run() - Await.result(g.getDrainFor(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6)) + Await.result(g.materializedDrain(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6)) } "support balance - merge (parallelization) layouts" in { @@ -96,7 +96,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { balance ~> f ~> merge ~> Flow[Int].grouped(elements.size * 2) ~> out }.run() - Await.result(out.future(g), 3.seconds).sorted should be(elements) + Await.result(g.materializedDrain(out), 3.seconds).sorted should be(elements) } "support wikipedia Topological_sorting 2" in { @@ -142,9 +142,9 @@ class GraphOpsIntegrationSpec extends AkkaSpec { }.run() - Await.result(g.getDrainFor(resultFuture2), 3.seconds).sorted should be(List(5, 7)) - Await.result(g.getDrainFor(resultFuture9), 3.seconds).sorted should be(List(3, 5, 7, 7)) - Await.result(g.getDrainFor(resultFuture10), 3.seconds).sorted should be(List(3, 5, 7)) + Await.result(g.materializedDrain(resultFuture2), 3.seconds).sorted should be(List(5, 7)) + Await.result(g.materializedDrain(resultFuture9), 3.seconds).sorted should be(List(3, 5, 7, 7)) + Await.result(g.materializedDrain(resultFuture10), 3.seconds).sorted should be(List(3, 5, 7)) } @@ -161,11 +161,11 @@ class GraphOpsIntegrationSpec extends AkkaSpec { merge ~> Flow[Int].grouped(10).connect(resultFuture) }.run() - Await.result(g.getDrainFor(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9)) + Await.result(g.materializedDrain(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9)) } "be able to run plain flow" in { - val p = Source(List(1, 2, 3)).toPublisher() + val p = Source(List(1, 2, 3)).runWith(PublisherDrain()) val s = SubscriberProbe[Int] val flow = Flow[Int].map(_ * 2) FlowGraph { implicit builder ⇒ @@ -215,7 +215,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { val lego1 = Lego(Flow[String].filter(_.length > 3).map(s ⇒ s" $s ")) val lego2 = Lego(Flow[String].map(_.toUpperCase)) val lego3 = lego1.connect(lego2, Flow[ByteString].map(_.utf8String)) - val source = PublisherTap(Source(List("green ", "blue", "red", "yellow", "black")).toPublisher) + val source = Source(List("green ", "blue", "red", "yellow", "black")) val s = SubscriberProbe[ByteString] val sink = SubscriberDrain(s) lego3.run(source, sink) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala index 5495d7fc65..f78478c8e6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala @@ -23,8 +23,7 @@ object ImplicitFlowMaterializerSpec { // run takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer import context.dispatcher val foldDrain = FoldDrain[String, String]("")(_ + _) - val mf = flow.connect(foldDrain).run() - foldDrain.future(mf) pipeTo sender() + flow.runWith(foldDrain) pipeTo sender() } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala index 3fde6450f9..3d5bd3ce3a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala @@ -17,7 +17,7 @@ class TickPublisherSpec extends AkkaSpec { "produce ticks" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).publishTo(c) + Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectNoMsg(600.millis) @@ -33,7 +33,7 @@ class TickPublisherSpec extends AkkaSpec { "drop ticks when not requested" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).publishTo(c) + Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(2) c.expectNext("tick-1") @@ -50,7 +50,7 @@ class TickPublisherSpec extends AkkaSpec { "produce ticks with multiple subscribers" in { val tickGen = Iterator from 1 - val p = Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).toPublisher() + val p = Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).runWith(PublisherDrain()) val c1 = StreamTestKit.SubscriberProbe[String]() val c2 = StreamTestKit.SubscriberProbe[String]() p.subscribe(c1) @@ -74,7 +74,7 @@ class TickPublisherSpec extends AkkaSpec { "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).publishTo(c) + Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).connect(SubscriberDrain(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") @@ -83,8 +83,8 @@ class TickPublisherSpec extends AkkaSpec { // FIXME enable this test again when zip is back "be usable with zip for a simple form of rate limiting" ignore { // val c = StreamTestKit.SubscriberProbe[Int]() - // val rate = Source(1.second, 1.second, () ⇒ "tick").toPublisher() - // Source(1 to 100).zip(rate).map { case (n, _) ⇒ n }.publishTo(c) + // val rate = Source(1.second, 1.second, () ⇒ "tick").runWith(PublisherDrain()) + // Source(1 to 100).zip(rate).map { case (n, _) ⇒ n }.connect(SubscriberDrain(c)).run() // val sub = c.expectSubscription() // sub.request(1000) // c.expectNext(1) diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index a79895fe1b..4b4762dabc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -136,36 +136,36 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } // Ops come in reverse order - override def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedPipe = { + override def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedMap = { val flowName = createFlowName() def attachDrain(pub: Publisher[Out]) = drain match { - case s: SimpleDrain[Out] ⇒ s.attach(pub, this, flowName) - case s: DrainWithKey[Out, _] ⇒ s.attach(pub, this, flowName) - case _ ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) + case s: SimpleDrain[Out] ⇒ s.attach(pub, this, flowName) + case s: DrainWithKey[Out] ⇒ s.attach(pub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) } def attachTap(sub: Subscriber[In]) = tap match { - case s: SimpleTap[In] ⇒ s.attach(sub, this, flowName) - case s: TapWithKey[In, _] ⇒ s.attach(sub, this, flowName) - case _ ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) + case s: SimpleTap[In] ⇒ s.attach(sub, this, flowName) + case s: TapWithKey[In] ⇒ s.attach(sub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) } def createDrain() = drain.asInstanceOf[Drain[In]] match { - case s: SimpleDrain[In] ⇒ s.create(this, flowName) -> (()) - case s: DrainWithKey[In, _] ⇒ s.create(this, flowName) - case _ ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) + case s: SimpleDrain[In] ⇒ s.create(this, flowName) -> (()) + case s: DrainWithKey[In] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) } def createTap() = tap.asInstanceOf[Tap[Out]] match { - case s: SimpleTap[Out] ⇒ s.create(this, flowName) -> (()) - case s: TapWithKey[Out, _] ⇒ s.create(this, flowName) - case _ ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) + case s: SimpleTap[Out] ⇒ s.create(this, flowName) -> (()) + case s: TapWithKey[Out] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) } def isActive(s: AnyRef) = s match { - case tap: SimpleTap[_] ⇒ tap.isActive - case tap: TapWithKey[_, _] ⇒ tap.isActive - case drain: SimpleDrain[_] ⇒ drain.isActive - case drain: DrainWithKey[_, _] ⇒ drain.isActive - case _: Tap[_] ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) - case _: Drain[_] ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) + case tap: SimpleTap[_] ⇒ tap.isActive + case tap: TapWithKey[_] ⇒ tap.isActive + case drain: SimpleDrain[_] ⇒ drain.isActive + case drain: DrainWithKey[_] ⇒ drain.isActive + case _: Tap[_] ⇒ throw new MaterializationException("unknown Tap type " + drain.getClass) + case _: Drain[_] ⇒ throw new MaterializationException("unknown Drain type " + drain.getClass) } val (tapValue, drainValue) = diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala index 1db54f09fd..71233efa2d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala @@ -7,6 +7,7 @@ import akka.stream.impl.TransferPhase import akka.stream.impl.MultiStreamInputProcessor import akka.stream.scaladsl2.Source import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.PublisherDrain /** * INTERNAL API @@ -17,8 +18,8 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer) import MultiStreamInputProcessor._ val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val flow = primaryInputs.dequeueInputElement().asInstanceOf[Source[Any]] - val publisher = flow.toPublisher()(materializer) + val source = primaryInputs.dequeueInputElement().asInstanceOf[Source[Any]] + val publisher = source.runWith(PublisherDrain())(materializer) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala index d0be176fe9..f38d7eb854 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala @@ -25,7 +25,10 @@ import java.util.concurrent.atomic.AtomicReference * FlowMaterializers can be used but must then implement the functionality of these * Drain nodes themselves (or construct an ActorBasedFlowMaterializer). */ -trait Drain[-In] extends Sink[In] +trait Drain[-In] extends Sink[In] { + override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType = + tap.connect(this).run().materializedTap(tap) +} /** * A drain that does not need to create a user-accessible object during materialization. @@ -63,7 +66,10 @@ trait SimpleDrain[-In] extends Drain[In] { * to retrieve in order to access aspects of this drain (could be a completion Future * or a cancellation handle, etc.) */ -trait DrainWithKey[-In, T] extends Drain[In] { +trait DrainWithKey[-In] extends Drain[In] { + + type MaterializedType + /** * Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this drain belongs to @@ -75,12 +81,12 @@ trait DrainWithKey[-In, T] extends Drain[In] { * @param materializer a FlowMaterializer that may be used for creating flows * @param flowName the name of the current flow, which should be used in log statements or error messages */ - def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): T + def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType /** * This method is only used for Drains that return true from [[#isActive]], which then must * implement it. */ - def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, T) = + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, MaterializedType) = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") /** * This method indicates whether this Drain can create a Subscriber instead of being @@ -102,20 +108,21 @@ trait DrainWithKey[-In, T] extends Drain[In] { */ object PublisherDrain { private val instance = new PublisherDrain[Nothing] - def apply[T]: PublisherDrain[T] = instance.asInstanceOf[PublisherDrain[T]] + def apply[T](): PublisherDrain[T] = instance.asInstanceOf[PublisherDrain[T]] def withFanout[T](initialBufferSize: Int, maximumBufferSize: Int): FanoutPublisherDrain[T] = new FanoutPublisherDrain[T](initialBufferSize, maximumBufferSize) } -class PublisherDrain[In] extends DrainWithKey[In, Publisher[In]] { +class PublisherDrain[In] extends DrainWithKey[In] { + type MaterializedType = Publisher[In] def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = flowPublisher - def publisher(m: MaterializedDrain): Publisher[In] = m.getDrainFor(this) override def toString: String = "PublisherDrain" } -class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) extends DrainWithKey[In, Publisher[In]] { - def publisher(m: MaterializedDrain): Publisher[In] = m.getDrainFor(this) +final case class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) extends DrainWithKey[In] { + type MaterializedType = Publisher[In] + override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = { val fanoutActor = materializer.actorOf( Props(new FanoutProcessorImpl(materializer.settings, initialBufferSize, maximumBufferSize)), s"$flowName-fanoutPublisher") @@ -123,12 +130,10 @@ class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) e flowPublisher.subscribe(fanoutProcessor) fanoutProcessor } - - override def toString: String = "Fanout" } object FutureDrain { - def apply[T]: FutureDrain[T] = new FutureDrain[T] + def apply[T](): FutureDrain[T] = new FutureDrain[T] } /** @@ -138,7 +143,8 @@ object FutureDrain { * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -class FutureDrain[In] extends DrainWithKey[In, Future[In]] { +class FutureDrain[In] extends DrainWithKey[In] { + type MaterializedType = Future[In] def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[In] = { val (sub, f) = create(materializer, flowName) flowPublisher.subscribe(sub) @@ -159,8 +165,6 @@ class FutureDrain[In] extends DrainWithKey[In, Future[In]] { (sub, p.future) } - def future(m: MaterializedDrain): Future[In] = m.getDrainFor(this) - override def toString: String = "FutureDrain" } @@ -207,7 +211,7 @@ final case class OnCompleteDrain[In](callback: Try[Unit] ⇒ Unit) extends Simpl } Nil } - }).consume()(materializer.withNamePrefix(flowName)) + }).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName)) } /** @@ -215,7 +219,9 @@ final case class OnCompleteDrain[In](callback: Try[Unit] ⇒ Unit) extends Simpl * that will be completed with `Success` when reaching the normal end of the stream, or completed * with `Failure` if there is an error is signaled in the stream. */ -final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In, Future[Unit]] { +final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In] { + type MaterializedType = Future[Unit] + override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Unit] = { val promise = Promise[Unit]() Source(flowPublisher).transform("foreach", () ⇒ new Transformer[In, Unit] { @@ -228,10 +234,9 @@ final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In, Futur } Nil } - }).consume()(materializer.withNamePrefix(flowName)) + }).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName)) promise.future } - def future(m: MaterializedDrain): Future[Unit] = m.getDrainFor(this) } /** @@ -241,7 +246,9 @@ final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In, Futur * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ -final case class FoldDrain[U, In](zero: U)(f: (U, In) ⇒ U) extends DrainWithKey[In, Future[U]] { +final case class FoldDrain[U, In](zero: U)(f: (U, In) ⇒ U) extends DrainWithKey[In] { + type MaterializedType = Future[U] + override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[U] = { val promise = Promise[U]() @@ -256,16 +263,9 @@ final case class FoldDrain[U, In](zero: U)(f: (U, In) ⇒ U) extends DrainWithKe } Nil } - }).consume()(materializer.withNamePrefix(flowName)) + }).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName)) promise.future } - def future(m: MaterializedDrain): Future[U] = m.getDrainFor(this) } -trait MaterializedDrain { - /** - * Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]]. - */ - def getDrainFor[T](drainKey: DrainWithKey[_, T]): T -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 4e4ee3f145..e73ae46b1d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -27,6 +27,14 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * Connect this flow to a sink, concatenating the processing steps of both. */ def connect(sink: Sink[Out]): Sink[In] + + /** + * + * Connect the `Tap` to this `Flow` and then connect it to the `Drain` and run it. The returned tuple contains + * the materialized values of the `Tap` and `Drain`, e.g. the `Subscriber` of a [[SubscriberTap]] and + * and `Publisher` of a [[PublisherDrain]]. + */ + def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) } object Flow { @@ -41,16 +49,9 @@ object Flow { * Flow with attached input and output, can be executed. */ trait RunnableFlow { - def run()(implicit materializer: FlowMaterializer): MaterializedFlow + def run()(implicit materializer: FlowMaterializer): MaterializedMap } -/** - * Returned by [[RunnableFlow#run]] and can be used as parameter to the - * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. - * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. - */ -trait MaterializedFlow extends MaterializedTap with MaterializedDrain - /** * Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only. */ @@ -434,3 +435,4 @@ private[scaladsl2] object FlowOps { override def onNext(elem: Any) = List(elem) } } + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index aa290b7944..ea5df15cb1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -836,7 +836,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph /** * Materialize the `FlowGraph` and attach all sinks and sources. */ - def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + def run()(implicit materializer: FlowMaterializer): MaterializedMap = { val edges = graph.edges if (edges.size == 1) { val edge = edges.head @@ -854,20 +854,20 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph /** * Run FlowGraph that only contains one edge from a `Source` to a `Sink`. */ - private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedMap = { val mf = pipe.withTap(tapVertex.tap).withDrain(drainVertex.drain).run() - val materializedSources: Map[TapWithKey[_, _], Any] = tapVertex match { - case TapVertex(tap: TapWithKey[_, _]) ⇒ Map(tap -> mf.getTapFor(tap)) - case _ ⇒ Map.empty + val materializedSources: Map[TapWithKey[_], Any] = tapVertex match { + case TapVertex(tap: TapWithKey[_]) ⇒ Map(tap -> mf.materializedTap(tap)) + case _ ⇒ Map.empty } - val materializedSinks: Map[DrainWithKey[_, _], Any] = drainVertex match { - case DrainVertex(drain: DrainWithKey[_, _]) ⇒ Map(drain -> mf.getDrainFor(drain)) - case _ ⇒ Map.empty + val materializedSinks: Map[DrainWithKey[_], Any] = drainVertex match { + case DrainVertex(drain: DrainWithKey[_]) ⇒ Map(drain -> mf.materializedDrain(drain)) + case _ ⇒ Map.empty } new MaterializedFlowGraph(materializedSources, materializedSinks) } - private def runGraph()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + private def runGraph()(implicit materializer: FlowMaterializer): MaterializedMap = { import scalax.collection.GraphTraversal._ // start with drains @@ -877,7 +877,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph downstreamSubscriber: Map[graph.EdgeT, Subscriber[Any]] = Map.empty, upstreamPublishers: Map[graph.EdgeT, Publisher[Any]] = Map.empty, taps: Map[TapVertex, SinkPipe[Any]] = Map.empty, - materializedDrains: Map[DrainWithKey[_, _], Any] = Map.empty) + materializedDrains: Map[DrainWithKey[_], Any] = Map.empty) val result = startingNodes.foldLeft(Memo()) { case (memo, start) ⇒ @@ -892,12 +892,12 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph val pipe = edge.label.pipe // returns the materialized drain, if any - def connectToDownstream(publisher: Publisher[Any]): Option[(DrainWithKey[_, _], Any)] = { + def connectToDownstream(publisher: Publisher[Any]): Option[(DrainWithKey[_], Any)] = { val f = pipe.withTap(PublisherTap(publisher)) edge.to.value match { - case DrainVertex(drain: DrainWithKey[_, _]) ⇒ + case DrainVertex(drain: DrainWithKey[_]) ⇒ val mf = f.withDrain(drain).run() - Some(drain -> mf.getDrainFor(drain)) + Some(drain -> mf.materializedDrain(drain)) case DrainVertex(drain) ⇒ f.withDrain(drain).run() None @@ -948,12 +948,12 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } // connect all input taps as the last thing - val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_, _], Any]) { + val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_], Any]) { case (acc, (TapVertex(tap), pipe)) ⇒ val mf = pipe.withTap(tap).run() tap match { - case tapKey: TapWithKey[_, _] ⇒ acc.updated(tapKey, mf.getTapFor(tapKey)) - case _ ⇒ acc + case tapKey: TapWithKey[_] ⇒ acc.updated(tapKey, mf.materializedTap(tapKey)) + case _ ⇒ acc } } @@ -1018,29 +1018,22 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl } /** - * Returned by [[FlowGraph#run]] and can be used as parameter to the - * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. - * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. + * Returned by [[FlowGraph#run]] and can be used to retrieve the materialized + * `Tap` inputs or `Drain` outputs. */ -class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any]) - extends MaterializedTap with MaterializedDrain { +private[scaladsl2] class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_], Any], materializedDrains: Map[DrainWithKey[_], Any]) + extends MaterializedMap { - /** - * Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]]. - */ - override def getTapFor[T](key: TapWithKey[_, T]): T = + override def materializedTap(key: TapWithKey[_]): key.MaterializedType = materializedTaps.get(key) match { - case Some(matTap) ⇒ matTap.asInstanceOf[T] + case Some(matTap) ⇒ matTap.asInstanceOf[key.MaterializedType] case None ⇒ throw new IllegalArgumentException(s"Tap key [$key] doesn't exist in this flow graph") } - /** - * Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]]. - */ - def getDrainFor[T](key: DrainWithKey[_, T]): T = + def materializedDrain(key: DrainWithKey[_]): key.MaterializedType = materializedDrains.get(key) match { - case Some(matDrain) ⇒ matDrain.asInstanceOf[T] + case Some(matDrain) ⇒ matDrain.asInstanceOf[key.MaterializedType] case None ⇒ throw new IllegalArgumentException(s"Drain key [$key] doesn't exist in this flow graph") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index 97802b49cb..978fc86a4a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -138,7 +138,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { * stream. The result can be highly implementation specific, ranging from * local actor chains to remote-deployed processing networks. */ - def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedPipe + def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedMap /** * Create publishers and subscribers for fan-in and fan-out operations. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/MaterializedMap.scala new file mode 100644 index 0000000000..ca2fe2a638 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/MaterializedMap.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +/** + * Returned by [[RunnableFlow#run]] and [[FlowGraph#run]] and can be used to retrieve the materialized + * `Tap` inputs or `Drain` outputs, e.g. [[SubscriberTap]] or [[PublisherDrain]]. + */ +trait MaterializedMap { + + /** + * Retrieve a materialized `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]]. + */ + def materializedTap(key: TapWithKey[_]): key.MaterializedType + + /** + * Retrieve a materialized `Drain`, e.g. the `Publisher` of a [[PublisherDrain]]. + */ + def materializedDrain(key: DrainWithKey[_]): key.MaterializedType + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala index 7de9622334..2dcbff2b15 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala @@ -18,7 +18,7 @@ private[scaladsl2] object Pipe { } /** - * Flow with one open input and one open output.. + * Flow with one open input and one open output. */ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] @@ -40,6 +40,11 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) } + override def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = { + val m = tap.connect(this).connect(drain).run() + (m.materializedTap(tap), m.materializedDrain(drain)) + } + private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) } @@ -52,11 +57,9 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) - override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { - val subIn = SubscriberTap[In]() - val mf = withTap(subIn).run() - subIn.subscriber(mf) - } + override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType = + tap.connect(this).run().materializedTap(tap) + } /** @@ -82,50 +85,29 @@ private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[As case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) } - override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { - val pubOut = PublisherDrain[Out] - val mf = withDrain(pubOut).run() - pubOut.publisher(mf) - } + override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType = + withDrain(drain).run().materializedDrain(drain) - override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { - val pubOut = PublisherDrain.withFanout[Out](initialBufferSize, maximumBufferSize) - val mf = withDrain(pubOut).run() - pubOut.publisher(mf) - } - - override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = - toPublisher().subscribe(subscriber) - - override def consume()(implicit materializer: FlowMaterializer): Unit = - withDrain(BlackholeDrain).run() } /** * Pipe with attached input and output, can be executed. */ private[scaladsl2] final case class RunnablePipe(input: Tap[_], output: Drain[_], ops: List[AstNode]) extends RunnableFlow { - def run()(implicit materializer: FlowMaterializer): MaterializedPipe = + def run()(implicit materializer: FlowMaterializer): MaterializedMap = materializer.materialize(input, output, ops) } /** - * Returned by [[RunnablePipe#run]] and can be used as parameter to the - * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. - * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. + * Returned by [[RunnablePipe#run]] and can be used as parameter to retrieve the materialized + * `Tap` input or `Drain` output. */ -private[stream] class MaterializedPipe(tapKey: AnyRef, matTap: Any, drainKey: AnyRef, matDrain: Any) extends MaterializedFlow { - /** - * Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]]. - */ - override def getTapFor[T](key: TapWithKey[_, T]): T = - if (key == tapKey) matTap.asInstanceOf[T] +private[stream] class MaterializedPipe(tapKey: AnyRef, matTap: Any, drainKey: AnyRef, matDrain: Any) extends MaterializedMap { + override def materializedTap(key: TapWithKey[_]): key.MaterializedType = + if (key == tapKey) matTap.asInstanceOf[key.MaterializedType] else throw new IllegalArgumentException(s"Tap key [$key] doesn't match the tap [$tapKey] of this flow") - /** - * Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]]. - */ - def getDrainFor[T](key: DrainWithKey[_, T]): T = - if (key == drainKey) matDrain.asInstanceOf[T] + override def materializedDrain(key: DrainWithKey[_]): key.MaterializedType = + if (key == drainKey) matDrain.asInstanceOf[key.MaterializedType] else throw new IllegalArgumentException(s"Drain key [$key] doesn't match the drain [$drainKey] of this flow") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index cbcd8a323d..82005c335d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -13,6 +13,9 @@ import scala.annotation.unchecked.uncheckedVariance * Can be used as a `Subscriber` */ trait Sink[-In] { - def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = - Flow[In].connect(this).toSubscriber() + /** + * Connect this `Sink` to a `Tap` and run it. The returned value is the materialized value + * of the `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]]. + */ + def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index 2d8575d223..4b3223c681 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -30,13 +30,12 @@ trait Source[+Out] extends FlowOps[Out] { */ def connect(sink: Sink[Out]): RunnableFlow - def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] + /** + * Connect this `Source` to a `Drain` and run it. The returned value is the materialized value + * of the `Drain`, e.g. the `Publisher` of a [[PublisherDrain]]. + */ + def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType - def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] - - def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer) - - def consume()(implicit materializer: FlowMaterializer): Unit } object Source { @@ -48,7 +47,7 @@ object Source { * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ - def apply[T](publisher: Publisher[T]): Tap[T] = PublisherTap(publisher) + def apply[T](publisher: Publisher[T]): Source[T] = PublisherTap(publisher) /** * Helper to create [[Source]] from `Iterator`. @@ -60,7 +59,7 @@ object Source { * in accordance with the demand coming from the downstream transformation * steps. */ - def apply[T](iterator: Iterator[T]): Tap[T] = IteratorTap(iterator) + def apply[T](iterator: Iterator[T]): Source[T] = IteratorTap(iterator) /** * Helper to create [[Source]] from `Iterable`. @@ -71,14 +70,14 @@ object Source { * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ - def apply[T](iterable: immutable.Iterable[T]): Tap[T] = IterableTap(iterable) + def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableTap(iterable) /** * Define the sequence of elements to be produced by the given closure. * The stream ends normally when evaluation of the closure returns a `None`. * The stream ends exceptionally when an exception is thrown from the closure. */ - def apply[T](f: () ⇒ Option[T]): Tap[T] = ThunkTap(f) + def apply[T](f: () ⇒ Option[T]): Source[T] = ThunkTap(f) /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -86,7 +85,7 @@ object Source { * may happen before or after materializing the `Flow`. * The stream terminates with an error if the `Future` is completed with a failure. */ - def apply[T](future: Future[T]): Tap[T] = FutureTap(future) + def apply[T](future: Future[T]): Source[T] = FutureTap(future) /** * Elements are produced from the tick closure periodically with the specified interval. @@ -95,6 +94,7 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Tap[T] = + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = TickTap(initialDelay, interval, tick) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala index edcac2250d..cd0b3895da 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala @@ -34,17 +34,8 @@ trait Tap[+Out] extends Source[Out] { override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) - override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = - sourcePipe.toPublisher()(materializer) - - override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = - sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer) - - override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = - sourcePipe.publishTo(subscriber)(materializer) - - override def consume()(implicit materializer: FlowMaterializer): Unit = - sourcePipe.consume() + override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType = + connect(drain).run().materializedDrain(drain) /** INTERNAL API */ override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op)) @@ -89,7 +80,10 @@ trait SimpleTap[+Out] extends Tap[Out] { * to retrieve in order to access aspects of this tap (could be a Subscriber, a * Future/Promise, etc.). */ -trait TapWithKey[+Out, T] extends Tap[Out] { +trait TapWithKey[+Out] extends Tap[Out] { + + type MaterializedType + /** * Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given * [[FlowMaterializer]] is completely optional, especially if this tap belongs to @@ -101,12 +95,12 @@ trait TapWithKey[+Out, T] extends Tap[Out] { * @param materializer a FlowMaterializer that may be used for creating flows * @param flowName the name of the current flow, which should be used in log statements or error messages */ - def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): T + def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType /** * This method is only used for Taps that return true from [[#isActive]], which then must * implement it. */ - def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, T) = + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, MaterializedType) = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") /** * This method indicates whether this Tap can create a Publisher instead of being @@ -124,11 +118,12 @@ trait TapWithKey[+Out, T] extends Tap[Out] { * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -final case class SubscriberTap[Out]() extends TapWithKey[Out, Subscriber[Out]] { +final case class SubscriberTap[Out]() extends TapWithKey[Out] { + type MaterializedType = Subscriber[Out] + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = flowSubscriber - def subscriber(m: MaterializedTap): Subscriber[Out] = m.getTapFor(this) } /** @@ -230,9 +225,3 @@ final case class TickTap[Out](initialDelay: FiniteDuration, interval: FiniteDura name = s"$flowName-0-tick")) } -trait MaterializedTap { - /** - * Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]]. - */ - def getTapFor[T](tapKey: TapWithKey[_, T]): T -}