add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions, comprising: * generic ActorRef[T] that only accepts T messages * generic ActorSystem[T] extends ActorRef[T] (sending to the guardian, whose Props[T] are provided for ActorSystem construction) * removed the Actor trait: everything in there has been made into messages and signals * new Behavior[T] abstraction that consumes messages (of type T) or Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed), producing the next Behavior[T] as the result each time * the ask pattern is provided and yields properly typed Futures * variants of ActorContext are provided for synchronous testing of Behaviors All of this is implemented without touching code outside akka-typed (apart from making guardianProps configurable), creating wrapper objects around ActorRef, ActorContext, ActorSystem, Props and providing an Actor implementation that just runs a Behavior.
This commit is contained in:
parent
50d1569f37
commit
d9efd041f7
40 changed files with 4724 additions and 21 deletions
|
|
@ -138,7 +138,7 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
|||
if (timeout.duration.length <= 0)
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message, sender)
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
|
||||
actorRef.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
|
|
@ -159,7 +159,7 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
|
|||
Future.failed[Any](
|
||||
new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message, sender)
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender)
|
||||
actorSel.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
|
|
@ -175,12 +175,15 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
|
|||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any])
|
||||
private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any], _mcn: String)
|
||||
extends MinimalActorRef {
|
||||
import PromiseActorRef._
|
||||
import AbstractPromiseActorRef.stateOffset
|
||||
import AbstractPromiseActorRef.watchedByOffset
|
||||
|
||||
// This is necessary for weaving the PromiseActorRef into the asked message, i.e. the replyTo pattern.
|
||||
@volatile var messageClassName = _mcn
|
||||
|
||||
/**
|
||||
* As an optimization for the common (local) case we only register this PromiseActorRef
|
||||
* with the provider when the `path` member is actually queried, which happens during
|
||||
|
|
@ -330,15 +333,14 @@ private[akka] object PromiseActorRef {
|
|||
|
||||
private val ActorStopResult = Failure(new ActorKilledException("Stopped"))
|
||||
|
||||
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, message: Any, sender: ActorRef = Actor.noSender): PromiseActorRef = {
|
||||
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String, sender: ActorRef = Actor.noSender): PromiseActorRef = {
|
||||
val result = Promise[Any]()
|
||||
val scheduler = provider.guardian.underlying.system.scheduler
|
||||
val a = new PromiseActorRef(provider, result)
|
||||
val a = new PromiseActorRef(provider, result, messageClassName)
|
||||
implicit val ec = a.internalCallingThreadExecutionContext
|
||||
val messageClassName = message.getClass.getName
|
||||
val f = scheduler.scheduleOnce(timeout.duration) {
|
||||
result tryComplete Failure(
|
||||
new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "$messageClassName"."""))
|
||||
new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
|
||||
}
|
||||
result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
|
||||
a
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue