+str #16093 Add missing runWith variant

This commit is contained in:
Patrik Nordwall 2014-10-30 14:58:44 +01:00
parent 292afaa4d0
commit 5e240367f8
20 changed files with 90 additions and 110 deletions

View file

@ -122,7 +122,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"remember requested after restart" in {
// creating actor with default supervision, because stream supervisor default strategy is to stop
val ref = system.actorOf(manualSubscriberProps(testActor))
Source(1 to 7).to(Sink(ActorSubscriber[Int](ref))).run()
Source(1 to 7).runWith(Sink(ActorSubscriber[Int](ref)))
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))

View file

@ -79,7 +79,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
val flow: Flow[Int, Long] = Flow[Int].map(_.toLong).timedIntervalBetween(in in % 2 == 1, d probe.ref ! d)
val c1 = StreamTestKit.SubscriberProbe[Long]()
Source(List(1, 2, 3)).via(flow).to(Sink(c1)).run()
Source(List(1, 2, 3)).via(flow).runWith(Sink(c1))
val s = c1.expectSubscription()
s.request(100)

View file

@ -120,7 +120,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
val ssession = Await.result(ssessionf, duration)
val sdata = ssession.data
Source(sdata).map(bs ByteString(bs.decodeString("utf-8").split('\n').head.toUpperCase + '\n')).
to(Sink(scipher.plainTextOutbound)).run()
runWith(Sink(scipher.plainTextOutbound))
}
def replyFirstLineInUpperCase(clientConnection: JavaSslConnection): Unit = {
@ -129,7 +129,7 @@ class SslTlsFlowSpec extends AkkaSpec("akka.actor.default-mailbox.mailbox-type =
def sendLineAndReceiveResponse(ccipher: SslTlsCipher, message: String): String = {
val csessionf = Source(ccipher.sessionInbound).runWith(Sink.future)
Source(List(ByteString(message + '\n'))).to(Sink(ccipher.plainTextOutbound)).run()
Source(List(ByteString(message + '\n'))).runWith(Sink(ccipher.plainTextOutbound))
val csession = Await.result(csessionf, duration)
val cdata = csession.data
Await.result(Source(cdata).map(_.decodeString("utf-8").split('\n').head).runWith(Sink.future), duration)

View file

@ -48,7 +48,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
serverConnection.read(256)
Source(tcpProcessor).to(Sink.ignore).run()
Source(tcpProcessor).runWith(Sink.ignore)
Source(testInput).runWith(Sink.publisher).subscribe(tcpProcessor)
serverConnection.waitRead() should be(expectedOutput)
@ -158,7 +158,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).to(Sink(conn.outputStream)).run()
Source(testInput).runWith(Sink(conn.outputStream))
val resultFuture = Source(conn.inputStream).fold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 3.seconds) should be(expectedOutput)
@ -178,7 +178,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).to(Sink(conn1.outputStream)).run()
Source(testInput).runWith(Sink(conn1.outputStream))
conn1.inputStream.subscribe(conn2.outputStream)
conn2.inputStream.subscribe(conn3.outputStream)
val resultFuture = Source(conn3.inputStream).fold(ByteString.empty)((acc, in) acc ++ in)

View file

@ -48,9 +48,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
serverConnection.read(256)
Source(tcpPublisher).to(Sink.ignore).run()
Source(tcpPublisher).runWith(Sink.ignore)
Source(testInput).to(Sink(tcpSubscriber)).run()
Source(testInput).runWith(Sink(tcpSubscriber))
serverConnection.waitRead() should be(expectedOutput)
server.close()
@ -162,7 +162,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).to(Sink(tcpSubscriber)).run()
Source(testInput).runWith(Sink(tcpSubscriber))
val resultFuture = Source(tcpPublisher).fold(ByteString.empty) { case (res, elem) res ++ elem }
Await.result(resultFuture, 3.seconds) should be(expectedOutput)
@ -181,7 +181,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = Iterator.range(0, 256).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).to(Sink(tcpSubscriber1)).run()
Source(testInput).runWith(Sink(tcpSubscriber1))
tcpPublisher1.subscribe(tcpSubscriber2)
tcpPublisher2.subscribe(tcpSubscriber3)
val resultFuture = Source(tcpPublisher3).fold(ByteString.empty) { case (res, elem) res ++ elem }

View file

