DOC: Improved scheduler doc. Split into Java/Scala samples
This commit is contained in:
parent
c57b2732e7
commit
fabe475f64
15 changed files with 188 additions and 57 deletions
257
akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala
Normal file
257
akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
package akka.docs.actor
|
||||
|
||||
//#imports1
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.dispatch.Future
|
||||
|
||||
//#imports1
|
||||
|
||||
//#imports2
|
||||
import akka.actor.ActorSystem
|
||||
//#imports2
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
//#my-actor
|
||||
class MyActor extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
def receive = {
|
||||
case "test" ⇒ log.info("received test")
|
||||
case _ ⇒ log.info("received unknown message")
|
||||
}
|
||||
}
|
||||
//#my-actor
|
||||
|
||||
case class DoIt(msg: ImmutableMessage)
|
||||
case class Message(s: String)
|
||||
|
||||
//#context-actorOf
|
||||
class FirstActor extends Actor {
|
||||
val myActor = context.actorOf(Props[MyActor])
|
||||
//#context-actorOf
|
||||
//#anonymous-actor
|
||||
def receive = {
|
||||
case m: DoIt ⇒
|
||||
context.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case DoIt(msg) ⇒
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
sender ! replyMsg
|
||||
context.stop(self)
|
||||
}
|
||||
def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" }
|
||||
})) ! m
|
||||
|
||||
case replyMsg: String ⇒ sender ! replyMsg
|
||||
}
|
||||
//#anonymous-actor
|
||||
}
|
||||
|
||||
//#system-actorOf
|
||||
object Main extends App {
|
||||
val system = ActorSystem("MySystem")
|
||||
val myActor = system.actorOf(Props[MyActor])
|
||||
//#system-actorOf
|
||||
}
|
||||
|
||||
class ReplyException extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
//#reply-exception
|
||||
try {
|
||||
val result = operation()
|
||||
sender ! result
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
sender ! akka.actor.Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
//#reply-exception
|
||||
}
|
||||
|
||||
def operation(): String = { "Hi" }
|
||||
|
||||
}
|
||||
|
||||
//#swapper
|
||||
case object Swap
|
||||
class Swapper extends Actor {
|
||||
import context._
|
||||
val log = Logging(system, this)
|
||||
|
||||
def receive = {
|
||||
case Swap ⇒
|
||||
log.info("Hi")
|
||||
become {
|
||||
case Swap ⇒
|
||||
log.info("Ho")
|
||||
unbecome() // resets the latest 'become' (just for fun)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SwapperApp extends App {
|
||||
val system = ActorSystem("SwapperSystem")
|
||||
val swap = system.actorOf(Props[Swapper])
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
swap ! Swap // logs Hi
|
||||
swap ! Swap // logs Ho
|
||||
}
|
||||
//#swapper
|
||||
|
||||
//#receive-orElse
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
def specificMessageHandler: Receive
|
||||
|
||||
// generic message handler
|
||||
def genericMessageHandler: Receive = {
|
||||
case event ⇒ printf("generic: %s\n", event)
|
||||
}
|
||||
|
||||
def receive = specificMessageHandler orElse genericMessageHandler
|
||||
}
|
||||
|
||||
class SpecificActor extends GenericActor {
|
||||
def specificMessageHandler = {
|
||||
case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMsg(subject: String)
|
||||
//#receive-orElse
|
||||
|
||||
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
|
||||
"import context" in {
|
||||
//#import-context
|
||||
class FirstActor extends Actor {
|
||||
import context._
|
||||
val myActor = actorOf(Props[MyActor])
|
||||
def receive = {
|
||||
case x ⇒ myActor ! x
|
||||
}
|
||||
}
|
||||
//#import-context
|
||||
|
||||
val first = system.actorOf(Props(new FirstActor))
|
||||
system.stop(first)
|
||||
|
||||
}
|
||||
|
||||
"creating actor with AkkaSpec.actorOf" in {
|
||||
val myActor = system.actorOf(Props[MyActor])
|
||||
|
||||
// testing the actor
|
||||
|
||||
// TODO: convert docs to AkkaSpec(Map(...))
|
||||
val filter = EventFilter.custom {
|
||||
case e: Logging.Info ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
system.eventStream.publish(TestEvent.Mute(filter))
|
||||
system.eventStream.subscribe(testActor, classOf[Logging.Info])
|
||||
|
||||
myActor ! "test"
|
||||
expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true }
|
||||
|
||||
myActor ! "unknown"
|
||||
expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true }
|
||||
|
||||
system.eventStream.unsubscribe(testActor)
|
||||
system.eventStream.publish(TestEvent.UnMute(filter))
|
||||
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"creating actor with constructor" in {
|
||||
class MyActor(arg: String) extends Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}
|
||||
|
||||
//#creating-constructor
|
||||
// allows passing in arguments to the MyActor constructor
|
||||
val myActor = system.actorOf(Props(new MyActor("...")))
|
||||
//#creating-constructor
|
||||
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"creating actor with Props" in {
|
||||
//#creating-props
|
||||
import akka.actor.Props
|
||||
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||
//#creating-props
|
||||
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"using ask" in {
|
||||
//#using-ask
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x: String ⇒ sender ! x.toUpperCase
|
||||
case n: Int ⇒ sender ! (n + 1)
|
||||
}
|
||||
}
|
||||
|
||||
val myActor = system.actorOf(Props(new MyActor))
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val future = myActor ? "hello"
|
||||
for (x ← future) println(x) //Prints "hello"
|
||||
|
||||
val result: Future[Int] = for (x ← (myActor ? 3).mapTo[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"using receiveTimeout" in {
|
||||
//#receive-timeout
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.util.duration._
|
||||
class MyActor extends Actor {
|
||||
context.setReceiveTimeout(30 milliseconds)
|
||||
def receive = {
|
||||
case "Hello" ⇒ //...
|
||||
case ReceiveTimeout ⇒ throw new RuntimeException("received timeout")
|
||||
}
|
||||
}
|
||||
//#receive-timeout
|
||||
}
|
||||
|
||||
"using hot-swap" in {
|
||||
//#hot-swap-actor
|
||||
class HotSwapActor extends Actor {
|
||||
import context._
|
||||
def angry: Receive = {
|
||||
case "foo" ⇒ sender ! "I am already angry?"
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
|
||||
def happy: Receive = {
|
||||
case "bar" ⇒ sender ! "I am already happy :-)"
|
||||
case "foo" ⇒ become(angry)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "foo" ⇒ become(angry)
|
||||
case "bar" ⇒ become(happy)
|
||||
}
|
||||
}
|
||||
//#hot-swap-actor
|
||||
|
||||
val actor = system.actorOf(Props(new HotSwapActor))
|
||||
}
|
||||
}
|
||||
53
akka-docs/scala/code/akka/docs/actor/SchedulerDocSpec.scala
Normal file
53
akka-docs/scala/code/akka/docs/actor/SchedulerDocSpec.scala
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
package akka.docs.actor
|
||||
|
||||
//#imports1
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Props
|
||||
import akka.util.duration._
|
||||
|
||||
//#imports1
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
|
||||
class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
"schedule a one-off task" in {
|
||||
//#schedule-one-off-message
|
||||
//Schedules to send the "foo"-message to the testActor after 50ms
|
||||
system.scheduler.scheduleOnce(50 milliseconds, testActor, "foo")
|
||||
//#schedule-one-off-message
|
||||
|
||||
expectMsg(1 second, "foo")
|
||||
|
||||
//#schedule-one-off-thunk
|
||||
//Schedules a function to be executed (send the current time) to the testActor after 50ms
|
||||
system.scheduler.scheduleOnce(50 milliseconds) {
|
||||
testActor ! System.currentTimeMillis
|
||||
}
|
||||
//#schedule-one-off-thunk
|
||||
|
||||
}
|
||||
|
||||
"schedule a recurring task" in {
|
||||
//#schedule-recurring
|
||||
val Tick = "tick"
|
||||
val tickActor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Tick ⇒ //Do something
|
||||
}
|
||||
}))
|
||||
//This will schedule to send the Tick-message
|
||||
//to the tickActor after 0ms repeating every 50ms
|
||||
val cancellable =
|
||||
system.scheduler.schedule(0 milliseconds,
|
||||
50 milliseconds,
|
||||
tickActor,
|
||||
Tick)
|
||||
|
||||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel()
|
||||
//#schedule-recurring
|
||||
system.stop(tickActor)
|
||||
}
|
||||
}
|
||||
48
akka-docs/scala/code/akka/docs/actor/UnnestedReceives.scala
Normal file
48
akka-docs/scala/code/akka/docs/actor/UnnestedReceives.scala
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
package akka.docs.actor
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* Requirements are as follows:
|
||||
* The first thing the actor needs to do, is to subscribe to a channel of events,
|
||||
* Then it must replay (process) all "old" events
|
||||
* Then it has to wait for a GoAhead signal to begin processing the new events
|
||||
* It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
|
||||
*/
|
||||
class UnnestedReceives extends Actor {
|
||||
import context.become
|
||||
//If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
|
||||
val queue = new ListBuffer[Any]()
|
||||
|
||||
//This message processes a message/event
|
||||
def process(msg: Any): Unit = println("processing: " + msg)
|
||||
//This method subscribes the actor to the event bus
|
||||
def subscribe() {} //Your external stuff
|
||||
//This method retrieves all prior messages/events
|
||||
def allOldMessages() = List()
|
||||
|
||||
override def preStart {
|
||||
//We override preStart to be sure that the first message the actor gets is
|
||||
//'Replay, that message will start to be processed _after_ the actor is started
|
||||
self ! 'Replay
|
||||
//Then we subscribe to the stream of messages/events
|
||||
subscribe()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case 'Replay ⇒ //Our first message should be a 'Replay message, all others are invalid
|
||||
allOldMessages() foreach process //Process all old messages/events
|
||||
become { //Switch behavior to look for the GoAhead signal
|
||||
case 'GoAhead ⇒ //When we get the GoAhead signal we process all our buffered messages/events
|
||||
queue foreach process
|
||||
queue.clear
|
||||
become { //Then we change behaviour to process incoming messages/events as they arrive
|
||||
case msg ⇒ process(msg)
|
||||
}
|
||||
case msg ⇒ //While we haven't gotten the GoAhead signal, buffer all incoming messages
|
||||
queue += msg //Here you have full control, you can handle overflow etc
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue