convert typed IntroSpec to new API, #22293
This commit is contained in:
parent
4368bed37a
commit
b2b4f64d97
4 changed files with 67 additions and 73 deletions
|
|
@ -30,10 +30,10 @@ private final case class SchedulerException(msg: String) extends akka.AkkaExcept
|
||||||
* 1) the system’s com.typesafe.config.Config (from system.settings.config)
|
* 1) the system’s com.typesafe.config.Config (from system.settings.config)
|
||||||
* 2) a akka.event.LoggingAdapter
|
* 2) a akka.event.LoggingAdapter
|
||||||
* 3) a java.util.concurrent.ThreadFactory
|
* 3) a java.util.concurrent.ThreadFactory
|
||||||
*
|
*
|
||||||
* Please note that this scheduler implementation is higly optimised for high-throughput
|
* Please note that this scheduler implementation is higly optimised for high-throughput
|
||||||
* and high-frequency events. It is not to be confused with long-term schedulers such as
|
* and high-frequency events. It is not to be confused with long-term schedulers such as
|
||||||
* Quartz. The scheduler will throw an exception if attempts are made to schedule too far
|
* Quartz. The scheduler will throw an exception if attempts are made to schedule too far
|
||||||
* into the future (which by default is around 8 months (`Int.MaxValue` seconds).
|
* into the future (which by default is around 8 months (`Int.MaxValue` seconds).
|
||||||
*/
|
*/
|
||||||
trait Scheduler {
|
trait Scheduler {
|
||||||
|
|
@ -96,9 +96,9 @@ trait Scheduler {
|
||||||
* If the `Runnable` throws an exception the repeated scheduling is aborted,
|
* If the `Runnable` throws an exception the repeated scheduling is aborted,
|
||||||
* i.e. the function will not be invoked any more.
|
* i.e. the function will not be invoked any more.
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given delays exceed the maximum
|
* @throws IllegalArgumentException if the given delays exceed the maximum
|
||||||
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
||||||
*
|
*
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
def schedule(
|
def schedule(
|
||||||
|
|
@ -110,9 +110,9 @@ trait Scheduler {
|
||||||
* Schedules a message to be sent once with a delay, i.e. a time period that has
|
* Schedules a message to be sent once with a delay, i.e. a time period that has
|
||||||
* to pass before the message is sent.
|
* to pass before the message is sent.
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given delays exceed the maximum
|
* @throws IllegalArgumentException if the given delays exceed the maximum
|
||||||
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
||||||
*
|
*
|
||||||
* Java & Scala API
|
* Java & Scala API
|
||||||
*/
|
*/
|
||||||
final def scheduleOnce(
|
final def scheduleOnce(
|
||||||
|
|
@ -129,9 +129,9 @@ trait Scheduler {
|
||||||
* Schedules a function to be run once with a delay, i.e. a time period that has
|
* Schedules a function to be run once with a delay, i.e. a time period that has
|
||||||
* to pass before the function is run.
|
* to pass before the function is run.
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given delays exceed the maximum
|
* @throws IllegalArgumentException if the given delays exceed the maximum
|
||||||
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
||||||
*
|
*
|
||||||
* Scala API
|
* Scala API
|
||||||
*/
|
*/
|
||||||
final def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(
|
final def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(
|
||||||
|
|
@ -143,9 +143,9 @@ trait Scheduler {
|
||||||
* Schedules a Runnable to be run once with a delay, i.e. a time period that
|
* Schedules a Runnable to be run once with a delay, i.e. a time period that
|
||||||
* has to pass before the runnable is executed.
|
* has to pass before the runnable is executed.
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given delays exceed the maximum
|
* @throws IllegalArgumentException if the given delays exceed the maximum
|
||||||
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
|
||||||
*
|
*
|
||||||
* Java & Scala API
|
* Java & Scala API
|
||||||
*/
|
*/
|
||||||
def scheduleOnce(
|
def scheduleOnce(
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ package docs.akka.typed
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
import akka.typed._
|
import akka.typed._
|
||||||
import akka.typed.ScalaDSL._
|
import akka.typed.scaladsl.Actor._
|
||||||
import akka.typed.scaladsl.AskPattern._
|
import akka.typed.scaladsl.AskPattern._
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
@ -21,7 +21,7 @@ object IntroSpec {
|
||||||
final case class Greet(whom: String, replyTo: ActorRef[Greeted])
|
final case class Greet(whom: String, replyTo: ActorRef[Greeted])
|
||||||
final case class Greeted(whom: String)
|
final case class Greeted(whom: String)
|
||||||
|
|
||||||
val greeter = Static[Greet] { msg =>
|
val greeter = Stateless[Greet] { (_, msg) ⇒
|
||||||
println(s"Hello ${msg.whom}!")
|
println(s"Hello ${msg.whom}!")
|
||||||
msg.replyTo ! Greeted(msg.whom)
|
msg.replyTo ! Greeted(msg.whom)
|
||||||
}
|
}
|
||||||
|
|
@ -50,22 +50,21 @@ object IntroSpec {
|
||||||
//#chatroom-protocol
|
//#chatroom-protocol
|
||||||
//#chatroom-behavior
|
//#chatroom-behavior
|
||||||
|
|
||||||
val behavior: Behavior[GetSession] =
|
def chatRoom(sessions: List[ActorRef[SessionEvent]] = List.empty): Behavior[Command] =
|
||||||
ContextAware[Command] { ctx =>
|
Stateful[Command] { (ctx, msg) ⇒
|
||||||
var sessions = List.empty[ActorRef[SessionEvent]]
|
msg match {
|
||||||
|
case GetSession(screenName, client) ⇒
|
||||||
Static {
|
|
||||||
case GetSession(screenName, client) =>
|
|
||||||
sessions ::= client
|
|
||||||
val wrapper = ctx.spawnAdapter {
|
val wrapper = ctx.spawnAdapter {
|
||||||
p: PostMessage => PostSessionMessage(screenName, p.message)
|
p: PostMessage ⇒ PostSessionMessage(screenName, p.message)
|
||||||
}
|
}
|
||||||
client ! SessionGranted(wrapper)
|
client ! SessionGranted(wrapper)
|
||||||
case PostSessionMessage(screenName, message) =>
|
chatRoom(client :: sessions)
|
||||||
|
case PostSessionMessage(screenName, message) ⇒
|
||||||
val mp = MessagePosted(screenName, message)
|
val mp = MessagePosted(screenName, message)
|
||||||
sessions foreach (_ ! mp)
|
sessions foreach (_ ! mp)
|
||||||
|
Same
|
||||||
}
|
}
|
||||||
}.narrow // only expose GetSession to the outside
|
}
|
||||||
//#chatroom-behavior
|
//#chatroom-behavior
|
||||||
}
|
}
|
||||||
//#chatroom-actor
|
//#chatroom-actor
|
||||||
|
|
@ -76,6 +75,7 @@ class IntroSpec extends TypedSpec {
|
||||||
import IntroSpec._
|
import IntroSpec._
|
||||||
|
|
||||||
def `must say hello`(): Unit = {
|
def `must say hello`(): Unit = {
|
||||||
|
// TODO Implicits.global is not something we would like to encourage in docs
|
||||||
//#hello-world
|
//#hello-world
|
||||||
import HelloWorld._
|
import HelloWorld._
|
||||||
// using global pool since we want to run tasks after system.terminate
|
// using global pool since we want to run tasks after system.terminate
|
||||||
|
|
@ -86,8 +86,8 @@ class IntroSpec extends TypedSpec {
|
||||||
val future: Future[Greeted] = system ? (Greet("world", _))
|
val future: Future[Greeted] = system ? (Greet("world", _))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
greeting <- future.recover { case ex => ex.getMessage }
|
greeting ← future.recover { case ex ⇒ ex.getMessage }
|
||||||
done <- { println(s"result: $greeting"); system.terminate() }
|
done ← { println(s"result: $greeting"); system.terminate() }
|
||||||
} println("system terminated")
|
} println("system terminated")
|
||||||
//#hello-world
|
//#hello-world
|
||||||
}
|
}
|
||||||
|
|
@ -96,32 +96,40 @@ class IntroSpec extends TypedSpec {
|
||||||
//#chatroom-gabbler
|
//#chatroom-gabbler
|
||||||
import ChatRoom._
|
import ChatRoom._
|
||||||
|
|
||||||
val gabbler: Behavior[SessionEvent] =
|
val gabbler =
|
||||||
Total {
|
Stateful[SessionEvent] { (_, msg) ⇒
|
||||||
case SessionDenied(reason) =>
|
msg match {
|
||||||
println(s"cannot start chat room session: $reason")
|
case SessionDenied(reason) ⇒
|
||||||
Stopped
|
println(s"cannot start chat room session: $reason")
|
||||||
case SessionGranted(handle) =>
|
Stopped
|
||||||
handle ! PostMessage("Hello World!")
|
case SessionGranted(handle) ⇒
|
||||||
Same
|
handle ! PostMessage("Hello World!")
|
||||||
case MessagePosted(screenName, message) =>
|
Same
|
||||||
println(s"message has been posted by '$screenName': $message")
|
case MessagePosted(screenName, message) ⇒
|
||||||
Stopped
|
println(s"message has been posted by '$screenName': $message")
|
||||||
|
Stopped
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//#chatroom-gabbler
|
//#chatroom-gabbler
|
||||||
|
|
||||||
//#chatroom-main
|
//#chatroom-main
|
||||||
val main: Behavior[akka.NotUsed] =
|
val main: Behavior[akka.NotUsed] =
|
||||||
Full {
|
SignalOrMessage(
|
||||||
case Sig(ctx, PreStart) =>
|
signal = { (ctx, sig) ⇒
|
||||||
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
|
sig match {
|
||||||
val gabblerRef = ctx.spawn(gabbler, "gabbler")
|
case PreStart ⇒
|
||||||
ctx.watch(gabblerRef)
|
val chatRoom = ctx.spawn(ChatRoom.chatRoom(), "chatroom")
|
||||||
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
val gabblerRef = ctx.spawn(gabbler, "gabbler")
|
||||||
Same
|
ctx.watch(gabblerRef)
|
||||||
case Sig(_, Terminated(ref)) =>
|
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
||||||
Stopped
|
Same
|
||||||
}
|
case Terminated(ref) ⇒
|
||||||
|
Stopped
|
||||||
|
case _ ⇒
|
||||||
|
Unhandled
|
||||||
|
}
|
||||||
|
},
|
||||||
|
mesg = (_, _) ⇒ Unhandled)
|
||||||
|
|
||||||
val system = ActorSystem("ChatRoomDemo", main)
|
val system = ActorSystem("ChatRoomDemo", main)
|
||||||
Await.result(system.whenTerminated, 1.second)
|
Await.result(system.whenTerminated, 1.second)
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ supplies so that the :class:`HelloWorld` Actor can send back the confirmation
|
||||||
message.
|
message.
|
||||||
|
|
||||||
The behavior of the Actor is defined as the :meth:`greeter` value with the help
|
The behavior of the Actor is defined as the :meth:`greeter` value with the help
|
||||||
of the :class:`Static` behavior constructor—there are many different ways of
|
of the :class:`Stateless` behavior constructor—there are many different ways of
|
||||||
formulating behaviors as we shall see in the following. The “static” behavior
|
formulating behaviors as we shall see in the following. The “stateless” behavior
|
||||||
is not capable of changing in response to a message, it will stay the same
|
is not capable of changing in response to a message, it will stay the same
|
||||||
until the Actor is stopped by its parent.
|
until the Actor is stopped by its parent.
|
||||||
|
|
||||||
|
|
@ -175,10 +175,11 @@ as the following:
|
||||||
|
|
||||||
.. includecode:: code/docs/akka/typed/IntroSpec.scala#chatroom-behavior
|
.. includecode:: code/docs/akka/typed/IntroSpec.scala#chatroom-behavior
|
||||||
|
|
||||||
The core of this behavior is again static, the chat room itself does not change
|
The core of this behavior is stateful, the chat room itself does not change
|
||||||
into something else when sessions are established, but we introduce a variable
|
into something else when sessions are established, but we introduce a variable
|
||||||
that tracks the opened sessions. When a new :class:`GetSession` command comes
|
that tracks the opened sessions. Note that by using a method parameter a ``var``
|
||||||
in we add that client to the list and then we need to create the session’s
|
is not needed. When a new :class:`GetSession` command comes in we add that client to the
|
||||||
|
list that is in the returned behavior. Then we also need to create the session’s
|
||||||
:class:`ActorRef` that will be used to post messages. In this case we want to
|
:class:`ActorRef` that will be used to post messages. In this case we want to
|
||||||
create a very simple Actor that just repackages the :class:`PostMessage`
|
create a very simple Actor that just repackages the :class:`PostMessage`
|
||||||
command into a :class:`PostSessionMessage` command which also includes the
|
command into a :class:`PostSessionMessage` command which also includes the
|
||||||
|
|
@ -194,15 +195,8 @@ clients. But we do not want to give the ability to send
|
||||||
:class:`PostSessionMessage` commands to arbitrary clients, we reserve that
|
:class:`PostSessionMessage` commands to arbitrary clients, we reserve that
|
||||||
right to the wrappers we create—otherwise clients could pose as completely
|
right to the wrappers we create—otherwise clients could pose as completely
|
||||||
different screen names (imagine the :class:`GetSession` protocol to include
|
different screen names (imagine the :class:`GetSession` protocol to include
|
||||||
authentication information to further secure this). Therefore we narrow the
|
authentication information to further secure this). Therefore :class:`PostSessionMessage`
|
||||||
behavior down to only accepting :class:`GetSession` commands before exposing it
|
has ``private`` visibility and can't be created outside the actor.
|
||||||
to the world, hence the type of the ``behavior`` value is
|
|
||||||
:class:`Behavior[GetSession]` instead of :class:`Behavior[Command]`.
|
|
||||||
|
|
||||||
Narrowing the type of a behavior is always a safe operation since it only
|
|
||||||
restricts what clients can do. If we were to widen the type then clients could
|
|
||||||
send other messages that were not foreseen while writing the source code for
|
|
||||||
the behavior.
|
|
||||||
|
|
||||||
If we did not care about securing the correspondence between a session and a
|
If we did not care about securing the correspondence between a session and a
|
||||||
screen name then we could change the protocol such that :class:`PostMessage` is
|
screen name then we could change the protocol such that :class:`PostMessage` is
|
||||||
|
|
@ -216,13 +210,6 @@ former simply speaks more languages than the latter. The opposite would be
|
||||||
problematic, so passing an :class:`ActorRef[PostSessionMessage]` where
|
problematic, so passing an :class:`ActorRef[PostSessionMessage]` where
|
||||||
:class:`ActorRef[Command]` is required will lead to a type error.
|
:class:`ActorRef[Command]` is required will lead to a type error.
|
||||||
|
|
||||||
The final piece of this behavior definition is the :class:`ContextAware`
|
|
||||||
decorator that we use in order to obtain access to the :class:`ActorContext`
|
|
||||||
within the :class:`Static` behavior definition. This decorator invokes the
|
|
||||||
provided function when the first message is received and thereby creates the
|
|
||||||
real behavior that will be used going forward—the decorator is discarded after
|
|
||||||
it has done its job.
|
|
||||||
|
|
||||||
Trying it out
|
Trying it out
|
||||||
-------------
|
-------------
|
||||||
|
|
||||||
|
|
@ -261,11 +248,9 @@ Actor will perform its job on its own accord, we do not need to send messages
|
||||||
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
|
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
|
||||||
only external messages, they also are notified of certain system events,
|
only external messages, they also are notified of certain system events,
|
||||||
so-called Signals. In order to get access to those we choose to implement this
|
so-called Signals. In order to get access to those we choose to implement this
|
||||||
particular one using the :class:`Full` behavior decorator. The name stems from
|
particular one using the :class:`SignalOrMessage` behavior decorator. The
|
||||||
the fact that within this we have full access to all aspects of the Actor. The
|
provided ``signal`` function will be invoked for signals (subclasses of :class:`Signal`)
|
||||||
provided function will be invoked for signals (wrapped in :class:`Sig`) or user
|
or the ``mesg`` function for user messages.
|
||||||
messages (wrapped in :class:`Msg`) and the wrapper also contains a reference to
|
|
||||||
the :class:`ActorContext`.
|
|
||||||
|
|
||||||
This particular main Actor reacts to two signals: when it is started it will
|
This particular main Actor reacts to two signals: when it is started it will
|
||||||
first receive the :class:`PreStart` signal, upon which the chat room and the
|
first receive the :class:`PreStart` signal, upon which the chat room and the
|
||||||
|
|
|
||||||
|
|
@ -106,6 +106,7 @@ private[typed] class ActorCell[T](
|
||||||
val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
|
val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
|
||||||
val capacity = deployment.firstOrElse(MailboxCapacity(system.settings.DefaultMailboxCapacity))
|
val capacity = deployment.firstOrElse(MailboxCapacity(system.settings.DefaultMailboxCapacity))
|
||||||
val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self)
|
val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self)
|
||||||
|
// TODO uid is still needed
|
||||||
val ref = new LocalActorRef[U](self.path / name, cell)
|
val ref = new LocalActorRef[U](self.path / name, cell)
|
||||||
cell.setSelf(ref)
|
cell.setSelf(ref)
|
||||||
childrenMap = childrenMap.updated(name, ref)
|
childrenMap = childrenMap.updated(name, ref)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue