Merge pull request #16582 from akka/wip-16549-actor-integration-patriknw

=str #16549 doc: ActorPublisher and ActorSubscriber
This commit is contained in:
drewhk 2014-12-20 12:03:37 +01:00
commit 6445632a9d
4 changed files with 275 additions and 10 deletions

View file

@ -0,0 +1,94 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.annotation.tailrec
import akka.actor.Props
import akka.stream.FlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
object ActorPublisherDocSpec {
//#job-manager
object JobManager {
def props: Props = Props[JobManager]
final case class Job(payload: String)
case object JobAccepted
case object JobDenied
}
class JobManager extends ActorPublisher[JobManager.Job] {
import akka.stream.actor.ActorPublisherMessage._
import JobManager._
val MaxBufferSize = 100
var buf = Vector.empty[Job]
def receive = {
case job: Job if buf.size == MaxBufferSize =>
sender() ! JobDenied
case job: Job =>
sender() ! JobAccepted
if (buf.isEmpty && totalDemand > 0)
onNext(job)
else {
buf :+= job
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
//#job-manager
}
class ActorPublisherDocSpec extends AkkaSpec {
import ActorPublisherDocSpec._
implicit val mat = FlowMaterializer()
"illustrate usage of ActorPublisher" in {
def println(s: String): Unit =
testActor ! s
//#actor-publisher-usage
val jobManagerSource = Source[JobManager.Job](JobManager.props)
val materializedMap = jobManagerSource
.map(_.payload.toUpperCase)
.map { elem => println(elem); elem }
.to(Sink.ignore)
.run()
val ref = materializedMap.get(jobManagerSource)
ref ! JobManager.Job("a")
ref ! JobManager.Job("b")
ref ! JobManager.Job("c")
//#actor-publisher-usage
expectMsg("A")
expectMsg("B")
expectMsg("C")
}
}

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.ActorRefRoutee
import akka.routing.RoundRobinRoutingLogic
import akka.routing.Router
import akka.stream.FlowMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
object ActorSubscriberDocSpec {
//#worker-pool
object WorkerPool {
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def props: Props = Props(new WorkerPool)
}
class WorkerPool extends ActorSubscriber {
import WorkerPool._
import ActorSubscriberMessage._
val MaxQueueSize = 10
var queue = Map.empty[Int, ActorRef]
val router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker]))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) {
override def inFlightInternally: Int = queue.size
}
def receive = {
case OnNext(Msg(id, replyTo))
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id)
queue(id) ! Done(id)
queue -= id
}
}
class Worker extends Actor {
import WorkerPool._
def receive = {
case Work(id)
// ...
sender() ! Reply(id)
}
}
//#worker-pool
}
class ActorSubscriberDocSpec extends AkkaSpec {
import ActorSubscriberDocSpec._
implicit val mat = FlowMaterializer()
"illustrate usage of ActorSubscriber" in {
val replyTo = testActor
//#actor-subscriber-usage
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink(WorkerPool.props))
//#actor-subscriber-usage
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)
}
}

View file

@ -0,0 +1,91 @@
.. _stream-integration-actor-scala:
Integrating with Actors
=======================
:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for
implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as a
Akka Streams :class:`Source` or :class:`Sink`.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
ActorPublisher
^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisher.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisher.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage
You can only attach one subscriber to this publisher. Use ``Sink.fanoutPublisher`` to enable
multiple subscribers.
ActorSubscriber
^^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
stream subscriber with full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``ActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage

View file

@ -26,6 +26,7 @@ and for best results we recommend the following approach:
stream-integration-external stream-integration-external
stream-integration-reactive-streams stream-integration-reactive-streams
stream-integration-actor
Motivation Motivation
========== ==========
@ -448,18 +449,8 @@ Flexi Route
Integrating with Actors Integrating with Actors
======================= =======================
// TODO: Source.subscriber
// TODO: Sink.publisher
// TODO: Use the ImplicitFlowMaterializer if you have streams starting from inside actors. // TODO: Use the ImplicitFlowMaterializer if you have streams starting from inside actors.
// TODO: how do I create my own sources / sinks? // TODO: how do I create my own sources / sinks?
ActorPublisher
^^^^^^^^^^^^^^
ActorSubscriber
^^^^^^^^^^^^^^^
// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActoSubscriber???