!str #15950 Add runWith and remove toX

* runWith(drainWithKey) and runWith(tapWithKey) returning the
  materialized drain/tap
* remove toPublisher, toSubscriber, and friends, in favor of using
  runWith
* MaterializedMap is the return type for both flow and graph
* Source[T] return type for Source.apply methods
This commit is contained in:
Patrik Nordwall 2014-10-02 13:34:27 +02:00
parent dc4d121f48
commit 14d3501f92
50 changed files with 379 additions and 419 deletions

View file

@ -62,7 +62,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte
entity = HttpEntity.Default(
contentType = ContentTypes.`application/json`,
contentLength = 5,
Source(List(ByteString("hello"))).toPublisher()))
Source(List(ByteString("hello"))).runWith(PublisherDrain())))
// Our library uses a simple model of headers: a Seq[(String, String)].
// The body is represented as an Array[Byte]. To get the headers in
@ -141,7 +141,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte
// convert the body into a Publisher[ByteString].
val byteStringBody = ByteString(byteArrayBody)
val publisherBody = Source(List(byteStringBody)).toPublisher()
val publisherBody = Source(List(byteStringBody)).runWith(PublisherDrain())
// Finally we can create our HttpResponse.

View file

@ -9,17 +9,17 @@ import akka.stream.scaladsl2.{ FlowMaterializer, Source, Flow }
import akka.stream.testkit.StreamTestKit._
import org.reactivestreams.Publisher
import org.scalatest.Matchers
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.stream.scaladsl2.PublisherDrain
trait ScriptedTest extends Matchers {
class ScriptException(msg: String) extends RuntimeException(msg)
def toPublisher[In, Out]: (Source[Out], FlowMaterializer) Publisher[Out] =
(f, m) f.toPublisher()(m)
(f, m) f.runWith(PublisherDrain())(m)
object Script {
def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = {

View file

@ -41,7 +41,7 @@ abstract class TwoStreamsSetup extends AkkaSpec {
def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T]
def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).toPublisher()
def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).runWith(PublisherDrain())
def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException)

View file

@ -17,7 +17,7 @@ class FlowAppendSpec extends AkkaSpec with River {
"Flow" should {
"append Flow" in riverOf[String] { subscriber
val flow = Flow[Int].connect(otherFlow)
Source(elements).connect(flow).publishTo(subscriber)
Source(elements).connect(flow).connect(SubscriberDrain(subscriber)).run()
}
"append Sink" in riverOf[String] { subscriber
@ -30,7 +30,7 @@ class FlowAppendSpec extends AkkaSpec with River {
"append Flow" in riverOf[String] { subscriber
Source(elements)
.connect(otherFlow)
.publishTo(subscriber)
.connect(SubscriberDrain(subscriber)).run()
}
"append Sink" in riverOf[String] { subscriber

View file

@ -8,6 +8,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.MaterializerSettings
import akka.stream.OverflowStrategy
import scala.concurrent.Future
class FlowBufferSpec extends AkkaSpec {
@ -20,24 +21,20 @@ class FlowBufferSpec extends AkkaSpec {
"Buffer" must {
"pass elements through normally in backpressured mode" in {
val futureDrain = FutureDrain[Seq[Int]]
val mf = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
connect(futureDrain).run()
val future = futureDrain.future(mf)
val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(FutureDrain())
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through normally in backpressured mode with buffer size one" in {
val futureDrain = FutureDrain[Seq[Int]]
val mf = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
connect(futureDrain).run()
val future = futureDrain.future(mf)
val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(FutureDrain())
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through a chain of backpressured buffers of different size" in {
val futureDrain = FutureDrain[Seq[Int]]
val mf = Source((1 to 1000).iterator)
val future = Source((1 to 1000).iterator)
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.buffer(256, overflowStrategy = OverflowStrategy.backpressure)
@ -45,8 +42,7 @@ class FlowBufferSpec extends AkkaSpec {
.buffer(5, overflowStrategy = OverflowStrategy.backpressure)
.buffer(128, overflowStrategy = OverflowStrategy.backpressure)
.grouped(1001)
.connect(futureDrain).run()
val future = futureDrain.future(mf)
.runWith(FutureDrain())
Await.result(future, 3.seconds) should be(1 to 1000)
}
@ -54,7 +50,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).publishTo(subscriber)
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -74,7 +70,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).publishTo(subscriber)
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -102,7 +98,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).publishTo(subscriber)
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -133,7 +129,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).publishTo(subscriber)
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -164,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(1, overflowStrategy = strategy).publishTo(subscriber)
Source(publisher).buffer(1, overflowStrategy = strategy).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -32,7 +32,7 @@ class FlowConcatAllSpec extends AkkaSpec {
val main = Source(List(s1, s2, s3, s4, s5))
val subscriber = StreamTestKit.SubscriberProbe[Int]()
main.flatten(FlattenStrategy.concat).publishTo(subscriber)
main.flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run()
val subscription = subscriber.expectSubscription()
subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))
@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).publishTo(subscriber)
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run()
val subscription = subscriber.expectSubscription()
subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))
@ -53,7 +53,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the current open substream and signal error" in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber)
Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -73,7 +73,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on open substream, cancel the master stream and signal error " in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber)
Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -93,7 +93,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the current open substream and the master stream" in {
val publisher = StreamTestKit.PublisherProbe[Source[Int]]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).publishTo(subscriber)
Source(publisher).flatten(FlattenStrategy.concat).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()

View file

@ -23,7 +23,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).publishTo(subscriber)
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -41,7 +41,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).publishTo(subscriber)
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -57,12 +57,10 @@ class FlowConflateSpec extends AkkaSpec {
"work on a variable rate chain" in {
val foldDrain = FoldDrain[Int, Int](0)(_ + _)
val mf = Source((1 to 1000).iterator)
val future = Source((1 to 1000).iterator)
.conflate[Int](seed = i i, aggregate = (sum, i) sum + i)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.connect(foldDrain)
.run()
val future = foldDrain.future(mf)
.runWith(FoldDrain[Int, Int](0)(_ + _))
Await.result(future, 10.seconds) should be(500500)
}
@ -70,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).publishTo(subscriber)
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -16,7 +16,7 @@ class FlowDispatcherSpec extends AkkaSpec {
val probe = TestProbe()
val p = Source(List(1, 2, 3)).map(i
{ probe.ref ! Thread.currentThread().getName(); i }).
consume()
connect(BlackholeDrain).run()
probe.receiveN(3) foreach {
case s: String s should startWith(system.name + "-akka.test.stream-dispatcher")
}

View file

@ -29,7 +29,7 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest {
"not drop anything for negative n" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1, 2, 3)).drop(-1).publishTo(probe)
Source(List(1, 2, 3)).drop(-1).connect(SubscriberDrain(probe)).run()
probe.expectSubscription().request(10)
probe.expectNext(1)
probe.expectNext(2)

View file

@ -18,7 +18,7 @@ class FlowDropWithinSpec extends AkkaSpec {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
Source(p).dropWithin(1.second).publishTo(c)
Source(p).dropWithin(1.second).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)

View file

@ -24,7 +24,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).publishTo(subscriber)
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -44,7 +44,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).publishTo(subscriber)
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -64,13 +64,10 @@ class FlowExpandSpec extends AkkaSpec {
}
"work on a variable rate chain" in {
val foldDrain = FoldDrain[Set[Int], Int](Set.empty[Int])(_ + _)
val mf = Source((1 to 100).iterator)
val future = Source((1 to 100).iterator)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.expand[Int, Int](seed = i i, extrapolate = i (i, i))
.connect(foldDrain)
.run()
val future = foldDrain.future(mf)
.runWith(FoldDrain[Set[Int], Int](Set.empty[Int])(_ + _))
Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100))
}
@ -79,7 +76,7 @@ class FlowExpandSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).publishTo(subscriber)
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).connect(SubscriberDrain(subscriber)).run()
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -30,7 +30,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
toPublisher().subscribe(probe)
connect(SubscriberDrain(probe)).run()
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -15,18 +15,14 @@ class FlowFoldSpec extends AkkaSpec with DefaultTimeout {
"fold" in {
val input = 1 to 100
val foldDrain = FoldDrain[Int, Int](0)(_ + _)
val mf = Source(input).connect(foldDrain).run()
val future = foldDrain.future(mf)
val future = Source(input).runWith(FoldDrain[Int, Int](0)(_ + _))
val expected = input.fold(0)(_ + _)
Await.result(future, timeout.duration) should be(expected)
}
"propagate an error" in {
val error = new Exception with NoStackTrace
val foldSink = FoldDrain[Unit, Unit](())((_, _) ())
val mf = Source[Unit](() throw error).connect(foldSink).run()
val future = foldSink.future(mf)
val future = Source[Unit](() throw error).runWith(FoldDrain[Unit, Unit](())((_, _) ()))
the[Exception] thrownBy Await.result(future, timeout.duration) should be(error)
}

View file

@ -16,9 +16,7 @@ class FlowForeachSpec extends AkkaSpec {
"A Foreach" must {
"call the procedure for each element" in {
val foreachDrain = ForeachDrain[Int](testActor ! _)
val mf = Source(1 to 3).connect(foreachDrain).run()
foreachDrain.future(mf).onSuccess {
Source(1 to 3).runWith(ForeachDrain[Int](testActor ! _)) onSuccess {
case _ testActor ! "done"
}
expectMsg(1)
@ -29,8 +27,7 @@ class FlowForeachSpec extends AkkaSpec {
"complete the future for an empty stream" in {
val foreachDrain = ForeachDrain[Int](testActor ! _)
val mf = Source(Nil).connect(foreachDrain).run()
foreachDrain.future(mf).onSuccess {
val mf = Source(Nil).runWith(ForeachDrain[Int](testActor ! _)) onSuccess {
case _ testActor ! "done"
}
expectMsg("done")
@ -39,8 +36,7 @@ class FlowForeachSpec extends AkkaSpec {
"yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]()
val foreachDrain = ForeachDrain[Int](testActor ! _)
val mf = Source(p).connect(foreachDrain).run()
foreachDrain.future(mf).onFailure {
val mf = Source(p).runWith(ForeachDrain[Int](testActor ! _)) onFailure {
case ex testActor ! ex
}
val proc = p.expectSubscription

View file

@ -17,7 +17,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"A Flow based on a Future" must {
"produce one element from already successful Future" in {
val p = Source(Future.successful(1)).toPublisher()
val p = Source(Future.successful(1)).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -29,7 +29,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce error from already failed Future" in {
val ex = new RuntimeException("test") with NoStackTrace
val p = Source(Future.failed[Int](ex)).toPublisher()
val p = Source(Future.failed[Int](ex)).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectError(ex)
@ -37,7 +37,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val p = Source(promise.future).toPublisher()
val p = Source(promise.future).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -51,7 +51,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce one element when Future is completed but not before request" in {
val promise = Promise[Int]()
val p = Source(promise.future).toPublisher()
val p = Source(promise.future).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -64,7 +64,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce elements with multiple subscribers" in {
val promise = Promise[Int]()
val p = Source(promise.future).toPublisher()
val p = Source(promise.future).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -82,7 +82,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce elements to later subscriber" in {
val promise = Promise[Int]()
val p = Source(promise.future).toPublisher()
val p = Source(promise.future).runWith(PublisherDrain())
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
@ -103,7 +103,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"allow cancel before receiving element" in {
val promise = Promise[Int]()
val p = Source(promise.future).toPublisher()
val p = Source(promise.future).runWith(PublisherDrain())
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)

View file

@ -190,7 +190,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
b.attachSink(undefinedSink1, out1)
}.run()
out1.publisher(mg) should not be (null)
mg.materializedDrain(out1) should not be (null)
}
"build partial flow graphs" in {

View file

@ -32,8 +32,8 @@ class FlowGroupBySpec extends AkkaSpec {
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val tap = Source((1 to elementCount).iterator).toPublisher()
val groupStream = Source(tap).groupBy(_ % groupCount).toPublisher()
val tap = Source((1 to elementCount).iterator).runWith(PublisherDrain())
val groupStream = Source(tap).groupBy(_ % groupCount).runWith(PublisherDrain())
val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
groupStream.subscribe(masterSubscriber)
@ -56,7 +56,7 @@ class FlowGroupBySpec extends AkkaSpec {
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubFlow(1).toPublisher())
val s1 = StreamPuppet(getSubFlow(1).runWith(PublisherDrain()))
masterSubscriber.expectNoMsg(100.millis)
s1.expectNoMsg(100.millis)
@ -64,7 +64,7 @@ class FlowGroupBySpec extends AkkaSpec {
s1.expectNext(1)
s1.expectNoMsg(100.millis)
val s2 = StreamPuppet(getSubFlow(0).toPublisher())
val s2 = StreamPuppet(getSubFlow(0).runWith(PublisherDrain()))
s2.expectNoMsg(100.millis)
s2.request(2)
@ -92,9 +92,9 @@ class FlowGroupBySpec extends AkkaSpec {
}
"accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) {
StreamPuppet(getSubFlow(1).toPublisher()).cancel()
StreamPuppet(getSubFlow(1).runWith(PublisherDrain())).cancel()
val substream = StreamPuppet(getSubFlow(0).toPublisher())
val substream = StreamPuppet(getSubFlow(0).runWith(PublisherDrain()))
substream.request(2)
substream.expectNext(2)
substream.expectNext(4)
@ -110,7 +110,7 @@ class FlowGroupBySpec extends AkkaSpec {
"accept cancellation of master stream when not consumed anything" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
@ -141,7 +141,7 @@ class FlowGroupBySpec extends AkkaSpec {
}
"work with empty input stream" in {
val publisher = Source(List.empty[Int]).groupBy(_ % 2).toPublisher()
val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
@ -150,7 +150,7 @@ class FlowGroupBySpec extends AkkaSpec {
"abort on onError from upstream" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
@ -167,7 +167,7 @@ class FlowGroupBySpec extends AkkaSpec {
"abort on onError from upstream when substreams are running" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
@ -179,7 +179,7 @@ class FlowGroupBySpec extends AkkaSpec {
upstreamSubscription.sendNext(1)
val (_, substream) = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.toPublisher())
val substreamPuppet = StreamPuppet(substream.runWith(PublisherDrain()))
substreamPuppet.request(1)
substreamPuppet.expectNext(1)

View file

@ -24,7 +24,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 1.second).publishTo(c)
Source(p).groupedWithin(1000, 1.second).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)
@ -49,7 +49,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(1 to 3).groupedWithin(1000, 10.second).publishTo(c)
Source(1 to 3).groupedWithin(1000, 10.second).connect(SubscriberDrain(c)).run()
val cSub = c.expectSubscription
cSub.request(100)
c.expectNext((1 to 3).toList)
@ -61,7 +61,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 1.second).publishTo(c)
Source(p).groupedWithin(1000, 1.second).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(1)
@ -81,7 +81,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
"drop empty groups" in {
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(1000, 500.millis).publishTo(c)
Source(p).groupedWithin(1000, 500.millis).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(2)
@ -103,7 +103,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(3, 2.second).publishTo(c)
Source(p).groupedWithin(3, 2.second).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(4)

View file

@ -18,7 +18,7 @@ class FlowIterableSpec extends AkkaSpec {
"A Flow based on an iterable" must {
"produce elements" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -32,7 +32,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"complete empty" in {
val p = Source(List.empty[Int]).toPublisher()
val p = Source(List.empty[Int]).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
@ -44,7 +44,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with multiple subscribers" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -68,7 +68,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -94,7 +94,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Source(List(1, 2, 3)).map(_ * 2).toPublisher()
val p = Source(List(1, 2, 3)).map(_ * 2).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -106,7 +106,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = Source(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher()
val p = Source(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -118,7 +118,7 @@ class FlowIterableSpec extends AkkaSpec {
"allow cancel before receiving all elements" in {
val count = 100000
val p = Source(1 to count).toPublisher()
val p = Source(1 to count).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -134,19 +134,19 @@ class FlowIterableSpec extends AkkaSpec {
}
"have value equality of publisher" in {
val p1 = Source(List(1, 2, 3)).toPublisher()
val p2 = Source(List(1, 2, 3)).toPublisher()
val p1 = Source(List(1, 2, 3)).runWith(PublisherDrain())
val p2 = Source(List(1, 2, 3)).runWith(PublisherDrain())
p1 should be(p2)
p2 should be(p1)
val p3 = Source(List(1, 2, 3, 4)).toPublisher()
val p3 = Source(List(1, 2, 3, 4)).runWith(PublisherDrain())
p1 should not be (p3)
p3 should not be (p1)
val p4 = Source(Vector.empty[String]).toPublisher()
val p5 = Source(Set.empty[String]).toPublisher()
val p4 = Source(Vector.empty[String]).runWith(PublisherDrain())
val p5 = Source(Set.empty[String]).runWith(PublisherDrain())
p1 should not be (p4)
p4 should be(p5)
p5 should be(p4)
val p6 = Source(List(1, 2, 3).iterator).toPublisher()
val p6 = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
p1 should not be (p6)
p6 should not be (p1)
}

View file

@ -22,7 +22,7 @@ class FlowIteratorSpec extends AkkaSpec {
"A Flow based on an iterator" must {
"produce elements" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -36,7 +36,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"complete empty" in {
val p = Source(List.empty[Int].iterator).toPublisher()
val p = Source(List.empty[Int].iterator).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
@ -48,7 +48,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with multiple subscribers" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -72,7 +72,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -95,7 +95,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Source(List(1, 2, 3).iterator).map(_ * 2).toPublisher()
val p = Source(List(1, 2, 3).iterator).map(_ * 2).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -107,7 +107,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher()
val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -119,7 +119,7 @@ class FlowIteratorSpec extends AkkaSpec {
"allow cancel before receiving all elements" in {
val count = 100000
val p = Source((1 to count).iterator).toPublisher()
val p = Source((1 to count).iterator).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()

View file

@ -23,7 +23,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 3).mapAsync(n Future(n)).publishTo(c)
val p = Source(1 to 3).mapAsync(n Future(n)).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(2)
c.expectNext(1)
@ -40,7 +40,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val p = Source(1 to 50).mapAsync(n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(1000)
for (n 1 to 50) c.expectNext(n)
@ -54,7 +54,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val p = Source(1 to 20).mapAsync(n Future {
probe.ref ! n
n
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
// nothing before requested
probe.expectNoMsg(500.millis)
@ -82,7 +82,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
Await.ready(latch, 10.seconds)
n
}
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err1")
@ -101,7 +101,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
n
}
}).
publishTo(c)
connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err2")

View file

@ -26,7 +26,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val p = Source(1 to 4).mapAsyncUnordered(n Future {
Await.ready(latch(n), 5.seconds)
n
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(5)
latch(2).countDown()
@ -47,7 +47,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val p = Source(1 to 20).mapAsyncUnordered(n Future {
probe.ref ! n
n
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
// nothing before requested
probe.expectNoMsg(500.millis)
@ -76,7 +76,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
Await.ready(latch, 10.seconds)
n
}
}).publishTo(c)
}).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err1")
@ -95,7 +95,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
n
}
}).
publishTo(c)
connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be("err2")

View file

@ -28,7 +28,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1).iterator).
map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).
toPublisher().subscribe(probe)
runWith(PublisherDrain()).subscribe(probe)
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -66,11 +66,11 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
val foreachDrain = ForeachDrain[Int] {
x onCompleteProbe.ref ! ("foreach-" + x)
}
val mf = Source(p).map { x
val future = Source(p).map { x
onCompleteProbe.ref ! ("map-" + x)
x
}.connect(foreachDrain).run()
foreachDrain.future(mf) onComplete { onCompleteProbe.ref ! _ }
}.runWith(foreachDrain)
future onComplete { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)

View file

@ -29,74 +29,65 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"work on empty input" in {
val futureDrain = newFutureDrain
val mf = Source(Nil).prefixAndTail(10).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source(Nil).prefixAndTail(10).runWith(futureDrain)
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(Nil)
val tailSubscriber = SubscriberProbe[Int]
tailFlow.publishTo(tailSubscriber)
tailFlow.connect(SubscriberDrain(tailSubscriber)).run()
tailSubscriber.expectComplete()
}
"work on short input" in {
val futureDrain = newFutureDrain
val mf = Source(List(1, 2, 3)).prefixAndTail(10).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureDrain)
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(List(1, 2, 3))
val tailSubscriber = SubscriberProbe[Int]
tailFlow.publishTo(tailSubscriber)
tailFlow.connect(SubscriberDrain(tailSubscriber)).run()
tailSubscriber.expectComplete()
}
"work on longer inputs" in {
val futureDrain = newFutureDrain
val mf = Source((1 to 10).iterator).prefixAndTail(5).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureDrain)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(1 to 5)
val futureDrain2 = FutureDrain[immutable.Seq[Int]]
val mf2 = tail.grouped(6).connect(futureDrain2).run()
val fut2 = futureDrain2.future(mf2)
val fut2 = tail.grouped(6).runWith(futureDrain2)
Await.result(fut2, 3.seconds) should be(6 to 10)
}
"handle zero take count" in {
val futureDrain = newFutureDrain
val mf = Source((1 to 10).iterator).prefixAndTail(0).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureDrain)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
val futureDrain2 = FutureDrain[immutable.Seq[Int]]
val mf2 = tail.grouped(11).connect(futureDrain2).run()
val fut2 = futureDrain2.future(mf2)
val fut2 = tail.grouped(11).runWith(futureDrain2)
Await.result(fut2, 3.seconds) should be(1 to 10)
}
"handle negative take count" in {
val futureDrain = newFutureDrain
val mf = Source((1 to 10).iterator).prefixAndTail(-1).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureDrain)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
val futureDrain2 = FutureDrain[immutable.Seq[Int]]
val mf2 = tail.grouped(11).connect(futureDrain2).run()
val fut2 = futureDrain2.future(mf2)
val fut2 = tail.grouped(11).runWith(futureDrain2)
Await.result(fut2, 3.seconds) should be(1 to 10)
}
"work if size of take is equal to stream size" in {
val futureDrain = newFutureDrain
val mf = Source((1 to 10).iterator).prefixAndTail(10).connect(futureDrain).run()
val fut = futureDrain.future(mf)
val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureDrain)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(1 to 10)
val subscriber = StreamTestKit.SubscriberProbe[Int]()
tail.publishTo(subscriber)
tail.connect(SubscriberDrain(subscriber)).run()
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
}
@ -104,7 +95,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(3).publishTo(subscriber)
Source(publisher).prefixAndTail(3).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -122,7 +113,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(1).publishTo(subscriber)
Source(publisher).prefixAndTail(1).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -137,7 +128,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
tail.publishTo(substreamSubscriber)
tail.connect(SubscriberDrain(substreamSubscriber)).run()
substreamSubscriber.expectSubscription()
upstream.sendError(testException)
@ -149,7 +140,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(3).publishTo(subscriber)
Source(publisher).prefixAndTail(3).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -167,7 +158,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[(immutable.Seq[Int], Source[Int])]()
Source(publisher).prefixAndTail(1).publishTo(subscriber)
Source(publisher).prefixAndTail(1).connect(SubscriberDrain(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
@ -182,7 +173,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
subscriber.expectComplete()
val substreamSubscriber = StreamTestKit.SubscriberProbe[Int]()
tail.publishTo(substreamSubscriber)
tail.connect(SubscriberDrain(substreamSubscriber)).run()
substreamSubscriber.expectSubscription().cancel()
upstream.expectCancellation()

View file

@ -83,15 +83,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val identity2: Flow[Any, Any] Flow[Any, Any] = in identity(in)
val toPublisher: (Source[Any], FlowMaterializer) Publisher[Any] =
(f, m) f.toPublisher()(m)
(f, m) f.runWith(PublisherDrain())(m)
def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out], FlowMaterializer) Publisher[Out] =
(f, m) f.toFanoutPublisher(initialBufferSize, maximumBufferSize)(m)
(f, m) f.runWith(FanoutPublisherDrain(initialBufferSize, maximumBufferSize))(m)
def materializeIntoSubscriberAndPublisher[In, Out](flow: Flow[In, Out]): (Subscriber[In], Publisher[Out]) = {
val tap = SubscriberTap[In]
val drain = PublisherDrain[Out]
val mf = tap.connect(flow).connect(drain).run()
(tap.subscriber(mf), drain.publisher(mf))
flow.runWith(tap, drain)
}
"A Flow" must {
@ -174,7 +173,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val c1 = StreamTestKit.SubscriberProbe[String]()
flowOut.subscribe(c1)
val tap: Publisher[String] = Source(List("1", "2", "3")).toPublisher()
val tap: Publisher[String] = Source(List("1", "2", "3")).runWith(PublisherDrain())
tap.subscribe(flowIn)
val sub1 = c1.expectSubscription
@ -195,7 +194,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
sub1.request(3)
c1.expectNoMsg(200.millis)
val tap: Publisher[Int] = Source(List(1, 2, 3)).toPublisher()
val tap: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain())
tap.subscribe(flowIn)
c1.expectNext("1")
@ -214,7 +213,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
sub1.request(3)
c1.expectNoMsg(200.millis)
val tap: Publisher[Int] = Source(List(1, 2, 3)).toPublisher()
val tap: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain())
tap.subscribe(flowIn)
c1.expectNext("elem-1")
@ -227,7 +226,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val flow: Flow[String, String] = Flow[String]
val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[String] = flow.connect(SubscriberDrain(c1))
val publisher: Publisher[String] = Source(List("1", "2", "3")).toPublisher()
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(PublisherDrain())
Source(publisher).connect(sink).run()
val sub1 = c1.expectSubscription
@ -241,8 +240,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"perform transformation operation" in {
val flow = Flow[Int].map(i { testActor ! i.toString; i.toString })
val publisher = Source(List(1, 2, 3)).toPublisher()
Source(publisher).connect(flow).consume()
val publisher = Source(List(1, 2, 3)).runWith(PublisherDrain())
Source(publisher).connect(flow).connect(BlackholeDrain).run()
expectMsg("1")
expectMsg("2")
@ -253,7 +252,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val flow = Flow[Int].map(_.toString)
val c1 = StreamTestKit.SubscriberProbe[String]()
val sink: Sink[Int] = flow.connect(SubscriberDrain(c1))
val publisher: Publisher[Int] = Source(List(1, 2, 3)).toPublisher()
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(PublisherDrain())
Source(publisher).connect(sink).run()
val sub1 = c1.expectSubscription
@ -266,8 +265,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"be materializable several times with fanout publisher" in {
val flow = Source(List(1, 2, 3)).map(_.toString)
val p1 = flow.toFanoutPublisher(2, 2)
val p2 = flow.toFanoutPublisher(2, 2)
val p1 = flow.runWith(FanoutPublisherDrain(2, 2))
val p2 = flow.runWith(FanoutPublisherDrain(2, 2))
val s1 = StreamTestKit.SubscriberProbe[String]()
val s2 = StreamTestKit.SubscriberProbe[String]()
val s3 = StreamTestKit.SubscriberProbe[String]()
@ -299,7 +298,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"be covariant" in {
val f1: Source[Fruit] = Source[Fruit](() Some(new Apple))
val p1: Publisher[Fruit] = Source[Fruit](() Some(new Apple)).toPublisher()
val p1: Publisher[Fruit] = Source[Fruit](() Some(new Apple)).runWith(PublisherDrain())
val f2: Source[Source[Fruit]] = Source[Fruit](() Some(new Apple)).splitWhen(_ true)
val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](() Some(new Apple)).groupBy(_ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](() Some(new Apple)).prefixAndTail(1)

View file

@ -32,7 +32,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val tap = Source((1 to elementCount).iterator)
val groupStream = tap.splitWhen(_ == splitWhen).toPublisher()
val groupStream = tap.splitWhen(_ == splitWhen).runWith(PublisherDrain())
val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int]]()
groupStream.subscribe(masterSubscriber)
@ -53,7 +53,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
"splitWhen" must {
"work in the happy case" in new SubstreamsSupport(elementCount = 4) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain()))
masterSubscriber.expectNoMsg(100.millis)
s1.request(2)
@ -62,7 +62,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
s1.request(1)
s1.expectComplete()
val s2 = StreamPuppet(getSubFlow().toPublisher())
val s2 = StreamPuppet(getSubFlow().runWith(PublisherDrain()))
s2.request(1)
s2.expectNext(3)
@ -77,9 +77,9 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
"support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain()))
s1.cancel()
val s2 = StreamPuppet(getSubFlow().toPublisher())
val s2 = StreamPuppet(getSubFlow().runWith(PublisherDrain()))
s2.request(4)
s2.expectNext(5)
@ -94,7 +94,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
"support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubFlow().toPublisher())
val s1 = StreamPuppet(getSubFlow().runWith(PublisherDrain()))
masterSubscription.cancel()
s1.request(4)
s1.expectNext(1)

View file

@ -34,7 +34,7 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest {
"not take anything for negative n" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1, 2, 3)).take(-1).publishTo(probe)
Source(List(1, 2, 3)).take(-1).connect(SubscriberDrain(probe)).run()
probe.expectSubscription().request(10)
probe.expectComplete()
}

View file

@ -18,7 +18,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
Source(p).takeWithin(1.second).publishTo(c)
Source(p).takeWithin(1.second).connect(SubscriberDrain(c)).run()
val pSub = p.expectSubscription()
val cSub = c.expectSubscription()
cSub.request(100)
@ -38,7 +38,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
"deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[Int]()
Source(1 to 3).takeWithin(1.second).publishTo(c)
Source(1 to 3).takeWithin(1.second).connect(SubscriberDrain(c)).run()
val cSub = c.expectSubscription()
c.expectNoMsg(200.millis)
cSub.request(100)

View file

@ -18,7 +18,7 @@ class FlowThunkSpec extends AkkaSpec {
"produce elements" in {
val iter = List(1, 2, 3).iterator
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).toPublisher()
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -32,7 +32,7 @@ class FlowThunkSpec extends AkkaSpec {
}
"complete empty" in {
val p = Source(() None).toPublisher()
val p = Source(() None).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -44,7 +44,7 @@ class FlowThunkSpec extends AkkaSpec {
"allow cancel before receiving all elements" in {
val count = 100000
val iter = (1 to count).iterator
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).toPublisher()
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()

View file

@ -28,7 +28,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
}
override def isComplete: Boolean = !isTimerActive("tick")
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -54,7 +54,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
}
override def isComplete: Boolean = !isTimerActive("tick")
}).
consume()
connect(BlackholeDrain).run()
val pSub = p.expectSubscription()
expectMsg("tick-1")
expectMsg("tick-2")
@ -72,7 +72,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
def onNext(element: Int) = Nil
override def onTimer(timerKey: Any) =
throw exception
}).toPublisher()
}).runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
import scala.util.Failure
import akka.stream.MaterializerSettings
import scala.concurrent.Future
class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
@ -19,16 +20,15 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
implicit val materializer = FlowMaterializer(settings)
"A Flow with toFuture" must {
"A Flow with FutureDrain" must {
"yield the first value" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = FutureDrain[Int]
val m = Source(p).connect(f).run()
val f: Future[Int] = Source(p).map(identity).runWith(FutureDrain())
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)
Await.result(f.future(m), 100.millis) should be(42)
Await.result(f, 100.millis) should be(42)
proc.expectCancellation()
}
@ -37,37 +37,33 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
val f = FutureDrain[Int]
val s = SubscriberTap[Int]
val m = s.connect(f).run()
p.subscribe(s.subscriber(m))
p.subscribe(m.materializedTap(s))
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)
Await.result(f.future(m), 100.millis) should be(42)
Await.result(m.materializedDrain(f), 100.millis) should be(42)
proc.expectCancellation()
}
"yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = FutureDrain[Int]
val m = Source(p).connect(f).run()
val f = Source(p).runWith(FutureDrain())
val proc = p.expectSubscription
proc.expectRequest()
val ex = new RuntimeException("ex")
proc.sendError(ex)
val future = f.future(m)
Await.ready(future, 100.millis)
future.value.get should be(Failure(ex))
Await.ready(f, 100.millis)
f.value.get should be(Failure(ex))
}
"yield NoSuchElementExcption for empty stream" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = FutureDrain[Int]
val m = Source(p).connect(f).run()
val f = Source(p).runWith(FutureDrain())
val proc = p.expectSubscription
proc.expectRequest()
proc.sendComplete()
val future = f.future(m)
Await.ready(future, 100.millis)
future.value.get match {
Await.ready(f, 100.millis)
f.value.get match {
case Failure(e: NoSuchElementException) e.getMessage() should be("empty stream")
case x fail("expected NoSuchElementException, got " + x)
}

View file

@ -40,7 +40,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"A Flow with transformRecover operations" must {
"produce one-to-one transformation as expected" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -54,7 +54,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
case Some(_) List(-1)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -68,7 +68,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce one-to-several transformation as expected" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -82,7 +82,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
case Some(_) List(-1)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -99,7 +99,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4).iterator).toPublisher()
val p = Source(List(1, 2, 3, 4).iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -113,7 +113,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
case Some(_) List(-1)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -127,7 +127,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def").iterator).toPublisher()
val p = Source(List("a", "bc", "def").iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new TryRecoveryTransformer[String, Int] {
var concat = ""
@ -147,7 +147,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
case None Nil
case Some(_) List(-1)
}
}).toFanoutPublisher(1, 1)
}).runWith(FanoutPublisherDrain(1, 1))
val c1 = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c1)
val sub1 = c1.expectSubscription()
@ -170,7 +170,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"invoke onComplete when done" in {
val p = Source(List("a").iterator).toPublisher()
val p = Source(List("a").iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new TryRecoveryTransformer[String, String] {
var s = ""
@ -180,7 +180,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
override def onTermination(e: Option[Throwable]) = List(s + "B")
}).
toPublisher()
runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
@ -200,7 +200,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
override def isComplete = s == "Success(1)"
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -225,7 +225,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
override def isComplete = s == "Success(1)"
override def onTermination(e: Option[Throwable]) = List(s.length + 10)
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -240,7 +240,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"report error when exception is thrown" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = {
@ -249,7 +249,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
override def onError(e: Throwable) = List(-1)
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -279,7 +279,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
case Some(_) List(-1, -2, -3)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -332,7 +332,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
}
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription()
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
@ -353,7 +353,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
override def onNext(in: Int) = List(in)
override def onError(e: Throwable) = throw e
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription()
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -366,13 +366,13 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"support cancel as expected" in {
val p = Source(List(1, 2, 3).iterator).toPublisher()
val p = Source(List(1, 2, 3).iterator).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem)
override def onError(e: Throwable) = List(-1)
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()

View file

@ -23,7 +23,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"A Flow with transform operations" must {
"produce one-to-one transformation as expected" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -32,7 +32,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
List(tot)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -46,7 +46,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce one-to-several transformation as expected" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -55,7 +55,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
Vector.fill(elem)(tot)
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -72,7 +72,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4)).toPublisher()
val p = Source(List(1, 2, 3, 4)).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -85,7 +85,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -99,7 +99,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def")).toPublisher()
val p = Source(List("a", "bc", "def")).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[String, Int] {
var concat = ""
@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
List(tot)
}
}).
toFanoutPublisher(2, 2)
runWith(FanoutPublisherDrain(2, 2))
val c1 = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c1)
val sub1 = c1.expectSubscription()
@ -138,7 +138,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"invoke onComplete when done" in {
val p = Source(List("a")).toPublisher()
val p = Source(List("a")).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[String, String] {
var s = ""
@ -148,7 +148,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
override def onTermination(e: Option[Throwable]) = List(s + "B")
}).
toPublisher()
runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
@ -159,7 +159,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done" in {
val cleanupProbe = TestProbe()
val p = Source(List("a")).toPublisher()
val p = Source(List("a")).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[String, String] {
var s = ""
@ -170,7 +170,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s + "B")
override def cleanup() = cleanupProbe.ref ! s
}).
toPublisher()
runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
@ -182,7 +182,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done consume" in {
val cleanupProbe = TestProbe()
val p = Source(List("a")).toPublisher()
val p = Source(List("a")).runWith(PublisherDrain())
Source(p).
transform("transform", () new Transformer[String, String] {
var s = "x"
@ -198,7 +198,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done after error" in {
val cleanupProbe = TestProbe()
val p = Source(List("a", "b", "c")).toPublisher()
val p = Source(List("a", "b", "c")).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[String, String] {
var s = ""
@ -214,7 +214,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s + "B")
override def cleanup() = cleanupProbe.ref ! s
}).
toPublisher()
runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c)
val s = c.expectSubscription()
@ -236,7 +236,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
override def isComplete = s == "1"
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -263,7 +263,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s.length + 10)
override def cleanup() = cleanupProbe.ref ! s
}).
toPublisher()
runWith(PublisherDrain())
val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c)
@ -279,7 +279,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"report error when exception is thrown" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = {
@ -290,7 +290,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
}
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -304,12 +304,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"support cancel as expected" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem)
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -323,13 +323,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"support producing elements from empty inputs" in {
val p = Source(List.empty[Int]).toPublisher()
val p = Source(List.empty[Int]).runWith(PublisherDrain())
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = Nil
override def onTermination(e: Option[Throwable]) = List(1, 2, 3)
}).
toPublisher()
runWith(PublisherDrain())
val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
@ -363,7 +363,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
case _ Nil
}
}
}).publishTo(subscriber)
}).connect(SubscriberDrain(subscriber)).run()
val subscription = subscriber.expectSubscription()
subscription.request(10)
@ -386,13 +386,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
})
val s1 = StreamTestKit.SubscriberProbe[Int]()
flow.publishTo(s1)
flow.connect(SubscriberDrain(s1)).run()
s1.expectSubscription().request(3)
s1.expectNext(1, 2, 3)
s1.expectComplete()
val s2 = StreamTestKit.SubscriberProbe[Int]()
flow.publishTo(s2)
flow.connect(SubscriberDrain(s2)).run()
s2.expectSubscription().request(3)
s2.expectNext(1, 2, 3)
s2.expectComplete()

View file

@ -59,7 +59,7 @@ class GraphBalanceSpec extends AkkaSpec {
balance ~> Flow[Int].grouped(15) ~> f5
}.run()
Set(f1, f2, f3, f4, f5) flatMap (sink Await.result(sink.future(g), 3.seconds)) should be((0 to 14).toSet)
Set(f1, f2, f3, f4, f5) flatMap (sink Await.result(g.materializedDrain(sink), 3.seconds)) should be((0 to 14).toSet)
}
"fairly balance between three outputs" in {
@ -73,7 +73,7 @@ class GraphBalanceSpec extends AkkaSpec {
}.run()
Seq(f1, f2, f3) map { sink
Await.result(sink.future(g), 3.seconds) should be(numElementsForSink +- 1000)
Await.result(g.materializedDrain(sink), 3.seconds) should be(numElementsForSink +- 1000)
}
}

View file

@ -62,11 +62,11 @@ class GraphBroadcastSpec extends AkkaSpec {
bcast ~> Flow[Int].grouped(5) ~> f5
}.run()
Await.result(g.getDrainFor(f1), 3.seconds) should be(List(1, 2, 3))
Await.result(g.getDrainFor(f2), 3.seconds) should be(List(1, 2, 3))
Await.result(g.getDrainFor(f3), 3.seconds) should be(List(1, 2, 3))
Await.result(g.getDrainFor(f4), 3.seconds) should be(List(1, 2, 3))
Await.result(g.getDrainFor(f5), 3.seconds) should be(List(1, 2, 3))
Await.result(g.materializedDrain(f1), 3.seconds) should be(List(1, 2, 3))
Await.result(g.materializedDrain(f2), 3.seconds) should be(List(1, 2, 3))
Await.result(g.materializedDrain(f3), 3.seconds) should be(List(1, 2, 3))
Await.result(g.materializedDrain(f4), 3.seconds) should be(List(1, 2, 3))
Await.result(g.materializedDrain(f5), 3.seconds) should be(List(1, 2, 3))
}
"produce to other even though downstream cancels" in {

View file

@ -76,7 +76,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
merge ~> Flow[Int].grouped(10) ~> resultFuture
}.run()
Await.result(g.getDrainFor(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6))
Await.result(g.materializedDrain(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6))
}
"support balance - merge (parallelization) layouts" in {
@ -96,7 +96,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
balance ~> f ~> merge ~> Flow[Int].grouped(elements.size * 2) ~> out
}.run()
Await.result(out.future(g), 3.seconds).sorted should be(elements)
Await.result(g.materializedDrain(out), 3.seconds).sorted should be(elements)
}
"support wikipedia Topological_sorting 2" in {
@ -142,9 +142,9 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
}.run()
Await.result(g.getDrainFor(resultFuture2), 3.seconds).sorted should be(List(5, 7))
Await.result(g.getDrainFor(resultFuture9), 3.seconds).sorted should be(List(3, 5, 7, 7))
Await.result(g.getDrainFor(resultFuture10), 3.seconds).sorted should be(List(3, 5, 7))
Await.result(g.materializedDrain(resultFuture2), 3.seconds).sorted should be(List(5, 7))
Await.result(g.materializedDrain(resultFuture9), 3.seconds).sorted should be(List(3, 5, 7, 7))
Await.result(g.materializedDrain(resultFuture10), 3.seconds).sorted should be(List(3, 5, 7))
}
@ -161,11 +161,11 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
merge ~> Flow[Int].grouped(10).connect(resultFuture)
}.run()
Await.result(g.getDrainFor(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9))
Await.result(g.materializedDrain(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9))
}
"be able to run plain flow" in {
val p = Source(List(1, 2, 3)).toPublisher()
val p = Source(List(1, 2, 3)).runWith(PublisherDrain())
val s = SubscriberProbe[Int]
val flow = Flow[Int].map(_ * 2)
FlowGraph { implicit builder
@ -215,7 +215,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
val lego1 = Lego(Flow[String].filter(_.length > 3).map(s s" $s "))
val lego2 = Lego(Flow[String].map(_.toUpperCase))
val lego3 = lego1.connect(lego2, Flow[ByteString].map(_.utf8String))
val source = PublisherTap(Source(List("green ", "blue", "red", "yellow", "black")).toPublisher)
val source = Source(List("green ", "blue", "red", "yellow", "black"))
val s = SubscriberProbe[ByteString]
val sink = SubscriberDrain(s)
lego3.run(source, sink)

View file

@ -23,8 +23,7 @@ object ImplicitFlowMaterializerSpec {
// run takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer
import context.dispatcher
val foldDrain = FoldDrain[String, String]("")(_ + _)
val mf = flow.connect(foldDrain).run()
foldDrain.future(mf) pipeTo sender()
flow.runWith(foldDrain) pipeTo sender()
}
}
}

View file

@ -17,7 +17,7 @@ class TickPublisherSpec extends AkkaSpec {
"produce ticks" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 500.millis, () "tick-" + tickGen.next()).publishTo(c)
Source(1.second, 500.millis, () "tick-" + tickGen.next()).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
@ -33,7 +33,7 @@ class TickPublisherSpec extends AkkaSpec {
"drop ticks when not requested" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
Source(1.second, 1.second, () "tick-" + tickGen.next()).publishTo(c)
Source(1.second, 1.second, () "tick-" + tickGen.next()).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(2)
c.expectNext("tick-1")
@ -50,7 +50,7 @@ class TickPublisherSpec extends AkkaSpec {
"produce ticks with multiple subscribers" in {
val tickGen = Iterator from 1
val p = Source(1.second, 1.second, () "tick-" + tickGen.next()).toPublisher()
val p = Source(1.second, 1.second, () "tick-" + tickGen.next()).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c1)
@ -74,7 +74,7 @@ class TickPublisherSpec extends AkkaSpec {
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).publishTo(c)
Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).connect(SubscriberDrain(c)).run()
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
@ -83,8 +83,8 @@ class TickPublisherSpec extends AkkaSpec {
// FIXME enable this test again when zip is back
"be usable with zip for a simple form of rate limiting" ignore {
// val c = StreamTestKit.SubscriberProbe[Int]()
// val rate = Source(1.second, 1.second, () "tick").toPublisher()
// Source(1 to 100).zip(rate).map { case (n, _) n }.publishTo(c)
// val rate = Source(1.second, 1.second, () "tick").runWith(PublisherDrain())
// Source(1 to 100).zip(rate).map { case (n, _) n }.connect(SubscriberDrain(c)).run()
// val sub = c.expectSubscription()
// sub.request(1000)
// c.expectNext(1)

View file

@ -136,36 +136,36 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
}
// Ops come in reverse order
override def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedPipe = {
override def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedMap = {
val flowName = createFlowName()
def attachDrain(pub: Publisher[Out]) = drain match {
case s: SimpleDrain[Out] s.attach(pub, this, flowName)
case s: DrainWithKey[Out, _] s.attach(pub, this, flowName)
case _ throw new MaterializationException("unknown Drain type " + drain.getClass)
case s: SimpleDrain[Out] s.attach(pub, this, flowName)
case s: DrainWithKey[Out] s.attach(pub, this, flowName)
case _ throw new MaterializationException("unknown Drain type " + drain.getClass)
}
def attachTap(sub: Subscriber[In]) = tap match {
case s: SimpleTap[In] s.attach(sub, this, flowName)
case s: TapWithKey[In, _] s.attach(sub, this, flowName)
case _ throw new MaterializationException("unknown Tap type " + drain.getClass)
case s: SimpleTap[In] s.attach(sub, this, flowName)
case s: TapWithKey[In] s.attach(sub, this, flowName)
case _ throw new MaterializationException("unknown Tap type " + drain.getClass)
}
def createDrain() = drain.asInstanceOf[Drain[In]] match {
case s: SimpleDrain[In] s.create(this, flowName) -> (())
case s: DrainWithKey[In, _] s.create(this, flowName)
case _ throw new MaterializationException("unknown Drain type " + drain.getClass)
case s: SimpleDrain[In] s.create(this, flowName) -> (())
case s: DrainWithKey[In] s.create(this, flowName)
case _ throw new MaterializationException("unknown Drain type " + drain.getClass)
}
def createTap() = tap.asInstanceOf[Tap[Out]] match {
case s: SimpleTap[Out] s.create(this, flowName) -> (())
case s: TapWithKey[Out, _] s.create(this, flowName)
case _ throw new MaterializationException("unknown Tap type " + drain.getClass)
case s: SimpleTap[Out] s.create(this, flowName) -> (())
case s: TapWithKey[Out] s.create(this, flowName)
case _ throw new MaterializationException("unknown Tap type " + drain.getClass)
}
def isActive(s: AnyRef) = s match {
case tap: SimpleTap[_] tap.isActive
case tap: TapWithKey[_, _] tap.isActive
case drain: SimpleDrain[_] drain.isActive
case drain: DrainWithKey[_, _] drain.isActive
case _: Tap[_] throw new MaterializationException("unknown Tap type " + drain.getClass)
case _: Drain[_] throw new MaterializationException("unknown Drain type " + drain.getClass)
case tap: SimpleTap[_] tap.isActive
case tap: TapWithKey[_] tap.isActive
case drain: SimpleDrain[_] drain.isActive
case drain: DrainWithKey[_] drain.isActive
case _: Tap[_] throw new MaterializationException("unknown Tap type " + drain.getClass)
case _: Drain[_] throw new MaterializationException("unknown Drain type " + drain.getClass)
}
val (tapValue, drainValue) =

View file

@ -7,6 +7,7 @@ import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamInputProcessor
import akka.stream.scaladsl2.Source
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.PublisherDrain
/**
* INTERNAL API
@ -17,8 +18,8 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer)
import MultiStreamInputProcessor._
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val flow = primaryInputs.dequeueInputElement().asInstanceOf[Source[Any]]
val publisher = flow.toPublisher()(materializer)
val source = primaryInputs.dequeueInputElement().asInstanceOf[Source[Any]]
val publisher = source.runWith(PublisherDrain())(materializer)
// FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now)
val inputs = createAndSubscribeSubstreamInput(publisher)
nextPhase(streamSubstream(inputs))

View file

@ -25,7 +25,10 @@ import java.util.concurrent.atomic.AtomicReference
* FlowMaterializers can be used but must then implement the functionality of these
* Drain nodes themselves (or construct an ActorBasedFlowMaterializer).
*/
trait Drain[-In] extends Sink[In]
trait Drain[-In] extends Sink[In] {
override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType =
tap.connect(this).run().materializedTap(tap)
}
/**
* A drain that does not need to create a user-accessible object during materialization.
@ -63,7 +66,10 @@ trait SimpleDrain[-In] extends Drain[In] {
* to retrieve in order to access aspects of this drain (could be a completion Future
* or a cancellation handle, etc.)
*/
trait DrainWithKey[-In, T] extends Drain[In] {
trait DrainWithKey[-In] extends Drain[In] {
type MaterializedType
/**
* Attach this drain to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this drain belongs to
@ -75,12 +81,12 @@ trait DrainWithKey[-In, T] extends Drain[In] {
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): T
def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType
/**
* This method is only used for Drains that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, T) =
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, MaterializedType) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Drain can create a Subscriber instead of being
@ -102,20 +108,21 @@ trait DrainWithKey[-In, T] extends Drain[In] {
*/
object PublisherDrain {
private val instance = new PublisherDrain[Nothing]
def apply[T]: PublisherDrain[T] = instance.asInstanceOf[PublisherDrain[T]]
def apply[T](): PublisherDrain[T] = instance.asInstanceOf[PublisherDrain[T]]
def withFanout[T](initialBufferSize: Int, maximumBufferSize: Int): FanoutPublisherDrain[T] =
new FanoutPublisherDrain[T](initialBufferSize, maximumBufferSize)
}
class PublisherDrain[In] extends DrainWithKey[In, Publisher[In]] {
class PublisherDrain[In] extends DrainWithKey[In] {
type MaterializedType = Publisher[In]
def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = flowPublisher
def publisher(m: MaterializedDrain): Publisher[In] = m.getDrainFor(this)
override def toString: String = "PublisherDrain"
}
class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) extends DrainWithKey[In, Publisher[In]] {
def publisher(m: MaterializedDrain): Publisher[In] = m.getDrainFor(this)
final case class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) extends DrainWithKey[In] {
type MaterializedType = Publisher[In]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = {
val fanoutActor = materializer.actorOf(
Props(new FanoutProcessorImpl(materializer.settings, initialBufferSize, maximumBufferSize)), s"$flowName-fanoutPublisher")
@ -123,12 +130,10 @@ class FanoutPublisherDrain[In](initialBufferSize: Int, maximumBufferSize: Int) e
flowPublisher.subscribe(fanoutProcessor)
fanoutProcessor
}
override def toString: String = "Fanout"
}
object FutureDrain {
def apply[T]: FutureDrain[T] = new FutureDrain[T]
def apply[T](): FutureDrain[T] = new FutureDrain[T]
}
/**
@ -138,7 +143,8 @@ object FutureDrain {
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
class FutureDrain[In] extends DrainWithKey[In, Future[In]] {
class FutureDrain[In] extends DrainWithKey[In] {
type MaterializedType = Future[In]
def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[In] = {
val (sub, f) = create(materializer, flowName)
flowPublisher.subscribe(sub)
@ -159,8 +165,6 @@ class FutureDrain[In] extends DrainWithKey[In, Future[In]] {
(sub, p.future)
}
def future(m: MaterializedDrain): Future[In] = m.getDrainFor(this)
override def toString: String = "FutureDrain"
}
@ -207,7 +211,7 @@ final case class OnCompleteDrain[In](callback: Try[Unit] ⇒ Unit) extends Simpl
}
Nil
}
}).consume()(materializer.withNamePrefix(flowName))
}).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName))
}
/**
@ -215,7 +219,9 @@ final case class OnCompleteDrain[In](callback: Try[Unit] ⇒ Unit) extends Simpl
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
*/
final case class ForeachDrain[In](f: In Unit) extends DrainWithKey[In, Future[Unit]] {
final case class ForeachDrain[In](f: In Unit) extends DrainWithKey[In] {
type MaterializedType = Future[Unit]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Unit] = {
val promise = Promise[Unit]()
Source(flowPublisher).transform("foreach", () new Transformer[In, Unit] {
@ -228,10 +234,9 @@ final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In, Futur
}
Nil
}
}).consume()(materializer.withNamePrefix(flowName))
}).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName))
promise.future
}
def future(m: MaterializedDrain): Future[Unit] = m.getDrainFor(this)
}
/**
@ -241,7 +246,9 @@ final case class ForeachDrain[In](f: In ⇒ Unit) extends DrainWithKey[In, Futur
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
final case class FoldDrain[U, In](zero: U)(f: (U, In) U) extends DrainWithKey[In, Future[U]] {
final case class FoldDrain[U, In](zero: U)(f: (U, In) U) extends DrainWithKey[In] {
type MaterializedType = Future[U]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): Future[U] = {
val promise = Promise[U]()
@ -256,16 +263,9 @@ final case class FoldDrain[U, In](zero: U)(f: (U, In) ⇒ U) extends DrainWithKe
}
Nil
}
}).consume()(materializer.withNamePrefix(flowName))
}).connect(BlackholeDrain).run()(materializer.withNamePrefix(flowName))
promise.future
}
def future(m: MaterializedDrain): Future[U] = m.getDrainFor(this)
}
trait MaterializedDrain {
/**
* Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]].
*/
def getDrainFor[T](drainKey: DrainWithKey[_, T]): T
}

View file

@ -27,6 +27,14 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* Connect this flow to a sink, concatenating the processing steps of both.
*/
def connect(sink: Sink[Out]): Sink[In]
/**
*
* Connect the `Tap` to this `Flow` and then connect it to the `Drain` and run it. The returned tuple contains
* the materialized values of the `Tap` and `Drain`, e.g. the `Subscriber` of a [[SubscriberTap]] and
* and `Publisher` of a [[PublisherDrain]].
*/
def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType)
}
object Flow {
@ -41,16 +49,9 @@ object Flow {
* Flow with attached input and output, can be executed.
*/
trait RunnableFlow {
def run()(implicit materializer: FlowMaterializer): MaterializedFlow
def run()(implicit materializer: FlowMaterializer): MaterializedMap
}
/**
* Returned by [[RunnableFlow#run]] and can be used as parameter to the
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
*/
trait MaterializedFlow extends MaterializedTap with MaterializedDrain
/**
* Scala API: Operations offered by Flows and Sources with a free output side: the DSL flows left-to-right only.
*/
@ -434,3 +435,4 @@ private[scaladsl2] object FlowOps {
override def onNext(elem: Any) = List(elem)
}
}

View file

@ -836,7 +836,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
/**
* Materialize the `FlowGraph` and attach all sinks and sources.
*/
def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
def run()(implicit materializer: FlowMaterializer): MaterializedMap = {
val edges = graph.edges
if (edges.size == 1) {
val edge = edges.head
@ -854,20 +854,20 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
/**
* Run FlowGraph that only contains one edge from a `Source` to a `Sink`.
*/
private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedMap = {
val mf = pipe.withTap(tapVertex.tap).withDrain(drainVertex.drain).run()
val materializedSources: Map[TapWithKey[_, _], Any] = tapVertex match {
case TapVertex(tap: TapWithKey[_, _]) Map(tap -> mf.getTapFor(tap))
case _ Map.empty
val materializedSources: Map[TapWithKey[_], Any] = tapVertex match {
case TapVertex(tap: TapWithKey[_]) Map(tap -> mf.materializedTap(tap))
case _ Map.empty
}
val materializedSinks: Map[DrainWithKey[_, _], Any] = drainVertex match {
case DrainVertex(drain: DrainWithKey[_, _]) Map(drain -> mf.getDrainFor(drain))
case _ Map.empty
val materializedSinks: Map[DrainWithKey[_], Any] = drainVertex match {
case DrainVertex(drain: DrainWithKey[_]) Map(drain -> mf.materializedDrain(drain))
case _ Map.empty
}
new MaterializedFlowGraph(materializedSources, materializedSinks)
}
private def runGraph()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
private def runGraph()(implicit materializer: FlowMaterializer): MaterializedMap = {
import scalax.collection.GraphTraversal._
// start with drains
@ -877,7 +877,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
downstreamSubscriber: Map[graph.EdgeT, Subscriber[Any]] = Map.empty,
upstreamPublishers: Map[graph.EdgeT, Publisher[Any]] = Map.empty,
taps: Map[TapVertex, SinkPipe[Any]] = Map.empty,
materializedDrains: Map[DrainWithKey[_, _], Any] = Map.empty)
materializedDrains: Map[DrainWithKey[_], Any] = Map.empty)
val result = startingNodes.foldLeft(Memo()) {
case (memo, start)
@ -892,12 +892,12 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
val pipe = edge.label.pipe
// returns the materialized drain, if any
def connectToDownstream(publisher: Publisher[Any]): Option[(DrainWithKey[_, _], Any)] = {
def connectToDownstream(publisher: Publisher[Any]): Option[(DrainWithKey[_], Any)] = {
val f = pipe.withTap(PublisherTap(publisher))
edge.to.value match {
case DrainVertex(drain: DrainWithKey[_, _])
case DrainVertex(drain: DrainWithKey[_])
val mf = f.withDrain(drain).run()
Some(drain -> mf.getDrainFor(drain))
Some(drain -> mf.materializedDrain(drain))
case DrainVertex(drain)
f.withDrain(drain).run()
None
@ -948,12 +948,12 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
}
// connect all input taps as the last thing
val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_, _], Any]) {
val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_], Any]) {
case (acc, (TapVertex(tap), pipe))
val mf = pipe.withTap(tap).run()
tap match {
case tapKey: TapWithKey[_, _] acc.updated(tapKey, mf.getTapFor(tapKey))
case _ acc
case tapKey: TapWithKey[_] acc.updated(tapKey, mf.materializedTap(tapKey))
case _ acc
}
}
@ -1018,29 +1018,22 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl
}
/**
* Returned by [[FlowGraph#run]] and can be used as parameter to the
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
* Returned by [[FlowGraph#run]] and can be used to retrieve the materialized
* `Tap` inputs or `Drain` outputs.
*/
class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any])
extends MaterializedTap with MaterializedDrain {
private[scaladsl2] class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_], Any], materializedDrains: Map[DrainWithKey[_], Any])
extends MaterializedMap {
/**
* Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]].
*/
override def getTapFor[T](key: TapWithKey[_, T]): T =
override def materializedTap(key: TapWithKey[_]): key.MaterializedType =
materializedTaps.get(key) match {
case Some(matTap) matTap.asInstanceOf[T]
case Some(matTap) matTap.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Tap key [$key] doesn't exist in this flow graph")
}
/**
* Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]].
*/
def getDrainFor[T](key: DrainWithKey[_, T]): T =
def materializedDrain(key: DrainWithKey[_]): key.MaterializedType =
materializedDrains.get(key) match {
case Some(matDrain) matDrain.asInstanceOf[T]
case Some(matDrain) matDrain.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Drain key [$key] doesn't exist in this flow graph")
}

View file

@ -138,7 +138,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
* stream. The result can be highly implementation specific, ranging from
* local actor chains to remote-deployed processing networks.
*/
def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedPipe
def materialize[In, Out](tap: Tap[In], drain: Drain[Out], ops: List[Ast.AstNode]): MaterializedMap
/**
* Create publishers and subscribers for fan-in and fan-out operations.

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
/**
* Returned by [[RunnableFlow#run]] and [[FlowGraph#run]] and can be used to retrieve the materialized
* `Tap` inputs or `Drain` outputs, e.g. [[SubscriberTap]] or [[PublisherDrain]].
*/
trait MaterializedMap {
/**
* Retrieve a materialized `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]].
*/
def materializedTap(key: TapWithKey[_]): key.MaterializedType
/**
* Retrieve a materialized `Drain`, e.g. the `Publisher` of a [[PublisherDrain]].
*/
def materializedDrain(key: DrainWithKey[_]): key.MaterializedType
}

View file

@ -18,7 +18,7 @@ private[scaladsl2] object Pipe {
}
/**
* Flow with one open input and one open output..
* Flow with one open input and one open output.
*/
private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
@ -40,6 +40,11 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
}
override def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = {
val m = tap.connect(this).connect(drain).run()
(m.materializedTap(tap), m.materializedDrain(drain))
}
private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops)
}
@ -52,11 +57,9 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As
private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = {
val subIn = SubscriberTap[In]()
val mf = withTap(subIn).run()
subIn.subscriber(mf)
}
override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType =
tap.connect(this).run().materializedTap(tap)
}
/**
@ -82,50 +85,29 @@ private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[As
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
}
override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = {
val pubOut = PublisherDrain[Out]
val mf = withDrain(pubOut).run()
pubOut.publisher(mf)
}
override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
withDrain(drain).run().materializedDrain(drain)
override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = {
val pubOut = PublisherDrain.withFanout[Out](initialBufferSize, maximumBufferSize)
val mf = withDrain(pubOut).run()
pubOut.publisher(mf)
}
override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit =
toPublisher().subscribe(subscriber)
override def consume()(implicit materializer: FlowMaterializer): Unit =
withDrain(BlackholeDrain).run()
}
/**
* Pipe with attached input and output, can be executed.
*/
private[scaladsl2] final case class RunnablePipe(input: Tap[_], output: Drain[_], ops: List[AstNode]) extends RunnableFlow {
def run()(implicit materializer: FlowMaterializer): MaterializedPipe =
def run()(implicit materializer: FlowMaterializer): MaterializedMap =
materializer.materialize(input, output, ops)
}
/**
* Returned by [[RunnablePipe#run]] and can be used as parameter to the
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
* Returned by [[RunnablePipe#run]] and can be used as parameter to retrieve the materialized
* `Tap` input or `Drain` output.
*/
private[stream] class MaterializedPipe(tapKey: AnyRef, matTap: Any, drainKey: AnyRef, matDrain: Any) extends MaterializedFlow {
/**
* Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]].
*/
override def getTapFor[T](key: TapWithKey[_, T]): T =
if (key == tapKey) matTap.asInstanceOf[T]
private[stream] class MaterializedPipe(tapKey: AnyRef, matTap: Any, drainKey: AnyRef, matDrain: Any) extends MaterializedMap {
override def materializedTap(key: TapWithKey[_]): key.MaterializedType =
if (key == tapKey) matTap.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Tap key [$key] doesn't match the tap [$tapKey] of this flow")
/**
* Do not call directly. Use accessor method in the concrete `Drain`, e.g. [[PublisherDrain#publisher]].
*/
def getDrainFor[T](key: DrainWithKey[_, T]): T =
if (key == drainKey) matDrain.asInstanceOf[T]
override def materializedDrain(key: DrainWithKey[_]): key.MaterializedType =
if (key == drainKey) matDrain.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Drain key [$key] doesn't match the drain [$drainKey] of this flow")
}

View file

@ -13,6 +13,9 @@ import scala.annotation.unchecked.uncheckedVariance
* Can be used as a `Subscriber`
*/
trait Sink[-In] {
def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] =
Flow[In].connect(this).toSubscriber()
/**
* Connect this `Sink` to a `Tap` and run it. The returned value is the materialized value
* of the `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]].
*/
def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType
}

View file

@ -30,13 +30,12 @@ trait Source[+Out] extends FlowOps[Out] {
*/
def connect(sink: Sink[Out]): RunnableFlow
def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance]
/**
* Connect this `Source` to a `Drain` and run it. The returned value is the materialized value
* of the `Drain`, e.g. the `Publisher` of a [[PublisherDrain]].
*/
def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType
def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance]
def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer)
def consume()(implicit materializer: FlowMaterializer): Unit
}
object Source {
@ -48,7 +47,7 @@ object Source {
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](publisher: Publisher[T]): Tap[T] = PublisherTap(publisher)
def apply[T](publisher: Publisher[T]): Source[T] = PublisherTap(publisher)
/**
* Helper to create [[Source]] from `Iterator`.
@ -60,7 +59,7 @@ object Source {
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def apply[T](iterator: Iterator[T]): Tap[T] = IteratorTap(iterator)
def apply[T](iterator: Iterator[T]): Source[T] = IteratorTap(iterator)
/**
* Helper to create [[Source]] from `Iterable`.
@ -71,14 +70,14 @@ object Source {
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Tap[T] = IterableTap(iterable)
def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableTap(iterable)
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): Tap[T] = ThunkTap(f)
def apply[T](f: () Option[T]): Source[T] = ThunkTap(f)
/**
* Start a new `Source` from the given `Future`. The stream will consist of
@ -86,7 +85,7 @@ object Source {
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Tap[T] = FutureTap(future)
def apply[T](future: Future[T]): Source[T] = FutureTap(future)
/**
* Elements are produced from the tick closure periodically with the specified interval.
@ -95,6 +94,7 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Tap[T] =
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
TickTap(initialDelay, interval, tick)
}

View file

@ -34,17 +34,8 @@ trait Tap[+Out] extends Source[Out] {
override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink)
override def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toPublisher()(materializer)
override def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] =
sourcePipe.toFanoutPublisher(initialBufferSize, maximumBufferSize)(materializer)
override def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit =
sourcePipe.publishTo(subscriber)(materializer)
override def consume()(implicit materializer: FlowMaterializer): Unit =
sourcePipe.consume()
override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
connect(drain).run().materializedDrain(drain)
/** INTERNAL API */
override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op))
@ -89,7 +80,10 @@ trait SimpleTap[+Out] extends Tap[Out] {
* to retrieve in order to access aspects of this tap (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait TapWithKey[+Out, T] extends Tap[Out] {
trait TapWithKey[+Out] extends Tap[Out] {
type MaterializedType
/**
* Attach this tap to the given [[org.reactivestreams.Subscriber]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this tap belongs to
@ -101,12 +95,12 @@ trait TapWithKey[+Out, T] extends Tap[Out] {
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): T
def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType
/**
* This method is only used for Taps that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, T) =
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, MaterializedType) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Tap can create a Publisher instead of being
@ -124,11 +118,12 @@ trait TapWithKey[+Out, T] extends Tap[Out] {
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final case class SubscriberTap[Out]() extends TapWithKey[Out, Subscriber[Out]] {
final case class SubscriberTap[Out]() extends TapWithKey[Out] {
type MaterializedType = Subscriber[Out]
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] =
flowSubscriber
def subscriber(m: MaterializedTap): Subscriber[Out] = m.getTapFor(this)
}
/**
@ -230,9 +225,3 @@ final case class TickTap[Out](initialDelay: FiniteDuration, interval: FiniteDura
name = s"$flowName-0-tick"))
}
trait MaterializedTap {
/**
* Do not call directly. Use accessor method in the concrete `Tap`, e.g. [[SubscriberTap#subscriber]].
*/
def getTapFor[T](tapKey: TapWithKey[_, T]): T
}