diff --git a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala index e08883c6ed..93787d43a6 100644 --- a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala +++ b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala @@ -11,7 +11,7 @@ import akka.util.Timeout import akka.util.duration._ @deprecated("use ActorSystem instead", "2.0") -object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) { +object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig, OldConfigurationLoader.oldClassLoader) { start() /** @@ -26,11 +26,12 @@ object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfiguratio */ @deprecated("use default config location or write your own configuration loader", "2.0") object OldConfigurationLoader { + val oldClassLoader: ClassLoader = ActorSystem.findClassLoader() val defaultConfig: Config = { val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig - val config = cfg.withFallback(ConfigFactory.defaultReference) - config.checkValid(ConfigFactory.defaultReference, "akka") + val config = cfg.withFallback(ConfigFactory.defaultReference(oldClassLoader)) + config.checkValid(ConfigFactory.defaultReference(oldClassLoader), "akka") config } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index f644842591..92af540a9a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -10,6 +10,7 @@ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.pattern.ask import akka.util.duration._ +import akka.util.NonFatal object SupervisorMiscSpec { val config = """ @@ -77,5 +78,69 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul expectMsg("preStart") a.isTerminated must be(false) } + + "be able to recreate child when old child is Terminated" in { + val parent = system.actorOf(Props(new Actor { + val kid = context.watch(context.actorOf(Props.empty, "foo")) + def receive = { + case Terminated(`kid`) ⇒ + try { + val newKid = context.actorOf(Props.empty, "foo") + val result = + if (newKid eq kid) "Failure: context.actorOf returned the same instance!" + else if (!kid.isTerminated) "Kid is zombie" + else if (newKid.isTerminated) "newKid was stillborn" + else if (kid.path != newKid.path) "The kids do not share the same path" + else "green" + testActor ! result + } catch { + case NonFatal(e) ⇒ testActor ! e + } + case "engage" ⇒ context.stop(kid) + } + })) + parent ! "engage" + expectMsg("green") + } + + "not be able to recreate child when old child is alive" in { + val parent = system.actorOf(Props(new Actor { + def receive = { + case "engage" ⇒ + try { + val kid = context.actorOf(Props.empty, "foo") + context.stop(kid) + context.actorOf(Props.empty, "foo") + testActor ! "red" + } catch { + case e: InvalidActorNameException ⇒ testActor ! "green" + } + } + })) + parent ! "engage" + expectMsg("green") + } + + "be able to create a similar kid in the fault handling strategy" in { + val parent = system.actorOf(Props(new Actor { + + override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { + override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { + val newKid = context.actorOf(Props.empty, child.path.name) + testActor ! { + if ((newKid ne child) && newKid.path == child.path) "green" + else "red" + } + } + } + + def receive = { + case "engage" ⇒ context.stop(context.actorOf(Props.empty, "Robert")) + } + })) + parent ! "engage" + expectMsg("green") + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 127907412e..d24b2c4806 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -9,9 +9,10 @@ import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ import akka.util.duration._ import akka.util.Duration +import akka.actor.ActorSystem @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { +class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.findClassLoader())) { "The default configuration file (i.e. reference.conf)" must { "contain all configuration properties for akka-actor that are used in code with their correct defaults" in { @@ -22,8 +23,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { { import config._ - getString("akka.version") must equal("2.0-SNAPSHOT") - settings.ConfigVersion must equal("2.0-SNAPSHOT") + getString("akka.version") must equal("2.1-SNAPSHOT") + settings.ConfigVersion must equal("2.1-SNAPSHOT") getBoolean("akka.daemonic") must equal(false) getBoolean("akka.actor.serialize-messages") must equal(false) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala index b3ce0108dd..7ac129e254 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala @@ -95,6 +95,17 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { assert(Await.result(c, timeout.duration) === 4) } + "map futures" in { + val q = PromiseStream[String]() + flow { + q << (Future("a"), Future("b"), Future("c")) + } + val a, b, c = q.dequeue + Await.result(a, timeout.duration) must be("a") + Await.result(b, timeout.duration) must be("b") + Await.result(c, timeout.duration) must be("c") + } + "not fail under concurrent stress" in { implicit val timeout = Timeout(60 seconds) val q = PromiseStream[Long](timeout.duration.toMillis) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index a7218230e4..787f29f93a 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -200,9 +200,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } system.stop(supervisor) - expectMsg(Logging.Debug(sname, `sclass`, "stopping")) - expectMsg(Logging.Debug(aname, `aclass`, "stopped")) - expectMsg(Logging.Debug(sname, `sclass`, "stopped")) + expectMsgAllOf( + Logging.Debug(aname, aclass, "stopped"), + Logging.Debug(sname, sclass, "stopping"), + Logging.Debug(sname, sclass, "stopped")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 89b6f5274a..2ae32cfcf5 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -118,7 +118,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) + watch(router) system.stop(router) + expectMsgType[Terminated] } "use configured nr-of-instances when router is specified" in { diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java b/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java index 1b819ab4c5..981708ae55 100755 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigFactory.java @@ -11,7 +11,6 @@ import java.util.Map; import java.util.Properties; import com.typesafe.config.impl.ConfigImpl; -import com.typesafe.config.impl.ConfigImplUtil; import com.typesafe.config.impl.Parseable; /** @@ -41,6 +40,12 @@ public final class ConfigFactory { * {@link Class#getResource}). * *

+ * Resources are loaded from the current thread's + * {@link Thread#getContextClassLoader()}. In general, a library needs its + * configuration to come from the class loader used to load that library, so + * the proper "reference.conf" are present. + * + *

* The loaded object will already be resolved (substitutions have already * been processed). As a result, if you add more fallbacks then they won't * be seen by substitutions. Substitutions are the "${foo.bar}" syntax. If @@ -49,16 +54,29 @@ public final class ConfigFactory { * * @param resourceBasename * name (optionally without extension) of a resource on classpath - * @return configuration for an application + * @return configuration for an application relative to context class loader */ public static Config load(String resourceBasename) { - return load(resourceBasename, ConfigParseOptions.defaults(), + return load(Thread.currentThread().getContextClassLoader(), resourceBasename); + } + + /** + * Like {@link #load(String)} but uses the supplied class loader instead of + * the current thread's context class loader. + * + * @param loader + * @param resourceBasename + * @return configuration for an application relative to given class loader + */ + public static Config load(ClassLoader loader, String resourceBasename) { + return load(loader, resourceBasename, ConfigParseOptions.defaults(), ConfigResolveOptions.defaults()); } /** * Like {@link #load(String)} but allows you to specify parse and resolve * options. + * * @param resourceBasename * the classpath resource name with optional extension * @param parseOptions @@ -69,9 +87,29 @@ public final class ConfigFactory { */ public static Config load(String resourceBasename, ConfigParseOptions parseOptions, ConfigResolveOptions resolveOptions) { - Config appConfig = ConfigFactory.parseResourcesAnySyntax(ConfigFactory.class, "/" - + resourceBasename, parseOptions); - return load(appConfig, resolveOptions); + return load(Thread.currentThread().getContextClassLoader(), resourceBasename, parseOptions, + resolveOptions); + } + + /** + * Like {@link #load(String,ConfigParseOptions,ConfigResolveOptions)} but + * allows you to specify a class loader + * + * @param loader + * class loader in which to find resources + * @param resourceBasename + * the classpath resource name with optional extension + * @param parseOptions + * options to use when parsing the resource + * @param resolveOptions + * options to use when resolving the stack + * @return configuration for an application + */ + public static Config load(ClassLoader loader, String resourceBasename, + ConfigParseOptions parseOptions, ConfigResolveOptions resolveOptions) { + Config appConfig = ConfigFactory.parseResourcesAnySyntax(loader, resourceBasename, + parseOptions); + return load(loader, appConfig, resolveOptions); } /** @@ -85,7 +123,11 @@ public final class ConfigFactory { * @return resolved configuration with overrides and fallbacks added */ public static Config load(Config config) { - return load(config, ConfigResolveOptions.defaults()); + return load(Thread.currentThread().getContextClassLoader(), config); + } + + public static Config load(ClassLoader loader, Config config) { + return load(loader, config, ConfigResolveOptions.defaults()); } /** @@ -99,53 +141,66 @@ public final class ConfigFactory { * @return resolved configuration with overrides and fallbacks added */ public static Config load(Config config, ConfigResolveOptions resolveOptions) { - return defaultOverrides().withFallback(config).withFallback(defaultReference()) + return load(Thread.currentThread().getContextClassLoader(), config, resolveOptions); + } + + /** + * Like {@link #load(Config,ConfigResolveOptions)} but allows you to specify + * a class loader other than the context class loader. + * + * @param loader + * class loader to use when looking up override and reference + * configs + * @param config + * the application's portion of the configuration + * @param resolveOptions + * options for resolving the assembled config stack + * @return resolved configuration with overrides and fallbacks added + */ + public static Config load(ClassLoader loader, Config config, ConfigResolveOptions resolveOptions) { + return defaultOverrides(loader).withFallback(config).withFallback(defaultReference(loader)) .resolve(resolveOptions); } - private static class DefaultConfigHolder { + private static Config loadDefaultConfig(ClassLoader loader) { + int specified = 0; - private static Config loadDefaultConfig() { - int specified = 0; + // override application.conf with config.file, config.resource, + // config.url if requested. + String resource = System.getProperty("config.resource"); + if (resource != null) + specified += 1; + String file = System.getProperty("config.file"); + if (file != null) + specified += 1; + String url = System.getProperty("config.url"); + if (url != null) + specified += 1; - // override application.conf with config.file, config.resource, - // config.url if requested. - String resource = System.getProperty("config.resource"); - if (resource != null) - specified += 1; - String file = System.getProperty("config.file"); - if (file != null) - specified += 1; - String url = System.getProperty("config.url"); - if (url != null) - specified += 1; - - if (specified == 0) { - return load("application"); - } else if (specified > 1) { - throw new ConfigException.Generic("You set more than one of config.file='" + file - + "', config.url='" + url + "', config.resource='" + resource - + "'; don't know which one to use!"); + if (specified == 0) { + return load(loader, "application"); + } else if (specified > 1) { + throw new ConfigException.Generic("You set more than one of config.file='" + file + + "', config.url='" + url + "', config.resource='" + resource + + "'; don't know which one to use!"); + } else { + if (resource != null) { + if (resource.startsWith("/")) + resource = resource.substring(1); + // this deliberately does not parseResourcesAnySyntax; if + // people want that they can use an include statement. + return load(loader, parseResources(loader, resource)); + } else if (file != null) { + return load(loader, parseFile(new File(file))); } else { - if (resource != null) { - // this deliberately does not parseResourcesAnySyntax; if - // people want that they can use an include statement. - return load(parseResources(ConfigFactory.class, resource)); - } else if (file != null) { - return load(parseFile(new File(file))); - } else { - try { - return load(parseURL(new URL(url))); - } catch (MalformedURLException e) { - throw new ConfigException.Generic( - "Bad URL in config.url system property: '" + url + "': " - + e.getMessage(), e); - } + try { + return load(loader, parseURL(new URL(url))); + } catch (MalformedURLException e) { + throw new ConfigException.Generic("Bad URL in config.url system property: '" + + url + "': " + e.getMessage(), e); } } } - - static final Config defaultConfig = loadDefaultConfig(); } /** @@ -176,11 +231,19 @@ public final class ConfigFactory { * @return configuration for an application */ public static Config load() { - try { - return DefaultConfigHolder.defaultConfig; - } catch (ExceptionInInitializerError e) { - throw ConfigImplUtil.extractInitializerError(e); - } + return load(Thread.currentThread().getContextClassLoader()); + } + + /** + * Like {@link #load()} but allows specifying a class loader other than the + * thread's current context class loader. + * + * @param loader + * class loader for finding resources + * @return configuration for an application + */ + public static Config load(ClassLoader loader) { + return loadDefaultConfig(loader); } /** @@ -194,6 +257,13 @@ public final class ConfigFactory { * jar. * *

+ * The reference config must be looked up in the class loader that contains + * the libraries that you want to use with this config, so the + * "reference.conf" for each library can be found. Use + * {@link #defaultReference(ClassLoader)} if the context class loader is not + * suitable. + * + *

* The {@link #load()} methods merge this configuration for you * automatically. * @@ -202,10 +272,21 @@ public final class ConfigFactory { * is not guaranteed that this method only looks at * "reference.conf". * - * @return the default reference config + * @return the default reference config for context class loader */ public static Config defaultReference() { - return ConfigImpl.defaultReference(); + return defaultReference(Thread.currentThread().getContextClassLoader()); + } + + /** + * Like {@link #defaultReference()} but allows you to specify a class loader + * to use rather than the current context class loader. + * + * @param loader + * @return the default reference config for this class loader + */ + public static Config defaultReference(ClassLoader loader) { + return ConfigImpl.defaultReference(loader); } /** @@ -227,6 +308,17 @@ public final class ConfigFactory { return systemProperties(); } + /** + * Like {@link #defaultOverrides()} but allows you to specify a class loader + * to use rather than the current context class loader. + * + * @param loader + * @return the default override configuration + */ + public static Config defaultOverrides(ClassLoader loader) { + return systemProperties(); + } + /** * Gets an empty configuration. See also {@link #empty(String)} to create an * empty configuration with a description, which may improve user-visible @@ -301,12 +393,12 @@ public final class ConfigFactory { * string values. If you have both "a=foo" and "a.b=bar" in your properties * file, so "a" is both the object containing "b" and the string "foo", then * the string value is dropped. - * + * *

* If you want to have System.getProperties() as a * ConfigObject, it's better to use the {@link #systemProperties()} method * which returns a cached global singleton. - * + * * @param properties * a Java Properties object * @param options @@ -465,6 +557,106 @@ public final class ConfigFactory { return parseResourcesAnySyntax(klass, resourceBasename, ConfigParseOptions.defaults()); } + /** + * Parses all resources on the classpath with the given name and merges them + * into a single Config. + * + *

+ * This works like {@link java.lang.ClassLoader#getResource}, not like + * {@link java.lang.Class#getResource}, so the name never begins with a + * slash. + * + *

+ * See {@link #parseResources(Class,String,ConfigParseOptions)} for full + * details. + * + * @param loader + * will be used to load resources + * @param resource + * resource to look up + * @param options + * parse options + * @return the parsed configuration + */ + public static Config parseResources(ClassLoader loader, String resource, + ConfigParseOptions options) { + return Parseable.newResources(loader, resource, options).parse().toConfig(); + } + + public static Config parseResources(ClassLoader loader, String resource) { + return parseResources(loader, resource, ConfigParseOptions.defaults()); + } + + /** + * Parses classpath resources with a flexible extension. In general, this + * method has the same behavior as + * {@link #parseFileAnySyntax(File,ConfigParseOptions)} but for classpath + * resources instead, as in + * {@link #parseResources(ClassLoader,String,ConfigParseOptions)}. + * + *

+ * {@link #parseResourcesAnySyntax(Class,String,ConfigParseOptions)} differs + * in the syntax for the resource name, but otherwise see + * {@link #parseResourcesAnySyntax(Class,String,ConfigParseOptions)} for + * some details and caveats on this method. + * + * @param loader + * class loader to look up resources in + * @param resourceBasename + * a resource name as in + * {@link java.lang.ClassLoader#getResource}, with or without + * extension + * @param options + * parse options + * @return the parsed configuration + */ + public static Config parseResourcesAnySyntax(ClassLoader loader, String resourceBasename, + ConfigParseOptions options) { + return ConfigImpl.parseResourcesAnySyntax(loader, resourceBasename, options).toConfig(); + } + + public static Config parseResourcesAnySyntax(ClassLoader loader, String resourceBasename) { + return parseResourcesAnySyntax(loader, resourceBasename, ConfigParseOptions.defaults()); + } + + /** + * Like {@link #parseResources(ClassLoader,String,ConfigParseOptions)} but + * uses thread's current context class loader. + */ + public static Config parseResources(String resource, ConfigParseOptions options) { + return Parseable + .newResources(Thread.currentThread().getContextClassLoader(), resource, options) + .parse().toConfig(); + } + + /** + * Like {@link #parseResources(ClassLoader,String)} but uses thread's + * current context class loader. + */ + public static Config parseResources(String resource) { + return parseResources(Thread.currentThread().getContextClassLoader(), resource, + ConfigParseOptions.defaults()); + } + + /** + * Like + * {@link #parseResourcesAnySyntax(ClassLoader,String,ConfigParseOptions)} + * but uses thread's current context class loader. + */ + public static Config parseResourcesAnySyntax(String resourceBasename, ConfigParseOptions options) { + return ConfigImpl.parseResourcesAnySyntax(Thread.currentThread().getContextClassLoader(), + resourceBasename, options).toConfig(); + } + + /** + * Like {@link #parseResourcesAnySyntax(ClassLoader,String)} but uses + * thread's current context class loader. + */ + public static Config parseResourcesAnySyntax(String resourceBasename) { + return parseResourcesAnySyntax(Thread.currentThread().getContextClassLoader(), + resourceBasename, ConfigParseOptions.defaults()); + } + public static Config parseString(String s, ConfigParseOptions options) { return Parseable.newString(s, options).parse().toConfig(); } diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java index 73ddfdce5a..fd41dda6a4 100755 --- a/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/ConfigImpl.java @@ -118,6 +118,18 @@ public class ConfigImpl { return fromBasename(source, resourceBasename, baseOptions); } + /** For use ONLY by library internals, DO NOT TOUCH not guaranteed ABI */ + public static ConfigObject parseResourcesAnySyntax(final ClassLoader loader, + String resourceBasename, final ConfigParseOptions baseOptions) { + NameSource source = new NameSource() { + @Override + public ConfigParseable nameToParseable(String name) { + return Parseable.newResources(loader, name, baseOptions); + } + }; + return fromBasename(source, resourceBasename, baseOptions); + } + /** For use ONLY by library internals, DO NOT TOUCH not guaranteed ABI */ public static ConfigObject parseFileAnySyntax(final File basename, final ConfigParseOptions baseOptions) { @@ -397,20 +409,11 @@ public class ConfigImpl { return envVariablesAsConfigObject().toConfig(); } - private static class ReferenceHolder { - private static final Config unresolvedResources = Parseable - .newResources(ConfigImpl.class, "/reference.conf", ConfigParseOptions.defaults()) - .parse().toConfig(); - static final Config referenceConfig = systemPropertiesAsConfig().withFallback( - unresolvedResources).resolve(); - } - /** For use ONLY by library internals, DO NOT TOUCH not guaranteed ABI */ - public static Config defaultReference() { - try { - return ReferenceHolder.referenceConfig; - } catch (ExceptionInInitializerError e) { - throw ConfigImplUtil.extractInitializerError(e); - } + public static Config defaultReference(ClassLoader loader) { + Config unresolvedResources = Parseable + .newResources(loader, "reference.conf", ConfigParseOptions.defaults()).parse() + .toConfig(); + return systemPropertiesAsConfig().withFallback(unresolvedResources).resolve(); } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 480826763f..6e747f8121 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -7,7 +7,7 @@ akka { # Akka version, checked against the runtime version of Akka. - version = "2.0-SNAPSHOT" + version = "2.1-SNAPSHOT" # Home directory of Akka, modules in the deploy directory will be loaded home = "" diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index e583180af7..9adff5594e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -189,8 +189,6 @@ trait Actor { "\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + - "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem), or" + - "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)") if (contextStack.isEmpty) noContextError diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 22c82bfb6c..3b068d5c2b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -161,19 +161,142 @@ trait UntypedActorContext extends ActorContext { } +/** + * Everything in here is completely Akka PRIVATE. You will not find any + * supported APIs in this place. This is not the API you were looking + * for! (waves hand) + */ private[akka] object ActorCell { val contextStack = new ThreadLocal[Stack[ActorContext]] { override def initialValue = Stack[ActorContext]() } - val emptyChildrenRefs = TreeMap[String, ChildRestartStats]() - final val emptyCancellable: Cancellable = new Cancellable { def isCancelled = false def cancel() {} } final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) + + trait SuspendReason + case object UserRequest extends SuspendReason + case class Recreation(cause: Throwable) extends SuspendReason + case object Termination extends SuspendReason + + trait ChildrenContainer { + def add(child: ActorRef): ChildrenContainer + def remove(child: ActorRef): ChildrenContainer + def getByName(name: String): Option[ChildRestartStats] + def getByRef(actor: ActorRef): Option[ChildRestartStats] + def children: Iterable[ActorRef] + def stats: Iterable[ChildRestartStats] + def shallDie(actor: ActorRef): ChildrenContainer + } + + trait EmptyChildrenContainer extends ChildrenContainer { + val emptyStats = TreeMap.empty[String, ChildRestartStats] + def add(child: ActorRef): ChildrenContainer = + new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) + def remove(child: ActorRef): ChildrenContainer = this + def getByName(name: String): Option[ChildRestartStats] = None + def getByRef(actor: ActorRef): Option[ChildRestartStats] = None + def children: Iterable[ActorRef] = Nil + def stats: Iterable[ChildRestartStats] = Nil + def shallDie(actor: ActorRef): ChildrenContainer = this + override def toString = "no children" + } + + /** + * This is the empty container, shared among all leaf actors. + */ + object EmptyChildrenContainer extends EmptyChildrenContainer + + /** + * This is the empty container which is installed after the last child has + * terminated while stopping; it is necessary to distinguish from the normal + * empty state while calling handleChildTerminated() for the last time. + */ + object TerminatedChildrenContainer extends EmptyChildrenContainer { + override def add(child: ActorRef): ChildrenContainer = this + } + + /** + * Normal children container: we do have at least one child, but none of our + * children are currently terminating (which is the time period between + * calling context.stop(child) and processing the ChildTerminated() system + * message). + */ + class NormalChildrenContainer(c: TreeMap[String, ChildRestartStats]) extends ChildrenContainer { + + def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + + def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) + + def getByName(name: String): Option[ChildRestartStats] = c get name + + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + case c @ Some(crs) if (crs.child == actor) ⇒ c + case _ ⇒ None + } + + def children: Iterable[ActorRef] = c.values.view.map(_.child) + + def stats: Iterable[ChildRestartStats] = c.values + + def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest) + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children:\n ", "\n ", "") + } + + object NormalChildrenContainer { + def apply(c: TreeMap[String, ChildRestartStats]): ChildrenContainer = + if (c.isEmpty) EmptyChildrenContainer + else new NormalChildrenContainer(c) + } + + /** + * Waiting state: there are outstanding termination requests (i.e. context.stop(child) + * was called but the corresponding ChildTerminated() system message has not yet been + * processed). There could be no specific reason (UserRequested), we could be Restarting + * or Terminating. + * + * Removing the last child which was supposed to be terminating will return a different + * type of container, depending on whether or not children are left and whether or not + * the reason was “Terminating”. + */ + case class TerminatingChildrenContainer(c: TreeMap[String, ChildRestartStats], toDie: Set[ActorRef], reason: SuspendReason) + extends ChildrenContainer { + + def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + + def remove(child: ActorRef): ChildrenContainer = { + val t = toDie - child + if (t.isEmpty) reason match { + case Termination ⇒ TerminatedChildrenContainer + case _ ⇒ NormalChildrenContainer(c - child.path.name) + } + else copy(c - child.path.name, t) + } + + def getByName(name: String): Option[ChildRestartStats] = c get name + + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + case c @ Some(crs) if (crs.child == actor) ⇒ c + case _ ⇒ None + } + + def children: Iterable[ActorRef] = c.values.view.map(_.child) + + def stats: Iterable[ChildRestartStats] = c.values + + def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie + } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) @@ -221,7 +344,18 @@ private[akka] class ActorCell( var receiveTimeoutData: (Long, Cancellable) = if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData - var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs + @volatile + var childrenRefs: ChildrenContainer = EmptyChildrenContainer + + private def isTerminating = childrenRefs match { + case TerminatingChildrenContainer(_, _, Termination) ⇒ true + case TerminatedChildrenContainer ⇒ true + case _ ⇒ false + } + private def isNormal = childrenRefs match { + case TerminatingChildrenContainer(_, _, Termination | _: Recreation) ⇒ false + case _ ⇒ true + } private def _actorOf(props: Props, name: String): ActorRef = { if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { @@ -234,9 +368,13 @@ private[akka] class ActorCell( } } } - val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) - childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) - actor + // in case we are currently terminating, swallow creation requests and return EmptyLocalActorRef + if (isTerminating) provider.actorFor(self, Seq(name)) + else { + val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) + childrenRefs = childrenRefs.add(actor) + actor + } } def actorOf(props: Props): ActorRef = _actorOf(props, randomName()) @@ -249,27 +387,21 @@ private[akka] class ActorCell( case ElementRegex() ⇒ // this is fine case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) } - if (childrenRefs contains name) - throw new InvalidActorNameException("actor name " + name + " is not unique!") - _actorOf(props, name) + childrenRefs.getByName(name) match { + case None ⇒ _actorOf(props, name) + case _ ⇒ throw new InvalidActorNameException("actor name " + name + " is not unique!") + } } final def stop(actor: ActorRef): Unit = { - val a = actor.asInstanceOf[InternalActorRef] - if (childrenRefs contains actor.path.name) { - system.locker ! a - childrenRefs -= actor.path.name - handleChildTerminated(actor) - } - a.stop() + if (childrenRefs.getByRef(actor).isDefined) childrenRefs = childrenRefs.shallDie(actor) + actor.asInstanceOf[InternalActorRef].stop() } var currentMessage: Envelope = null var actor: Actor = _ - var stopping = false - @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ @@ -329,7 +461,7 @@ private[akka] class ActorCell( subject } - final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) + final def children: Iterable[ActorRef] = childrenRefs.children /** * Impl UntypedActorContext @@ -369,100 +501,97 @@ private[akka] class ActorCell( //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def systemInvoke(message: SystemMessage) { - def create(): Unit = try { - val created = newActor() - actor = created - created.preStart() - checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) - } catch { - case NonFatal(e) ⇒ - try { + def create(): Unit = if (isNormal) { + try { + val created = newActor() + actor = created + created.preStart() + checkReceiveTimeout + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) + } catch { + case NonFatal(e) ⇒ + try { + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + } finally { + parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self) + } + } + } + + def recreate(cause: Throwable): Unit = if (isNormal) { + try { + val failedActor = actor + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) + if (failedActor ne null) { + val c = currentMessage //One read only plz + try { + if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) + } finally { + clearActorFields() + } + } + childrenRefs match { + case ct: TerminatingChildrenContainer ⇒ + childrenRefs = ct.copy(reason = Recreation(cause)) + dispatcher suspend this + case _ ⇒ + doRecreate(cause) + } + } catch { + case NonFatal(e) ⇒ try { dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self) + parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) } - } - - def recreate(cause: Throwable): Unit = try { - val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) - if (failedActor ne null) { - val c = currentMessage //One read only plz - try { - failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) - } finally { - clearActorFields() - } - } - val freshActor = newActor() // this must happen after failedActor.preRestart (to scrap those children) - actor = freshActor // this must happen before postRestart has a chance to fail - freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) - - dispatcher.resume(this) //FIXME should this be moved down? - - actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) - } catch { - case NonFatal(e) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) } } - def suspend(): Unit = dispatcher suspend this + def suspend(): Unit = if (isNormal) dispatcher suspend this - def resume(): Unit = dispatcher resume this + def resume(): Unit = if (isNormal) dispatcher resume this + + def link(subject: ActorRef): Unit = if (!isTerminating) { + system.deathWatch.subscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) + } + + def unlink(subject: ActorRef): Unit = if (!isTerminating) { + system.deathWatch.unsubscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) + } def terminate() { setReceiveTimeout(None) cancelReceiveTimeout - val c = children - if (c.isEmpty) doTerminate() - else { - // do not process normal messages while waiting for all children to terminate - dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) - // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them - for (child ← c) child.asInstanceOf[InternalActorRef].stop() - stopping = true + // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) + children foreach stop + + childrenRefs match { + case ct: TerminatingChildrenContainer ⇒ + childrenRefs = ct.copy(reason = Termination) + // do not process normal messages while waiting for all children to terminate + dispatcher suspend this + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) + case _ ⇒ doTerminate() } } - def supervise(child: ActorRef): Unit = { - childrenRefs.get(child.path.name) match { - case None ⇒ - childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - case Some(ChildRestartStats(`child`, _, _)) ⇒ - // this is the nominal case where we created the child and entered it in actorCreated() above - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - case Some(ChildRestartStats(c, _, _)) ⇒ - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) - } + def supervise(child: ActorRef): Unit = if (!isTerminating) { + if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } try { - if (stopping) message match { - case Terminate() ⇒ terminate() // to allow retry - case ChildTerminated(child) ⇒ handleChildTerminated(child) - case _ ⇒ - } - else message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ - system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) - case Unlink(subject) ⇒ - system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) + message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Link(subject) ⇒ link(subject) + case Unlink(subject) ⇒ unlink(subject) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -472,7 +601,6 @@ private[akka] class ActorCell( } catch { case NonFatal(e) ⇒ dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e)) - //TODO FIXME How should problems here be handled??? throw e } } @@ -545,7 +673,7 @@ private[akka] class ActorCell( case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ if (childrenRefs contains name) childrenRefs(name).child.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ for (c ← childrenRefs getByName name) c.child.tell(m, msg.sender) case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } @@ -567,22 +695,61 @@ private[akka] class ActorCell( } finally { if (a ne null) a.clearBehaviorStack() clearActorFields() + actor = null } } } - final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { - case Some(stats) if stats.child == child ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause - case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) - case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + private def doRecreate(cause: Throwable): Unit = try { + // after all killed children have terminated, recreate the rest, then go on to start the new instance + actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) + + val freshActor = newActor() + actor = freshActor // this must happen before postRestart has a chance to fail + + freshActor.postRestart(cause) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + + dispatcher.resume(this) + } catch { + case NonFatal(e) ⇒ try { + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + actor.supervisorStrategy.handleSupervisorFailing(self, children) + } finally { + parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) + } } - final def handleChildTerminated(child: ActorRef): Unit = { - if (childrenRefs contains child.path.name) { - childrenRefs -= child.path.name - actor.supervisorStrategy.handleChildTerminated(this, child, children) - if (stopping && childrenRefs.isEmpty) doTerminate() - } else system.locker ! ChildTerminated(child) + final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match { + case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause + case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + } + + final def handleChildTerminated(child: ActorRef): Unit = try { + childrenRefs match { + case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ + val n = tc.remove(child) + childrenRefs = n + actor.supervisorStrategy.handleChildTerminated(this, child, children) + if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { + case Recreation(cause) ⇒ doRecreate(cause) + case Termination ⇒ doTerminate() + case _ ⇒ + } + case _ ⇒ + childrenRefs = childrenRefs.remove(child) + actor.supervisorStrategy.handleChildTerminated(this, child, children) + } + } catch { + case NonFatal(e) ⇒ + try { + dispatcher suspend this + actor.supervisorStrategy.handleSupervisorFailing(self, children) + } finally { + parent.tell(Failed(e), self) + } } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -609,7 +776,6 @@ private[akka] class ActorCell( final def clearActorFields(): Unit = { setActorFields(context = null, self = system.deadLetters) currentMessage = null - actor = null } final def setActorFields(context: ActorContext, self: ActorRef) { @@ -640,3 +806,4 @@ private[akka] class ActorCell( private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } + diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bd2c0cf196..668731e4bb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -284,14 +284,11 @@ private[akka] class LocalActorRef private[akka] ( * Method for looking up a single child beneath this actor. Override in order * to inject “synthetic” actor paths like “/temp”. */ - protected def getSingleChild(name: String): InternalActorRef = { - if (actorCell.isTerminated) Nobody // read of the mailbox status ensures we get the latest childrenRefs - else { - val children = actorCell.childrenRefs - if (children contains name) children(name).child.asInstanceOf[InternalActorRef] - else Nobody + protected def getSingleChild(name: String): InternalActorRef = + actorCell.childrenRefs.getByName(name) match { + case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] + case None ⇒ Nobody } - } def getChild(names: Iterator[String]): InternalActorRef = { /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d9f710b533..b22218d6a3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -10,7 +10,6 @@ import akka.routing._ import akka.AkkaException import akka.util.{ Switch, Helpers } import akka.event._ -import com.typesafe.config.ConfigFactory /** * Interface for all ActorRef providers to implement. @@ -44,12 +43,6 @@ trait ActorRefProvider { */ def deathWatch: DeathWatch - /** - * Care-taker of actor refs which await final termination but cannot be kept - * in their parent’s children list because the name shall be freed. - */ - def locker: Locker - /** * The root path for all actors within this actor system, including remote * address if enabled. @@ -275,10 +268,7 @@ trait ActorRefFactory { /** * Stop the actor pointed to by the given [[akka.actor.ActorRef]]; this is - * an asynchronous operation, i.e. involves a message send, but if invoked - * on an [[akka.actor.ActorContext]] if operating on a child of that - * context it will free up the name for immediate reuse. - * + * an asynchronous operation, i.e. involves a message send. * When invoked on [[akka.actor.ActorSystem]] for a top-level actor, this * method sends a message to the guardian actor and blocks waiting for a reply, * see `akka.actor.creation-timeout` in the `reference.conf`. @@ -333,8 +323,6 @@ class LocalActorRefProvider( val deathWatch = new LocalDeathWatch(1024) //TODO make configrable - val locker: Locker = new Locker(scheduler, settings.ReaperInterval, this, rootPath / "locker", deathWatch) - /* * generate name for temporary actor refs */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index de1689e730..2c217a9109 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -23,7 +23,7 @@ import collection.immutable.Stack object ActorSystem { - val Version = "2.0-SNAPSHOT" + val Version = "2.1-SNAPSHOT" val EnvHome = System.getenv("AKKA_HOME") match { case null | "" | "." ⇒ None @@ -37,27 +37,76 @@ object ActorSystem { val GlobalHome = SystemHome orElse EnvHome - def create(name: String, config: Config): ActorSystem = apply(name, config) - def apply(name: String, config: Config): ActorSystem = new ActorSystemImpl(name, config).start() + /** + * Creates a new ActorSystem with the name "default", + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. + * Then it loads the default reference configuration using the ClassLoader. + */ + def create(): ActorSystem = apply() /** - * Uses the standard default Config from ConfigFactory.load(), since none is provided. + * Creates a new ActorSystem with the specified name, + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. + * Then it loads the default reference configuration using the ClassLoader. */ def create(name: String): ActorSystem = apply(name) /** - * Uses the standard default Config from ConfigFactory.load(), since none is provided. + * Creates a new ActorSystem with the name "default", and the specified Config, then + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. */ - def apply(name: String): ActorSystem = apply(name, ConfigFactory.load()) + def create(name: String, config: Config): ActorSystem = apply(name, config) - def create(): ActorSystem = apply() + /** + * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + */ + def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader) + + /** + * Creates a new ActorSystem with the name "default", + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. + * Then it loads the default reference configuration using the ClassLoader. + */ def apply(): ActorSystem = apply("default") - class Settings(cfg: Config, final val name: String) { + /** + * Creates a new ActorSystem with the specified name, + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. + * Then it loads the default reference configuration using the ClassLoader. + */ + def apply(name: String): ActorSystem = { + val classLoader = findClassLoader() + apply(name, ConfigFactory.load(classLoader), classLoader) + } + + /** + * Creates a new ActorSystem with the name "default", and the specified Config, then + * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, + * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader + * associated with the ActorSystem class. + */ + def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) + + /** + * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + */ + def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start() + + class Settings(classLoader: ClassLoader, cfg: Config, final val name: String) { final val config: Config = { - val config = cfg.withFallback(ConfigFactory.defaultReference) - config.checkValid(ConfigFactory.defaultReference, "akka") + val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader)) + config.checkValid(ConfigFactory.defaultReference(classLoader), "akka") config } @@ -98,6 +147,27 @@ object ActorSystem { override def toString: String = config.root.render } + + /** + * INTERNAL + */ + private[akka] def findClassLoader(): ClassLoader = { + def findCaller(get: Int ⇒ Class[_]): ClassLoader = + Iterator.from(2 /*is the magic number, promise*/ ).map(get) dropWhile { c ⇒ + c != null && + (c.getName.startsWith("akka.actor.ActorSystem") || + c.getName.startsWith("scala.Option") || + c.getName.startsWith("scala.collection.Iterator") || + c.getName.startsWith("akka.util.Reflect")) + } next () match { + case null ⇒ getClass.getClassLoader + case c ⇒ c.getClassLoader + } + + Option(Thread.currentThread.getContextClassLoader) orElse + (Reflect.getCallerClass map findCaller) getOrElse + getClass.getClassLoader + } } /** @@ -344,14 +414,14 @@ abstract class ExtendedActorSystem extends ActorSystem { def dynamicAccess: DynamicAccess } -class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Config) extends ExtendedActorSystem { +class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem { if (!name.matches("""^\w+$""")) throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])") import ActorSystem._ - final val settings: Settings = new Settings(applicationConfig, name) + final val settings: Settings = new Settings(classLoader, applicationConfig, name) protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { @@ -366,33 +436,13 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf } final val threadFactory: MonitorableThreadFactory = - MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader), uncaughtExceptionHandler) + MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler) /** * This is an extension point: by overriding this method, subclasses can * control all reflection activities of an actor system. */ - protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(findClassLoader) - - protected def findClassLoader: ClassLoader = { - def findCaller(get: Int ⇒ Class[_]): ClassLoader = { - val frames = Iterator.from(2).map(get) - frames dropWhile { c ⇒ - c != null && - (c.getName.startsWith("akka.actor.ActorSystem") || - c.getName.startsWith("scala.Option") || - c.getName.startsWith("scala.collection.Iterator") || - c.getName.startsWith("akka.util.Reflect")) - } next () match { - case null ⇒ getClass.getClassLoader - case c ⇒ c.getClassLoader - } - } - - Option(Thread.currentThread.getContextClassLoader) orElse - (Reflect.getCallerClass map findCaller) getOrElse - getClass.getClassLoader - } + protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(classLoader) private val _pm: DynamicAccess = createDynamicAccess() def dynamicAccess: DynamicAccess = _pm @@ -478,8 +528,6 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def hasSystemMessages = false } - def locker: Locker = provider.locker - val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala deleted file mode 100644 index 4f9caaeedc..0000000000 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor - -import akka.dispatch._ -import akka.util.Duration -import java.util.concurrent.ConcurrentHashMap -import akka.event.DeathWatch - -/** - * Internal implementation detail for disposing of orphaned actors. - */ -private[akka] class Locker( - scheduler: Scheduler, - period: Duration, - val provider: ActorRefProvider, - val path: ActorPath, - val deathWatch: DeathWatch) extends MinimalActorRef { - - class DavyJones extends Runnable { - def run = { - val iter = heap.entrySet.iterator - while (iter.hasNext) { - val soul = iter.next() - deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere - soul.getKey match { - case _: LocalRef ⇒ // nothing to do, they know what they signed up for - case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure - } - } - } - } - - private val heap = new ConcurrentHashMap[InternalActorRef, Long] - - scheduler.schedule(period, period, new DavyJones) - - override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg) - - override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case Terminated(soul) ⇒ heap.remove(soul) - case ChildTerminated(soul) ⇒ heap.remove(soul) - case soul: InternalActorRef ⇒ - heap.put(soul, 0l) // wanted to put System.nanoTime and do something intelligent, but forgot what that was - deathWatch.subscribe(this, soul) - // now re-bind the soul so that it does not drown its parent - soul match { - case local: LocalActorRef ⇒ - val cell = local.underlying - cell.parent = this - case _ ⇒ - } - case _ ⇒ // ignore - } - -} diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e8389c69fc..6ba3c4e14e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -16,6 +16,7 @@ import akka.serialization.SerializationExtension import akka.util.NonFatal import akka.event.Logging.LogEventException import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } +import akka.util.Index final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -180,6 +181,29 @@ object MessageDispatcher { val SCHEDULED = 1 val RESCHEDULED = 2 + // dispatcher debugging helper using println (see below) + // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) + final val debug = false + lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _) + def printActors: Unit = if (debug) { + for { + d ← actors.keys + val c = println(d + " inhabitants: " + d.inhabitants) + a ← actors.valueIterator(d) + } { + val status = if (a.isTerminated) " (terminated)" else " (alive)" + val messages = a match { + case l: LocalActorRef ⇒ " " + l.underlying.mailbox.numberOfMessages + " messages" + case _ ⇒ " " + a.getClass + } + val parent = a match { + case i: InternalActorRef ⇒ ", parent: " + i.getParent + case _ ⇒ "" + } + println(" -> " + a + status + messages + parent) + } + } + implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } @@ -267,6 +291,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { + if (debug) actors.put(this, actor.self) inhabitantsUpdater.incrementAndGet(this) } @@ -274,6 +299,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext * If you override it, you must call it. But only ever once. See "detach" for the only invocation */ protected[akka] def unregister(actor: ActorCell) { + if (debug) actors.remove(this, actor.self) inhabitantsUpdater.decrementAndGet(this) val mailBox = actor.mailbox mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 2d929cdded..322b50b900 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -187,10 +187,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes var nextMessage = systemDrain() try { while ((nextMessage ne null) && !isClosed) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with " + - (if (actor.childrenRefs.isEmpty) "no children" - else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children" - else actor.childrenRefs.mkString("children:\n ", "\n ", ""))) + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala index b1c25f55e1..882219f84d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala @@ -246,7 +246,10 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ elem map (a ⇒ cont(this += a)) } final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ Future.flow(this << elem1 << elem2 <<< Future.sequence(elems.toSeq)) map cont } + shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ + val seq = Future.sequence(elem1 +: elem2 +: elems) + seq map (a ⇒ cont(this ++= a)) + } final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] = shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this ++= elems) } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 368dd9110a..d35a761964 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -15,8 +15,9 @@ import akka.util.Timeout * This is what is used to complete a Future that is returned from an ask/? call, * when it times out. */ -class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) { def this(message: String) = this(message, null: Throwable) + override def getCause(): Throwable = cause } /** diff --git a/akka-docs/_sphinx/static/favicon.ico b/akka-docs/_sphinx/static/favicon.ico new file mode 100644 index 0000000000..858e2dbebd Binary files /dev/null and b/akka-docs/_sphinx/static/favicon.ico differ diff --git a/akka-docs/_sphinx/themes/akka/layout.html b/akka-docs/_sphinx/themes/akka/layout.html index 0d46ef708e..6f975aad81 100644 --- a/akka-docs/_sphinx/themes/akka/layout.html +++ b/akka-docs/_sphinx/themes/akka/layout.html @@ -4,60 +4,130 @@ #} {% extends "basic/layout.html" %} -{% set script_files = script_files + ['_static/theme_extras.js'] %} -{% set css_files = css_files + ['_static/print.css'] %} -{% set is_snapshot = version.endswith("-SNAPSHOT") %} +{% set script_files = script_files + ['_static/toc.js'] %} +{% set script_files = script_files + ['_static/prettify.js'] %} +{% set script_files = script_files + ['_static/highlightCode.js'] %} +{% set script_files = script_files + ['_static/effects.core.js'] %} +{% set script_files = script_files + ['_static/effects.highlight.js'] %} +{% set script_files = script_files + ['_static/scrollTo.js'] %} +{% set script_files = script_files + ['_static/contentsFix.js'] %} +{% set script_files = script_files + ['_static/ga.js'] %} +{% set css_files = css_files + ['_static/prettify.css'] %} +{% set css_files = css_files + ['_static/base.css'] %} +{% set css_files = css_files + ['_static/docs.css'] %} +{% set css_files = css_files + ['http://fonts.googleapis.com/css?family=Exo:300,400,600,700'] %} {# do not display relbars #} {% block relbar1 %}{% endblock %} {% block relbar2 %}{% endblock %} -{% macro nav() %} -

- {%- block akkarel1 %} - {%- endblock %} - {%- if prev %} - «  {{ prev.title }} -   ::   - {%- endif %} - {{ _('Contents') }} - {%- if next %} -   ::   - {{ next.title }}  » - {%- endif %} - {%- block akkarel2 %} - {%- endblock %} -

-{% endmacro %} - {% block content %} -
- {%- block akkaheader %} - {%- if logo -%} - - {%- endif -%} -

{{ shorttitle|e }}

-

Version {{ version|e }}

- {%- if is_snapshot -%} -

PDF

- {%- else -%} -

PDF

- {%- endif -%} - {%- endblock %} + {%- block akkaheader %} +