diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala new file mode 100644 index 0000000000..df7574318b --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor + +import language.postfixOps + +import akka.testkit.{ AkkaSpec, EventFilter } +import ActorDSL._ +import akka.event.Logging.Warning +import scala.concurrent.{ Await, Future } +import scala.concurrent.util.duration._ +import java.util.concurrent.TimeoutException + +class ActorDSLSpec extends AkkaSpec { + + val echo = system.actorOf(Props(new Actor { + def receive = { + case x ⇒ sender ! x + } + })) + + "An Inbox" must { + + "function as implicit sender" in { + implicit val i = inbox() + echo ! "hello" + i.receive() must be("hello") + } + + "support queueing multiple queries" in { + val i = inbox() + import system.dispatcher + val res = Future.sequence(Seq( + Future { i.receive() } recover { case x ⇒ x }, + Future { Thread.sleep(100); i.select() { case "world" ⇒ 1 } } recover { case x ⇒ x }, + Future { Thread.sleep(200); i.select() { case "hello" ⇒ 2 } } recover { case x ⇒ x })) + Thread.sleep(1000) + res.isCompleted must be(false) + i.receiver ! 42 + i.receiver ! "hello" + i.receiver ! "world" + Await.result(res, 5 second) must be(Seq(42, 1, 2)) + } + + "support selective receives" in { + val i = inbox() + i.receiver ! "hello" + i.receiver ! "world" + val result = i.select() { + case "world" ⇒ true + } + result must be(true) + i.receive() must be("hello") + } + + "have a maximum queue size" in { + val i = inbox() + system.eventStream.subscribe(testActor, classOf[Warning]) + for (_ ← 1 to 1000) i.receiver ! 0 + expectNoMsg(1 second) + EventFilter.warning(start = "dropping message", occurrences = 1) intercept { + i.receiver ! 42 + } + expectMsgType[Warning] + i.receiver ! 42 + expectNoMsg(1 second) + val gotit = for (_ ← 1 to 1000) yield i.receive() + gotit must be((1 to 1000) map (_ ⇒ 0)) + intercept[TimeoutException] { + i.receive(1 second) + } + } + + "have a default and custom timeouts" in { + val i = inbox() + within(5 seconds, 6 seconds) { + intercept[TimeoutException](i.receive()) + } + within(1 second) { + intercept[TimeoutException](i.receive(100 millis)) + } + } + + } + + "A lightweight creator" must { + + "support creating regular actors" in { + val a = actor(new Act { + become { + case "hello" ⇒ sender ! "hi" + } + }) + + implicit val i = inbox() + a ! "hello" + i.receive() must be("hi") + } + + "support setup/teardown" in { + val a = actor(new Act { + whenStarting { testActor ! "started" } + whenStopping { testActor ! "stopped" } + }) + + system stop a + expectMsg("started") + expectMsg("stopped") + } + + "support restart" in { + val a = actor(new Act { + become { + case "die" ⇒ throw new Exception + } + whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) } + whenRestarted { cause ⇒ testActor ! cause } + }) + + EventFilter[Exception](occurrences = 1) intercept { + a ! "die" + } + expectMsgPF() { case (x: Exception, Some("die")) ⇒ } + expectMsgPF() { case _: Exception ⇒ } + } + + "support superviseWith" in { + val a = actor(new Act { + val system = null // shadow the implicit system + superviseWith(OneForOneStrategy() { + case e: Exception if e.getMessage == "hello" ⇒ SupervisorStrategy.Stop + case _: Exception ⇒ SupervisorStrategy.Resume + }) + val child = actor("child")(new Act { + whenFailing { (_, _) ⇒ } + become { + case ref: ActorRef ⇒ whenStopping(ref ! "stopped") + case ex: Exception ⇒ throw ex + } + }) + become { + case x ⇒ child ! x + } + }) + a ! testActor + EventFilter[Exception](occurrences = 1) intercept { + a ! new Exception + } + expectNoMsg(1 second) + EventFilter[Exception]("hello", occurrences = 1) intercept { + a ! new Exception("hello") + } + expectMsg("stopped") + } + + "supported nested declaration" in { + val system = this.system + val a = actor(system, "fred")(new Act { + val b = actor("barney")(new Act { + whenStarting { context.parent ! s"hello from $self" } + }) + become { + case x ⇒ testActor ! x + } + }) + expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") + lastSender must be(a) + } + + "support Stash" in { + val a = actor(new ActWithStash { + become { + case 1 ⇒ stash() + case 2 ⇒ + testActor ! 2; unstashAll(); become { + case 1 ⇒ testActor ! 1; unbecome() + } + } + }) + + a ! 1 + a ! 2 + expectMsg(2) + expectMsg(1) + a ! 1 + a ! 2 + expectMsg(2) + expectMsg(1) + } + + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 723347ce98..459c9bd216 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -157,6 +157,11 @@ akka { } } + # Default dispatcher for Actors that extend Stash + default-stash-dispatcher { + mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + } + default-dispatcher { # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of @@ -302,6 +307,16 @@ akka { serialization-bindings { "java.io.Serializable" = java } + + # Configuration items which are used by the akka.actor.ActorDSL._ methods + dsl { + # Maximum queue size of the actor created by newInbox(); this protects against + # faulty programs which use select() and consistently miss messages + inbox-size = 1000 + + # Default timeout to assume for operations like Inbox.receive et al + default-timeout = 5s + } } # Used to set the behavior of the scheduler. diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala new file mode 100644 index 0000000000..196c794f27 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor + +import scala.collection.mutable.Queue +import scala.concurrent.util.Duration +import scala.concurrent.util.duration._ +import akka.pattern.ask +import scala.concurrent.Await +import akka.util.Timeout +import scala.collection.immutable.TreeSet +import scala.concurrent.util.Deadline +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.TimeUnit + +/** + * This object contains elements which make writing actors and related code + * more concise, e.g. when trying out actors in the REPL. + * + * For the communication of non-actor code with actors, you may use anonymous + * actors tailored to this job: + * + * {{{ + * import ActorDSL._ + * import concurrent.util.duration._ + * + * implicit val system: ActorSystem = ... + * + * implicit val i = inbox() + * someActor ! someMsg // replies will go to `i` + * + * val reply = i.receive() + * val transformedReply = i.select(5 seconds) { + * case x: Int => 2 * x + * } + * }}} + * + * The `receive` and `select` methods are synchronous, i.e. they block the + * calling thread until an answer from the actor is received or the timeout + * expires. The default timeout is taken from configuration item + * `akka.actor.dsl.default-timeout`. + * + * When defining actors in the REPL, say, you may want to have a look at the + * `Act` trait: + * + * {{{ + * import ActorDSL._ + * + * val system: ActorSystem = ... + * + * val a = actor(system, "fred")(new Act { + * val b = actor("barney")(new Act { + * ... + * }) + * + * become { + * case msg => ... + * } + * }) + * }}} + * + * Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]] + * as shown with `"barney"` (where the [[akka.actor.ActorContext serves this + * purpose), but since nested declarations share the same + * lexical context `"fred"`’s ActorContext would be ambiguous + * if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also + * be circumvented by shadowing the name `system` within `"fred"`). + * + * Note: If you want to use an `Act with Stash`, you should use the + * `ActWithStash` trait in order to have the actor run on a special dispatcher + * (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based + * mailbox setting. + */ +object ActorDSL extends dsl.Inbox with dsl.Creators { + + protected object Extension extends ExtensionKey[Extension] + + protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { + + val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef] + + { + val timeout = system.settings.CreationTimeout.duration + val deadline = Deadline.now + timeout + while (!boss.isStarted) { + if (deadline.hasTimeLeft) + if (system.isTerminated) throw new IllegalStateException("actor system is already shutdown") + else Thread.sleep(10) + else throw new TimeoutException("failed to create /system/dsl actor within " + timeout) + } + } + + lazy val config = system.settings.config.getConfig("akka.actor.dsl") + + val DSLDefaultTimeout = Duration(config.getMilliseconds("default-timeout"), TimeUnit.MILLISECONDS) + + def mkChild(p: Props, name: String) = boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true) + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d10f7ba29c..82fe2000b4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -502,11 +502,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, protected def systemImpl: ActorSystemImpl = this - private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name) + private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true) - def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name) + def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false) - def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props) + def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props, systemService = false) def stop(actor: ActorRef): Unit = { val path = actor.path diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index b2761519fe..89c506fd75 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -26,10 +26,14 @@ private[akka] trait Children { this: ActorCell ⇒ final def children: Iterable[ActorRef] = childrenRefs.children final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava - def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false) - def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false) - private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true) - private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true) + def actorOf(props: Props): ActorRef = + makeChild(this, props, randomName(), async = false, systemService = false) + def actorOf(props: Props, name: String): ActorRef = + makeChild(this, props, checkName(name), async = false, systemService = false) + private[akka] def attachChild(props: Props, systemService: Boolean): ActorRef = + makeChild(this, props, randomName(), async = true, systemService = systemService) + private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef = + makeChild(this, props, checkName(name), async = true, systemService = systemService) @volatile private var _nextNameDoNotCallMeDirectly = 0L final protected def randomName(): String = { @@ -163,7 +167,7 @@ private[akka] trait Children { this: ActorCell ⇒ } } - private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = { + private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(cell.system) ser.serialize(props.creator) match { @@ -185,7 +189,7 @@ private[akka] trait Children { this: ActorCell ⇒ val actor = try { cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, - systemService = false, deploy = None, lookupDeploy = true, async = async) + systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { case NonFatal(e) ⇒ unreserveChild(name) diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala new file mode 100644 index 0000000000..b81f733013 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.dsl + +import scala.concurrent.Await +import akka.actor.ActorLogging +import scala.concurrent.util.Deadline +import scala.collection.immutable.TreeSet +import scala.concurrent.util.{ Duration, FiniteDuration } +import scala.concurrent.util.duration._ +import akka.actor.Cancellable +import akka.actor.{ Actor, Stash, SupervisorStrategy } +import scala.collection.mutable.Queue +import akka.actor.{ ActorSystem, ActorRefFactory } +import akka.actor.ActorRef +import akka.util.Timeout +import akka.actor.Status +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import akka.pattern.ask +import akka.actor.ActorDSL +import akka.actor.Props +import scala.reflect.ClassTag + +trait Creators { this: ActorDSL.type ⇒ + + /** + * This trait provides a DSL for writing the inner workings of an actor, e.g. + * for quickly trying things out in the REPL. It makes the following keywords + * available: + * + * - `become` mapped to `context.become(_, discardOld = false)` + * + * - `unbecome` mapped to `context.unbecome` + * + * - `setup` for implementing `preStart()` + * + * - `whenFailing` for implementing `preRestart()` + * + * - `whenRestarted` for implementing `postRestart()` + * + * - `teardown` for implementing `postStop` + * + * Using the life-cycle keywords multiple times results in replacing the + * content of the respective hook. + */ + trait Act extends Actor { + + private[this] var preStartFun: () ⇒ Unit = null + private[this] var postStopFun: () ⇒ Unit = null + private[this] var preRestartFun: (Throwable, Option[Any]) ⇒ Unit = null + private[this] var postRestartFun: Throwable ⇒ Unit = null + private[this] var strategy: SupervisorStrategy = null + + /** + * Add the given behavior on top of the behavior stack for this actor. This + * stack is cleared upon restart. Use `unbecome()` to pop an element off + * this stack. + */ + def become(r: Receive) = context.become(r, discardOld = false) + + /** + * Pop the active behavior from the behavior stack of this actor. This stack + * is cleared upon restart. + */ + def unbecome(): Unit = context.unbecome() + + /** + * Set the supervisor strategy of this actor, i.e. how it supervises its children. + */ + def superviseWith(s: SupervisorStrategy): Unit = strategy = s + + /** + * Replace the `preStart` action with the supplied thunk. Default action + * is to call `super.preStart()` + */ + def whenStarting(body: ⇒ Unit): Unit = preStartFun = () ⇒ body + + /** + * Replace the `preRestart` action with the supplied function. Default + * action is to call `super.preRestart()`, which will kill all children + * and invoke `postStop()`. + */ + def whenFailing(body: (Throwable, Option[Any]) ⇒ Unit): Unit = preRestartFun = body + + /** + * Replace the `postRestart` action with the supplied function. Default + * action is to call `super.postRestart` which will call `preStart()`. + */ + def whenRestarted(body: Throwable ⇒ Unit): Unit = postRestartFun = body + + /** + * Replace the `postStop` action with the supplied thunk. Default action + * is to call `super.postStop`. + */ + def whenStopping(body: ⇒ Unit): Unit = postStopFun = () ⇒ body + + override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart() + override def preRestart(cause: Throwable, msg: Option[Any]): Unit = if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg) + override def postRestart(cause: Throwable): Unit = if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause) + override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop() + override def supervisorStrategy: SupervisorStrategy = if (strategy != null) strategy else super.supervisorStrategy + + /** + * Default behavior of the actor is empty, use `become` to change this. + */ + override def receive: Receive = Actor.emptyBehavior + } + + /** + * Use this trait when defining an [[akka.actor.Actor]] with [[akka.actor.Stash]], + * since just using `actor()(new Act with Stash{})` will not be able to see the + * Stash component due to type erasure. + */ + trait ActWithStash extends Act with Stash + + private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props = + if (classOf[Stash].isAssignableFrom(classOfActor)) + Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher") + else + Props(creator = ctor) + + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { + // configure dispatcher/mailbox based on runtime class + val classOfActor = implicitly[ClassTag[T]].runtimeClass + val props = mkProps(classOfActor, () ⇒ ctor) + factory.actorOf(props) + } + + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param name is the name, which must be unique within the context of its + * parent. + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](name: String)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { + // configure dispatcher/mailbox based on runtime class + val classOfActor = implicitly[ClassTag[T]].runtimeClass + val props = mkProps(classOfActor, () ⇒ ctor) + + if (name == null) factory.actorOf(props) + else factory.actorOf(props, name) + } + + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param name is the name, which must be unique within the context of its + * parent; defaults to `null` which will assign a name automatically. + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: ⇒ T): ActorRef = + actor(name)(ctor)(implicitly[ClassTag[T]], factory) + + /** + * Create an actor with an automatically generated name from the given thunk + * which must produce an [[akka.actor.Actor]]. + * + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](factory: ActorRefFactory)(ctor: ⇒ T): ActorRef = + actor(null: String)(ctor)(implicitly[ClassTag[T]], factory) + +} diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala new file mode 100644 index 0000000000..9857023e82 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -0,0 +1,203 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.dsl + +import scala.concurrent.Await +import akka.actor.ActorLogging +import scala.concurrent.util.Deadline +import scala.collection.immutable.TreeSet +import scala.concurrent.util.{ Duration, FiniteDuration } +import scala.concurrent.util.duration._ +import akka.actor.Cancellable +import akka.actor.Actor +import scala.collection.mutable.Queue +import akka.actor.ActorSystem +import akka.actor.ActorRef +import akka.util.Timeout +import akka.actor.Status +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import akka.pattern.ask +import akka.actor.ActorDSL +import akka.actor.Props + +trait Inbox { this: ActorDSL.type ⇒ + + protected trait InboxExtension { this: Extension ⇒ + val DSLInboxQueueSize = config.getInt("inbox-size") + + val inboxNr = new AtomicInteger + val inboxProps = Props(new InboxActor(DSLInboxQueueSize)) + + def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet) + } + + private sealed trait Query { + def deadline: Deadline + def withClient(c: ActorRef): Query + def client: ActorRef + } + private case class Get(deadline: Deadline, client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case object Kick + private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { + def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time + } + + private class InboxActor(size: Int) extends Actor with ActorLogging { + var clients = Queue.empty[Query] + val messages = Queue.empty[Any] + var clientsByTimeout = TreeSet.empty[Query] + var printedWarning = false + + def enqueueQuery(q: Query) { + val query = q withClient sender + clients enqueue query + clientsByTimeout += query + } + + def enqueueMessage(msg: Any) { + if (messages.size < size) messages enqueue msg + else { + if (!printedWarning) { + log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size) + printedWarning = true + } + } + } + + var currentMsg: Any = _ + val clientPredicate: (Query) ⇒ Boolean = { + case _: Get ⇒ true + case Select(_, p, _) ⇒ p isDefinedAt currentMsg + case _ ⇒ false + } + + var currentSelect: Select = _ + val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg) + + var currentDeadline: Option[(Deadline, Cancellable)] = None + + def receive = ({ + case g: Get ⇒ + if (messages.isEmpty) enqueueQuery(g) + else sender ! messages.dequeue() + case s @ Select(_, predicate, _) ⇒ + if (messages.isEmpty) enqueueQuery(s) + else { + currentSelect = s + messages.dequeueFirst(messagePredicate) match { + case Some(msg) ⇒ sender ! msg + case None ⇒ enqueueQuery(s) + } + currentSelect = null + } + case Kick ⇒ + val now = Deadline.now + val pred = (q: Query) ⇒ q.deadline.time < now.time + val overdue = clientsByTimeout.iterator.takeWhile(pred) + while (overdue.hasNext) { + val toKick = overdue.next() + toKick.client ! Status.Failure(new TimeoutException("deadline passed")) + } + // TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed + clients = Queue.empty ++= clients.filterNot(pred) + clientsByTimeout = clientsByTimeout.from(Get(now)) + case msg ⇒ + if (clients.isEmpty) enqueueMessage(msg) + else { + currentMsg = msg + clients.dequeueFirst(clientPredicate) match { + case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg + case None ⇒ enqueueMessage(msg) + } + currentMsg = null + } + }: Receive) andThen { _ ⇒ + if (clients.isEmpty) { + if (currentDeadline.isDefined) { + currentDeadline.get._2.cancel() + currentDeadline = None + } + } else { + val next = clientsByTimeout.head.deadline + import context.dispatcher + if (currentDeadline.isEmpty) { + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + } else if (currentDeadline.get._1 != next) { + currentDeadline.get._2.cancel() + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + } + } + } + } + + /* + * make sure that AskTimeout does not accidentally mess up message reception + * by adding this extra time to the real timeout + */ + private val extraTime = 1.minute + + /** + * Create a new actor which will internally queue up messages it gets so that + * they can be interrogated with the [[akka.actor.dsl.Inbox!.Inbox!.receive]] + * and [[akka.actor.dsl.Inbox!.Inbox!.select]] methods. It will be created as + * a system actor in the ActorSystem which is implicitly (or explicitly) + * supplied. + */ + def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system) + + class Inbox(system: ActorSystem) { + + val receiver: ActorRef = Extension(system).newReceiver + private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout + + /** + * Receive a single message from the internal `receiver` actor. The supplied + * timeout is used for cleanup purposes and its precision is subject to the + * resolution of the system’s scheduler (usually 100ms, but configurable). + * + * Warning: This method blocks the current thread until a message is + * received, thus it can introduce dead-locks (directly as well as + * indirectly by causing starvation of the thread pool). Do not use + * this method within an actor! + */ + def receive(timeout: FiniteDuration = defaultTimeout): Any = { + implicit val t = Timeout(timeout + extraTime) + Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) + } + + /** + * Receive a single message for which the given partial function is defined + * and return the transformed result, using the internal `receiver` actor. + * The supplied timeout is used for cleanup purposes and its precision is + * subject to the resolution of the system’s scheduler (usually 100ms, but + * configurable). + * + * Warning: This method blocks the current thread until a message is + * received, thus it can introduce dead-locks (directly as well as + * indirectly by causing starvation of the thread pool). Do not use + * this method within an actor! + */ + def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { + implicit val t = Timeout(timeout + extraTime) + predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) + } + + /** + * Overridden finalizer which will try to stop the actor once this Inbox + * is no longer referenced. + */ + override def finalize() { + system.stop(receiver) + } + } + + implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 47f58bdce6..8527b28f8e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -40,13 +40,15 @@ object AkkaBuild extends Build { initialCommands in ThisBuild := """|import language.postfixOps |import akka.actor._ - |import akka.dispatch._ + |import ActorDSL._ + |import scala.concurrent._ |import com.typesafe.config.ConfigFactory |import scala.concurrent.util.duration._ |import akka.util.Timeout |val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG") |val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config) |var system: ActorSystem = null + |implicit def _system = system |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") } |implicit def ec = system.dispatcher |implicit val timeout = Timeout(5 seconds)