pekko/akka-docs/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala

90 lines
2.2 KiB
Scala
Raw Normal View History

2014-12-19 15:22:45 +01:00
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
2014-12-19 15:22:45 +01:00
*/
package docs.stream
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.ActorRefRoutee
import akka.routing.RoundRobinRoutingLogic
import akka.routing.Router
import akka.stream.ActorMaterializer
2014-12-19 15:22:45 +01:00
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
object ActorSubscriberDocSpec {
//#worker-pool
object WorkerPool {
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def props: Props = Props(new WorkerPool)
}
class WorkerPool extends ActorSubscriber {
import WorkerPool._
import ActorSubscriberMessage._
val MaxQueueSize = 10
var queue = Map.empty[Int, ActorRef]
val router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker]))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) {
override def inFlightInternally: Int = queue.size
}
def receive = {
2014-12-22 16:18:26 +01:00
case OnNext(Msg(id, replyTo)) =>
2014-12-19 15:22:45 +01:00
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
2014-12-22 16:18:26 +01:00
case Reply(id) =>
2014-12-19 15:22:45 +01:00
queue(id) ! Done(id)
queue -= id
}
}
class Worker extends Actor {
import WorkerPool._
def receive = {
2014-12-22 16:18:26 +01:00
case Work(id) =>
2014-12-19 15:22:45 +01:00
// ...
sender() ! Reply(id)
}
}
//#worker-pool
}
class ActorSubscriberDocSpec extends AkkaSpec {
import ActorSubscriberDocSpec._
implicit val materializer = ActorMaterializer()
2014-12-19 15:22:45 +01:00
"illustrate usage of ActorSubscriber" in {
val replyTo = testActor
//#actor-subscriber-usage
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink.actorSubscriber(WorkerPool.props))
2014-12-19 15:22:45 +01:00
//#actor-subscriber-usage
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)
}
}