From 4dcb0d9834954b7f8e67726e9866c814da1a8e4e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 19 Dec 2014 15:22:45 +0100 Subject: [PATCH] =str #16549 doc: ActorPublisher --- .../docs/stream/ActorPublisherDocSpec.scala | 94 +++++++++++++++++++ .../docs/stream/ActorSubscriberDocSpec.scala | 89 ++++++++++++++++++ .../rst/scala/stream-integration-actor.rst | 91 ++++++++++++++++++ akka-docs-dev/rst/scala/stream.rst | 11 +-- 4 files changed, 275 insertions(+), 10 deletions(-) create mode 100644 akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala create mode 100644 akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala create mode 100644 akka-docs-dev/rst/scala/stream-integration-actor.rst diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala new file mode 100644 index 0000000000..ac7b73e5d6 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import scala.annotation.tailrec +import akka.actor.Props +import akka.stream.FlowMaterializer +import akka.stream.actor.ActorPublisher +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.testkit.AkkaSpec + +object ActorPublisherDocSpec { + + //#job-manager + object JobManager { + def props: Props = Props[JobManager] + + final case class Job(payload: String) + case object JobAccepted + case object JobDenied + } + + class JobManager extends ActorPublisher[JobManager.Job] { + import akka.stream.actor.ActorPublisherMessage._ + import JobManager._ + + val MaxBufferSize = 100 + var buf = Vector.empty[Job] + + def receive = { + case job: Job if buf.size == MaxBufferSize => + sender() ! JobDenied + case job: Job => + sender() ! JobAccepted + if (buf.isEmpty && totalDemand > 0) + onNext(job) + else { + buf :+= job + deliverBuf() + } + case Request(_) => + deliverBuf() + case Cancel => + context.stop(self) + } + + @tailrec final def deliverBuf(): Unit = + if (totalDemand > 0) { + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + val (use, keep) = buf.splitAt(Int.MaxValue) + buf = keep + use foreach onNext + deliverBuf() + } + } + } + //#job-manager +} + +class ActorPublisherDocSpec extends AkkaSpec { + import ActorPublisherDocSpec._ + + implicit val mat = FlowMaterializer() + + "illustrate usage of ActorPublisher" in { + def println(s: String): Unit = + testActor ! s + + //#actor-publisher-usage + val jobManagerSource = Source[JobManager.Job](JobManager.props) + val materializedMap = jobManagerSource + .map(_.payload.toUpperCase) + .map { elem => println(elem); elem } + .to(Sink.ignore) + .run() + + val ref = materializedMap.get(jobManagerSource) + ref ! JobManager.Job("a") + ref ! JobManager.Job("b") + ref ! JobManager.Job("c") + //#actor-publisher-usage + + expectMsg("A") + expectMsg("B") + expectMsg("C") + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala new file mode 100644 index 0000000000..7bc1dc656b --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.routing.ActorRefRoutee +import akka.routing.RoundRobinRoutingLogic +import akka.routing.Router +import akka.stream.FlowMaterializer +import akka.stream.actor.ActorSubscriber +import akka.stream.actor.ActorSubscriberMessage +import akka.stream.actor.MaxInFlightRequestStrategy +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.testkit.AkkaSpec + +object ActorSubscriberDocSpec { + //#worker-pool + object WorkerPool { + case class Msg(id: Int, replyTo: ActorRef) + case class Work(id: Int) + case class Reply(id: Int) + case class Done(id: Int) + + def props: Props = Props(new WorkerPool) + } + + class WorkerPool extends ActorSubscriber { + import WorkerPool._ + import ActorSubscriberMessage._ + + val MaxQueueSize = 10 + var queue = Map.empty[Int, ActorRef] + + val router = { + val routees = Vector.fill(3) { + ActorRefRoutee(context.actorOf(Props[Worker])) + } + Router(RoundRobinRoutingLogic(), routees) + } + + override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) { + override def inFlightInternally: Int = queue.size + } + + def receive = { + case OnNext(Msg(id, replyTo)) ⇒ + queue += (id -> replyTo) + assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}") + router.route(Work(id), self) + case Reply(id) ⇒ + queue(id) ! Done(id) + queue -= id + } + } + + class Worker extends Actor { + import WorkerPool._ + def receive = { + case Work(id) ⇒ + // ... + sender() ! Reply(id) + } + } + //#worker-pool + +} + +class ActorSubscriberDocSpec extends AkkaSpec { + import ActorSubscriberDocSpec._ + + implicit val mat = FlowMaterializer() + + "illustrate usage of ActorSubscriber" in { + val replyTo = testActor + + //#actor-subscriber-usage + val N = 117 + Source(1 to N).map(WorkerPool.Msg(_, replyTo)) + .runWith(Sink(WorkerPool.props)) + //#actor-subscriber-usage + + receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet) + } + +} \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-integration-actor.rst b/akka-docs-dev/rst/scala/stream-integration-actor.rst new file mode 100644 index 0000000000..203004f985 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-integration-actor.rst @@ -0,0 +1,91 @@ +.. _stream-integration-actor-scala: + +Integrating with Actors +======================= + +:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for +implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`. + +These can be consumed by other Reactive Stream libraries or used as a +Akka Streams :class:`Source` or :class:`Sink`. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors, + because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the + the stream may deadlock. + +ActorPublisher +^^^^^^^^^^^^^^ + +Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a +stream publisher that keeps track of the subscription life cycle and requested elements. + +Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: + +.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#job-manager + +You send elements to the stream by calling ``onNext``. You are allowed to send as many +elements as have been requested by the stream subscriber. This amount can be inquired with +``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, +otherwise ``onNext`` will throw ``IllegalStateException``. + +When the stream subscriber requests more elements the ``ActorPublisher.Request`` message +is delivered to this actor, and you can act on that event. The ``totalDemand`` +is updated automatically. + +When the stream subscriber cancels the subscription the ``ActorPublisher.Cancel`` message +is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. + +You can complete the stream by calling ``onComplete``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +You can terminate the stream with failure by calling ``onError``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` +method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when +the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform +cleanup and stop itself. + +If the actor is stopped the stream will be completed, unless it was not already terminated with +failure, completed or canceled. + +More detailed information can be found in the API documentation. + +This is how it can be used as input :class:`Source` to a :class:`Flow`: + +.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage + +You can only attach one subscriber to this publisher. Use ``Sink.fanoutPublisher`` to enable +multiple subscribers. + +ActorSubscriber +^^^^^^^^^^^^^^^ + +Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a +stream subscriber with full control of stream back pressure. It will receive +``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` +messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. + +Here is an example of such an actor. It dispatches incoming jobs to child worker actors: + +.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool + +Subclass must define the ``RequestStrategy`` to control stream back pressure. +After each incoming message the ``ActorSubscriber`` will automatically invoke +the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. + +* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. +* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or + delegated to other actors. +* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with + ``ZeroRequestStrategy`` or some other strategy. In that case + you must also call ``request`` when the actor is started or when it is ready, otherwise + it will not receive any elements. + +More detailed information can be found in the API documentation. + +This is how it can be used as output :class:`Sink` to a :class:`Flow`: + +.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index 4ec77c3a25..c052b9a85c 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -26,6 +26,7 @@ and for best results we recommend the following approach: stream-integration-external stream-integration-reactive-streams + stream-integration-actor Motivation ========== @@ -448,18 +449,8 @@ Flexi Route Integrating with Actors ======================= -// TODO: Source.subscriber - -// TODO: Sink.publisher - // TODO: Use the ImplicitFlowMaterializer if you have streams starting from inside actors. // TODO: how do I create my own sources / sinks? -ActorPublisher -^^^^^^^^^^^^^^ -ActorSubscriber -^^^^^^^^^^^^^^^ - -// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActoSubscriber???