2012-01-18 11:52:35 +01:00
/* *
2017-01-04 17:37:10 +01:00
* Copyright ( C ) 2009 - 2017 Lightbend Inc . < http : //www.lightbend.com>
2012-01-18 11:52:35 +01:00
*/
package akka.pattern
import java.util.concurrent.TimeoutException
2015-03-30 16:17:12 +02:00
2012-03-24 23:02:37 +01:00
import akka.actor._
2013-03-05 16:19:54 +01:00
import akka.dispatch.sysmsg._
2012-07-22 15:33:18 +02:00
import akka.util. { Timeout , Unsafe }
2015-03-30 16:17:12 +02:00
import scala.annotation.tailrec
import scala.concurrent. { ExecutionContext , Future , Promise }
import scala.language.implicitConversions
import scala.util. { Failure , Success }
2012-01-18 11:52:35 +01:00
/* *
* This is what is used to complete a Future that is returned from an ask /? call ,
* when it times out .
*/
2012-03-06 14:56:34 +01:00
class AskTimeoutException ( message : String , cause : Throwable ) extends TimeoutException ( message ) {
2012-01-18 11:52:35 +01:00
def this ( message : String ) = this ( message , null : Throwable )
2012-03-06 14:56:34 +01:00
override def getCause ( ) : Throwable = cause
2012-01-18 11:52:35 +01:00
}
2012-01-23 18:23:34 +01:00
/* *
* This object contains implementation details of the “ ask ” pattern .
*/
2012-02-01 13:37:57 +01:00
trait AskSupport {
/* *
* Import this implicit conversion to gain `?` and `ask` methods on
* [ [ akka . actor . ActorRef ] ] , which will defer to the
* `ask(actorRef, message)(timeout)` method defined here .
*
* { { {
* import akka.pattern.ask
*
* val future = actor ? message // => ask(actor, message)
* val future = actor ask message // => ask(actor, message)
* val future = actor . ask ( message ) ( timeout ) // => ask(actor, message)(timeout)
* } } }
*
2013-07-05 12:46:39 +02:00
* All of the above use an implicit [ [ akka . util . Timeout ] ] .
2012-02-01 13:37:57 +01:00
*/
implicit def ask ( actorRef : ActorRef ) : AskableActorRef = new AskableActorRef ( actorRef )
/* *
2012-07-04 15:25:30 +02:00
* Sends a message asynchronously and returns a [ [ scala . concurrent . Future ] ]
2012-02-01 13:37:57 +01:00
* holding the eventual reply message ; this means that the target actor
* needs to send the result to the `sender` reference provided . The Future
2012-05-18 15:04:08 +02:00
* will be completed with an [ [ akka . pattern . AskTimeoutException ] ] after the
2012-02-01 13:37:57 +01:00
* given timeout has expired ; this is independent from any timeout applied
* while awaiting a result for this future ( i . e . in
* `Await.result(..., timeout)` ) .
*
* < b > Warning : </ b >
* When using future callbacks , inside actors you need to carefully avoid closing over
* the containing actor ’ s object , i . e . do not call methods or access mutable state
* on the enclosing actor from within the callback . This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor . Unfortunately
* there is not yet a way to detect these illegal accesses at compile time .
*
* < b > Recommended usage : </ b >
*
* { { {
* val f = ask ( worker , request ) ( timeout )
2013-12-11 12:57:31 +01:00
* f . map { response =>
* EnrichedMessage ( response )
2012-02-01 13:37:57 +01:00
* } pipeTo nextActor
* } } }
*
*/
2015-04-30 09:23:18 +02:00
def ask ( actorRef : ActorRef , message : Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorRef . internalAsk ( message , timeout , ActorRef . noSender )
2014-11-27 11:54:02 +03:00
def ask ( actorRef : ActorRef , message : Any , sender : ActorRef ) ( implicit timeout : Timeout ) : Future [ Any ] =
2015-04-30 09:23:18 +02:00
actorRef . internalAsk ( message , timeout , sender )
2013-04-29 22:01:14 +02:00
/* *
* Import this implicit conversion to gain `?` and `ask` methods on
* [ [ akka . actor . ActorSelection ] ] , which will defer to the
* `ask(actorSelection, message)(timeout)` method defined here .
*
* { { {
* import akka.pattern.ask
*
* val future = selection ? message // => ask(selection, message)
* val future = selection ask message // => ask(selection, message)
* val future = selection . ask ( message ) ( timeout ) // => ask(selection, message)(timeout)
* } } }
*
2013-07-05 12:46:39 +02:00
* All of the above use an implicit [ [ akka . util . Timeout ] ] .
2013-04-29 22:01:14 +02:00
*/
implicit def ask ( actorSelection : ActorSelection ) : AskableActorSelection = new AskableActorSelection ( actorSelection )
/* *
* Sends a message asynchronously and returns a [ [ scala . concurrent . Future ] ]
* holding the eventual reply message ; this means that the target actor
* needs to send the result to the `sender` reference provided . The Future
* will be completed with an [ [ akka . pattern . AskTimeoutException ] ] after the
* given timeout has expired ; this is independent from any timeout applied
* while awaiting a result for this future ( i . e . in
* `Await.result(..., timeout)` ) .
*
* < b > Warning : </ b >
* When using future callbacks , inside actors you need to carefully avoid closing over
* the containing actor ’ s object , i . e . do not call methods or access mutable state
* on the enclosing actor from within the callback . This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor . Unfortunately
* there is not yet a way to detect these illegal accesses at compile time .
*
* < b > Recommended usage : </ b >
*
* { { {
2013-12-11 12:57:31 +01:00
* val f = ask ( worker , request ) ( timeout )
* f . map { response =>
* EnrichedMessage ( response )
2013-04-29 22:01:14 +02:00
* } pipeTo nextActor
* } } }
*
*/
2015-04-30 09:23:18 +02:00
def ask ( actorSelection : ActorSelection , message : Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorSelection . internalAsk ( message , timeout , ActorRef . noSender )
2014-11-27 11:54:02 +03:00
def ask ( actorSelection : ActorSelection , message : Any , sender : ActorRef ) ( implicit timeout : Timeout ) : Future [ Any ] =
2015-04-30 09:23:18 +02:00
actorSelection . internalAsk ( message , timeout , sender )
}
2015-10-25 14:38:10 +03:00
/* *
* This object contains implementation details of the “ ask ” pattern ,
* which can be combined with "replyTo" pattern .
*/
trait ExplicitAskSupport {
/* *
* Import this implicit conversion to gain `?` and `ask` methods on
* [ [ akka . actor . ActorRef ] ] , which will defer to the
* `ask(actorRef, askSender => message)(timeout)` method defined here .
*
* { { {
* import akka.pattern.ask
*
* // same as `ask(actor, askSender => Request(askSender))`
* val future = actor ? { askSender => Request ( askSender ) }
*
* // same as `ask(actor, Request(_))`
* val future = actor ? ( Request ( _ ) )
*
* // same as `ask(actor, Request(_))(timeout)`
* val future = actor ? ( Request ( _ ) ) ( timeout )
* } } }
*
* All of the above use a required implicit [ [ akka . util . Timeout ] ] and optional implicit
* sender [ [ akka . actor . ActorRef ] ] .
*/
implicit def ask ( actorRef : ActorRef ) : ExplicitlyAskableActorRef = new ExplicitlyAskableActorRef ( actorRef )
/* *
* Sends a message asynchronously and returns a [ [ scala . concurrent . Future ] ]
* holding the eventual reply message ; this means that the target actor
* needs to send the result to the `sender` reference provided . The Future
* will be completed with an [ [ akka . pattern . AskTimeoutException ] ] after the
* given timeout has expired ; this is independent from any timeout applied
* while awaiting a result for this future ( i . e . in
* `Await.result(..., timeout)` ) .
*
* < b > Warning : </ b >
* When using future callbacks , inside actors you need to carefully avoid closing over
* the containing actor ’ s object , i . e . do not call methods or access mutable state
* on the enclosing actor from within the callback . This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor . Unfortunately
* there is not yet a way to detect these illegal accesses at compile time .
*
* < b > Recommended usage : </ b >
*
* { { {
* val f = ask ( worker , replyTo => Request ( replyTo ) ) ( timeout )
* f . map { response =>
* EnrichedMessage ( response )
* } pipeTo nextActor
* } } }
*/
def ask ( actorRef : ActorRef , messageFactory : ActorRef ⇒ Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorRef . internalAsk ( messageFactory , timeout , ActorRef . noSender )
def ask ( actorRef : ActorRef , messageFactory : ActorRef ⇒ Any , sender : ActorRef ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorRef . internalAsk ( messageFactory , timeout , sender )
/* *
* Import this implicit conversion to gain `?` and `ask` methods on
* [ [ akka . actor . ActorSelection ] ] , which will defer to the
* `ask(actorSelection, message)(timeout)` method defined here .
*
* { { {
* import akka.pattern.ask
*
* // same as `ask(selection, askSender => Request(askSender))`
* val future = selection ? { askSender => Request ( askSender ) }
*
* // same as `ask(selection, Request(_))`
* val future = selection ? ( Request ( _ ) )
*
* // same as `ask(selection, Request(_))(timeout)`
* val future = selection ? ( Request ( _ ) ) ( timeout )
* } } }
*
* All of the above use a required implicit [ [ akka . util . Timeout ] ] and optional implicit
* sender [ [ akka . actor . ActorRef ] ] .
*/
implicit def ask ( actorSelection : ActorSelection ) : ExplicitlyAskableActorSelection = new ExplicitlyAskableActorSelection ( actorSelection )
/* *
* Sends a message asynchronously and returns a [ [ scala . concurrent . Future ] ]
* holding the eventual reply message ; this means that the target actor
* needs to send the result to the `sender` reference provided . The Future
* will be completed with an [ [ akka . pattern . AskTimeoutException ] ] after the
* given timeout has expired ; this is independent from any timeout applied
* while awaiting a result for this future ( i . e . in
* `Await.result(..., timeout)` ) .
*
* < b > Warning : </ b >
* When using future callbacks , inside actors you need to carefully avoid closing over
* the containing actor ’ s object , i . e . do not call methods or access mutable state
* on the enclosing actor from within the callback . This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor . Unfortunately
* there is not yet a way to detect these illegal accesses at compile time .
*
* < b > Recommended usage : </ b >
*
* { { {
* val f = ask ( worker , replyTo => Request ( replyTo ) ) ( timeout )
* f . map { response =>
* EnrichedMessage ( response )
* } pipeTo nextActor
* } } }
*
*/
def ask ( actorSelection : ActorSelection , messageFactory : ActorRef ⇒ Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorSelection . internalAsk ( messageFactory , timeout , ActorRef . noSender )
def ask ( actorSelection : ActorSelection , messageFactory : ActorRef ⇒ Any , sender : ActorRef ) ( implicit timeout : Timeout ) : Future [ Any ] =
actorSelection . internalAsk ( messageFactory , timeout , sender )
}
2015-04-30 09:23:18 +02:00
object AskableActorRef {
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def ask$extension ( actorRef : ActorRef , message : Any , timeout : Timeout ) : Future [ Any ] =
actorRef . internalAsk ( message , timeout , ActorRef . noSender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def $qmark$extension ( actorRef : ActorRef , message : Any , timeout : Timeout ) : Future [ Any ] =
actorRef . internalAsk ( message , timeout , ActorRef . noSender )
2012-12-17 14:22:17 +01:00
}
/*
* Implementation class of the “ ask ” pattern enrichment of ActorRef
*/
final class AskableActorRef ( val actorRef : ActorRef ) extends AnyVal {
2015-04-30 09:23:18 +02:00
/* *
* INTERNAL API : for binary compatibility
*/
protected def ask ( message : Any , timeout : Timeout ) : Future [ Any ] =
internalAsk ( message , timeout , ActorRef . noSender )
def ask ( message : Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
protected def ? ( message : Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
internalAsk ( message , timeout , ActorRef . noSender )
def ? ( message : Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def internalAsk ( message : Any , timeout : Timeout , sender : ActorRef ) = actorRef match {
2012-05-03 21:14:47 +02:00
case ref : InternalActorRef if ref . isTerminated ⇒
2012-09-19 23:55:53 +02:00
actorRef ! message
2014-11-27 11:54:02 +03:00
Future . failed [ Any ] ( new AskTimeoutException ( s""" Recipient[ $actorRef ] had already been terminated. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2012-02-01 13:37:57 +01:00
case ref : InternalActorRef ⇒
2013-04-29 22:01:14 +02:00
if ( timeout . duration . length <= 0 )
2016-04-22 15:42:53 +02:00
Future . failed [ Any ] ( new IllegalArgumentException ( s""" Timeout length must be positive, question not sent to [ $actorRef ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2012-07-23 13:52:48 +02:00
else {
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
val a = PromiseActorRef ( ref . provider , timeout , targetName = actorRef , message . getClass . getName , sender )
2012-02-01 13:37:57 +01:00
actorRef . tell ( message , a )
2012-07-04 15:25:30 +02:00
a . result . future
2012-02-01 13:37:57 +01:00
}
2014-11-27 11:54:02 +03:00
case _ ⇒ Future . failed [ Any ] ( new IllegalArgumentException ( s""" Unsupported recipient ActorRef type, question not sent to [ $actorRef ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2013-04-29 22:01:14 +02:00
}
2015-04-30 09:23:18 +02:00
}
2015-10-25 14:38:10 +03:00
/*
* Implementation class of the “ ask ” with explicit sender pattern enrichment of ActorRef
*/
final class ExplicitlyAskableActorRef ( val actorRef : ActorRef ) extends AnyVal {
def ask ( message : ActorRef ⇒ Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
def ? ( message : ActorRef ⇒ Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def internalAsk ( messageFactory : ActorRef ⇒ Any , timeout : Timeout , sender : ActorRef ) : Future [ Any ] = actorRef match {
case ref : InternalActorRef if ref . isTerminated ⇒
val message = messageFactory ( ref . provider . deadLetters )
actorRef ! message
Future . failed [ Any ] ( new AskTimeoutException ( s""" Recipient[ $actorRef ] had already been terminated. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
case ref : InternalActorRef ⇒
if ( timeout . duration . length <= 0 ) {
val message = messageFactory ( ref . provider . deadLetters )
2016-04-22 15:42:53 +02:00
Future . failed [ Any ] ( new IllegalArgumentException ( s""" Timeout length must be positive, question not sent to [ $actorRef ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2015-10-25 14:38:10 +03:00
} else {
val a = PromiseActorRef ( ref . provider , timeout , targetName = actorRef , "unknown" , sender )
val message = messageFactory ( a )
a . messageClassName = message . getClass . getName
actorRef . tell ( message , a )
a . result . future
}
case _ if sender eq null ⇒
Future . failed [ Any ] ( new IllegalArgumentException ( s""" No recipient provided, question not sent to [ $actorRef ]. """ ) )
case _ ⇒
val message = messageFactory ( sender . asInstanceOf [ InternalActorRef ] . provider . deadLetters )
Future . failed [ Any ] ( new IllegalArgumentException ( s""" Unsupported recipient ActorRef type, question not sent to [ $actorRef ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
}
}
2015-04-30 09:23:18 +02:00
object AskableActorSelection {
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def ask$extension ( actorSel : ActorSelection , message : Any , timeout : Timeout ) : Future [ Any ] =
actorSel . internalAsk ( message , timeout , ActorRef . noSender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def $qmark$extension ( actorSel : ActorSelection , message : Any , timeout : Timeout ) : Future [ Any ] =
actorSel . internalAsk ( message , timeout , ActorRef . noSender )
2013-04-29 22:01:14 +02:00
}
/*
* Implementation class of the “ ask ” pattern enrichment of ActorSelection
*/
final class AskableActorSelection ( val actorSel : ActorSelection ) extends AnyVal {
2015-04-30 09:23:18 +02:00
/* *
* INTERNAL API : for binary compatibility
*/
protected def ask ( message : Any , timeout : Timeout ) : Future [ Any ] =
internalAsk ( message , timeout , ActorRef . noSender )
def ask ( message : Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
protected def ? ( message : Any ) ( implicit timeout : Timeout ) : Future [ Any ] =
internalAsk ( message , timeout , ActorRef . noSender )
def ? ( message : Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def internalAsk ( message : Any , timeout : Timeout , sender : ActorRef ) : Future [ Any ] = actorSel . anchor match {
2013-04-29 22:01:14 +02:00
case ref : InternalActorRef ⇒
if ( timeout . duration . length <= 0 )
Future . failed [ Any ] (
2016-04-22 15:42:53 +02:00
new IllegalArgumentException ( s""" Timeout length must be positive, question not sent to [ $actorSel ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2013-04-29 22:01:14 +02:00
else {
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
val a = PromiseActorRef ( ref . provider , timeout , targetName = actorSel , message . getClass . getName , sender )
2013-04-29 22:01:14 +02:00
actorSel . tell ( message , a )
a . result . future
}
2014-11-27 11:54:02 +03:00
case _ ⇒ Future . failed [ Any ] ( new IllegalArgumentException ( s""" Unsupported recipient ActorRef type, question not sent to [ $actorSel ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2012-02-01 13:37:57 +01:00
}
2012-03-23 21:35:52 +01:00
}
2015-10-25 14:38:10 +03:00
/*
* Implementation class of the “ ask ” with explicit sender pattern enrichment of ActorSelection
*/
final class ExplicitlyAskableActorSelection ( val actorSel : ActorSelection ) extends AnyVal {
def ask ( message : ActorRef ⇒ Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
def ? ( message : ActorRef ⇒ Any ) ( implicit timeout : Timeout , sender : ActorRef = Actor . noSender ) : Future [ Any ] =
internalAsk ( message , timeout , sender )
/* *
* INTERNAL API : for binary compatibility
*/
private [ pattern ] def internalAsk ( messageFactory : ActorRef ⇒ Any , timeout : Timeout , sender : ActorRef ) : Future [ Any ] = actorSel . anchor match {
case ref : InternalActorRef ⇒
if ( timeout . duration . length <= 0 ) {
val message = messageFactory ( ref . provider . deadLetters )
Future . failed [ Any ] (
2016-04-22 15:42:53 +02:00
new IllegalArgumentException ( s""" Timeout length must be positive, question not sent to [ $actorSel ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
2015-10-25 14:38:10 +03:00
} else {
val a = PromiseActorRef ( ref . provider , timeout , targetName = actorSel , "unknown" , sender )
val message = messageFactory ( a )
a . messageClassName = message . getClass . getName
actorSel . tell ( message , a )
a . result . future
}
case _ if sender eq null ⇒
Future . failed [ Any ] ( new IllegalArgumentException ( s""" No recipient provided, question not sent to [ $actorSel ]. """ ) )
case _ ⇒
val message = messageFactory ( sender . asInstanceOf [ InternalActorRef ] . provider . deadLetters )
Future . failed [ Any ] ( new IllegalArgumentException ( s""" Unsupported recipient ActorRef type, question not sent to [ $actorSel ]. Sender[ $sender ] sent the message of type " ${ message . getClass . getName } " . """ ) )
}
}
2012-03-23 21:35:52 +01:00
/* *
* Akka private optimized representation of the temporary actor spawned to
* receive the reply to an "ask" operation .
2012-05-18 16:41:19 +02:00
*
* INTERNAL API
2012-03-23 21:35:52 +01:00
*/
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
private [ akka ] final class PromiseActorRef private ( val provider : ActorRefProvider , val result : Promise [ Any ] , _mcn : String )
2012-03-27 09:28:54 +02:00
extends MinimalActorRef {
2015-03-30 16:17:12 +02:00
import AbstractPromiseActorRef. { stateOffset , watchedByOffset }
2012-03-23 21:35:52 +01:00
import PromiseActorRef._
2012-01-18 11:52:35 +01:00
2015-04-30 09:23:18 +02:00
@deprecated ( "Use the full constructor" , "2.4" )
def this ( provider : ActorRefProvider , result : Promise [ Any ] ) = this ( provider , result , "unknown" )
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
// This is necessary for weaving the PromiseActorRef into the asked message, i.e. the replyTo pattern.
@volatile var messageClassName = _mcn
2012-01-18 11:52:35 +01:00
/* *
2012-03-23 21:35:52 +01:00
* As an optimization for the common ( local ) case we only register this PromiseActorRef
* with the provider when the `path` member is actually queried , which happens during
2012-03-24 23:02:37 +01:00
* serialization ( but also during a simple call to `toString` , `equals` or `hashCode` ! ) .
2012-03-23 21:35:52 +01:00
*
* Defined states :
2012-03-24 23:02:37 +01:00
* null => started , path not yet created
* Registering => currently creating temp path and registering it
* path : ActorPath => path is available and was registered
* StoppedWithPath ( path ) => stopped , path available
* Stopped => stopped , path not yet created
2012-01-18 11:52:35 +01:00
*/
2012-03-23 21:35:52 +01:00
@volatile
2012-03-27 09:28:54 +02:00
private [ this ] var _stateDoNotCallMeDirectly : AnyRef = _
2012-01-18 11:52:35 +01:00
2012-05-30 13:24:38 +02:00
@volatile
private [ this ] var _watchedByDoNotCallMeDirectly : Set [ ActorRef ] = ActorCell . emptyActorRefSet
@inline
private [ this ] def watchedBy : Set [ ActorRef ] = Unsafe . instance . getObjectVolatile ( this , watchedByOffset ) . asInstanceOf [ Set [ ActorRef ] ]
@inline
private [ this ] def updateWatchedBy ( oldWatchedBy : Set [ ActorRef ] , newWatchedBy : Set [ ActorRef ] ) : Boolean =
Unsafe . instance . compareAndSwapObject ( this , watchedByOffset , oldWatchedBy , newWatchedBy )
@tailrec // Returns false if the Promise is already completed
private [ this ] final def addWatcher ( watcher : ActorRef ) : Boolean = watchedBy match {
case null ⇒ false
2012-06-02 14:49:28 +02:00
case other ⇒ updateWatchedBy ( other , other + watcher ) || addWatcher ( watcher )
2012-05-30 13:24:38 +02:00
}
@tailrec
private [ this ] final def remWatcher ( watcher : ActorRef ) : Unit = watchedBy match {
case null ⇒ ( )
case other ⇒ if ( ! updateWatchedBy ( other , other - watcher ) ) remWatcher ( watcher )
}
@tailrec
private [ this ] final def clearWatchers ( ) : Set [ ActorRef ] = watchedBy match {
case null ⇒ ActorCell . emptyActorRefSet
case other ⇒ if ( ! updateWatchedBy ( other , null ) ) clearWatchers ( ) else other
}
2012-03-23 21:35:52 +01:00
@inline
2012-05-30 13:24:38 +02:00
private [ this ] def state : AnyRef = Unsafe . instance . getObjectVolatile ( this , stateOffset )
2012-03-24 23:02:37 +01:00
@inline
2012-05-30 13:24:38 +02:00
private [ this ] def updateState ( oldState : AnyRef , newState : AnyRef ) : Boolean =
Unsafe . instance . compareAndSwapObject ( this , stateOffset , oldState , newState )
2012-03-24 23:02:37 +01:00
@inline
2012-05-30 13:24:38 +02:00
private [ this ] def setState ( newState : AnyRef ) : Unit = Unsafe . instance . putObjectVolatile ( this , stateOffset , newState )
2012-03-23 15:52:36 +01:00
2012-05-18 16:41:19 +02:00
override def getParent : InternalActorRef = provider . tempContainer
2012-03-27 09:28:54 +02:00
2013-03-30 01:03:17 +01:00
def internalCallingThreadExecutionContext : ExecutionContext =
provider . guardian . underlying . systemImpl . internalCallingThreadExecutionContext
2012-03-23 21:35:52 +01:00
/* *
* Contract of this method :
2012-03-24 23:02:37 +01:00
* Must always return the same ActorPath , which must have
2012-03-23 21:35:52 +01:00
* been registered if we haven 't been stopped yet .
*/
@tailrec
def path : ActorPath = state match {
case null ⇒
2012-03-24 23:02:37 +01:00
if ( updateState ( null , Registering ) ) {
2012-03-23 21:35:52 +01:00
var p : ActorPath = null
try {
p = provider . tempPath ( )
2012-03-23 15:52:36 +01:00
provider . registerTempActor ( this , p )
p
2012-03-24 23:02:37 +01:00
} finally { setState ( p ) }
2012-03-23 21:35:52 +01:00
} else path
2012-03-24 23:02:37 +01:00
case p : ActorPath ⇒ p
case StoppedWithPath ( p ) ⇒ p
2012-03-23 21:35:52 +01:00
case Stopped ⇒
// even if we are already stopped we still need to produce a proper path
2012-03-24 23:02:37 +01:00
updateState ( Stopped , StoppedWithPath ( provider . tempPath ( ) ) )
2012-03-23 21:35:52 +01:00
path
case Registering ⇒ path // spin until registration is completed
}
2012-01-18 11:52:35 +01:00
2012-10-06 00:13:42 +02:00
override def ! ( message : Any ) ( implicit sender : ActorRef = Actor . noSender ) : Unit = state match {
2012-03-24 23:02:37 +01:00
case Stopped | _ : StoppedWithPath ⇒ provider . deadLetters ! message
2013-02-20 11:42:29 +01:00
case _ ⇒
if ( message == null ) throw new InvalidMessageException ( "Message is null" )
if ( ! ( result . tryComplete (
message match {
case Status . Success ( r ) ⇒ Success ( r )
case Status . Failure ( f ) ⇒ Failure ( f )
case other ⇒ Success ( other )
} ) ) ) provider . deadLetters ! message
2012-03-23 21:35:52 +01:00
}
2012-01-18 11:52:35 +01:00
2012-05-29 14:09:22 +02:00
override def sendSystemMessage ( message : SystemMessage ) : Unit = message match {
2013-03-26 13:59:46 +01:00
case _ : Terminate ⇒ stop ( )
case DeathWatchNotification ( a , ec , at ) ⇒ this . ! ( Terminated ( a ) ( existenceConfirmed = ec , addressTerminated = at ) )
2012-06-01 14:49:12 +02:00
case Watch ( watchee , watcher ) ⇒
if ( watchee == this && watcher != this ) {
2013-03-26 13:59:46 +01:00
if ( ! addWatcher ( watcher ) )
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watcher . sendSystemMessage ( DeathWatchNotification ( watchee , existenceConfirmed = true , addressTerminated = false ) )
2012-06-01 14:49:12 +02:00
} else System . err . println ( "BUG: illegal Watch(%s,%s) for %s" . format ( watchee , watcher , this ) )
case Unwatch ( watchee , watcher ) ⇒
if ( watchee == this && watcher != this ) remWatcher ( watcher )
else System . err . println ( "BUG: illegal Unwatch(%s,%s) for %s" . format ( watchee , watcher , this ) )
case _ ⇒
2012-03-23 21:35:52 +01:00
}
2012-01-18 11:52:35 +01:00
2015-05-08 10:37:41 +02:00
@deprecated ( "Use context.watch(actor) and receive Terminated(actor)" , "2.2" )
override private [ akka ] def isTerminated : Boolean = state match {
2012-03-24 23:02:37 +01:00
case Stopped | _ : StoppedWithPath ⇒ true
case _ ⇒ false
}
2012-01-18 11:52:35 +01:00
2012-03-23 21:35:52 +01:00
@tailrec
2012-03-24 23:02:37 +01:00
override def stop ( ) : Unit = {
2012-05-30 13:24:38 +02:00
def ensureCompleted ( ) : Unit = {
2014-12-22 11:35:28 +01:00
result tryComplete ActorStopResult
2012-05-30 13:24:38 +02:00
val watchers = clearWatchers ( )
if ( ! watchers . isEmpty ) {
2013-03-26 13:59:46 +01:00
watchers foreach { watcher ⇒
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watcher . asInstanceOf [ InternalActorRef ]
2015-03-30 16:17:12 +02:00
. sendSystemMessage ( DeathWatchNotification ( this , existenceConfirmed = true , addressTerminated = false ) )
2013-03-26 13:59:46 +01:00
}
2012-05-30 13:24:38 +02:00
}
}
2012-03-24 23:02:37 +01:00
state match {
2012-05-28 16:49:49 +02:00
case null ⇒ // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if ( updateState ( null , Stopped ) ) ensureCompleted ( ) else stop ( )
2012-03-24 23:02:37 +01:00
case p : ActorPath ⇒
2012-05-30 15:37:29 +02:00
if ( updateState ( p , StoppedWithPath ( p ) ) ) { try ensureCompleted ( ) finally provider . unregisterTempActor ( p ) } else stop ( )
2012-05-28 16:49:49 +02:00
case Stopped | _ : StoppedWithPath ⇒ // already stopped
2012-03-24 23:02:37 +01:00
case Registering ⇒ stop ( ) // spin until registration is completed before stopping
}
2012-01-18 11:52:35 +01:00
}
2012-03-23 21:35:52 +01:00
}
2012-01-18 11:52:35 +01:00
2012-05-18 16:41:19 +02:00
/* *
* INTERNAL API
*/
2012-03-23 21:35:52 +01:00
private [ akka ] object PromiseActorRef {
2012-03-24 23:02:37 +01:00
private case object Registering
private case object Stopped
2014-03-07 13:20:01 +01:00
private final case class StoppedWithPath ( path : ActorPath )
2012-03-23 21:35:52 +01:00
2014-12-22 11:35:28 +01:00
private val ActorStopResult = Failure ( new ActorKilledException ( "Stopped" ) )
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
def apply ( provider : ActorRefProvider , timeout : Timeout , targetName : Any , messageClassName : String , sender : ActorRef = Actor . noSender ) : PromiseActorRef = {
2012-07-04 15:25:30 +02:00
val result = Promise [ Any ] ( )
2013-03-30 01:03:17 +01:00
val scheduler = provider . guardian . underlying . system . scheduler
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
val a = new PromiseActorRef ( provider , result , messageClassName )
2013-03-30 01:03:17 +01:00
implicit val ec = a . internalCallingThreadExecutionContext
2013-10-16 14:47:17 +02:00
val f = scheduler . scheduleOnce ( timeout . duration ) {
2014-11-27 11:54:02 +03:00
result tryComplete Failure (
add akka-typed project with generic ActorRef
This is the first step towards more type-safety in Actor interactions,
comprising:
* generic ActorRef[T] that only accepts T messages
* generic ActorSystem[T] extends ActorRef[T] (sending to the guardian,
whose Props[T] are provided for ActorSystem construction)
* removed the Actor trait: everything in there has been made into
messages and signals
* new Behavior[T] abstraction that consumes messages (of type T) or
Signals (lifecycle hooks, Terminated, ReceiveTimeout, Failed),
producing the next Behavior[T] as the result each time
* the ask pattern is provided and yields properly typed Futures
* variants of ActorContext are provided for synchronous testing of
Behaviors
All of this is implemented without touching code outside akka-typed
(apart from making guardianProps configurable), creating wrapper objects
around ActorRef, ActorContext, ActorSystem, Props and providing an Actor
implementation that just runs a Behavior.
2015-01-28 20:45:21 +01:00
new AskTimeoutException ( s""" Ask timed out on [ $targetName ] after [ ${ timeout . duration . toMillis } ms]. Sender[ $sender ] sent message of type " ${ a . messageClassName } " . """ ) )
2013-10-16 14:47:17 +02:00
}
2012-07-04 15:25:30 +02:00
result . future onComplete { _ ⇒ try a . stop ( ) finally f . cancel ( ) }
2012-01-18 11:52:35 +01:00
a
}
2015-04-30 09:23:18 +02:00
@deprecated ( "Use apply with messageClassName and sender parameters" , "2.4" )
def apply ( provider : ActorRefProvider , timeout : Timeout , targetName : String ) : PromiseActorRef =
apply ( provider , timeout , targetName , "unknown" , Actor . noSender )
2013-01-09 01:47:48 +01:00
}