From 946faedd9578e04603766a076b019568b2b6e04d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 30 Mar 2015 14:42:30 +0200 Subject: [PATCH] !str #16521 Add ActorRefSink * also rename the factory for ActorSubscriber props Sink, from apply to actorSubscriber --- .../rst/java/stream-integrations.rst | 22 +++++++- .../docs/stream/ActorSubscriberDocSpec.scala | 4 +- .../rst/scala/stream-integrations.rst | 23 ++++++++- .../java/akka/stream/javadsl/SinkTest.java | 31 ++++++++---- .../stream/actor/ActorPublisherSpec.scala | 4 +- .../stream/actor/ActorSubscriberSpec.scala | 12 ++--- .../stream/scaladsl/ActorRefSinkSpec.scala | 50 +++++++++++++++++++ .../akka/stream/impl/ActorRefSinkActor.scala | 45 +++++++++++++++++ .../main/scala/akka/stream/impl/Sinks.scala | 29 +++++++++-- .../main/scala/akka/stream/javadsl/Sink.scala | 35 ++++++++++--- .../scala/akka/stream/scaladsl/Sink.scala | 35 ++++++++++--- 11 files changed, 247 insertions(+), 43 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index 65d31c87af..5b93cf8abf 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -7,8 +7,12 @@ Integration Integrating with Actors ======================= -:class:`AbstractActorPublisher` and :class:`AbstractActorSubscriber` are two traits that provides support for -implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`. +For piping the elements of a stream as messages to an ordinary actor you can use the +``Sink.actorRef``. + +For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are +provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with +an :class:`Actor`. These can be consumed by other Reactive Stream libraries or used as a Akka Streams :class:`Source` or :class:`Sink`. @@ -24,6 +28,20 @@ Akka Streams :class:`Source` or :class:`Sink`. prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used easily from any language level. +Sink.actorRef +^^^^^^^^^^^^^ + +The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates +the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage`` +will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure`` +message will be sent to the destination actor. + +.. warning:: + + There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming + the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors + it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + limiting stage in front of this stage. ActorPublisher ^^^^^^^^^^^^^^ diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala index bb5dccde98..d75e728c91 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala @@ -80,10 +80,10 @@ class ActorSubscriberDocSpec extends AkkaSpec { //#actor-subscriber-usage val N = 117 Source(1 to N).map(WorkerPool.Msg(_, replyTo)) - .runWith(Sink(WorkerPool.props)) + .runWith(Sink.actorSubscriber(WorkerPool.props)) //#actor-subscriber-usage receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet) } -} \ No newline at end of file +} diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index eecfe8d29a..7016c3070f 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -7,8 +7,12 @@ Integration Integrating with Actors ======================= -:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for -implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`. +For piping the elements of a stream as messages to an ordinary actor you can use the +``Sink.actorRef``. + +For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are +provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with +an :class:`Actor`. These can be consumed by other Reactive Stream libraries or used as a Akka Streams :class:`Source` or :class:`Sink`. @@ -19,6 +23,21 @@ Akka Streams :class:`Source` or :class:`Sink`. because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the the stream may deadlock. +Sink.actorRef +^^^^^^^^^^^^^ + +The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates +the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage`` +will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure`` +message will be sent to the destination actor. + +.. warning:: + + There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming + the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors + it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + limiting stage in front of this stage. + ActorPublisher ^^^^^^^^^^^^^^ diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 10233f4e9a..081bd43bdf 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -3,12 +3,11 @@ */ package akka.stream.javadsl; -import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; -import akka.stream.StreamTest; -import akka.stream.javadsl.japi.Function2; -import akka.stream.testkit.AkkaSpec; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -16,8 +15,10 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.util.ArrayList; -import java.util.List; +import akka.stream.StreamTest; +import akka.stream.javadsl.japi.Function2; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; public class SinkTest extends StreamTest { public SinkTest() { @@ -31,14 +32,14 @@ public class SinkTest extends StreamTest { @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { final Sink> pubSink = Sink.fanoutPublisher(2, 2); + @SuppressWarnings("unused") final Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); } @Test public void mustBeAbleToUseFuture() throws Exception { final Sink> futSink = Sink.head(); - final List list = new ArrayList(); - list.add(1); + final List list = Collections.singletonList(1); final Future future = Source.from(list).runWith(futSink, materializer); assert Await.result(future, Duration.create("1 second")).equals(1); } @@ -50,7 +51,19 @@ public class SinkTest extends StreamTest { return arg1 + arg2; } }); + @SuppressWarnings("unused") Future integerFuture = Source.from(new ArrayList()).runWith(foldSink, materializer); } + + @Test + public void mustBeAbleToUseActorRefSink() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Sink actorRefSink = Sink.actorRef(probe.getRef(), "done"); + Source.from(Arrays.asList(1, 2, 3)).runWith(actorRefSink, materializer); + probe.expectMsgEquals(1); + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + probe.expectMsgEquals("done"); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 7548d514ce..79a96679ba 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -254,7 +254,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val probe = TestProbe() val source = Source[Int](senderProps) - val sink = Sink[String](receiverProps(probe.ref)) + val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref)) val (snd, rcv) = source.collect { case n if n % 2 == 0 ⇒ "elem-" + n @@ -284,7 +284,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val source1 = Source(ActorPublisher[Int](senderRef1)) val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref)))) - val sink2 = Sink[String](receiverProps(probe2.ref)) + val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref)) val senderRef2 = FlowGraph.closed(Source[Int](senderProps)) { implicit b ⇒ source2 ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 3c8277fa66..faf5b3d498 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -100,7 +100,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "An ActorSubscriber" must { "receive requested elements" in { - val ref = Source(List(1, 2, 3)).runWith(Sink(manualSubscriberProps(testActor))) + val ref = Source(List(1, 2, 3)).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) expectNoMsg(200.millis) ref ! "ready" // requesting 2 expectMsg(OnNext(1)) @@ -113,7 +113,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { "signal error" in { val e = new RuntimeException("simulated") with NoStackTrace - val ref = Source(() ⇒ throw e).runWith(Sink(manualSubscriberProps(testActor))) + val ref = Source(() ⇒ throw e).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) ref ! "ready" expectMsg(OnError(e)) } @@ -138,7 +138,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { } "not deliver more after cancel" in { - val ref = Source(1 to 5).runWith(Sink(manualSubscriberProps(testActor))) + val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) ref ! "ready" expectMsg(OnNext(1)) expectMsg(OnNext(2)) @@ -147,20 +147,20 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { } "work with OneByOneRequestStrategy" in { - Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))) + Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } "work with WatermarkRequestStrategy" in { - Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))) + Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))) for (n ← 1 to 17) expectMsg(OnNext(n)) expectMsg(OnComplete) } "suport custom max in flight request strategy with child workers" in { val N = 117 - Source(1 to N).map(Msg(_, testActor)).runWith(Sink(streamerProps)) + Source(1 to N).map(Msg(_, testActor)).runWith(Sink.actorSubscriber(streamerProps)) receiveN(N).toSet should be((1 to N).map(Done).toSet) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala new file mode 100644 index 0000000000..6a54803b0a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorFlowMaterializer +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props + +object ActorRefSinkSpec { + case class Fw(ref: ActorRef) extends Actor { + def receive = { + case msg ⇒ ref forward msg + } + } +} + +class ActorRefSinkSpec extends AkkaSpec { + import ActorRefSinkSpec._ + implicit val mat = ActorFlowMaterializer() + + "A ActorRefSink" must { + + "send the elements to the ActorRef" in { + Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done")) + expectMsg(1) + expectMsg(2) + expectMsg(3) + expectMsg("done") + } + + "cancel stream when actor terminates" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher")) + Source(publisher).runWith(Sink.actorRef(fw, onCompleteMessage = "done")) + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + autoPublisher.sendNext(1) + autoPublisher.sendNext(2) + expectMsg(1) + expectMsg(2) + system.stop(fw) + autoPublisher.subscription.expectCancellation() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala new file mode 100644 index 0000000000..faaec3722c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.actor.ActorSubscriber +import akka.actor.ActorRef +import akka.stream.actor.ActorSubscriberMessage +import akka.actor.Status +import akka.stream.actor.WatermarkRequestStrategy +import akka.actor.Props +import akka.actor.Terminated + +/** + * INTERNAL API + */ +private[akka] object ActorRefSinkActor { + def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props = + Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage)) +} + +/** + * INTERNAL API + */ +private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { + import ActorSubscriberMessage._ + + override val requestStrategy = WatermarkRequestStrategy(highWatermark) + + context.watch(ref) + + def receive = { + case OnNext(elem) ⇒ + ref ! elem + case OnError(cause) ⇒ + ref ! Status.Failure(cause) + context.stop(self) + case OnComplete ⇒ + ref ! onCompleteMessage + context.stop(self) + case Terminated(`ref`) ⇒ + context.stop(self) // will cancel upstream + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 63fc280cf2..161ccb7214 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -190,13 +190,34 @@ private[akka] final class CancelSink(val attributes: OperationAttributes, shape: * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -private[akka] final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { +private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val subscriberRef = materializer.actorOf(props, name = s"$flowName-props") + val subscriberRef = materializer.actorOf(props, name = s"$flowName-actorSubscriber") (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef) } - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new PropsSink[In](props, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, amendShape(attr)) + override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape) + override def withAttributes(attr: OperationAttributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr)) } + +/** + * INTERNAL API + */ +private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, + val attributes: OperationAttributes, + shape: SinkShape[In]) extends SinkModule[In, Unit](shape) { + + override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { + val subscriberRef = materializer.actorOf( + ActorRefSinkActor.props(ref, materializer.settings.maxInputBufferSize, onCompleteMessage), + name = s"$flowName-actorRef") + (akka.stream.actor.ActorSubscriber[In](subscriberRef), ()) + } + + override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = + new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) + override def withAttributes(attr: OperationAttributes): Module = + new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) +} + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 96dc483c39..86362cd658 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -39,14 +39,6 @@ object Sink { def create[In](subs: Subscriber[In]): Sink[In, Unit] = new Sink(scaladsl.Sink(subs)) - /** - * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorSubscriber]]. - */ - def create[T](props: Props): Sink[T, ActorRef] = - new Sink(scaladsl.Sink.apply(props)) - /** * A `Sink` that immediately cancels its upstream after materialization. */ @@ -96,6 +88,33 @@ object Sink { def head[In](): Sink[In, Future[In]] = new Sink(scaladsl.Sink.head[In]) + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be cancelled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a [[akka.actor.Status.Failure]] + * message will be sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this stage. + * + */ + def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] = + new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage)) + + /** + * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorSubscriber]]. + */ + def actorSubscriber[T](props: Props): Sink[T, ActorRef] = + new Sink(scaladsl.Sink.actorSubscriber(props)) + /** * A graph with the shape of a sink logically is a sink, this method makes * it so also in type. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 00a573f2fd..e19dfc21cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -65,14 +65,6 @@ object Sink extends SinkApply { def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink"))) - /** - * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorSubscriber]]. - */ - def apply[T](props: Props): Sink[T, ActorRef] = - new Sink(new PropsSink(props, none, shape("PropsSink"))) - /** * A `Sink` that immediately cancels its upstream after materialization. */ @@ -206,4 +198,31 @@ object Sink extends SinkApply { Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink") } + + /** + * Sends the elements of the stream to the given `ActorRef`. + * If the target actor terminates the stream will be cancelled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure a [[akka.actor.Status.Failure]] + * message will be sent to the destination actor. + * + * It will request at most `maxInputBufferSize` number of elements from + * upstream, but there is no back-pressure signal from the destination actor, + * i.e. if the actor is not consuming the messages fast enough the mailbox + * of the actor will grow. For potentially slow consumer actors it is recommended + * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate + * limiting stage in front of this stage. + */ + def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] = + new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink"))) + + /** + * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorSubscriber]]. + */ + def actorSubscriber[T](props: Props): Sink[T, ActorRef] = + new Sink(new ActorSubscriberSink(props, none, shape("ActorSubscriberSink"))) + }