From a4c6ea2f1f200ac48e879b31a654bae9f177c24a Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 8 Aug 2012 22:38:50 +0200 Subject: [PATCH 1/9] add ActorDSL object with receiver methods, see #2362 tests will follow --- .../src/main/scala/akka/actor/ActorDSL.scala | 170 ++++++++++++++++++ project/AkkaBuild.scala | 4 +- 2 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 akka-actor/src/main/scala/akka/actor/ActorDSL.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala new file mode 100644 index 0000000000..549a6d5cf0 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -0,0 +1,170 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor + +import scala.collection.mutable.Queue +import scala.concurrent.util.Duration +import akka.pattern.ask +import scala.concurrent.Await +import akka.util.Timeout +import scala.collection.immutable.TreeSet +import scala.concurrent.util.Deadline +import java.util.concurrent.TimeoutException + +/** + * This object contains elements which make writing actors and related code + * more concise, e.g. when trying out actors in the REPL. + * + * For the communication of non-actor code with actors, you may use anonymous + * actors tailored to this job: + * + * {{{ + * import ActorDSL._ + * import concurrent.util.duration._ + * + * implicit val system: ActorSystem = ... + * + * implicit val recv = newReceiver() + * someActor ! someMsg // replies will go to `recv` + * + * val reply = receiveMessage(recv, 5 seconds) + * val transformedReply = selectMessage(recv, 5 seconds) { + * case x: Int => 2 * x + * } + * }}} + */ +object ActorDSL { + + private sealed trait Query { + def deadline: Deadline + def withClient(c: ActorRef): Query + def client: ActorRef + } + private case class Get(deadline: Deadline, client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case object Kick + private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { + def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time + } + + private class Receiver extends Actor { + var clients = Queue.empty[Query] + val messages = Queue.empty[Any] + var clientsByTimeout = TreeSet.empty[Query] + + def enqueue(q: Query) { + val query = q withClient sender + clients enqueue query + clientsByTimeout += query + } + + var currentMsg: Any = _ + val clientPredicate: (Query) ⇒ Boolean = { + case _: Get ⇒ true + case Select(_, p, _) ⇒ p isDefinedAt currentMsg + case _ ⇒ false + } + + var currentSelect: Select = _ + val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg) + + var currentDeadline: Option[(Deadline, Cancellable)] = None + + def receive = ({ + case g: Get ⇒ + if (messages.isEmpty) enqueue(g) + else sender ! messages.dequeue() + case s @ Select(_, predicate, _) ⇒ + if (messages.isEmpty) enqueue(s) + else { + currentSelect = s + messages.dequeueFirst(messagePredicate) match { + case Some(msg) ⇒ sender ! msg + case None ⇒ enqueue(s) + } + currentSelect = null + } + case Kick ⇒ + val now = Deadline.now + val pred = (q: Query) ⇒ q.deadline.time < now.time + val overdue = clientsByTimeout.iterator.takeWhile(pred) + while (overdue.hasNext) { + val toKick = overdue.next() + toKick.client ! Status.Failure(new TimeoutException("deadline passed")) + } + // TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed + clients = Queue.empty ++= clients.filterNot(pred) + clientsByTimeout = clientsByTimeout.from(Get(now)) + case msg ⇒ + if (clients.isEmpty) messages.enqueue(msg) + else { + currentMsg = msg + clients.dequeueFirst(clientPredicate) match { + case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg + case None ⇒ messages.enqueue(msg) + } + currentMsg = null + } + }: Receive) andThen { _ ⇒ + if (clients.isEmpty) { + if (currentDeadline.isDefined) { + currentDeadline.get._2.cancel() + currentDeadline = None + } + } else { + val next = clientsByTimeout.head.deadline + if (currentDeadline.isEmpty) { + currentDeadline = Some(next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)) + } else if (currentDeadline.get._1 != next) { + currentDeadline.get._2.cancel() + currentDeadline = Some(next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)) + } + } + } + } + + private val receiverProps = Props(new Receiver) + + /** + * Create a new actor which will internally queue up messages it gets so that + * they can be interrogated with the `receiveMessage()` and `selectMessage()` + * methods below. It will be created as top-level actor in the ActorSystem + * which is implicitly (or explicitly) supplied. + * + * IMPORTANT: + * + * Be sure to terminate this actor using `system.stop(ref)` where `system` is + * the actor system with which the actor was created. + */ + def newReceiver()(implicit system: ActorSystem): ActorRef = system.actorOf(receiverProps) + + /** + * Receive a single message using the actor reference supplied; this must be + * an actor created using `newReceiver()` above. The supplied timeout is used + * for cleanup purposes and its precision is subject to the resolution of the + * system’s scheduler (usually 100ms, but configurable). + */ + def receiveMessage(receiver: ActorRef, timeout: Duration): Any = { + implicit val t = Timeout(timeout * 2) + Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) + } + + /** + * Receive a single message for which the given partial function is defined + * and return the transformed result, using the actor reference supplied; + * this must be an actor created using `newReceiver()` above. The supplied + * timeout is used for cleanup purposes and its precision is subject to the + * resolution of the system’s scheduler (usually 100ms, but configurable). + */ + def selectMessage[T](receiver: ActorRef, timeout: Duration)(predicate: PartialFunction[Any, T]): T = { + implicit val t = Timeout(timeout * 2) + predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) + } + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a29f0ed290..0c59619c9a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -40,13 +40,15 @@ object AkkaBuild extends Build { initialCommands in ThisBuild := """|import language.postfixOps |import akka.actor._ - |import akka.dispatch._ + |import ActorDSL._ + |import scala.concurrent._ |import com.typesafe.config.ConfigFactory |import scala.concurrent.util.duration._ |import akka.util.Timeout |val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG") |val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config) |var system: ActorSystem = null + |implicit def _system = system |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") } |implicit def ec = system.dispatcher |implicit val timeout = Timeout(5 seconds) From bb790922628e5c92c893140cb05a30b4a547307d Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 9 Aug 2012 22:26:52 +0200 Subject: [PATCH 2/9] =?UTF-8?q?work=20some=20more=20on=20ActorDSL=E2=80=99?= =?UTF-8?q?s=20Inbox=20feature,=20see=20#2362?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - re-enable the systemService flag for systemActorOf (just to be safe) - attach inbox actors below path /system/dsl by means of an Extension - add queue size limit - change names to newInbox()/receive()/select() --- akka-actor/src/main/resources/reference.conf | 7 ++ .../src/main/scala/akka/actor/ActorDSL.scala | 95 ++++++++++++++----- .../main/scala/akka/actor/ActorSystem.scala | 6 +- .../main/scala/akka/actor/cell/Children.scala | 16 ++-- 4 files changed, 89 insertions(+), 35 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 7bedfa3f02..879796db0e 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -302,6 +302,13 @@ akka { serialization-bindings { "java.io.Serializable" = java } + + # Configuration items which are used by the akka.actor.ActorDSL._ methods + dsl { + # Maximum queue size of the actor created by newInbox(); this protects against + # faulty programs which use select() and consistently miss messages + inbox-size = 1000 + } } # Used to set the behavior of the scheduler. diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 549a6d5cf0..02605be8fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -6,34 +6,40 @@ package akka.actor import scala.collection.mutable.Queue import scala.concurrent.util.Duration +import scala.concurrent.util.duration._ import akka.pattern.ask import scala.concurrent.Await import akka.util.Timeout import scala.collection.immutable.TreeSet import scala.concurrent.util.Deadline import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger /** * This object contains elements which make writing actors and related code * more concise, e.g. when trying out actors in the REPL. - * + * * For the communication of non-actor code with actors, you may use anonymous * actors tailored to this job: - * + * * {{{ * import ActorDSL._ * import concurrent.util.duration._ - * + * * implicit val system: ActorSystem = ... - * - * implicit val recv = newReceiver() + * + * implicit val recv = newInbox() * someActor ! someMsg // replies will go to `recv` - * - * val reply = receiveMessage(recv, 5 seconds) - * val transformedReply = selectMessage(recv, 5 seconds) { + * + * val reply = receive(recv, 5 seconds) + * val transformedReply = select(recv, 5 seconds) { * case x: Int => 2 * x * } * }}} + * + * The `receive` and `select` methods are synchronous, i.e. they block the + * calling thread until an answer from the actor is received or the timeout + * expires. */ object ActorDSL { @@ -53,17 +59,28 @@ object ActorDSL { def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time } - private class Receiver extends Actor { + private class Inbox(size: Int) extends Actor with ActorLogging { var clients = Queue.empty[Query] val messages = Queue.empty[Any] var clientsByTimeout = TreeSet.empty[Query] + var printedWarning = false - def enqueue(q: Query) { + def enqueueQuery(q: Query) { val query = q withClient sender clients enqueue query clientsByTimeout += query } + def enqueueMessage(msg: Any) { + if (messages.size < size) messages enqueue msg + else { + if (!printedWarning) { + log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size) + printedWarning = true + } + } + } + var currentMsg: Any = _ val clientPredicate: (Query) ⇒ Boolean = { case _: Get ⇒ true @@ -78,15 +95,15 @@ object ActorDSL { def receive = ({ case g: Get ⇒ - if (messages.isEmpty) enqueue(g) + if (messages.isEmpty) enqueueQuery(g) else sender ! messages.dequeue() case s @ Select(_, predicate, _) ⇒ - if (messages.isEmpty) enqueue(s) + if (messages.isEmpty) enqueueQuery(s) else { currentSelect = s messages.dequeueFirst(messagePredicate) match { case Some(msg) ⇒ sender ! msg - case None ⇒ enqueue(s) + case None ⇒ enqueueQuery(s) } currentSelect = null } @@ -102,12 +119,12 @@ object ActorDSL { clients = Queue.empty ++= clients.filterNot(pred) clientsByTimeout = clientsByTimeout.from(Get(now)) case msg ⇒ - if (clients.isEmpty) messages.enqueue(msg) + if (clients.isEmpty) enqueueMessage(msg) else { currentMsg = msg clients.dequeueFirst(clientPredicate) match { case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg - case None ⇒ messages.enqueue(msg) + case None ⇒ enqueueMessage(msg) } currentMsg = null } @@ -129,20 +146,44 @@ object ActorDSL { } } - private val receiverProps = Props(new Receiver) + private object Extension extends ExtensionKey[Extension] + private class Extension(system: ExtendedActorSystem) extends akka.actor.Extension { + val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef] + while (!boss.isStarted) Thread.sleep(10) + + val DSLInboxQueueSize = system.settings.config.getInt("akka.actor.dsl.inbox-size") + + val inboxNr = new AtomicInteger + val inboxProps = Props(new Inbox(DSLInboxQueueSize)) + + def newInbox(): ActorRef = + boss.underlying.asInstanceOf[ActorCell] + .attachChild(inboxProps, "inbox-" + inboxNr.incrementAndGet(), systemService = true) + } + + /* + * make sure that AskTimeout does not accidentally mess up message reception + * by adding this extra time to the real timeout + */ + private val extraTime = 1.minute + + private val pathPrefix = Seq("system", "dsl") + private def mustBeInbox(inbox: ActorRef) { + require(inbox.path.elements.take(2) == pathPrefix, "can only use select/receive with references obtained from newInbox()") + } /** * Create a new actor which will internally queue up messages it gets so that * they can be interrogated with the `receiveMessage()` and `selectMessage()` * methods below. It will be created as top-level actor in the ActorSystem * which is implicitly (or explicitly) supplied. - * + * * IMPORTANT: - * + * * Be sure to terminate this actor using `system.stop(ref)` where `system` is * the actor system with which the actor was created. */ - def newReceiver()(implicit system: ActorSystem): ActorRef = system.actorOf(receiverProps) + def newInbox()(implicit system: ActorSystem): ActorRef = Extension(system).newInbox() /** * Receive a single message using the actor reference supplied; this must be @@ -150,21 +191,23 @@ object ActorDSL { * for cleanup purposes and its precision is subject to the resolution of the * system’s scheduler (usually 100ms, but configurable). */ - def receiveMessage(receiver: ActorRef, timeout: Duration): Any = { - implicit val t = Timeout(timeout * 2) - Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) + def receive(inbox: ActorRef, timeout: Duration): Any = { + mustBeInbox(inbox) + implicit val t = Timeout(timeout + extraTime) + Await.result(inbox ? Get(Deadline.now + timeout), Duration.Inf) } /** * Receive a single message for which the given partial function is defined - * and return the transformed result, using the actor reference supplied; + * and return the transformed result, using the actor reference supplied; * this must be an actor created using `newReceiver()` above. The supplied * timeout is used for cleanup purposes and its precision is subject to the * resolution of the system’s scheduler (usually 100ms, but configurable). */ - def selectMessage[T](receiver: ActorRef, timeout: Duration)(predicate: PartialFunction[Any, T]): T = { - implicit val t = Timeout(timeout * 2) - predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) + def select[T](inbox: ActorRef, timeout: Duration)(predicate: PartialFunction[Any, T]): T = { + mustBeInbox(inbox) + implicit val t = Timeout(timeout + extraTime) + predicate(Await.result(inbox ? Select(Deadline.now + timeout, predicate), Duration.Inf)) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index ef23305b83..18d0eebb79 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -502,11 +502,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, protected def systemImpl: ActorSystemImpl = this - private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name) + private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true) - def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name) + def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false) - def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props) + def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props, systemService = false) def stop(actor: ActorRef): Unit = { val path = actor.path diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 2b0fa76db4..4ea0035054 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -27,10 +27,14 @@ private[akka] trait Children { this: ActorCell ⇒ final def children: Iterable[ActorRef] = childrenRefs.children final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava - def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false) - def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false) - private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true) - private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true) + def actorOf(props: Props): ActorRef = + makeChild(this, props, randomName(), async = false, systemService = false) + def actorOf(props: Props, name: String): ActorRef = + makeChild(this, props, checkName(name), async = false, systemService = false) + private[akka] def attachChild(props: Props, systemService: Boolean): ActorRef = + makeChild(this, props, randomName(), async = true, systemService = systemService) + private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef = + makeChild(this, props, checkName(name), async = true, systemService = systemService) @volatile private var _nextNameDoNotCallMeDirectly = 0L final protected def randomName(): String = { @@ -160,7 +164,7 @@ private[akka] trait Children { this: ActorCell ⇒ } } - private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = { + private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(cell.system) ser.serialize(props.creator) match { @@ -182,7 +186,7 @@ private[akka] trait Children { this: ActorCell ⇒ val actor = try { cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, - systemService = false, deploy = None, lookupDeploy = true, async = async) + systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { case NonFatal(e) ⇒ unreserveChild(name) From 4632b40a71a92076214a842463671e45530235fc Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Aug 2012 14:53:30 +0200 Subject: [PATCH 3/9] rework ActorDSL.Inbox, see #2362 - only visible DSL element is inbox(), which produces an Inbox - there is an implicit conversion from implicit Inbox to ActorRef, enabling the use as implicit sender - select() and receive() methods with (default) timeouts are provided on Inbox only, not to muddy the waters namespace-wise - split up internal structure to make a cake for the different constituents of the overall DSL (where this shall be only a small part) --- .../test/scala/akka/actor/ActorDSLSpec.scala | 88 ++++++++ akka-actor/src/main/resources/reference.conf | 3 + .../src/main/scala/akka/actor/ActorDSL.scala | 168 +-------------- .../src/main/scala/akka/actor/dsl/Inbox.scala | 194 ++++++++++++++++++ 4 files changed, 293 insertions(+), 160 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala create mode 100644 akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala new file mode 100644 index 0000000000..2f2dfd1054 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor + +import language.postfixOps + +import akka.testkit.{ AkkaSpec, EventFilter } +import ActorDSL._ +import akka.event.Logging.Warning +import scala.concurrent.{ Await, Future } +import scala.concurrent.util.duration._ +import java.util.concurrent.TimeoutException + +class ActorDSLSpec extends AkkaSpec { + + val echo = system.actorOf(Props(new Actor { + def receive = { + case x ⇒ sender ! x + } + })) + + "An Inbox" must { + + "function as implicit sender" in { + implicit val i = inbox() + echo ! "hello" + i.receive() must be("hello") + } + + "support queueing multiple queries" in { + val i = inbox() + import system.dispatcher + val res = Future.sequence(Seq( + Future { i.receive() } recover { case x ⇒ x }, + Future { Thread.sleep(100); i.select() { case "world" ⇒ 1 } } recover { case x ⇒ x }, + Future { Thread.sleep(200); i.select() { case "hello" ⇒ 2 } } recover { case x ⇒ x })) + Thread.sleep(1000) + res.isCompleted must be(false) + i.receiver ! 42 + i.receiver ! "hello" + i.receiver ! "world" + Await.result(res, 5 second) must be(Seq(42, 1, 2)) + } + + "support selective receives" in { + val i = inbox() + i.receiver ! "hello" + i.receiver ! "world" + val result = i.select() { + case "world" ⇒ true + } + result must be(true) + i.receive() must be("hello") + } + + "have a maximum queue size" in { + val i = inbox() + system.eventStream.subscribe(testActor, classOf[Warning]) + for (_ ← 1 to 1000) i.receiver ! 0 + expectNoMsg(1 second) + EventFilter.warning(start = "dropping message", occurrences = 1) intercept { + i.receiver ! 42 + } + expectMsgType[Warning] + i.receiver ! 42 + expectNoMsg(1 second) + val gotit = for (_ ← 1 to 1000) yield i.receive() + gotit must be((1 to 1000) map (_ ⇒ 0)) + intercept[TimeoutException] { + i.receive(1 second) + } + } + + "have a default and custom timeouts" in { + val i = inbox() + within(5 seconds, 6 seconds) { + intercept[TimeoutException](i.receive()) + } + within(1 second) { + intercept[TimeoutException](i.receive(100 millis)) + } + } + + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index ac4ff84a85..61cc2995b2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -308,6 +308,9 @@ akka { # Maximum queue size of the actor created by newInbox(); this protects against # faulty programs which use select() and consistently miss messages inbox-size = 1000 + + # Default timeout to assume for operations like Inbox.receive et al + default-timeout = 5s } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 02605be8fb..05dbe3861c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -14,6 +14,7 @@ import scala.collection.immutable.TreeSet import scala.concurrent.util.Deadline import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.TimeUnit /** * This object contains elements which make writing actors and related code @@ -36,178 +37,25 @@ import java.util.concurrent.atomic.AtomicInteger * case x: Int => 2 * x * } * }}} - * + * * The `receive` and `select` methods are synchronous, i.e. they block the * calling thread until an answer from the actor is received or the timeout * expires. */ -object ActorDSL { +object ActorDSL extends dsl.Inbox { - private sealed trait Query { - def deadline: Deadline - def withClient(c: ActorRef): Query - def client: ActorRef - } - private case class Get(deadline: Deadline, client: ActorRef = null) extends Query { - def withClient(c: ActorRef) = copy(client = c) - } - private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query { - def withClient(c: ActorRef) = copy(client = c) - } - private case object Kick - private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { - def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time - } + protected object Extension extends ExtensionKey[Extension] - private class Inbox(size: Int) extends Actor with ActorLogging { - var clients = Queue.empty[Query] - val messages = Queue.empty[Any] - var clientsByTimeout = TreeSet.empty[Query] - var printedWarning = false + protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { - def enqueueQuery(q: Query) { - val query = q withClient sender - clients enqueue query - clientsByTimeout += query - } - - def enqueueMessage(msg: Any) { - if (messages.size < size) messages enqueue msg - else { - if (!printedWarning) { - log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size) - printedWarning = true - } - } - } - - var currentMsg: Any = _ - val clientPredicate: (Query) ⇒ Boolean = { - case _: Get ⇒ true - case Select(_, p, _) ⇒ p isDefinedAt currentMsg - case _ ⇒ false - } - - var currentSelect: Select = _ - val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg) - - var currentDeadline: Option[(Deadline, Cancellable)] = None - - def receive = ({ - case g: Get ⇒ - if (messages.isEmpty) enqueueQuery(g) - else sender ! messages.dequeue() - case s @ Select(_, predicate, _) ⇒ - if (messages.isEmpty) enqueueQuery(s) - else { - currentSelect = s - messages.dequeueFirst(messagePredicate) match { - case Some(msg) ⇒ sender ! msg - case None ⇒ enqueueQuery(s) - } - currentSelect = null - } - case Kick ⇒ - val now = Deadline.now - val pred = (q: Query) ⇒ q.deadline.time < now.time - val overdue = clientsByTimeout.iterator.takeWhile(pred) - while (overdue.hasNext) { - val toKick = overdue.next() - toKick.client ! Status.Failure(new TimeoutException("deadline passed")) - } - // TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed - clients = Queue.empty ++= clients.filterNot(pred) - clientsByTimeout = clientsByTimeout.from(Get(now)) - case msg ⇒ - if (clients.isEmpty) enqueueMessage(msg) - else { - currentMsg = msg - clients.dequeueFirst(clientPredicate) match { - case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg - case None ⇒ enqueueMessage(msg) - } - currentMsg = null - } - }: Receive) andThen { _ ⇒ - if (clients.isEmpty) { - if (currentDeadline.isDefined) { - currentDeadline.get._2.cancel() - currentDeadline = None - } - } else { - val next = clientsByTimeout.head.deadline - if (currentDeadline.isEmpty) { - currentDeadline = Some(next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)) - } else if (currentDeadline.get._1 != next) { - currentDeadline.get._2.cancel() - currentDeadline = Some(next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)) - } - } - } - } - - private object Extension extends ExtensionKey[Extension] - private class Extension(system: ExtendedActorSystem) extends akka.actor.Extension { val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef] while (!boss.isStarted) Thread.sleep(10) - val DSLInboxQueueSize = system.settings.config.getInt("akka.actor.dsl.inbox-size") + lazy val config = system.settings.config.getConfig("akka.actor.dsl") - val inboxNr = new AtomicInteger - val inboxProps = Props(new Inbox(DSLInboxQueueSize)) + val DSLDefaultTimeout = Duration(config.getMilliseconds("default-timeout"), TimeUnit.MILLISECONDS) - def newInbox(): ActorRef = - boss.underlying.asInstanceOf[ActorCell] - .attachChild(inboxProps, "inbox-" + inboxNr.incrementAndGet(), systemService = true) - } - - /* - * make sure that AskTimeout does not accidentally mess up message reception - * by adding this extra time to the real timeout - */ - private val extraTime = 1.minute - - private val pathPrefix = Seq("system", "dsl") - private def mustBeInbox(inbox: ActorRef) { - require(inbox.path.elements.take(2) == pathPrefix, "can only use select/receive with references obtained from newInbox()") - } - - /** - * Create a new actor which will internally queue up messages it gets so that - * they can be interrogated with the `receiveMessage()` and `selectMessage()` - * methods below. It will be created as top-level actor in the ActorSystem - * which is implicitly (or explicitly) supplied. - * - * IMPORTANT: - * - * Be sure to terminate this actor using `system.stop(ref)` where `system` is - * the actor system with which the actor was created. - */ - def newInbox()(implicit system: ActorSystem): ActorRef = Extension(system).newInbox() - - /** - * Receive a single message using the actor reference supplied; this must be - * an actor created using `newReceiver()` above. The supplied timeout is used - * for cleanup purposes and its precision is subject to the resolution of the - * system’s scheduler (usually 100ms, but configurable). - */ - def receive(inbox: ActorRef, timeout: Duration): Any = { - mustBeInbox(inbox) - implicit val t = Timeout(timeout + extraTime) - Await.result(inbox ? Get(Deadline.now + timeout), Duration.Inf) - } - - /** - * Receive a single message for which the given partial function is defined - * and return the transformed result, using the actor reference supplied; - * this must be an actor created using `newReceiver()` above. The supplied - * timeout is used for cleanup purposes and its precision is subject to the - * resolution of the system’s scheduler (usually 100ms, but configurable). - */ - def select[T](inbox: ActorRef, timeout: Duration)(predicate: PartialFunction[Any, T]): T = { - mustBeInbox(inbox) - implicit val t = Timeout(timeout + extraTime) - predicate(Await.result(inbox ? Select(Deadline.now + timeout, predicate), Duration.Inf)) + def mkChild(p: Props, name: String) = boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala new file mode 100644 index 0000000000..2431bfd766 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.dsl + +import scala.concurrent.Await +import akka.actor.ActorLogging +import scala.concurrent.util.Deadline +import scala.collection.immutable.TreeSet +import scala.concurrent.util.{ Duration, FiniteDuration } +import scala.concurrent.util.duration._ +import akka.actor.Cancellable +import akka.actor.Actor +import scala.collection.mutable.Queue +import akka.actor.ActorSystem +import akka.actor.ActorRef +import akka.util.Timeout +import akka.actor.Status +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import akka.pattern.ask +import akka.actor.ActorDSL +import akka.actor.Props + +trait Inbox { this: ActorDSL.type ⇒ + + protected trait InboxExtension { this: Extension ⇒ + val DSLInboxQueueSize = config.getInt("inbox-size") + + val inboxNr = new AtomicInteger + val inboxProps = Props(new InboxActor(DSLInboxQueueSize)) + + def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet) + } + + private sealed trait Query { + def deadline: Deadline + def withClient(c: ActorRef): Query + def client: ActorRef + } + private case class Get(deadline: Deadline, client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query { + def withClient(c: ActorRef) = copy(client = c) + } + private case object Kick + private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { + def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time + } + + private class InboxActor(size: Int) extends Actor with ActorLogging { + var clients = Queue.empty[Query] + val messages = Queue.empty[Any] + var clientsByTimeout = TreeSet.empty[Query] + var printedWarning = false + + def enqueueQuery(q: Query) { + val query = q withClient sender + clients enqueue query + clientsByTimeout += query + } + + def enqueueMessage(msg: Any) { + if (messages.size < size) messages enqueue msg + else { + if (!printedWarning) { + log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size) + printedWarning = true + } + } + } + + var currentMsg: Any = _ + val clientPredicate: (Query) ⇒ Boolean = { + case _: Get ⇒ true + case Select(_, p, _) ⇒ p isDefinedAt currentMsg + case _ ⇒ false + } + + var currentSelect: Select = _ + val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg) + + var currentDeadline: Option[(Deadline, Cancellable)] = None + + def receive = ({ + case g: Get ⇒ + if (messages.isEmpty) enqueueQuery(g) + else sender ! messages.dequeue() + case s @ Select(_, predicate, _) ⇒ + if (messages.isEmpty) enqueueQuery(s) + else { + currentSelect = s + messages.dequeueFirst(messagePredicate) match { + case Some(msg) ⇒ sender ! msg + case None ⇒ enqueueQuery(s) + } + currentSelect = null + } + case Kick ⇒ + val now = Deadline.now + val pred = (q: Query) ⇒ q.deadline.time < now.time + val overdue = clientsByTimeout.iterator.takeWhile(pred) + while (overdue.hasNext) { + val toKick = overdue.next() + toKick.client ! Status.Failure(new TimeoutException("deadline passed")) + } + // TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed + clients = Queue.empty ++= clients.filterNot(pred) + clientsByTimeout = clientsByTimeout.from(Get(now)) + case msg ⇒ + if (clients.isEmpty) enqueueMessage(msg) + else { + currentMsg = msg + clients.dequeueFirst(clientPredicate) match { + case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg + case None ⇒ enqueueMessage(msg) + } + currentMsg = null + } + }: Receive) andThen { _ ⇒ + if (clients.isEmpty) { + if (currentDeadline.isDefined) { + currentDeadline.get._2.cancel() + currentDeadline = None + } + } else { + val next = clientsByTimeout.head.deadline + import context.dispatcher + if (currentDeadline.isEmpty) { + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + } else if (currentDeadline.get._1 != next) { + currentDeadline.get._2.cancel() + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + } + } + } + } + + /* + * make sure that AskTimeout does not accidentally mess up message reception + * by adding this extra time to the real timeout + */ + private val extraTime = 1.minute + + /** + * Create a new actor which will internally queue up messages it gets so that + * they can be interrogated with the `receiveMessage()` and `selectMessage()` + * methods below. It will be created as top-level actor in the ActorSystem + * which is implicitly (or explicitly) supplied. + * + * IMPORTANT: + * + * Be sure to terminate this actor using `system.stop(ref)` where `system` is + * the actor system with which the actor was created. + */ + def inbox()(implicit system: ActorSystem) = new Inbox(system) + + class Inbox(system: ActorSystem) { + + val receiver: ActorRef = Extension(system).newReceiver + private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout + + /** + * Receive a single message using the actor reference supplied; this must be + * an actor created using `newReceiver()` above. The supplied timeout is used + * for cleanup purposes and its precision is subject to the resolution of the + * system’s scheduler (usually 100ms, but configurable). + */ + def receive(timeout: FiniteDuration = defaultTimeout): Any = { + implicit val t = Timeout(timeout + extraTime) + Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) + } + + /** + * Receive a single message for which the given partial function is defined + * and return the transformed result, using the actor reference supplied; + * this must be an actor created using `newReceiver()` above. The supplied + * timeout is used for cleanup purposes and its precision is subject to the + * resolution of the system’s scheduler (usually 100ms, but configurable). + */ + def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { + implicit val t = Timeout(timeout + extraTime) + predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) + } + + override def finalize() { + system.stop(receiver) + } + } + + implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver +} \ No newline at end of file From 8d1b4a3bbf3ce80984bb0301037b30035a71774a Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Aug 2012 15:15:01 +0200 Subject: [PATCH 4/9] bring ScalaDoc up to date, see #2362 --- .../src/main/scala/akka/actor/ActorDSL.scala | 9 ++--- .../src/main/scala/akka/actor/dsl/Inbox.scala | 33 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 05dbe3861c..8ff0905411 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -29,18 +29,19 @@ import java.util.concurrent.TimeUnit * * implicit val system: ActorSystem = ... * - * implicit val recv = newInbox() + * implicit val i = inbox() * someActor ! someMsg // replies will go to `recv` * - * val reply = receive(recv, 5 seconds) - * val transformedReply = select(recv, 5 seconds) { + * val reply = i.receive() + * val transformedReply = i.select(5 seconds) { * case x: Int => 2 * x * } * }}} * * The `receive` and `select` methods are synchronous, i.e. they block the * calling thread until an answer from the actor is received or the timeout - * expires. + * expires. The default timeout is taken from configuration item + * `akka.actor.dsl.default-timeout`. */ object ActorDSL extends dsl.Inbox { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 2431bfd766..0aa68f1dbc 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -146,16 +146,12 @@ trait Inbox { this: ActorDSL.type ⇒ /** * Create a new actor which will internally queue up messages it gets so that - * they can be interrogated with the `receiveMessage()` and `selectMessage()` - * methods below. It will be created as top-level actor in the ActorSystem - * which is implicitly (or explicitly) supplied. - * - * IMPORTANT: - * - * Be sure to terminate this actor using `system.stop(ref)` where `system` is - * the actor system with which the actor was created. + * they can be interrogated with the [[akka.actor.dsl.Inbox!.Inbox!.receive]] + * and [[akka.actor.dsl.Inbox!.Inbox!.select]] methods. It will be created as + * a system actor in the ActorSystem which is implicitly (or explicitly) + * supplied. */ - def inbox()(implicit system: ActorSystem) = new Inbox(system) + def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system) class Inbox(system: ActorSystem) { @@ -163,10 +159,9 @@ trait Inbox { this: ActorDSL.type ⇒ private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout /** - * Receive a single message using the actor reference supplied; this must be - * an actor created using `newReceiver()` above. The supplied timeout is used - * for cleanup purposes and its precision is subject to the resolution of the - * system’s scheduler (usually 100ms, but configurable). + * Receive a single message from the internal `receiver` actor. The supplied + * timeout is used for cleanup purposes and its precision is subject to the + * resolution of the system’s scheduler (usually 100ms, but configurable). */ def receive(timeout: FiniteDuration = defaultTimeout): Any = { implicit val t = Timeout(timeout + extraTime) @@ -175,16 +170,20 @@ trait Inbox { this: ActorDSL.type ⇒ /** * Receive a single message for which the given partial function is defined - * and return the transformed result, using the actor reference supplied; - * this must be an actor created using `newReceiver()` above. The supplied - * timeout is used for cleanup purposes and its precision is subject to the - * resolution of the system’s scheduler (usually 100ms, but configurable). + * and return the transformed result, using the internal `receiver` actor. + * The supplied timeout is used for cleanup purposes and its precision is + * subject to the resolution of the system’s scheduler (usually 100ms, but + * configurable). */ def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { implicit val t = Timeout(timeout + extraTime) predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) } + /** + * Overridden finalizer which will try to stop the actor once this Inbox + * is no longer referenced. + */ override def finalize() { system.stop(receiver) } From 9fb82ee0ad8a91418d95000a32246b4d44c048d6 Mon Sep 17 00:00:00 2001 From: phaller Date: Fri, 10 Aug 2012 16:38:18 +0200 Subject: [PATCH 5/9] Some initial work on lightweight actor creators, see #2362 - add Creators and Act traits - check for dispatcher config based on ClassTag - support for become, unbecome, setup, and teardown --- .../test/scala/akka/actor/ActorDSLSpec.scala | 16 +++- akka-actor/src/main/resources/reference.conf | 5 ++ .../src/main/scala/akka/actor/ActorDSL.scala | 4 +- .../main/scala/akka/actor/dsl/Creators.scala | 77 +++++++++++++++++++ 4 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/actor/dsl/Creators.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 2f2dfd1054..27639eecda 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -85,4 +85,18 @@ class ActorDSLSpec extends AkkaSpec { } -} \ No newline at end of file + "A lightweight creator" must { + + "support creating regular actors" in { + val a = actor()(new Act { + become { + case "hello" ⇒ sender ! "hi" + } + }) + implicit val i = inbox() + a ! "hello" + i.receive() must be("hi") + } + + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 61cc2995b2..459c9bd216 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -157,6 +157,11 @@ akka { } } + # Default dispatcher for Actors that extend Stash + default-stash-dispatcher { + mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + } + default-dispatcher { # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 8ff0905411..14706f8b04 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit * expires. The default timeout is taken from configuration item * `akka.actor.dsl.default-timeout`. */ -object ActorDSL extends dsl.Inbox { +object ActorDSL extends dsl.Inbox with dsl.Creators { protected object Extension extends ExtensionKey[Extension] @@ -59,4 +59,4 @@ object ActorDSL extends dsl.Inbox { def mkChild(p: Props, name: String) = boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true) } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala new file mode 100644 index 0000000000..b9b230f1c7 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.dsl + +import scala.concurrent.Await +import akka.actor.ActorLogging +import scala.concurrent.util.Deadline +import scala.collection.immutable.TreeSet +import scala.concurrent.util.{ Duration, FiniteDuration } +import scala.concurrent.util.duration._ +import akka.actor.Cancellable +import akka.actor.{ Actor, Stash } +import scala.collection.mutable.Queue +import akka.actor.{ ActorSystem, ActorRefFactory } +import akka.actor.ActorRef +import akka.util.Timeout +import akka.actor.Status +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger +import akka.pattern.ask +import akka.actor.ActorDSL +import akka.actor.Props +import scala.reflect.ClassTag + +trait Creators { this: ActorDSL.type ⇒ + + trait Act extends Actor { + /* + whenFailing { (cause, optMsg) => ... } // preRestart + whenRestarted { cause => ... } // postRestart + */ + private[this] var preStartFun: () ⇒ Unit = null + private[this] var postStopFun: () ⇒ Unit = null + private[this] var preRestartFun: (Throwable, Option[Any]) ⇒ Unit = null + private[this] var postRestartFun: Throwable ⇒ Unit = null + + def become(r: Receive) = + context.become(r, false) + + def unbecome(): Unit = + context.unbecome() + + def setup(body: ⇒ Unit): Unit = + preStartFun = () ⇒ body + + def teardown(body: ⇒ Unit): Unit = + postStopFun = () ⇒ body + + override def preStart(): Unit = + if (preStartFun != null) preStartFun() + + override def postStop(): Unit = + if (postStopFun != null) postStopFun() + + override def receive: Receive = { + case _ ⇒ /* do nothing */ + } + } + + private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props = + if (classOf[Stash].isAssignableFrom(classOfActor)) + Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher") + else + Props(creator = ctor) + + def actor[T <: Actor: ClassTag](name: String = null)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { + // configure dispatcher/mailbox based on runtime class + val classOfActor = implicitly[ClassTag[T]].runtimeClass + val props = mkProps(classOfActor, () ⇒ ctor) + factory.actorOf(props, if (name == null) "anonymous-actor" else name) //TODO: attach ID + } + + def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: ⇒ T): ActorRef = null + +} From 82e96ba55dc3f71c826577b0b61561332faa1fa0 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Aug 2012 19:13:11 +0200 Subject: [PATCH 6/9] adding tests and comments, see #2362 --- .../test/scala/akka/actor/ActorDSLSpec.scala | 42 ++++++ .../src/main/scala/akka/actor/ActorDSL.scala | 25 ++++ .../main/scala/akka/actor/dsl/Creators.scala | 126 +++++++++++++++--- 3 files changed, 172 insertions(+), 21 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 27639eecda..72092b05db 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -93,10 +93,52 @@ class ActorDSLSpec extends AkkaSpec { case "hello" ⇒ sender ! "hi" } }) + implicit val i = inbox() a ! "hello" i.receive() must be("hi") } + "support setup/teardown" in { + val a = actor()(new Act { + setup { testActor ! "started" } + teardown { testActor ! "stopped" } + }) + + system stop a + expectMsg("started") + expectMsg("stopped") + } + + "support restart" in { + val a = actor()(new Act { + become { + case "die" ⇒ throw new Exception + } + whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) } + whenRestarted { cause ⇒ testActor ! cause } + }) + + EventFilter[Exception](occurrences = 1) intercept { + a ! "die" + } + expectMsgPF() { case (x: Exception, Some("die")) ⇒ } + expectMsgPF() { case _: Exception ⇒ } + } + + "supported nested declaration" in { + val system = this.system + val a = actor(system, "fred")(new Act { + val b = actor("barney")(new Act { + setup { context.parent ! s"hello from ${self}" } + }) + become { + case x => testActor ! x + } + }) + expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") + lastSender must be(a) + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 14706f8b04..1881716dba 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -42,6 +42,31 @@ import java.util.concurrent.TimeUnit * calling thread until an answer from the actor is received or the timeout * expires. The default timeout is taken from configuration item * `akka.actor.dsl.default-timeout`. + * + * When defining actors in the REPL, say, you may want to have a look at the + * `Act` trait: + * + * {{{ + * import ActorDSL._ + * + * val system: ActorSystem = ... + * + * val a = actor(system, "fred")(new Act { + * val b = actor("barney")(new Act { + * ... + * }) + * + * become { + * case msg => ... + * } + * }) + * }}} + * + * Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]] + * as shown with `"barney"`, but since nested declarations share the same + * lexical context `"fred"`’s [[akka.actor.ActorContext]] would be ambiguous + * if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also + * be circumvented by shadowing the name `system` within `"fred"`). */ object ActorDSL extends dsl.Inbox with dsl.Creators { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala index b9b230f1c7..acaf5df64c 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -26,37 +26,80 @@ import scala.reflect.ClassTag trait Creators { this: ActorDSL.type ⇒ + /** + * This trait provides a DSL for writing the inner workings of an actor, e.g. + * for quickly trying things out in the REPL. It makes the following keywords + * available: + * + * - `become` mapped to `context.become(_, discardOld = false)` + * + * - `unbecome` mapped to `context.unbecome` + * + * - `setup` for implementing `preStart()` + * + * - `whenFailing` for implementing `preRestart()` + * + * - `whenRestarted` for implementing `postRestart()` + * + * - `teardown` for implementing `postStop` + * + * Using the life-cycle keywords multiple times results in replacing the + * content of the respective hook. + */ trait Act extends Actor { - /* - whenFailing { (cause, optMsg) => ... } // preRestart - whenRestarted { cause => ... } // postRestart - */ + private[this] var preStartFun: () ⇒ Unit = null private[this] var postStopFun: () ⇒ Unit = null private[this] var preRestartFun: (Throwable, Option[Any]) ⇒ Unit = null private[this] var postRestartFun: Throwable ⇒ Unit = null - def become(r: Receive) = - context.become(r, false) + /** + * Add the given behavior on top of the behavior stack for this actor. This + * stack is cleared upon restart. Use `unbecome()` to pop an element off + * this stack. + */ + def become(r: Receive) = context.become(r, discardOld = false) - def unbecome(): Unit = - context.unbecome() + /** + * Pop the active behavior from the behavior stack of this actor. This stack + * is cleared upon restart. + */ + def unbecome(): Unit = context.unbecome() - def setup(body: ⇒ Unit): Unit = - preStartFun = () ⇒ body + /** + * Replace the `preStart` action with the supplied thunk. Default action + * is to call `super.preStart()` + */ + def setup(body: ⇒ Unit): Unit = preStartFun = () ⇒ body - def teardown(body: ⇒ Unit): Unit = - postStopFun = () ⇒ body + /** + * Replace the `preRestart` action with the supplied function. Default + * action is to call `super.preRestart()`, which will kill all children + * and invoke `postStop()`. + */ + def whenFailing(body: (Throwable, Option[Any]) ⇒ Unit): Unit = preRestartFun = body - override def preStart(): Unit = - if (preStartFun != null) preStartFun() + /** + * Replace the `postRestart` action with the supplied function. Default + * action is to call `super.postRestart` which will call `preStart()`. + */ + def whenRestarted(body: Throwable ⇒ Unit): Unit = postRestartFun = body - override def postStop(): Unit = - if (postStopFun != null) postStopFun() + /** + * Replace the `postStop` action with the supplied thunk. Default action + * is to call `super.postStop`. + */ + def teardown(body: ⇒ Unit): Unit = postStopFun = () ⇒ body - override def receive: Receive = { - case _ ⇒ /* do nothing */ - } + override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart() + override def preRestart(cause: Throwable, msg: Option[Any]): Unit = if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg) + override def postRestart(cause: Throwable): Unit = if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause) + override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop() + + /** + * Default behavior of the actor is empty, use `become` to change this. + */ + override def receive: Receive = Actor.emptyBehavior } private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props = @@ -65,13 +108,54 @@ trait Creators { this: ActorDSL.type ⇒ else Props(creator = ctor) + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param name is the name, which must be unique within the context of its + * parent; defaults to `null` which will assign a name automatically. + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ def actor[T <: Actor: ClassTag](name: String = null)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { // configure dispatcher/mailbox based on runtime class val classOfActor = implicitly[ClassTag[T]].runtimeClass val props = mkProps(classOfActor, () ⇒ ctor) - factory.actorOf(props, if (name == null) "anonymous-actor" else name) //TODO: attach ID + + if (name == null) factory.actorOf(props) + else factory.actorOf(props, name) } - def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: ⇒ T): ActorRef = null + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param name is the name, which must be unique within the context of its + * parent; defaults to `null` which will assign a name automatically. + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](factory: ActorRefFactory, name: String)(ctor: ⇒ T): ActorRef = + actor(name)(ctor)(implicitly[ClassTag[T]], factory) + + /** + * Create an actor with an automatically generated name from the given thunk + * which must produce an [[akka.actor.Actor]]. + * + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](factory: ActorRefFactory)(ctor: ⇒ T): ActorRef = + actor(null: String)(ctor)(implicitly[ClassTag[T]], factory) } From 15aae34ffa95190b6ed6c12b2aa94ebb839c107f Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Aug 2012 19:47:53 +0200 Subject: [PATCH 7/9] fix the DSL with stash, see #2364 --- .../test/scala/akka/actor/ActorDSLSpec.scala | 22 ++++++++++++++++++- .../src/main/scala/akka/actor/ActorDSL.scala | 5 +++++ .../main/scala/akka/actor/dsl/Creators.scala | 7 ++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 72092b05db..37cc49a0b9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -130,7 +130,7 @@ class ActorDSLSpec extends AkkaSpec { val system = this.system val a = actor(system, "fred")(new Act { val b = actor("barney")(new Act { - setup { context.parent ! s"hello from ${self}" } + setup { context.parent ! s"hello from $self" } }) become { case x => testActor ! x @@ -139,6 +139,26 @@ class ActorDSLSpec extends AkkaSpec { expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") lastSender must be(a) } + + "support Stash" in { + val a = actor()(new ActWithStash { + become { + case 1 => stash() + case 2 => testActor ! 2; unstashAll(); become { + case 1 => testActor ! 1; unbecome() + } + } + }) + + a ! 1 + a ! 2 + expectMsg(2) + expectMsg(1) + a ! 1 + a ! 2 + expectMsg(2) + expectMsg(1) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 1881716dba..5906da3175 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -67,6 +67,11 @@ import java.util.concurrent.TimeUnit * lexical context `"fred"`’s [[akka.actor.ActorContext]] would be ambiguous * if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also * be circumvented by shadowing the name `system` within `"fred"`). + * + * Note: If you want to use an `Act with Stash`, you should use the + * `ActWithStash` trait in order to have the actor run on a special dispatcher + * (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based + * mailbox setting. */ object ActorDSL extends dsl.Inbox with dsl.Creators { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala index acaf5df64c..bba225b23b 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -102,6 +102,13 @@ trait Creators { this: ActorDSL.type ⇒ override def receive: Receive = Actor.emptyBehavior } + /** + * Use this trait when defining an [[akka.actor.Actor]] with [[akka.actor.Stash]], + * since just using `actor()(new Act with Stash{})` will not be able to see the + * Stash component due to type erasure. + */ + trait ActWithStash extends Act with Stash + private def mkProps(classOfActor: Class[_], ctor: () ⇒ Actor): Props = if (classOf[Stash].isAssignableFrom(classOfActor)) Props(creator = ctor, dispatcher = "akka.actor.default-stash-dispatcher") From 23b044275b42ca6dace2299c6a64b34e25d2f38d Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Aug 2012 17:31:35 +0200 Subject: [PATCH 8/9] incorporate review comments - rename hooks to whenStarting/whenStopping - remove need for empty parens in anonymous actor declarations - add superviseWith and its test --- .../test/scala/akka/actor/ActorDSLSpec.scala | 66 ++++++++++++++----- .../src/main/scala/akka/actor/ActorDSL.scala | 23 +++++-- .../main/scala/akka/actor/dsl/Creators.scala | 36 ++++++++-- 3 files changed, 95 insertions(+), 30 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 37cc49a0b9..df7574318b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -88,68 +88,98 @@ class ActorDSLSpec extends AkkaSpec { "A lightweight creator" must { "support creating regular actors" in { - val a = actor()(new Act { + val a = actor(new Act { become { case "hello" ⇒ sender ! "hi" } }) - + implicit val i = inbox() a ! "hello" i.receive() must be("hi") } "support setup/teardown" in { - val a = actor()(new Act { - setup { testActor ! "started" } - teardown { testActor ! "stopped" } + val a = actor(new Act { + whenStarting { testActor ! "started" } + whenStopping { testActor ! "stopped" } }) - + system stop a expectMsg("started") expectMsg("stopped") } "support restart" in { - val a = actor()(new Act { + val a = actor(new Act { become { case "die" ⇒ throw new Exception } whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) } whenRestarted { cause ⇒ testActor ! cause } }) - + EventFilter[Exception](occurrences = 1) intercept { a ! "die" } expectMsgPF() { case (x: Exception, Some("die")) ⇒ } expectMsgPF() { case _: Exception ⇒ } } - + + "support superviseWith" in { + val a = actor(new Act { + val system = null // shadow the implicit system + superviseWith(OneForOneStrategy() { + case e: Exception if e.getMessage == "hello" ⇒ SupervisorStrategy.Stop + case _: Exception ⇒ SupervisorStrategy.Resume + }) + val child = actor("child")(new Act { + whenFailing { (_, _) ⇒ } + become { + case ref: ActorRef ⇒ whenStopping(ref ! "stopped") + case ex: Exception ⇒ throw ex + } + }) + become { + case x ⇒ child ! x + } + }) + a ! testActor + EventFilter[Exception](occurrences = 1) intercept { + a ! new Exception + } + expectNoMsg(1 second) + EventFilter[Exception]("hello", occurrences = 1) intercept { + a ! new Exception("hello") + } + expectMsg("stopped") + } + "supported nested declaration" in { val system = this.system val a = actor(system, "fred")(new Act { val b = actor("barney")(new Act { - setup { context.parent ! s"hello from $self" } + whenStarting { context.parent ! s"hello from $self" } }) become { - case x => testActor ! x + case x ⇒ testActor ! x } }) expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") lastSender must be(a) } - + "support Stash" in { - val a = actor()(new ActWithStash { + val a = actor(new ActWithStash { become { - case 1 => stash() - case 2 => testActor ! 2; unstashAll(); become { - case 1 => testActor ! 1; unbecome() - } + case 1 ⇒ stash() + case 2 ⇒ + testActor ! 2; unstashAll(); become { + case 1 ⇒ testActor ! 1; unbecome() + } } }) - + a ! 1 a ! 2 expectMsg(2) diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 5906da3175..196c794f27 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit * implicit val system: ActorSystem = ... * * implicit val i = inbox() - * someActor ! someMsg // replies will go to `recv` + * someActor ! someMsg // replies will go to `i` * * val reply = i.receive() * val transformedReply = i.select(5 seconds) { @@ -63,12 +63,13 @@ import java.util.concurrent.TimeUnit * }}} * * Note that `actor` can be used with an implicit [[akka.actor.ActorRefFactory]] - * as shown with `"barney"`, but since nested declarations share the same - * lexical context `"fred"`’s [[akka.actor.ActorContext]] would be ambiguous + * as shown with `"barney"` (where the [[akka.actor.ActorContext serves this + * purpose), but since nested declarations share the same + * lexical context `"fred"`’s ActorContext would be ambiguous * if the [[akka.actor.ActorSystem]] were declared `implicit` (this could also * be circumvented by shadowing the name `system` within `"fred"`). - * - * Note: If you want to use an `Act with Stash`, you should use the + * + * Note: If you want to use an `Act with Stash`, you should use the * `ActWithStash` trait in order to have the actor run on a special dispatcher * (`"akka.actor.default-stash-dispatcher"`) which has the necessary deque-based * mailbox setting. @@ -80,7 +81,17 @@ object ActorDSL extends dsl.Inbox with dsl.Creators { protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef] - while (!boss.isStarted) Thread.sleep(10) + + { + val timeout = system.settings.CreationTimeout.duration + val deadline = Deadline.now + timeout + while (!boss.isStarted) { + if (deadline.hasTimeLeft) + if (system.isTerminated) throw new IllegalStateException("actor system is already shutdown") + else Thread.sleep(10) + else throw new TimeoutException("failed to create /system/dsl actor within " + timeout) + } + } lazy val config = system.settings.config.getConfig("akka.actor.dsl") diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala index bba225b23b..b81f733013 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -11,7 +11,7 @@ import scala.collection.immutable.TreeSet import scala.concurrent.util.{ Duration, FiniteDuration } import scala.concurrent.util.duration._ import akka.actor.Cancellable -import akka.actor.{ Actor, Stash } +import akka.actor.{ Actor, Stash, SupervisorStrategy } import scala.collection.mutable.Queue import akka.actor.{ ActorSystem, ActorRefFactory } import akka.actor.ActorRef @@ -52,6 +52,7 @@ trait Creators { this: ActorDSL.type ⇒ private[this] var postStopFun: () ⇒ Unit = null private[this] var preRestartFun: (Throwable, Option[Any]) ⇒ Unit = null private[this] var postRestartFun: Throwable ⇒ Unit = null + private[this] var strategy: SupervisorStrategy = null /** * Add the given behavior on top of the behavior stack for this actor. This @@ -66,11 +67,16 @@ trait Creators { this: ActorDSL.type ⇒ */ def unbecome(): Unit = context.unbecome() + /** + * Set the supervisor strategy of this actor, i.e. how it supervises its children. + */ + def superviseWith(s: SupervisorStrategy): Unit = strategy = s + /** * Replace the `preStart` action with the supplied thunk. Default action * is to call `super.preStart()` */ - def setup(body: ⇒ Unit): Unit = preStartFun = () ⇒ body + def whenStarting(body: ⇒ Unit): Unit = preStartFun = () ⇒ body /** * Replace the `preRestart` action with the supplied function. Default @@ -89,12 +95,13 @@ trait Creators { this: ActorDSL.type ⇒ * Replace the `postStop` action with the supplied thunk. Default action * is to call `super.postStop`. */ - def teardown(body: ⇒ Unit): Unit = postStopFun = () ⇒ body + def whenStopping(body: ⇒ Unit): Unit = postStopFun = () ⇒ body override def preStart(): Unit = if (preStartFun != null) preStartFun() else super.preStart() override def preRestart(cause: Throwable, msg: Option[Any]): Unit = if (preRestartFun != null) preRestartFun(cause, msg) else super.preRestart(cause, msg) override def postRestart(cause: Throwable): Unit = if (postRestartFun != null) postRestartFun(cause) else super.postRestart(cause) override def postStop(): Unit = if (postStopFun != null) postStopFun() else super.postStop() + override def supervisorStrategy: SupervisorStrategy = if (strategy != null) strategy else super.supervisorStrategy /** * Default behavior of the actor is empty, use `become` to change this. @@ -118,8 +125,6 @@ trait Creators { this: ActorDSL.type ⇒ /** * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. * - * @param name is the name, which must be unique within the context of its - * parent; defaults to `null` which will assign a name automatically. * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] * factory; do not make the generated object accessible to code * outside and do not return the same object upon subsequent invocations. @@ -127,7 +132,26 @@ trait Creators { this: ActorDSL.type ⇒ * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], * where the latter is always implicitly available within an [[akka.actor.Actor]]. */ - def actor[T <: Actor: ClassTag](name: String = null)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { + def actor[T <: Actor: ClassTag](ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { + // configure dispatcher/mailbox based on runtime class + val classOfActor = implicitly[ClassTag[T]].runtimeClass + val props = mkProps(classOfActor, () ⇒ ctor) + factory.actorOf(props) + } + + /** + * Create an actor from the given thunk which must produce an [[akka.actor.Actor]]. + * + * @param name is the name, which must be unique within the context of its + * parent. + * @param ctor is a by-name argument which captures an [[akka.actor.Actor]] + * factory; do not make the generated object accessible to code + * outside and do not return the same object upon subsequent invocations. + * @param factory is an implicit [[akka.actor.ActorRefFactory]], which can + * either be an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]], + * where the latter is always implicitly available within an [[akka.actor.Actor]]. + */ + def actor[T <: Actor: ClassTag](name: String)(ctor: ⇒ T)(implicit factory: ActorRefFactory): ActorRef = { // configure dispatcher/mailbox based on runtime class val classOfActor = implicitly[ClassTag[T]].runtimeClass val props = mkProps(classOfActor, () ⇒ ctor) From 9973b864f1903852c5443b13edb7202849e5480c Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 17 Aug 2012 08:42:21 +0200 Subject: [PATCH 9/9] add deadlock warning to Inbox methods --- akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 0aa68f1dbc..9857023e82 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -162,6 +162,11 @@ trait Inbox { this: ActorDSL.type ⇒ * Receive a single message from the internal `receiver` actor. The supplied * timeout is used for cleanup purposes and its precision is subject to the * resolution of the system’s scheduler (usually 100ms, but configurable). + * + * Warning: This method blocks the current thread until a message is + * received, thus it can introduce dead-locks (directly as well as + * indirectly by causing starvation of the thread pool). Do not use + * this method within an actor! */ def receive(timeout: FiniteDuration = defaultTimeout): Any = { implicit val t = Timeout(timeout + extraTime) @@ -174,6 +179,11 @@ trait Inbox { this: ActorDSL.type ⇒ * The supplied timeout is used for cleanup purposes and its precision is * subject to the resolution of the system’s scheduler (usually 100ms, but * configurable). + * + * Warning: This method blocks the current thread until a message is + * received, thus it can introduce dead-locks (directly as well as + * indirectly by causing starvation of the thread pool). Do not use + * this method within an actor! */ def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { implicit val t = Timeout(timeout + extraTime) @@ -190,4 +200,4 @@ trait Inbox { this: ActorDSL.type ⇒ } implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver -} \ No newline at end of file +}