diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index d0ef5104cd..f8600f3e1b 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -2,21 +2,36 @@ package akka.actor; import akka.actor.ActorSystem; import akka.japi.Creator; +import akka.testkit.AkkaSpec; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; public class JavaAPI { - private ActorSystem system = ActorSystem.create(); + private static ActorSystem system; + + @BeforeClass + public static void beforeAll() { + system = ActorSystem.create("JavaAPI", AkkaSpec.testConf()); + } + + @AfterClass + public static void afterAll() { + system.stop(); + system = null; + } @Test - void mustBeAbleToCreateActorRefFromClass() { + public void mustBeAbleToCreateActorRefFromClass() { ActorRef ref = system.actorOf(JavaAPITestActor.class); assertNotNull(ref); } @Test - void mustBeAbleToCreateActorRefFromFactory() { + public void mustBeAbleToCreateActorRefFromFactory() { ActorRef ref = system.actorOf(new Props().withCreator(new Creator() { public Actor create() { return new JavaAPITestActor(); @@ -26,7 +41,7 @@ public class JavaAPI { } @Test - void mustAcceptSingleArgTell() { + public void mustAcceptSingleArgTell() { ActorRef ref = system.actorOf(JavaAPITestActor.class); ref.tell("hallo"); ref.tell("hallo", ref); diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index a08060f52d..20d760c46e 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -3,8 +3,12 @@ */ package akka.actor; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import akka.testkit.AkkaSpec; + import com.typesafe.config.ConfigFactory; import com.typesafe.config.Config; @@ -13,7 +17,9 @@ import static org.junit.Assert.*; public class JavaExtension { static class Provider implements ExtensionIdProvider { - public ExtensionId lookup() { return defaultInstance; } + public ExtensionId lookup() { + return defaultInstance; + } } public final static TestExtensionId defaultInstance = new TestExtensionId(); @@ -32,9 +38,20 @@ public class JavaExtension { } } - private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]"); + private static ActorSystem system; - private ActorSystem system = ActorSystem.create("JavaExtension", c); + @BeforeClass + public static void beforeAll() { + Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]").withFallback( + AkkaSpec.testConf()); + system = ActorSystem.create("JavaExtension", c); + } + + @AfterClass + public static void afterAll() { + system.stop(); + system = null; + } @Test public void mustBeAccessible() { diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 12dbe736d6..d534d87103 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -2,6 +2,9 @@ package akka.dispatch; import akka.actor.Timeout; import akka.actor.ActorSystem; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; import java.util.concurrent.Callable; @@ -14,15 +17,30 @@ import akka.japi.Function; import akka.japi.Function2; import akka.japi.Procedure; import akka.japi.Option; +import akka.testkit.AkkaSpec; public class JavaFutureTests { - private final ActorSystem system = ActorSystem.create(); - private final Timeout t = system.settings().ActorTimeout(); - private final FutureFactory ff = new FutureFactory(system.dispatcher(), t); + private static ActorSystem system; + private static FutureFactory ff; + private static Timeout t; + + @BeforeClass + public static void beforeAll() { + system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); + t = system.settings().ActorTimeout(); + ff = new FutureFactory(system.dispatcher(), t); + } + + @AfterClass + public static void afterAll() { + system.stop(); + system = null; + } @Test public void mustBeAbleToMapAFuture() { + Future f1 = ff.future(new Callable() { public String call() { return "Hello"; diff --git a/akka-actor-tests/src/test/java/akka/util/JavaDuration.java b/akka-actor-tests/src/test/java/akka/util/JavaDuration.java index 56e7f68bf6..5a833f5b51 100644 --- a/akka-actor-tests/src/test/java/akka/util/JavaDuration.java +++ b/akka-actor-tests/src/test/java/akka/util/JavaDuration.java @@ -7,7 +7,8 @@ import org.junit.Test; public class JavaDuration { - @Test void testCreation() { + @Test + public void testCreation() { final Duration fivesec = Duration.create(5, "seconds"); final Duration threemillis = Duration.parse("3 millis"); final Duration diff = fivesec.minus(threemillis); diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 3565cde2fb..6f8c364ff8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -9,17 +9,15 @@ import com.typesafe.config.ConfigFactory class JavaExtensionSpec extends JavaExtension with JUnitSuite -object ActorSystemSpec { - object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { - def lookup = this - def createExtension(s: ActorSystemImpl) = new TestExtension(s) - } - - class TestExtension(val system: ActorSystemImpl) extends Extension +object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { + def lookup = this + def createExtension(s: ActorSystemImpl) = new TestExtension(s) } -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") { - import ActorSystemSpec._ +// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains +class TestExtension(val system: ActorSystemImpl) extends Extension + +class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") { "An ActorSystem" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 8e0e3e61fd..52a18e0f3b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -107,9 +107,10 @@ object Chameneos { def run { // System.setProperty("akka.config", "akka.conf") Chameneos.start = System.currentTimeMillis - ActorSystem().actorOf(new Mall(1000000, 4)) + val system = ActorSystem().actorOf(new Mall(1000000, 4)) Thread.sleep(10000) println("Elapsed: " + (end - start)) + system.stop() } def main(args: Array[String]): Unit = run diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 68a81d9797..ecec2c0e3e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -160,9 +160,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { nr-of-instances = boom } } - """, ConfigParseOptions.defaults) + """, ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf) - ActorSystem("invalid", invalidDeployerConf) + ActorSystem("invalid", invalidDeployerConf).stop() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 7d829ec622..aa89ac5c89 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -194,41 +194,46 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im "log events and transitions if asked to do so" in { import scala.collection.JavaConverters._ val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", - "akka.actor.debug.fsm" -> true).asJava) - new TestKit(ActorSystem("fsm event", config)) { - EventFilter.debug() intercept { - val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { - startWith(1, null) - when(1) { - case Ev("go") ⇒ - setTimer("t", Shutdown, 1.5 seconds, false) - goto(2) + "akka.actor.debug.fsm" -> true).asJava).withFallback(AkkaSpec.testConf) + val fsmEventSystem = ActorSystem("fsm event", config) + try { + new TestKit(fsmEventSystem) { + EventFilter.debug() intercept { + val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { + startWith(1, null) + when(1) { + case Ev("go") ⇒ + setTimer("t", Shutdown, 1.5 seconds, false) + goto(2) + } + when(2) { + case Ev("stop") ⇒ + cancelTimer("t") + stop + } + onTermination { + case StopEvent(r, _, _) ⇒ testActor ! r + } + }) + val name = fsm.toString + system.eventStream.subscribe(testActor, classOf[Logging.Debug]) + fsm ! "go" + expectMsgPF(1 second, hint = "processing Event(go,null)") { + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true } - when(2) { - case Ev("stop") ⇒ - cancelTimer("t") - stop + expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) + fsm ! "stop" + expectMsgPF(1 second, hint = "processing Event(stop,null)") { + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - onTermination { - case StopEvent(r, _, _) ⇒ testActor ! r - } - }) - val name = fsm.toString - system.eventStream.subscribe(testActor, classOf[Logging.Debug]) - fsm ! "go" - expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true + expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) + expectNoMsg(1 second) + system.eventStream.unsubscribe(testActor) } - expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) - fsm ! "stop" - expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true - } - expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) - expectNoMsg(1 second) - system.eventStream.unsubscribe(testActor) } + } finally { + fsmEventSystem.stop() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/JavaAPISpec.scala b/akka-actor-tests/src/test/scala/akka/actor/JavaAPISpec.scala new file mode 100644 index 0000000000..49fcc6d638 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/JavaAPISpec.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +import org.scalatest.junit.JUnitSuite + +class JavaAPISpec extends JavaAPI with JUnitSuite diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index f25920240b..20526984b3 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -32,7 +32,7 @@ object LoggingReceiveSpec { class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAndAfterAll { import LoggingReceiveSpec._ - val config = ConfigFactory.parseMap(Map("akka.logLevel" -> "DEBUG").asJava) + val config = ConfigFactory.parseMap(Map("akka.logLevel" -> "DEBUG").asJava).withFallback(AkkaSpec.testConf) val appLogging = ActorSystem("logging", ConfigFactory.parseMap(Map("akka.actor.debug.receive" -> true).asJava).withFallback(config)) val appAuto = ActorSystem("autoreceive", ConfigFactory.parseMap(Map("akka.actor.debug.autoreceive" -> true).asJava).withFallback(config)) val appLifecycle = ActorSystem("lifecycle", ConfigFactory.parseMap(Map("akka.actor.debug.lifecycle" -> true).asJava).withFallback(config)) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala index e47d7987bd..1854a0a32e 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala @@ -34,7 +34,7 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 10000, - Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, Duration(1, TimeUnit.SECONDS)), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients * 2) .build diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala index 52bb3d169b..f5b7b3ae4d 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala @@ -14,7 +14,7 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, 1 seconds), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index a49e837ac4..f2e547ab71 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -14,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, 1 seconds), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala index 0de1e1be2d..eb0eaffc46 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala @@ -18,7 +18,7 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, Duration(1, TimeUnit.SECONDS)), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(1) .build diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 73afd0dc0a..499d214ff8 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -97,15 +97,19 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { "serialize DeadLetterActorRef" in { val outbuf = new ByteArrayOutputStream() val out = new ObjectOutputStream(outbuf) - val a = ActorSystem() - out.writeObject(a.deadLetters) - out.flush() - out.close() + val a = ActorSystem("SerializeDeadLeterActorRef", AkkaSpec.testConf) + try { + out.writeObject(a.deadLetters) + out.flush() + out.close() - val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) - Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) { - val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] - (deadLetters eq a.deadLetters) must be(true) + val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) + Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) { + val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] + (deadLetters eq a.deadLetters) must be(true) + } + } finally { + a.stop() } } } diff --git a/akka-actor-tests/src/test/scala/akka/util/JavaDurationSpec.scala b/akka-actor-tests/src/test/scala/akka/util/JavaDurationSpec.scala new file mode 100644 index 0000000000..aafbf3d133 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/JavaDurationSpec.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.util + +import org.scalatest.junit.JUnitSuite + +class JavaDurationSpec extends JavaDuration with JUnitSuite diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 7534a6de77..0da9137a5a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -378,8 +378,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // TODO shutdown all that other stuff, whatever that may be def stop() { guardian.stop() + try terminationFuture.await(10 seconds) catch { + case _: FutureTimeoutException ⇒ log.warning("Failed to stop [{}] within 10 seconds", name) + } + // Dispatchers shutdown themselves, but requires the scheduler terminationFuture onComplete (_ ⇒ stopScheduler()) - terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } protected def createScheduler(): Scheduler = { @@ -400,8 +403,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } protected def stopScheduler(): Unit = scheduler match { - case x: DefaultScheduler ⇒ x.stop() - case _ ⇒ + case x: DefaultScheduler ⇒ + // Let dispatchers shutdown first. + // Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay. + x.scheduleOnce(() ⇒ { x.stop; dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4) + case _ ⇒ } private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index fd45069e68..99557e33c8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -136,7 +136,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) + scheduler.scheduleOnce(shutdownAction, shutdownTimeout) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -211,7 +211,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) + scheduler.scheduleOnce(this, shutdownTimeout) else run() } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index bae930cb17..1bff28ea44 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -81,7 +81,7 @@ trait LoggingBus extends ActorEventBus { loggers = Seq(StandardOutLogger) _logLevel = level } - publish(Info(simpleName(this), "StandardOutLogger started")) + publish(Debug(simpleName(this), "StandardOutLogger started")) } private[akka] def startDefaultLoggers(system: ActorSystemImpl) { @@ -114,7 +114,7 @@ trait LoggingBus extends ActorEventBus { loggers = myloggers _logLevel = level } - publish(Info(simpleName(this), "Default Loggers started")) + publish(Debug(simpleName(this), "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) } @@ -154,7 +154,7 @@ trait LoggingBus extends ActorEventBus { if (response != LoggerInitialized) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Info(simpleName(this), "logger " + name + " started")) + publish(Debug(simpleName(this), "logger " + name + " started")) actor } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala index eb8a7f9e0b..54c5ba36b6 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala @@ -148,5 +148,7 @@ object QDumper { println("Queue: " + filename) new QueueDumper(filename, system.log)() } + + system.stop() } } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index ab0f0206d3..9d118e3a96 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -44,7 +44,8 @@ abstract class DurableMailboxSpec(val backendName: String, val mailboxType: Dura sender ! PoisonPill } - "handle reply to ! for multiple messages" in { + // FIXME ignored due to zookeeper issue, ticket #1423 + "handle reply to ! for multiple messages" ignore { val latch = new CountDownLatch(5) val queueActor = createMailboxTestActor(backendName + " should handle reply to !") val sender = actorOf(Props(new Sender(latch))) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index da363beaaf..3cdc734830 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -19,7 +19,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe } override def atTermination() { - zkServer.shutdown + zkServer.shutdown() super.atTermination() } } diff --git a/akka-stm/src/test/java/akka/stm/example/RetryExample.java b/akka-stm/src/test/java/akka/stm/example/RetryExample.java index ad86126deb..f15850d232 100644 --- a/akka-stm/src/test/java/akka/stm/example/RetryExample.java +++ b/akka-stm/src/test/java/akka/stm/example/RetryExample.java @@ -1,5 +1,8 @@ package akka.stm.example; +import org.junit.AfterClass; +import org.junit.BeforeClass; + import akka.actor.ActorSystem; import akka.stm.*; import akka.actor.*; @@ -7,11 +10,8 @@ import akka.testkit.AkkaSpec; public class RetryExample { public static void main(String[] args) { - System.out.println(); - System.out.println("Retry example"); - System.out.println(); - ActorSystem application = ActorSystem.create("RetryExample", AkkaSpec.testConf()); + ActorSystem application = ActorSystem.create("RetryExample", AkkaSpec.testConf()); final Ref account1 = new Ref(100.0); final Ref account2 = new Ref(100.0); @@ -47,5 +47,7 @@ public class RetryExample { // Account 2: 600.0 transferer.stop(); + + application.stop(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 344c98dfee..9baf0f1485 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -9,11 +9,8 @@ import akka.transactor.Coordinated; public class UntypedCoordinatedExample { public static void main(String[] args) throws InterruptedException { - System.out.println(); - System.out.println("Untyped transactor example"); - System.out.println(); - ActorSystem application = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf()); + ActorSystem application = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf()); ActorRef counter1 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); ActorRef counter2 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); @@ -45,5 +42,7 @@ public class UntypedCoordinatedExample { counter1.stop(); counter2.stop(); + + application.stop(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 882d5b7b1f..55e28f872f 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -8,11 +8,8 @@ import akka.testkit.AkkaSpec; public class UntypedTransactorExample { public static void main(String[] args) throws InterruptedException { - System.out.println(); - System.out.println("Untyped transactor example"); - System.out.println(); - ActorSystem application = ActorSystem.create("UntypedTransactorExample", AkkaSpec.testConf()); + ActorSystem application = ActorSystem.create("UntypedTransactorExample", AkkaSpec.testConf()); ActorRef counter1 = application.actorOf(new Props().withCreator(UntypedCounter.class)); ActorRef counter2 = application.actorOf(new Props().withCreator(UntypedCounter.class)); @@ -44,5 +41,7 @@ public class UntypedTransactorExample { counter1.stop(); counter2.stop(); + + application.stop(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 0d44d16496..a90e0a1952 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -3,6 +3,8 @@ package akka.transactor.test; import static org.junit.Assert.*; import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.Before; @@ -31,7 +33,20 @@ import scala.collection.JavaConverters; import scala.collection.Seq; public class UntypedCoordinatedIncrementTest { - ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf()); + ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf()); + + private static ActorSystem system; + + @BeforeClass + public static void beforeAll() { + system = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void afterAll() { + system.stop(); + system = null; + } List counters; ActorRef failer; diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 8d2a3e4db8..528a2a14f8 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -1,6 +1,9 @@ package akka.transactor.test; import static org.junit.Assert.*; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.Before; @@ -28,7 +31,19 @@ import scala.collection.Seq; import akka.testkit.AkkaSpec; public class UntypedTransactorTest { - ActorSystem application = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf()); + + private static ActorSystem system; + + @BeforeClass + public static void beforeAll() { + system = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void afterAll() { + system.stop(); + system = null; + } List counters; ActorRef failer; @@ -42,14 +57,14 @@ public class UntypedTransactorTest { counters = new ArrayList(); for (int i = 1; i <= numCounters; i++) { final String name = "counter" + i; - ActorRef counter = application.actorOf(new Props().withCreator(new UntypedActorFactory() { + ActorRef counter = system.actorOf(new Props().withCreator(new UntypedActorFactory() { public UntypedActor create() { return new UntypedCounter(name); } })); counters.add(counter); } - failer = application.actorOf(new Props().withCreator(UntypedFailer.class)); + failer = system.actorOf(new Props().withCreator(UntypedFailer.class)); } @Test @@ -80,7 +95,7 @@ public class UntypedTransactorTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); + system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index edbc589099..82e38511d3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -16,6 +16,10 @@ import akka.actor.Scheduler import akka.event.EventStream import akka.util.Duration import java.util.concurrent.TimeUnit +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.ActorSystemImpl +import akka.actor.Extension /* * Locking rules: @@ -34,7 +38,12 @@ import java.util.concurrent.TimeUnit * within one of its methods taking a closure argument. */ -private[testkit] object CallingThreadDispatcher { +private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider { + override def lookup = CallingThreadDispatcherQueues + override def createExtension(system: ActorSystemImpl): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues +} + +private[testkit] class CallingThreadDispatcherQueues extends Extension { // PRIVATE DATA @@ -127,7 +136,7 @@ class CallingThreadDispatcher( protected[akka] override def throughputDeadlineTime = Duration.Zero protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false - protected[akka] override def shutdownTimeout = Duration(100L, TimeUnit.MILLISECONDS) + protected[akka] override def shutdownTimeout = Duration(1000L, TimeUnit.MILLISECONDS) override def suspend(actor: ActorCell) { getMailbox(actor) foreach (_.suspendSwitch.switchOn) @@ -139,7 +148,7 @@ class CallingThreadDispatcher( val queue = mbox.queue val wasActive = queue.isActive val switched = mbox.suspendSwitch.switchOff { - gatherFromAllOtherQueues(mbox, queue) + CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue) } if (switched && !wasActive) { runQueue(mbox, queue) @@ -267,7 +276,7 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with private val q = new ThreadLocal[NestingQueue]() { override def initialValue = { val queue = new NestingQueue - CallingThreadDispatcher.registerQueue(CallingThreadMailbox.this, queue) + CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue) queue } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index dba8437ef6..c0476a74cc 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -92,7 +92,7 @@ class TestKit(_system: ActorSystem) { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - val testActor: ActorRef = { + lazy val testActor: ActorRef = { val impl = system.asInstanceOf[ActorSystemImpl] impl.systemActorOf(Props(new TestActor(queue)) .copy(dispatcher = new CallingThreadDispatcher(system.dispatcherFactory.prerequisites)), diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index e43ea378cb..ce8dad2b4c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -25,8 +25,7 @@ object AkkaSpec { stdout-loglevel = "WARNING" actor { default-dispatcher { - core-pool-size = 4 - max-pool-size = 32 + core-pool-size-factor = 2 } } } @@ -53,7 +52,7 @@ abstract class AkkaSpec(_system: ActorSystem = ActorSystem(getClass.getSimpleNam final override def afterAll { system.stop() try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch { - case _: FutureTimeoutException ⇒ system.log.warning("failed to stop within 5 seconds") + case _: FutureTimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } atTermination() } diff --git a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala index de5851bfe7..a752b3c783 100644 --- a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala +++ b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala @@ -5,15 +5,20 @@ package akka.tutorial.first.scala import org.junit.runner.RunWith import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll import org.scalatest.WordSpec import akka.testkit.TestActorRef import akka.tutorial.first.scala.Pi.Worker import akka.actor.ActorSystem @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class WorkerSpec extends WordSpec with MustMatchers { +class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { - implicit def system = ActorSystem() + implicit val system = ActorSystem() + + override def afterAll { + system.stop() + } "Worker" must { "calculate pi correctly" in { @@ -23,4 +28,4 @@ class WorkerSpec extends WordSpec with MustMatchers { actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001) } } -} \ No newline at end of file +}