@ -223,7 +223,7 @@ trait TcpHelper { this: TestKitBase ⇒
def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = {
val foreachSink = Sink.foreach[IncomingTcpConnection] { conn
conn.inbound.to(conn.outbound).run()
conn.inbound.runWith(conn.outbound)
}
val binding = bind(Flow[IncomingTcpConnection].to(foreachSink), serverAddress)
new EchoServer(binding.connection.get(foreachSink), binding)

View file

@ -53,7 +53,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).to(Sink(subscriber)).run()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -73,7 +73,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).to(Sink(subscriber)).run()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -101,7 +101,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).to(Sink(subscriber)).run()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -132,7 +132,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).to(Sink(subscriber)).run()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -160,7 +160,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).to(Sink(subscriber)).run()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -189,7 +189,7 @@ class FlowBufferSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(1, overflowStrategy = strategy).to(Sink(subscriber)).run()
Source(publisher).buffer(1, overflowStrategy = strategy).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run()
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber))
val subscription = subscriber.expectSubscription()
subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))

View file

@ -24,7 +24,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -42,7 +42,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -68,7 +68,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).to(Sink(subscriber)).run()
Source(publisher).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -27,7 +27,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -47,7 +47,7 @@ class FlowExpandSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
// Simply repeat the last element as an extrapolation step
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()
@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec {
val publisher = StreamTestKit.PublisherProbe[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).to(Sink(subscriber)).run()
Source(publisher).expand[Int, Int](seed = i i, extrapolate = i (i, i)).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -31,7 +31,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
to(Sink(probe)).run()
runWith(Sink(probe))
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -25,7 +25,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 3).mapAsync(n Future(n)).to(Sink(c)).run()
val p = Source(1 to 3).mapAsync(n Future(n)).runWith(Sink(c))
val sub = c.expectSubscription()
sub.request(2)
c.expectNext(1)

View file

@ -16,7 +16,6 @@ import akka.stream.impl.{ ActorBasedFlowMaterializer, ActorProcessorFactory, Fan
import java.util.concurrent.atomic.AtomicReference
sealed trait ActorFlowSink[-In] extends Sink[In] {
type MaterializedType
/**
* Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given
@ -64,7 +63,7 @@ trait SimpleActorFlowSink[-In] extends ActorFlowSink[In] {
*/
trait KeyedActorFlowSink[-In] extends ActorFlowSink[In] with KeyedSink[In]
private[scaladsl] object PublisherSink {
object PublisherSink {
def apply[T](): PublisherSink[T] = new PublisherSink[T]
def withFanout[T](initialBufferSize: Int, maximumBufferSize: Int): FanoutPublisherSink[T] =
new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize)
@ -76,7 +75,7 @@ private[scaladsl] object PublisherSink {
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
private[scaladsl] class PublisherSink[In] extends KeyedActorFlowSink[In] {
class PublisherSink[In] extends KeyedActorFlowSink[In] {
type MaterializedType = Publisher[In]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = flowPublisher
@ -84,7 +83,7 @@ private[scaladsl] class PublisherSink[In] extends KeyedActorFlowSink[In] {
override def toString: String = "PublisherSink"
}
private[scaladsl] final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] {
final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSize: Int) extends KeyedActorFlowSink[In] {
type MaterializedType = Publisher[In]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
@ -96,7 +95,7 @@ private[scaladsl] final case class FanoutPublisherSink[In](initialBufferSize: In
}
}
private[scaladsl] object FutureSink {
object FutureSink {
def apply[T](): FutureSink[T] = new FutureSink[T]
}
@ -107,7 +106,7 @@ private[scaladsl] object FutureSink {
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
private[scaladsl] class FutureSink[In] extends KeyedActorFlowSink[In] {
class FutureSink[In] extends KeyedActorFlowSink[In] {
type MaterializedType = Future[In]
@ -138,7 +137,7 @@ private[scaladsl] class FutureSink[In] extends KeyedActorFlowSink[In] {
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
private[scaladsl] final case object BlackholeSink extends SimpleActorFlowSink[Any] {
final case object BlackholeSink extends SimpleActorFlowSink[Any] {
override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
flowPublisher.subscribe(create(materializer, flowName)._1)
override def isActive: Boolean = true
@ -149,14 +148,14 @@ private[scaladsl] final case object BlackholeSink extends SimpleActorFlowSink[An
/**
* Attaches a subscriber to this stream.
*/
private[scaladsl] final case class SubscriberSink[In](subscriber: Subscriber[In]) extends SimpleActorFlowSink[In] {
final case class SubscriberSink[In](subscriber: Subscriber[In]) extends SimpleActorFlowSink[In] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) =
flowPublisher.subscribe(subscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = (subscriber, ())
}
private[scaladsl] object OnCompleteSink {
object OnCompleteSink {
private val SuccessUnit = Success[Unit](())
}
@ -165,7 +164,7 @@ private[scaladsl] object OnCompleteSink {
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] Unit) extends SimpleActorFlowSink[In] {
final case class OnCompleteSink[In](callback: Try[Unit] Unit) extends SimpleActorFlowSink[In] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) =
Source(flowPublisher).transform("onCompleteSink", () new Transformer[In, Unit] {
@ -186,7 +185,7 @@ private[scaladsl] final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Un
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
*/
private[scaladsl] final case class ForeachSink[In](f: In Unit) extends KeyedActorFlowSink[In] {
final case class ForeachSink[In](f: In Unit) extends KeyedActorFlowSink[In] {
override type MaterializedType = Future[Unit]
@ -214,7 +213,7 @@ private[scaladsl] final case class ForeachSink[In](f: In ⇒ Unit) extends Keyed
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) U) extends KeyedActorFlowSink[In] {
final case class FoldSink[U, In](zero: U)(f: (U, In) U) extends KeyedActorFlowSink[In] {
type MaterializedType = Future[U]
@ -241,7 +240,7 @@ private[scaladsl] final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) ex
/**
* A sink that immediately cancels its upstream upon materialization.
*/
private[scaladsl] final case object CancelSink extends SimpleActorFlowSink[Any] {
final case object CancelSink extends SimpleActorFlowSink[Any] {
override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = {
flowPublisher.subscribe(new Subscriber[Any] {
@ -257,7 +256,7 @@ private[scaladsl] final case object CancelSink extends SimpleActorFlowSink[Any]
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
private[scaladsl] final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] {
final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] {
type MaterializedType = ActorRef

View file

@ -19,7 +19,6 @@ import scala.util.Failure
import scala.util.Success
sealed trait ActorFlowSource[+Out] extends Source[Out] {
type MaterializedType
/**
* Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given
@ -82,7 +81,7 @@ trait KeyedActorFlowSource[+Out] extends ActorFlowSource[Out] with KeyedSource[O
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] {
final case class SubscriberSource[Out]() extends KeyedActorFlowSource[Out] {
override type MaterializedType = Subscriber[Out]
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] =
@ -96,7 +95,7 @@ private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlo
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
private[scaladsl] final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlowSource[Out] {
final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
p.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -110,7 +109,7 @@ private[scaladsl] final case class PublisherSource[Out](p: Publisher[Out]) exten
* in accordance with the demand coming from the downstream transformation
* steps.
*/
private[scaladsl] final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] {
final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -126,7 +125,7 @@ private[scaladsl] final case class IteratorSource[Out](iterator: Iterator[Out])
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
private[scaladsl] final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends SimpleActorFlowSource[Out] {
final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -141,7 +140,7 @@ private[scaladsl] final case class IterableSource[Out](iterable: immutable.Itera
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
private[scaladsl] final case class ThunkSource[Out](f: () Option[Out]) extends SimpleActorFlowSource[Out] {
final case class ThunkSource[Out](f: () Option[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -159,7 +158,7 @@ private[scaladsl] final case class ThunkSource[Out](f: () ⇒ Option[Out]) exten
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] {
final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -183,7 +182,7 @@ private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extend
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
private[scaladsl] final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends SimpleActorFlowSource[Out] {
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
@ -197,7 +196,7 @@ private[scaladsl] final case class TickSource[Out](initialDelay: FiniteDuration,
* completely, then draining the elements arriving from the second Source. If the first Source is infinite then the
* second Source will be never drained.
*/
private[scaladsl] final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] {
final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val concatter = Concat[Out]
@ -216,7 +215,7 @@ private[scaladsl] final case class ConcatSource[Out](source1: Source[Out], sourc
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
private[scaladsl] final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] {
final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] {
override type MaterializedType = ActorRef
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {

View file

@ -35,34 +35,11 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a [[SubscriberSource]] and
* and `Publisher` of a [[PublisherSink]].
*/
def runWith(source: KeyedSource[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = {
def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): (source.MaterializedType, sink.MaterializedType) = {
val m = source.via(this).to(sink).run()
(m.get(source), m.get(sink))
}
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
*
* The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]].
*/
def runWith(source: Source[In], sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
source.via(this).runWith(sink)
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
*
* The returned value will contain the materialized value of the `SourceWithKey`, e.g. `Subscriber` of a [[SubscriberSource]].
*/
def runWith(source: KeyedSource[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): source.MaterializedType =
source.via(this).to(sink).run().get(source)
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
*
* As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload.
*/
def runWith(source: Source[In], sink: Sink[Out])(implicit materializer: FlowMaterializer): Unit =
source.via(this).to(sink).run()
}
object Flow {

View file

@ -1320,18 +1320,24 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl
private[scaladsl] class MaterializedFlowGraph(materializedSources: Map[KeyedSource[_], Any], materializedSinks: Map[KeyedSink[_], Any])
extends MaterializedMap {
override def get(key: KeyedSource[_]): key.MaterializedType =
materializedSources.get(key) match {
case Some(matSource) matSource.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Source key [$key] doesn't exist in this flow graph")
override def get(key: Source[_]): key.MaterializedType =
key match {
case k: KeyedSource[_] materializedSources.get(k) match {
case Some(matSource) matSource.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Source key [$key] doesn't exist in this flow graph")
}
case _ ().asInstanceOf[key.MaterializedType]
}
def get(key: KeyedSink[_]): key.MaterializedType =
materializedSinks.get(key) match {
case Some(matSink) matSink.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Sink key [$key] doesn't exist in this flow graph")
def get(key: Sink[_]): key.MaterializedType =
key match {
case k: KeyedSink[_] materializedSinks.get(k) match {
case Some(matSink) matSink.asInstanceOf[key.MaterializedType]
case None
throw new IllegalArgumentException(s"Sink key [$key] doesn't exist in this flow graph")
}
case _ ().asInstanceOf[key.MaterializedType]
}
}

View file

@ -12,10 +12,10 @@ trait MaterializedMap {
/**
* Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/
def get(key: KeyedSource[_]): key.MaterializedType
def get(key: Source[_]): key.MaterializedType
/**
* Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[PublisherSink]].
*/
def get(key: KeyedSink[_]): key.MaterializedType
def get(key: Sink[_]): key.MaterializedType
}

View file

@ -48,8 +48,6 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops)
private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
override def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit =
source.to(this).run()
}
@ -91,11 +89,19 @@ private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_],
* `Source` input or `Sink` output.
*/
private[stream] class MaterializedPipe(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedMap {
override def get(key: KeyedSource[_]): key.MaterializedType =
if (key == sourceKey) matSource.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow")
override def get(key: Source[_]): key.MaterializedType =
key match {
case _: KeyedSource[_]
if (key == sourceKey) matSource.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow")
case _ ().asInstanceOf[key.MaterializedType]
}
override def get(key: KeyedSink[_]): key.MaterializedType =
if (key == sinkKey) matSink.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow")
override def get(key: Sink[_]): key.MaterializedType =
key match {
case _: KeyedSink[_]
if (key == sinkKey) matSink.asInstanceOf[key.MaterializedType]
else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow")
case _ ().asInstanceOf[key.MaterializedType]
}
}

View file

@ -13,19 +13,15 @@ import akka.stream.FlowMaterializer
* Can be used as a `Subscriber`
*/
trait Sink[-In] {
/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/
def runWith(source: KeyedSource[In])(implicit materializer: FlowMaterializer): source.MaterializedType =
source.to(this).run().get(source)
type MaterializedType
/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]].
*/
def runWith(source: Source[In])(implicit materializer: FlowMaterializer): Unit =
source.to(this).run()
def runWith(source: Source[In])(implicit materializer: FlowMaterializer): source.MaterializedType =
source.to(this).run().get(source)
}
object Sink {
@ -118,6 +114,4 @@ object Sink {
* to retrieve in order to access aspects of this sink (could be a completion Future
* or a cancellation handle, etc.)
*/
trait KeyedSink[-In] extends Sink[In] {
type MaterializedType
}
trait KeyedSink[-In] extends Sink[In]

View file

@ -17,6 +17,7 @@ import akka.stream.FlowMaterializer
* Can be used as a `Publisher`
*/
trait Source[+Out] extends FlowOps[Out] {
type MaterializedType
override type Repr[+O] <: Source[O]
/**
@ -33,7 +34,7 @@ trait Source[+Out] extends FlowOps[Out] {
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[Sink.fanoutPublisher]].
*/
def runWith(sink: KeyedSink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
to(sink).run().get(sink)
/**
@ -195,6 +196,4 @@ object Source {
* to retrieve in order to access aspects of this source (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait KeyedSource[+Out] extends Source[Out] {
type MaterializedType
}
trait KeyedSource[+Out] extends Source[Out]