Merge branch 'wip-2362-ActorDSL-∂π'
This commit is contained in:
commit
daefaaa75d
8 changed files with 723 additions and 10 deletions
194
akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala
Normal file
194
akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.testkit.{ AkkaSpec, EventFilter }
|
||||
import ActorDSL._
|
||||
import akka.event.Logging.Warning
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.util.duration._
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
class ActorDSLSpec extends AkkaSpec {
|
||||
|
||||
val echo = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ sender ! x
|
||||
}
|
||||
}))
|
||||
|
||||
"An Inbox" must {
|
||||
|
||||
"function as implicit sender" in {
|
||||
implicit val i = inbox()
|
||||
echo ! "hello"
|
||||
i.receive() must be("hello")
|
||||
}
|
||||
|
||||
"support queueing multiple queries" in {
|
||||
val i = inbox()
|
||||
import system.dispatcher
|
||||
val res = Future.sequence(Seq(
|
||||
Future { i.receive() } recover { case x ⇒ x },
|
||||
Future { Thread.sleep(100); i.select() { case "world" ⇒ 1 } } recover { case x ⇒ x },
|
||||
Future { Thread.sleep(200); i.select() { case "hello" ⇒ 2 } } recover { case x ⇒ x }))
|
||||
Thread.sleep(1000)
|
||||
res.isCompleted must be(false)
|
||||
i.receiver ! 42
|
||||
i.receiver ! "hello"
|
||||
i.receiver ! "world"
|
||||
Await.result(res, 5 second) must be(Seq(42, 1, 2))
|
||||
}
|
||||
|
||||
"support selective receives" in {
|
||||
val i = inbox()
|
||||
i.receiver ! "hello"
|
||||
i.receiver ! "world"
|
||||
val result = i.select() {
|
||||
case "world" ⇒ true
|
||||
}
|
||||
result must be(true)
|
||||
i.receive() must be("hello")
|
||||
}
|
||||
|
||||
"have a maximum queue size" in {
|
||||
val i = inbox()
|
||||
system.eventStream.subscribe(testActor, classOf[Warning])
|
||||
for (_ ← 1 to 1000) i.receiver ! 0
|
||||
expectNoMsg(1 second)
|
||||
EventFilter.warning(start = "dropping message", occurrences = 1) intercept {
|
||||
i.receiver ! 42
|
||||
}
|
||||
expectMsgType[Warning]
|
||||
i.receiver ! 42
|
||||
expectNoMsg(1 second)
|
||||
val gotit = for (_ ← 1 to 1000) yield i.receive()
|
||||
gotit must be((1 to 1000) map (_ ⇒ 0))
|
||||
intercept[TimeoutException] {
|
||||
i.receive(1 second)
|
||||
}
|
||||
}
|
||||
|
||||
"have a default and custom timeouts" in {
|
||||
val i = inbox()
|
||||
within(5 seconds, 6 seconds) {
|
||||
intercept[TimeoutException](i.receive())
|
||||
}
|
||||
within(1 second) {
|
||||
intercept[TimeoutException](i.receive(100 millis))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"A lightweight creator" must {
|
||||
|
||||
"support creating regular actors" in {
|
||||
val a = actor(new Act {
|
||||
become {
|
||||
case "hello" ⇒ sender ! "hi"
|
||||
}
|
||||
})
|
||||
|
||||
implicit val i = inbox()
|
||||
a ! "hello"
|
||||
i.receive() must be("hi")
|
||||
}
|
||||
|
||||
"support setup/teardown" in {
|
||||
val a = actor(new Act {
|
||||
whenStarting { testActor ! "started" }
|
||||
whenStopping { testActor ! "stopped" }
|
||||
})
|
||||
|
||||
system stop a
|
||||
expectMsg("started")
|
||||
expectMsg("stopped")
|
||||
}
|
||||
|
||||
"support restart" in {
|
||||
val a = actor(new Act {
|
||||
become {
|
||||
case "die" ⇒ throw new Exception
|
||||
}
|
||||
whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) }
|
||||
whenRestarted { cause ⇒ testActor ! cause }
|
||||
})
|
||||
|
||||
EventFilter[Exception](occurrences = 1) intercept {
|
||||
a ! "die"
|
||||
}
|
||||
expectMsgPF() { case (x: Exception, Some("die")) ⇒ }
|
||||
expectMsgPF() { case _: Exception ⇒ }
|
||||
}
|
||||
|
||||
"support superviseWith" in {
|
||||
val a = actor(new Act {
|
||||
val system = null // shadow the implicit system
|
||||
superviseWith(OneForOneStrategy() {
|
||||
case e: Exception if e.getMessage == "hello" ⇒ SupervisorStrategy.Stop
|
||||
case _: Exception ⇒ SupervisorStrategy.Resume
|
||||
})
|
||||
val child = actor("child")(new Act {
|
||||
whenFailing { (_, _) ⇒ }
|
||||
become {
|
||||
case ref: ActorRef ⇒ whenStopping(ref ! "stopped")
|
||||
case ex: Exception ⇒ throw ex
|
||||
}
|
||||
})
|
||||
become {
|
||||
case x ⇒ child ! x
|
||||
}
|
||||
})
|
||||
a ! testActor
|
||||
EventFilter[Exception](occurrences = 1) intercept {
|
||||
a ! new Exception
|
||||
}
|
||||
expectNoMsg(1 second)
|
||||
EventFilter[Exception]("hello", occurrences = 1) intercept {
|
||||
a ! new Exception("hello")
|
||||
}
|
||||
expectMsg("stopped")
|
||||
}
|
||||
|
||||
"supported nested declaration" in {
|
||||
val system = this.system
|
||||
val a = actor(system, "fred")(new Act {
|
||||
val b = actor("barney")(new Act {
|
||||
whenStarting { context.parent ! s"hello from $self" }
|
||||
})
|
||||
become {
|
||||
case x ⇒ testActor ! x
|
||||
}
|
||||
})
|
||||
expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]")
|
||||
lastSender must be(a)
|
||||
}
|
||||
|
||||
"support Stash" in {
|
||||
val a = actor(new ActWithStash {
|
||||
become {
|
||||
case 1 ⇒ stash()
|
||||
case 2 ⇒
|
||||
testActor ! 2; unstashAll(); become {
|
||||
case 1 ⇒ testActor ! 1; unbecome()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
a ! 1
|
||||
a ! 2
|
||||
expectMsg(2)
|
||||
expectMsg(1)
|
||||
a ! 1
|
||||
a ! 2
|
||||
expectMsg(2)
|
||||
expectMsg(1)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -157,6 +157,11 @@ akka {
|
|||
}
|
||||
}
|
||||
|
||||
# Default dispatcher for Actors that extend Stash
|
||||
default-stash-dispatcher {
|
||||
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
|
||||
}
|
||||
|
||||
default-dispatcher {
|
||||
# Must be one of the following
|
||||
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of
|
||||
|
|
@ -302,6 +307,16 @@ akka {
|
|||
serialization-bindings {
|
||||
"java.io.Serializable" = java
|
||||
}
|
||||
|
||||
# Configuration items which are used by the akka.actor.ActorDSL._ methods
|
||||
dsl {
|
||||
# Maximum queue size of the actor created by newInbox(); this protects against
|
||||
# faulty programs which use select() and consistently miss messages
|
||||
inbox-size = 1000
|
||||
|
||||
# Default timeout to assume for operations like Inbox.receive et al
|
||||
default-timeout = 5s
|
||||
}
|
||||
}
|
||||
|
||||
# Used to set the behavior of the scheduler.
|
||||
|
|
|
|||
103
akka-actor/src/main/scala/akka/actor/ActorDSL.scala
Normal file
103
akka-actor/src/main/scala/akka/actor/ActorDSL.scala
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import scala.collection.mutable.Queue
|
||||
import scala.concurrent.util.Duration
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.pattern.ask
|
||||
import scala.concurrent.Await
|
||||
import akka.util.Timeout
|
||||
import scala.collection.immutable.TreeSet
|
||||
import scala.concurrent.util.Deadline
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* This object contains elements which make writing actors and related code
|
||||
* more concise, e.g. when trying out actors in the REPL.
|
||||
*
|
||||
* For the communication of non-actor code with actors, you may use anonymous
|
||||
* actors tailored to this job:
|
||||
*
|
||||
* {{{
|
||||
* import ActorDSL._
|
||||
* import concurrent.util.duration._
|
||||
*
|
||||
* implicit val system: ActorSystem = ...
|
||||
*
|
||||
* implicit val i = inbox()
|
||||
* someActor ! someMsg // replies will go to `i`
|
||||
*
|
||||
* val reply = i.receive()
|
||||
* val transformedReply = i.select(5 seconds) {
|
||||
* case x: Int => 2 * x
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* The `receive` and `select` methods are synchronous, i.e. they block the
|
||||
* calling thread until an answer from the actor is received or the timeout
|
||||
* expires. The default timeout is taken from configuration item
|
||||
* `akka.actor.dsl.default-timeout`.
|
||||
*
|
||||
* When defining actors in the REPL, say, you may want to have a look at the
|
||||
* `Act` trait:
|
||||
*
|
||||
* {{{
|
||||
* import ActorDSL._
|
||||
*
|
||||
* val system: ActorSystem = ...
|
||||
*
|
||||
* val a = actor(system, "fred")(new Act {
|
||||
* val b = actor("barney")(new Act {
|
||||
* ...
|
||||
* })
|
||||
*
|
||||
* become {
|
||||
* case msg => ...
|
||||
* }
|
||||
* })
|
||||
* }}}
|
||||
*
|
||||
* Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]]
|
||||
* as shown with `"barney"` (where the [[akka.actor.ActorContext serves this
|
||||
* purpose), but since nested declarations share the same
|
||||
* lexical context `"fred"`’s ActorContext would be ambiguous
|
||||
* if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also
|
||||
* be circumvented by shadowing the name `system` within `"fred"`).
|
||||
*
|
||||
* <b>Note:</b> If you want to use an `Act with Stash`, you should use the
|
||||
* `ActWithStash` trait in order to have the actor run on a special dispatcher
|
||||
* (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based
|
||||
* mailbox setting.
|
||||
*/
|
||||
object ActorDSL extends dsl.Inbox with dsl.Creators {
|
||||
|
||||
protected object Extension extends ExtensionKey[Extension]
|
||||
|
||||
protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension {
|
||||
|
||||
val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef]
|
||||
|
||||
{
|
||||
val timeout = system.settings.CreationTimeout.duration
|
||||
val deadline = Deadline.now + timeout
|
||||
while (!boss.isStarted) {
|
||||
if (deadline.hasTimeLeft)
|
||||
if (system.isTerminated) throw new IllegalStateException("actor system is already shutdown")
|
||||
else Thread.sleep(10)
|
||||
else throw new TimeoutException("failed to create /system/dsl actor within " + timeout)
|
||||
}
|
||||
}
|
||||
|
||||
lazy val config = system.settings.config.getConfig("akka.actor.dsl")
|
||||
|
||||
val DSLDefaultTimeout = Duration(config.getMilliseconds("default-timeout"), TimeUnit.MILLISECONDS)
|
||||
|
||||
def mkChild(p: Props, name: String) = boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -502,11 +502,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
|
||||
protected def systemImpl: ActorSystemImpl = this
|
||||
|
||||
private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name)
|
||||
private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true)
|
||||
|
||||
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name)
|
||||
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false)
|
||||
|
||||
def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props)
|
||||
def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props, systemService = false)
|
||||
|
||||
def stop(actor: ActorRef): Unit = {
|
||||
val path = actor.path
|
||||
|
|
|
|||
|
|
@ -26,10 +26,14 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
final def children: Iterable[ActorRef] = childrenRefs.children
|
||||
final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava
|
||||
|
||||
def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false)
|
||||
def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false)
|
||||
private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true)
|
||||
private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true)
|
||||
def actorOf(props: Props): ActorRef =
|
||||
makeChild(this, props, randomName(), async = false, systemService = false)
|
||||
def actorOf(props: Props, name: String): ActorRef =
|
||||
makeChild(this, props, checkName(name), async = false, systemService = false)
|
||||
private[akka] def attachChild(props: Props, systemService: Boolean): ActorRef =
|
||||
makeChild(this, props, randomName(), async = true, systemService = systemService)
|
||||
private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef =
|
||||
makeChild(this, props, checkName(name), async = true, systemService = systemService)
|
||||
|
||||
@volatile private var _nextNameDoNotCallMeDirectly = 0L
|
||||
final protected def randomName(): String = {
|
||||
|
|
@ -163,7 +167,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
}
|
||||
}
|
||||
|
||||
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = {
|
||||
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
|
||||
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
|
||||
val ser = SerializationExtension(cell.system)
|
||||
ser.serialize(props.creator) match {
|
||||
|
|
@ -185,7 +189,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
val actor =
|
||||
try {
|
||||
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
|
||||
systemService = false, deploy = None, lookupDeploy = true, async = async)
|
||||
systemService = systemService, deploy = None, lookupDeploy = true, async = async)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
unreserveChild(name)
|
||||
|
|
|
|||
192
akka-actor/src/main/scala/akka/actor/dsl/Creators.scala
Normal file
192
akka-actor/src/main/scala/akka/actor/dsl/Creators.scala
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor.dsl
|
||||
|
||||
import scala.concurrent.Await
|
||||
import akka.actor.ActorLogging
|
||||
import scala.concurrent.util.Deadline
|
||||
import scala.collection.immutable.TreeSet
|
||||
import scala.concurrent.util.{ Duration, FiniteDuration }
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.{ Actor, Stash, SupervisorStrategy }
|
||||
import scala.collection.mutable.Queue
|
||||
import akka.actor.{ ActorSystem, ActorRefFactory }
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.Timeout
|
||||
import akka.actor.Status
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.pattern.ask
|
||||
import akka.actor.ActorDSL
|
||||
import akka.actor.Props
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
trait Creators { this: ActorDSL.type ⇒
|
||||
|
||||
/**
|
||||
* This trait provides a DSL for writing the inner workings of an actor, e.g.
|
||||
* for quickly trying things out in the REPL. It makes the following keywords
|
||||
* available:
|
||||
*
|
||||
* - `become` mapped to `context.become(_, discardOld = false)`
|
||||
*
|
||||
* - `unbecome` mapped to `context.unbecome`
|
||||
*
|
||||
* - `setup` for implementing `preStart()`
|
||||
*
|
||||
* - `whenFailing` for implementing `preRestart()`
|
||||
*
|
||||
* - `whenRestarted` for implementing `postRestart()`
|
||||
*
|
||||
* - `teardown` for implementing `postStop`
|
||||
*
|
||||
* Using the life-cycle keywords multiple times results in replacing the
|
||||
* content of the respective hook.
|
||||
*/
|
||||
trait Act extends Actor {
|
||||
|
||||
private[this] var preStartFun: () ⇒ Unit = null
|
||||
private[this] var postStopFun: () ⇒ Unit = null
|
||||
private[this] var preRestartFun: (Throwable, Option[Any]) ⇒ Unit = null
|
||||
private[this] var postRestartFun: Throwable ⇒ Unit = null
|
||||
private[this] var strategy: SupervisorStrategy = null
|
||||
|
||||
/**
|
||||
* Add the given behavior on top of the behavior stack for this actor. This
|
||||
* stack is cleared upon restart. Use `unbecome()` to pop an element off
|
||||
* this stack.
|
||||
*/
|
||||
def become(r: Receive) = context.become(r, discardOld = false)
|
||||
|
||||
/**
|
||||
* Pop the active behavior from the behavior stack of this actor. This stack
|
||||
* is cleared upon restart.
|
||||
*/
|
||||
def unbecome(): Unit = context.unbecome()
|
||||
|
||||
/**
|
||||
* Set the supervisor strategy of this actor, i.e. how it supervises its children.
|
||||
*/
|
||||
def superviseWith(s: SupervisorStrategy): Unit = strategy = s
|
||||
|
||||
/**
|
||||
* Replace the `preStart` action with the supplied thunk. Default action
|
||||
* is to call `super.preStart()`
|
||||
*/
|
||||
def whenStarting(body: ⇒ Unit): Unit = preStartFun = () ⇒ body
|
||||
|
||||
/**
|
||||
* Replace the `preRestart` action with the supplied function. Default
|
||||
* action is to call `super.preRestart()`, which will kill all children
|
||||
* and invoke `postStop()`.
|
||||
*/
|
||||
def whenFailing(body: (Throwable, Option[Any]) ⇒ Unit): Unit = preRestartFun = body
|
||||
|
||||
/**
|
||||
* Replace the `postRestart` action with the supplied function. Default
|
||||
* action is to call `super.postRestart` which will call `preStart()`.
|
||||
*/
|
||||
def whenRestarted(body: Throwable ⇒ Unit): Unit = postRestartFun = body
|
||||
|
||||
/**
|
||||
* Replace the `postStop` action with the supplied thunk. Default action
|
||||
* is to call `super.postStop`.
|
||||
*/
|
||||
def whenStopping(body: ⇒ Unit): Unit = postStopFun = () ⇒ body
|
||||
|
||||
override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart()
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg)
|
||||
override def postRestart(cause: Throwable): Unit = if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause)
|
||||
override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop()
|
||||
override def supervisorStrategy: SupervisorStrategy = if (strategy != null) strategy else super.supervisorStrategy
|
||||
|
||||
/**
|
||||
* Default behavior of the actor is empty, use `become` to change this.
|
||||
*/
|
||||
override def receive: Receive = Actor.emptyBehavior
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this trait when defining an [[akka.actor.Actor]] with [[akka.actor.Stash]],
|
||||
* since just using `actor()(new Act with Stash{})` will not be able to see the
|
||||
* Stash component due to type erasure.
|
||||
*/
|
||||
trait ActWithStash extends Act with Stash
|
||||
|
||||
private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props =
|
||||
if (classOf[Stash].isAssignableFrom(classOfActor))
|
||||
Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher")
|
||||
else
|
||||
Props(creator = ctor)
|
||||
|
||||
/**
|
||||
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
|
||||
*
|
||||
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
|
||||
* factory; <b>do not make the generated object accessible to code
|
||||
* outside and do not return the same object upon subsequent invocations.</b>
|
||||
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
|
||||
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
|
||||
* where the latter is always implicitly available within an [[akka.actor.Actor]].
|
||||
*/
|
||||
def actor[T <: Actor: ClassTag](ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = {
|
||||
// configure dispatcher/mailbox based on runtime class
|
||||
val classOfActor = implicitly[ClassTag[T]].runtimeClass
|
||||
val props = mkProps(classOfActor, () ⇒ ctor)
|
||||
factory.actorOf(props)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
|
||||
*
|
||||
* @param name is the name, which must be unique within the context of its
|
||||
* parent.
|
||||
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
|
||||
* factory; <b>do not make the generated object accessible to code
|
||||
* outside and do not return the same object upon subsequent invocations.</b>
|
||||
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
|
||||
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
|
||||
* where the latter is always implicitly available within an [[akka.actor.Actor]].
|
||||
*/
|
||||
def actor[T <: Actor: ClassTag](name: String)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = {
|
||||
// configure dispatcher/mailbox based on runtime class
|
||||
val classOfActor = implicitly[ClassTag[T]].runtimeClass
|
||||
val props = mkProps(classOfActor, () ⇒ ctor)
|
||||
|
||||
if (name == null) factory.actorOf(props)
|
||||
else factory.actorOf(props, name)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an actor from the given thunk which must produce an [[akka.actor.Actor]].
|
||||
*
|
||||
* @param name is the name, which must be unique within the context of its
|
||||
* parent; defaults to `null` which will assign a name automatically.
|
||||
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
|
||||
* factory; <b>do not make the generated object accessible to code
|
||||
* outside and do not return the same object upon subsequent invocations.</b>
|
||||
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
|
||||
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
|
||||
* where the latter is always implicitly available within an [[akka.actor.Actor]].
|
||||
*/
|
||||
def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: ⇒ T): ActorRef =
|
||||
actor(name)(ctor)(implicitly[ClassTag[T]], factory)
|
||||
|
||||
/**
|
||||
* Create an actor with an automatically generated name from the given thunk
|
||||
* which must produce an [[akka.actor.Actor]].
|
||||
*
|
||||
* @param ctor is a by-name argument which captures an [[akka.actor.Actor]]
|
||||
* factory; <b>do not make the generated object accessible to code
|
||||
* outside and do not return the same object upon subsequent invocations.</b>
|
||||
* @param factory is an implicit [[akka.actor.ActorRefFactory]], which can
|
||||
* either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]],
|
||||
* where the latter is always implicitly available within an [[akka.actor.Actor]].
|
||||
*/
|
||||
def actor[T <: Actor: ClassTag](factory: ActorRefFactory)(ctor: ⇒ T): ActorRef =
|
||||
actor(null: String)(ctor)(implicitly[ClassTag[T]], factory)
|
||||
|
||||
}
|
||||
203
akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala
Normal file
203
akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala
Normal file
|
|
@ -0,0 +1,203 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor.dsl
|
||||
|
||||
import scala.concurrent.Await
|
||||
import akka.actor.ActorLogging
|
||||
import scala.concurrent.util.Deadline
|
||||
import scala.collection.immutable.TreeSet
|
||||
import scala.concurrent.util.{ Duration, FiniteDuration }
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.Actor
|
||||
import scala.collection.mutable.Queue
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.Timeout
|
||||
import akka.actor.Status
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.pattern.ask
|
||||
import akka.actor.ActorDSL
|
||||
import akka.actor.Props
|
||||
|
||||
trait Inbox { this: ActorDSL.type ⇒
|
||||
|
||||
protected trait InboxExtension { this: Extension ⇒
|
||||
val DSLInboxQueueSize = config.getInt("inbox-size")
|
||||
|
||||
val inboxNr = new AtomicInteger
|
||||
val inboxProps = Props(new InboxActor(DSLInboxQueueSize))
|
||||
|
||||
def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet)
|
||||
}
|
||||
|
||||
private sealed trait Query {
|
||||
def deadline: Deadline
|
||||
def withClient(c: ActorRef): Query
|
||||
def client: ActorRef
|
||||
}
|
||||
private case class Get(deadline: Deadline, client: ActorRef = null) extends Query {
|
||||
def withClient(c: ActorRef) = copy(client = c)
|
||||
}
|
||||
private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query {
|
||||
def withClient(c: ActorRef) = copy(client = c)
|
||||
}
|
||||
private case object Kick
|
||||
private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] {
|
||||
def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time
|
||||
}
|
||||
|
||||
private class InboxActor(size: Int) extends Actor with ActorLogging {
|
||||
var clients = Queue.empty[Query]
|
||||
val messages = Queue.empty[Any]
|
||||
var clientsByTimeout = TreeSet.empty[Query]
|
||||
var printedWarning = false
|
||||
|
||||
def enqueueQuery(q: Query) {
|
||||
val query = q withClient sender
|
||||
clients enqueue query
|
||||
clientsByTimeout += query
|
||||
}
|
||||
|
||||
def enqueueMessage(msg: Any) {
|
||||
if (messages.size < size) messages enqueue msg
|
||||
else {
|
||||
if (!printedWarning) {
|
||||
log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size)
|
||||
printedWarning = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var currentMsg: Any = _
|
||||
val clientPredicate: (Query) ⇒ Boolean = {
|
||||
case _: Get ⇒ true
|
||||
case Select(_, p, _) ⇒ p isDefinedAt currentMsg
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
var currentSelect: Select = _
|
||||
val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg)
|
||||
|
||||
var currentDeadline: Option[(Deadline, Cancellable)] = None
|
||||
|
||||
def receive = ({
|
||||
case g: Get ⇒
|
||||
if (messages.isEmpty) enqueueQuery(g)
|
||||
else sender ! messages.dequeue()
|
||||
case s @ Select(_, predicate, _) ⇒
|
||||
if (messages.isEmpty) enqueueQuery(s)
|
||||
else {
|
||||
currentSelect = s
|
||||
messages.dequeueFirst(messagePredicate) match {
|
||||
case Some(msg) ⇒ sender ! msg
|
||||
case None ⇒ enqueueQuery(s)
|
||||
}
|
||||
currentSelect = null
|
||||
}
|
||||
case Kick ⇒
|
||||
val now = Deadline.now
|
||||
val pred = (q: Query) ⇒ q.deadline.time < now.time
|
||||
val overdue = clientsByTimeout.iterator.takeWhile(pred)
|
||||
while (overdue.hasNext) {
|
||||
val toKick = overdue.next()
|
||||
toKick.client ! Status.Failure(new TimeoutException("deadline passed"))
|
||||
}
|
||||
// TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed
|
||||
clients = Queue.empty ++= clients.filterNot(pred)
|
||||
clientsByTimeout = clientsByTimeout.from(Get(now))
|
||||
case msg ⇒
|
||||
if (clients.isEmpty) enqueueMessage(msg)
|
||||
else {
|
||||
currentMsg = msg
|
||||
clients.dequeueFirst(clientPredicate) match {
|
||||
case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg
|
||||
case None ⇒ enqueueMessage(msg)
|
||||
}
|
||||
currentMsg = null
|
||||
}
|
||||
}: Receive) andThen { _ ⇒
|
||||
if (clients.isEmpty) {
|
||||
if (currentDeadline.isDefined) {
|
||||
currentDeadline.get._2.cancel()
|
||||
currentDeadline = None
|
||||
}
|
||||
} else {
|
||||
val next = clientsByTimeout.head.deadline
|
||||
import context.dispatcher
|
||||
if (currentDeadline.isEmpty) {
|
||||
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
|
||||
} else if (currentDeadline.get._1 != next) {
|
||||
currentDeadline.get._2.cancel()
|
||||
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* make sure that AskTimeout does not accidentally mess up message reception
|
||||
* by adding this extra time to the real timeout
|
||||
*/
|
||||
private val extraTime = 1.minute
|
||||
|
||||
/**
|
||||
* Create a new actor which will internally queue up messages it gets so that
|
||||
* they can be interrogated with the [[akka.actor.dsl.Inbox!.Inbox!.receive]]
|
||||
* and [[akka.actor.dsl.Inbox!.Inbox!.select]] methods. It will be created as
|
||||
* a system actor in the ActorSystem which is implicitly (or explicitly)
|
||||
* supplied.
|
||||
*/
|
||||
def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system)
|
||||
|
||||
class Inbox(system: ActorSystem) {
|
||||
|
||||
val receiver: ActorRef = Extension(system).newReceiver
|
||||
private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout
|
||||
|
||||
/**
|
||||
* Receive a single message from the internal `receiver` actor. The supplied
|
||||
* timeout is used for cleanup purposes and its precision is subject to the
|
||||
* resolution of the system’s scheduler (usually 100ms, but configurable).
|
||||
*
|
||||
* <b>Warning:</b> This method blocks the current thread until a message is
|
||||
* received, thus it can introduce dead-locks (directly as well as
|
||||
* indirectly by causing starvation of the thread pool). <b>Do not use
|
||||
* this method within an actor!</b>
|
||||
*/
|
||||
def receive(timeout: FiniteDuration = defaultTimeout): Any = {
|
||||
implicit val t = Timeout(timeout + extraTime)
|
||||
Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf)
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive a single message for which the given partial function is defined
|
||||
* and return the transformed result, using the internal `receiver` actor.
|
||||
* The supplied timeout is used for cleanup purposes and its precision is
|
||||
* subject to the resolution of the system’s scheduler (usually 100ms, but
|
||||
* configurable).
|
||||
*
|
||||
* <b>Warning:</b> This method blocks the current thread until a message is
|
||||
* received, thus it can introduce dead-locks (directly as well as
|
||||
* indirectly by causing starvation of the thread pool). <b>Do not use
|
||||
* this method within an actor!</b>
|
||||
*/
|
||||
def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = {
|
||||
implicit val t = Timeout(timeout + extraTime)
|
||||
predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf))
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridden finalizer which will try to stop the actor once this Inbox
|
||||
* is no longer referenced.
|
||||
*/
|
||||
override def finalize() {
|
||||
system.stop(receiver)
|
||||
}
|
||||
}
|
||||
|
||||
implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver
|
||||
}
|
||||
|
|
@ -40,13 +40,15 @@ object AkkaBuild extends Build {
|
|||
initialCommands in ThisBuild :=
|
||||
"""|import language.postfixOps
|
||||
|import akka.actor._
|
||||
|import akka.dispatch._
|
||||
|import ActorDSL._
|
||||
|import scala.concurrent._
|
||||
|import com.typesafe.config.ConfigFactory
|
||||
|import scala.concurrent.util.duration._
|
||||
|import akka.util.Timeout
|
||||
|val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG")
|
||||
|val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config)
|
||||
|var system: ActorSystem = null
|
||||
|implicit def _system = system
|
||||
|def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") }
|
||||
|implicit def ec = system.dispatcher
|
||||
|implicit val timeout = Timeout(5 seconds)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue