diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 63ce043c33..310ead43c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -6,8 +6,14 @@ package akka.stream.actor import akka.actor.ActorRef import akka.actor.PoisonPill import akka.actor.Props -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.Broadcast +import akka.stream.scaladsl2.Flow +import akka.stream.scaladsl2.FlowGraph +import akka.stream.scaladsl2.FlowGraphImplicits +import akka.stream.scaladsl2.Merge +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.Sink +import akka.stream.scaladsl2.Source import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.EventFilter @@ -107,9 +113,9 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val p = ActorPublisher[String](ref) - val c = StreamTestKit.SubscriberProbe[String]() - p.subscribe(c) - val sub = c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + p.subscribe(s) + val sub = s.expectSubscription sub.request(2) probe.expectMsg(TotalDemand(2)) sub.request(3) @@ -121,68 +127,67 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val p = ActorPublisher[String](ref) - val c = StreamTestKit.SubscriberProbe[String]() - p.subscribe(c) - val sub = c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + p.subscribe(s) + val sub = s.expectSubscription sub.request(2) ref ! Produce("elem-1") ref ! Produce("elem-2") ref ! Produce("elem-3") - c.expectNext("elem-1") - c.expectNext("elem-2") - c.expectNoMsg(300.millis) + s.expectNext("elem-1") + s.expectNext("elem-2") + s.expectNoMsg(300.millis) sub.cancel() } "signal error" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) ref ! Err("wrong") - c.expectSubscription - c.expectError.getMessage should be("wrong") + s.expectSubscription + s.expectError.getMessage should be("wrong") } "signal error before subscribe" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) ref ! Err("early err") - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) - c.expectError.getMessage should be("early err") + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectError.getMessage should be("early err") } "drop onNext elements after cancel" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val p = ActorPublisher[String](ref) - val c = StreamTestKit.SubscriberProbe[String]() - p.subscribe(c) - val sub = c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + p.subscribe(s) + val sub = s.expectSubscription sub.request(2) ref ! Produce("elem-1") sub.cancel() ref ! Produce("elem-2") - c.expectNext("elem-1") - c.expectNoMsg(300.millis) - sub.cancel() + s.expectNext("elem-1") + s.expectNoMsg(300.millis) } "remember requested after restart" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val p = ActorPublisher[String](ref) - val c = StreamTestKit.SubscriberProbe[String]() - p.subscribe(c) - val sub = c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + p.subscribe(s) + val sub = s.expectSubscription sub.request(3) probe.expectMsg(TotalDemand(3)) ref ! Produce("elem-1") ref ! Boom ref ! Produce("elem-2") - c.expectNext("elem-1") - c.expectNext("elem-2") + s.expectNext("elem-1") + s.expectNext("elem-2") sub.request(5) probe.expectMsg(TotalDemand(6)) sub.cancel() @@ -191,54 +196,59 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { "signal onComplete" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) - val sub = c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + val sub = s.expectSubscription sub.request(3) ref ! Produce("elem-1") ref ! Complete - c.expectNext("elem-1") - c.expectComplete + s.expectNext("elem-1") + s.expectComplete } "signal immediate onComplete" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) ref ! Complete - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) - c.expectComplete + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectComplete } "only allow one subscriber" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) - c.expectSubscription - val c2 = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c2) - c2.expectError.getClass should be(classOf[IllegalStateException]) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectSubscription + val s2 = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s2) + s2.expectError.getClass should be(classOf[IllegalStateException]) } "signal onCompete when actor is stopped" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) - val c = StreamTestKit.SubscriberProbe[String]() - ActorPublisher[String](ref).subscribe(c) - c.expectSubscription + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectSubscription ref ! PoisonPill - c.expectComplete + s.expectComplete } "work together with Flow and ActorSubscriber" in { implicit val materializer = FlowMaterializer() val probe = TestProbe() - val snd = system.actorOf(senderProps) - val rcv = system.actorOf(receiverProps(probe.ref)) - Flow(ActorPublisher[Int](snd)).collect { + + val source = Source[Int](senderProps) + val sink = Sink[String](receiverProps(probe.ref)) + + val mat = source.collect { case n if n % 2 == 0 ⇒ "elem-" + n - }.produceTo(ActorSubscriber(rcv)) + }.connect(sink).run() + + val snd = mat.get(source) + val rcv = mat.get(sink) (1 to 3) foreach { snd ! _ } probe.expectMsg("elem-2") @@ -255,6 +265,46 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { expectTerminated(snd) } + "work in a FlowGraph" in { + implicit val materializer = FlowMaterializer() + val probe1 = TestProbe() + val probe2 = TestProbe() + + val senderRef1 = system.actorOf(senderProps) + val source1 = Source(ActorPublisher[Int](senderRef1)) + val source2 = Source[Int](senderProps) + + val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref)))) + val sink2 = Sink[String](receiverProps(probe2.ref)) + + val mat = FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + + val merge = Merge[Int] + val bcast = Broadcast[String] + + source1 ~> merge + source2 ~> merge + + merge ~> Flow[Int].map(_.toString) ~> bcast + + bcast ~> Flow[String].map(_ + "mark") ~> sink1 + bcast ~> sink2 + }.run() + + val senderRef2 = mat.get(source2) + + (0 to 10).foreach { + senderRef1 ! _ + senderRef2 ! _ + } + + (0 to 10).foreach { msg ⇒ + probe1.expectMsg(msg.toString + "mark") + probe2.expectMsg(msg.toString) + } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 83982c2d3f..2eb1ac4eb0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -5,9 +5,10 @@ package akka.stream.actor import akka.actor.{ Actor, ActorRef, Props } import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router } -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.Sink +import akka.stream.scaladsl2.Source import akka.stream.testkit.AkkaSpec -import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.testkit.ImplicitSender import scala.concurrent.duration._ @@ -100,8 +101,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "An ActorSubscriber" must { "receive requested elements" in { - val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(List(1, 2, 3)).produceTo(ActorSubscriber(ref)) + val ref = Source(List(1, 2, 3)).runWith(Sink(manualSubscriberProps(testActor))) expectNoMsg(200.millis) ref ! "ready" // requesting 2 expectMsg(OnNext(1)) @@ -113,16 +113,16 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { } "signal error" in { - val ref = system.actorOf(manualSubscriberProps(testActor)) val e = new RuntimeException("simulated") with NoStackTrace - Flow(() ⇒ throw e).produceTo(ActorSubscriber(ref)) + val ref = Source(() ⇒ throw e).runWith(Sink(manualSubscriberProps(testActor))) ref ! "ready" expectMsg(OnError(e)) } "remember requested after restart" in { + // creating actor with default supervision, because stream supervisor default strategy is to stop val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(1 to 7).produceTo(ActorSubscriber(ref)) + Source(1 to 7).connect(Sink(ActorSubscriber[Int](ref))).run() ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) @@ -139,8 +139,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { } "not deliver more after cancel" in { - val ref = system.actorOf(manualSubscriberProps(testActor)) - Flow(1 to 5).produceTo(ActorSubscriber(ref)) + val ref = Source(1 to 5).runWith(Sink(manualSubscriberProps(testActor))) ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) @@ -149,24 +148,21 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { } "work with OneByOneRequestStrategy" in { - val ref = system.actorOf(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)) - Flow(1 to 17).produceTo(ActorSubscriber(ref)) + Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } "work with WatermarkRequestStrategy" in { - val ref = system.actorOf(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))) - Flow(1 to 17).produceTo(ActorSubscriber(ref)) + Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } "suport custom max in flight request strategy with child workers" in { - val ref = system.actorOf(streamerProps) val N = 117 - Flow(1 to N).map(Msg(_, testActor)).produceTo(ActorSubscriber(ref)) - receiveN(N).toSet should be((1 to N).map(Done(_)).toSet) + Source(1 to N).map(Msg(_, testActor)).runWith(Sink(streamerProps)) + receiveN(N).toSet should be((1 to N).map(Done).toSet) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSink.scala index 7838bd2d1e..f0f79f4f7b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSink.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl2 +import akka.actor.ActorRef import akka.actor.Props import scala.collection.immutable @@ -252,3 +253,25 @@ private[scaladsl2] final case object CancelSink extends SimpleActorFlowSink[Any] }) } } + +/** + * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, + * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. + */ +private[scaladsl2] final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] { + + type MaterializedType = ActorRef + + override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): ActorRef = { + val (subscriber, subscriberRef) = create(materializer, flowName) + flowPublisher.subscribe(subscriber) + subscriberRef + } + + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { + val subscriberRef = materializer.actorOf(props, name = s"$flowName-props") + (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSource.scala index 5b54a9416e..e4422f56fc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/ActorFlowSource.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl2 +import akka.actor.ActorRef +import akka.actor.Props import akka.stream.impl._ import akka.stream.impl2.ActorBasedFlowMaterializer import akka.stream.impl2.Ast.AstNode @@ -208,3 +210,22 @@ private[scaladsl2] final case class ConcatSource[Out](source1: Source[Out], sour override def isActive: Boolean = false } + +/** + * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, + * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. + */ +private[scaladsl2] final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] { + override type MaterializedType = ActorRef + + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { + val (publisher, publisherRef) = create(materializer, flowName) + publisher.subscribe(flowSubscriber) + publisherRef + } + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { + val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props") + (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 492645e381..b30ab41653 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl2 +import akka.actor.Props import org.reactivestreams.Subscriber import scala.util.Try @@ -51,6 +52,13 @@ object Sink { builder.partialBuild().toSink(in) } + /** + * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorSubscriber]]. + */ + def apply[T](props: Props): PropsSink[T] = PropsSink[T](props) + /** * A `Sink` that immediately cancels its upstream after materialization. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index 6cf39e623f..cdfef4b65f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl2 +import akka.actor.Props import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousPublisherFromIterable } import org.reactivestreams.Publisher import scala.collection.immutable @@ -133,23 +134,6 @@ object Source { def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = TickSource(initialDelay, interval, tick) - /** - * Create a `Source` with one element. - * Every connected `Sink` of this stream will see an individual stream consisting of one element. - */ - def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) - - /** - * Create a `Source` with no elements, i.e. an empty stream that is completed immediately - * for every connected `Sink`. - */ - def empty[T](): Source[T] = apply(EmptyPublisher[T]) - - /** - * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. - */ - def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause)) - /** * Creates a `Source` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and * returns the `UndefinedSink`. @@ -169,6 +153,30 @@ object Source { builder.partialBuild().toSource(out) } + /** + * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorPublisher]]. + */ + def apply[T](props: Props): PropsSource[T] = PropsSource(props) + + /** + * Create a `Source` with one element. + * Every connected `Sink` of this stream will see an individual stream consisting of one element. + */ + def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) + + /** + * Create a `Source` with no elements, i.e. an empty stream that is completed immediately + * for every connected `Sink`. + */ + def empty[T](): Source[T] = apply(EmptyPublisher[T]) + + /** + * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. + */ + def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause)) + /** * Concatenates two sources so that the first element * emitted by the second source is emitted after the last element of the first