Fixed several memory and thread leaks. See #1404

* Dispatchers need Scheduler to be able to shutdown themselves. Stop Scheduler after dispatchers.
* Changed CallingThreadDispatcher global object to Extension, since it holds map of references to mailboxes. Will be GC:ed when system is GC:ed.
* Made testActor lazy, since it is not used in all tests, and it creates CallingThreadDispatcher.
* Activated some java tests that were not running
* Many tests were not stopping created ActorSystems. VERY IMPORTANT TO STOP ActorSystem in tests. Use AkkaSpec as much as possible.
* Used profiler to verify (and find) dangling ActorSystemImpl and threads from dispatchers.
* FutureSpec creates ForkJoinPool threads that are not cleared, but number of threads don't grow so it's not a problem.
This commit is contained in:
Patrik Nordwall 2011-11-30 15:16:20 +01:00
parent 035f514843
commit b488d70f54
31 changed files with 232 additions and 105 deletions

View file

@ -2,21 +2,36 @@ package akka.actor;
import akka.actor.ActorSystem;
import akka.japi.Creator;
import akka.testkit.AkkaSpec;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class JavaAPI {
private ActorSystem system = ActorSystem.create();
private static ActorSystem system;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaAPI", AkkaSpec.testConf());
}
@AfterClass
public static void afterAll() {
system.stop();
system = null;
}
@Test
void mustBeAbleToCreateActorRefFromClass() {
public void mustBeAbleToCreateActorRefFromClass() {
ActorRef ref = system.actorOf(JavaAPITestActor.class);
assertNotNull(ref);
}
@Test
void mustBeAbleToCreateActorRefFromFactory() {
public void mustBeAbleToCreateActorRefFromFactory() {
ActorRef ref = system.actorOf(new Props().withCreator(new Creator<Actor>() {
public Actor create() {
return new JavaAPITestActor();
@ -26,7 +41,7 @@ public class JavaAPI {
}
@Test
void mustAcceptSingleArgTell() {
public void mustAcceptSingleArgTell() {
ActorRef ref = system.actorOf(JavaAPITestActor.class);
ref.tell("hallo");
ref.tell("hallo", ref);

View file

@ -3,8 +3,12 @@
*/
package akka.actor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.Config;
@ -13,7 +17,9 @@ import static org.junit.Assert.*;
public class JavaExtension {
static class Provider implements ExtensionIdProvider {
public ExtensionId<TestExtension> lookup() { return defaultInstance; }
public ExtensionId<TestExtension> lookup() {
return defaultInstance;
}
}
public final static TestExtensionId defaultInstance = new TestExtensionId();
@ -32,9 +38,20 @@ public class JavaExtension {
}
}
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]");
private static ActorSystem system;
private ActorSystem system = ActorSystem.create("JavaExtension", c);
@BeforeClass
public static void beforeAll() {
Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]").withFallback(
AkkaSpec.testConf());
system = ActorSystem.create("JavaExtension", c);
}
@AfterClass
public static void afterAll() {
system.stop();
system = null;
}
@Test
public void mustBeAccessible() {

View file

@ -2,6 +2,9 @@ package akka.dispatch;
import akka.actor.Timeout;
import akka.actor.ActorSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.concurrent.Callable;
@ -14,15 +17,30 @@ import akka.japi.Function;
import akka.japi.Function2;
import akka.japi.Procedure;
import akka.japi.Option;
import akka.testkit.AkkaSpec;
public class JavaFutureTests {
private final ActorSystem system = ActorSystem.create();
private final Timeout t = system.settings().ActorTimeout();
private final FutureFactory ff = new FutureFactory(system.dispatcher(), t);
private static ActorSystem system;
private static FutureFactory ff;
private static Timeout t;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
t = system.settings().ActorTimeout();
ff = new FutureFactory(system.dispatcher(), t);
}
@AfterClass
public static void afterAll() {
system.stop();
system = null;
}
@Test
public void mustBeAbleToMapAFuture() {
Future<String> f1 = ff.future(new Callable<String>() {
public String call() {
return "Hello";

View file

@ -7,7 +7,8 @@ import org.junit.Test;
public class JavaDuration {
@Test void testCreation() {
@Test
public void testCreation() {
final Duration fivesec = Duration.create(5, "seconds");
final Duration threemillis = Duration.parse("3 millis");
final Duration diff = fivesec.minus(threemillis);

View file

@ -9,17 +9,15 @@ import com.typesafe.config.ConfigFactory
class JavaExtensionSpec extends JavaExtension with JUnitSuite
object ActorSystemSpec {
object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
def lookup = this
def createExtension(s: ActorSystemImpl) = new TestExtension(s)
}
class TestExtension(val system: ActorSystemImpl) extends Extension
object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
def lookup = this
def createExtension(s: ActorSystemImpl) = new TestExtension(s)
}
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") {
import ActorSystemSpec._
// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains
class TestExtension(val system: ActorSystemImpl) extends Extension
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {
"An ActorSystem" must {

View file

@ -107,9 +107,10 @@ object Chameneos {
def run {
// System.setProperty("akka.config", "akka.conf")
Chameneos.start = System.currentTimeMillis
ActorSystem().actorOf(new Mall(1000000, 4))
val system = ActorSystem().actorOf(new Mall(1000000, 4))
Thread.sleep(10000)
println("Elapsed: " + (end - start))
system.stop()
}
def main(args: Array[String]): Unit = run

View file

@ -160,9 +160,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
nr-of-instances = boom
}
}
""", ConfigParseOptions.defaults)
""", ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf)
ActorSystem("invalid", invalidDeployerConf)
ActorSystem("invalid", invalidDeployerConf).stop()
}
}

View file

@ -194,41 +194,46 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
"log events and transitions if asked to do so" in {
import scala.collection.JavaConverters._
val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG",
"akka.actor.debug.fsm" -> true).asJava)
new TestKit(ActorSystem("fsm event", config)) {
EventFilter.debug() intercept {
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
startWith(1, null)
when(1) {
case Ev("go")
setTimer("t", Shutdown, 1.5 seconds, false)
goto(2)
"akka.actor.debug.fsm" -> true).asJava).withFallback(AkkaSpec.testConf)
val fsmEventSystem = ActorSystem("fsm event", config)
try {
new TestKit(fsmEventSystem) {
EventFilter.debug() intercept {
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
startWith(1, null)
when(1) {
case Ev("go")
setTimer("t", Shutdown, 1.5 seconds, false)
goto(2)
}
when(2) {
case Ev("stop")
cancelTimer("t")
stop
}
onTermination {
case StopEvent(r, _, _) testActor ! r
}
})
val name = fsm.toString
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
}
when(2) {
case Ev("stop")
cancelTimer("t")
stop
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
}
onTermination {
case StopEvent(r, _, _) testActor ! r
}
})
val name = fsm.toString
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
}
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}
} finally {
fsmEventSystem.stop()
}
}

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import org.scalatest.junit.JUnitSuite
class JavaAPISpec extends JavaAPI with JUnitSuite

View file

@ -32,7 +32,7 @@ object LoggingReceiveSpec {
class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAndAfterAll {
import LoggingReceiveSpec._
val config = ConfigFactory.parseMap(Map("akka.logLevel" -> "DEBUG").asJava)
val config = ConfigFactory.parseMap(Map("akka.logLevel" -> "DEBUG").asJava).withFallback(AkkaSpec.testConf)
val appLogging = ActorSystem("logging", ConfigFactory.parseMap(Map("akka.actor.debug.receive" -> true).asJava).withFallback(config))
val appAuto = ActorSystem("autoreceive", ConfigFactory.parseMap(Map("akka.actor.debug.autoreceive" -> true).asJava).withFallback(config))
val appLifecycle = ActorSystem("lifecycle", ConfigFactory.parseMap(Map("akka.actor.debug.lifecycle" -> true).asJava).withFallback(config))

View file

@ -34,7 +34,7 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 10000,
Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig())
Duration.Zero, UnboundedMailbox(), config, Duration(1, TimeUnit.SECONDS)), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients * 2)
.build

View file

@ -14,7 +14,7 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig())
Duration.Zero, UnboundedMailbox(), config, 1 seconds), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build

View file

@ -14,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig())
Duration.Zero, UnboundedMailbox(), config, 1 seconds), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build

View file

@ -18,7 +18,7 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig())
Duration.Zero, UnboundedMailbox(), config, Duration(1, TimeUnit.SECONDS)), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(1)
.build

View file

@ -97,15 +97,19 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
"serialize DeadLetterActorRef" in {
val outbuf = new ByteArrayOutputStream()
val out = new ObjectOutputStream(outbuf)
val a = ActorSystem()
out.writeObject(a.deadLetters)
out.flush()
out.close()
val a = ActorSystem("SerializeDeadLeterActorRef", AkkaSpec.testConf)
try {
out.writeObject(a.deadLetters)
out.flush()
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
(deadLetters eq a.deadLetters) must be(true)
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
(deadLetters eq a.deadLetters) must be(true)
}
} finally {
a.stop()
}
}
}

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import org.scalatest.junit.JUnitSuite
class JavaDurationSpec extends JavaDuration with JUnitSuite

View file

@ -378,8 +378,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// TODO shutdown all that other stuff, whatever that may be
def stop() {
guardian.stop()
try terminationFuture.await(10 seconds) catch {
case _: FutureTimeoutException log.warning("Failed to stop [{}] within 10 seconds", name)
}
// Dispatchers shutdown themselves, but requires the scheduler
terminationFuture onComplete (_ stopScheduler())
terminationFuture onComplete (_ dispatcher.shutdown())
}
protected def createScheduler(): Scheduler = {
@ -400,8 +403,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
protected def stopScheduler(): Unit = scheduler match {
case x: DefaultScheduler x.stop()
case _
case x: DefaultScheduler
// Let dispatchers shutdown first.
// Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay.
x.scheduleOnce(() { x.stop; dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4)
case _
}
private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef]

View file

@ -136,7 +136,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
shutdownScheduleUpdater.get(this) match {
case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
scheduler.scheduleOnce(shutdownAction, shutdownTimeout)
()
} else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED
@ -211,7 +211,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
scheduler.scheduleOnce(this, shutdownTimeout)
else run()
}
}

View file

@ -81,7 +81,7 @@ trait LoggingBus extends ActorEventBus {
loggers = Seq(StandardOutLogger)
_logLevel = level
}
publish(Info(simpleName(this), "StandardOutLogger started"))
publish(Debug(simpleName(this), "StandardOutLogger started"))
}
private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
@ -114,7 +114,7 @@ trait LoggingBus extends ActorEventBus {
loggers = myloggers
_logLevel = level
}
publish(Info(simpleName(this), "Default Loggers started"))
publish(Debug(simpleName(this), "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) {
unsubscribe(StandardOutLogger)
}
@ -154,7 +154,7 @@ trait LoggingBus extends ActorEventBus {
if (response != LoggerInitialized)
throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response)
AllLogLevels filter (level >= _) foreach (l subscribe(actor, classFor(l)))
publish(Info(simpleName(this), "logger " + name + " started"))
publish(Debug(simpleName(this), "logger " + name + " started"))
actor
}

View file

@ -148,5 +148,7 @@ object QDumper {
println("Queue: " + filename)
new QueueDumper(filename, system.log)()
}
system.stop()
}
}

View file

@ -44,7 +44,8 @@ abstract class DurableMailboxSpec(val backendName: String, val mailboxType: Dura
sender ! PoisonPill
}
"handle reply to ! for multiple messages" in {
// FIXME ignored due to zookeeper issue, ticket #1423
"handle reply to ! for multiple messages" ignore {
val latch = new CountDownLatch(5)
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
val sender = actorOf(Props(new Sender(latch)))

View file

@ -19,7 +19,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
}
override def atTermination() {
zkServer.shutdown
zkServer.shutdown()
super.atTermination()
}
}

View file

@ -1,5 +1,8 @@
package akka.stm.example;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import akka.actor.ActorSystem;
import akka.stm.*;
import akka.actor.*;
@ -7,11 +10,8 @@ import akka.testkit.AkkaSpec;
public class RetryExample {
public static void main(String[] args) {
System.out.println();
System.out.println("Retry example");
System.out.println();
ActorSystem application = ActorSystem.create("RetryExample", AkkaSpec.testConf());
ActorSystem application = ActorSystem.create("RetryExample", AkkaSpec.testConf());
final Ref<Double> account1 = new Ref<Double>(100.0);
final Ref<Double> account2 = new Ref<Double>(100.0);
@ -47,5 +47,7 @@ public class RetryExample {
// Account 2: 600.0
transferer.stop();
application.stop();
}
}

View file

@ -9,11 +9,8 @@ import akka.transactor.Coordinated;
public class UntypedCoordinatedExample {
public static void main(String[] args) throws InterruptedException {
System.out.println();
System.out.println("Untyped transactor example");
System.out.println();
ActorSystem application = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf());
ActorSystem application = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf());
ActorRef counter1 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class));
ActorRef counter2 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class));
@ -45,5 +42,7 @@ public class UntypedCoordinatedExample {
counter1.stop();
counter2.stop();
application.stop();
}
}

View file

@ -8,11 +8,8 @@ import akka.testkit.AkkaSpec;
public class UntypedTransactorExample {
public static void main(String[] args) throws InterruptedException {
System.out.println();
System.out.println("Untyped transactor example");
System.out.println();
ActorSystem application = ActorSystem.create("UntypedTransactorExample", AkkaSpec.testConf());
ActorSystem application = ActorSystem.create("UntypedTransactorExample", AkkaSpec.testConf());
ActorRef counter1 = application.actorOf(new Props().withCreator(UntypedCounter.class));
ActorRef counter2 = application.actorOf(new Props().withCreator(UntypedCounter.class));
@ -44,5 +41,7 @@ public class UntypedTransactorExample {
counter1.stop();
counter2.stop();
application.stop();
}
}

View file

@ -3,6 +3,8 @@ package akka.transactor.test;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Before;
@ -31,7 +33,20 @@ import scala.collection.JavaConverters;
import scala.collection.Seq;
public class UntypedCoordinatedIncrementTest {
ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf());
ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf());
private static ActorSystem system;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf());
}
@AfterClass
public static void afterAll() {
system.stop();
system = null;
}
List<ActorRef> counters;
ActorRef failer;

View file

@ -1,6 +1,9 @@
package akka.transactor.test;
import static org.junit.Assert.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Before;
@ -28,7 +31,19 @@ import scala.collection.Seq;
import akka.testkit.AkkaSpec;
public class UntypedTransactorTest {
ActorSystem application = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf());
private static ActorSystem system;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("UntypedTransactorTest", AkkaSpec.testConf());
}
@AfterClass
public static void afterAll() {
system.stop();
system = null;
}
List<ActorRef> counters;
ActorRef failer;
@ -42,14 +57,14 @@ public class UntypedTransactorTest {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;
ActorRef counter = application.actorOf(new Props().withCreator(new UntypedActorFactory() {
ActorRef counter = system.actorOf(new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedCounter(name);
}
}));
counters.add(counter);
}
failer = application.actorOf(new Props().withCreator(UntypedFailer.class));
failer = system.actorOf(new Props().withCreator(UntypedFailer.class));
}
@Test
@ -80,7 +95,7 @@ public class UntypedTransactorTest {
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
application.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer);

View file

@ -16,6 +16,10 @@ import akka.actor.Scheduler
import akka.event.EventStream
import akka.util.Duration
import java.util.concurrent.TimeUnit
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl
import akka.actor.Extension
/*
* Locking rules:
@ -34,7 +38,12 @@ import java.util.concurrent.TimeUnit
* within one of its methods taking a closure argument.
*/
private[testkit] object CallingThreadDispatcher {
private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider {
override def lookup = CallingThreadDispatcherQueues
override def createExtension(system: ActorSystemImpl): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues
}
private[testkit] class CallingThreadDispatcherQueues extends Extension {
// PRIVATE DATA
@ -127,7 +136,7 @@ class CallingThreadDispatcher(
protected[akka] override def throughputDeadlineTime = Duration.Zero
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false
protected[akka] override def shutdownTimeout = Duration(100L, TimeUnit.MILLISECONDS)
protected[akka] override def shutdownTimeout = Duration(1000L, TimeUnit.MILLISECONDS)
override def suspend(actor: ActorCell) {
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
@ -139,7 +148,7 @@ class CallingThreadDispatcher(
val queue = mbox.queue
val wasActive = queue.isActive
val switched = mbox.suspendSwitch.switchOff {
gatherFromAllOtherQueues(mbox, queue)
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue)
}
if (switched && !wasActive) {
runQueue(mbox, queue)
@ -267,7 +276,7 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = {
val queue = new NestingQueue
CallingThreadDispatcher.registerQueue(CallingThreadMailbox.this, queue)
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
queue
}
}

View file

@ -92,7 +92,7 @@ class TestKit(_system: ActorSystem) {
* ActorRef of the test actor. Access is provided to enable e.g.
* registration as message target.
*/
val testActor: ActorRef = {
lazy val testActor: ActorRef = {
val impl = system.asInstanceOf[ActorSystemImpl]
impl.systemActorOf(Props(new TestActor(queue))
.copy(dispatcher = new CallingThreadDispatcher(system.dispatcherFactory.prerequisites)),

View file

@ -25,8 +25,7 @@ object AkkaSpec {
stdout-loglevel = "WARNING"
actor {
default-dispatcher {
core-pool-size = 4
max-pool-size = 32
core-pool-size-factor = 2
}
}
}
@ -53,7 +52,7 @@ abstract class AkkaSpec(_system: ActorSystem = ActorSystem(getClass.getSimpleNam
final override def afterAll {
system.stop()
try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch {
case _: FutureTimeoutException system.log.warning("failed to stop within 5 seconds")
case _: FutureTimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()
}

View file

@ -5,15 +5,20 @@ package akka.tutorial.first.scala
import org.junit.runner.RunWith
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import akka.testkit.TestActorRef
import akka.tutorial.first.scala.Pi.Worker
import akka.actor.ActorSystem
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class WorkerSpec extends WordSpec with MustMatchers {
class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
implicit def system = ActorSystem()
implicit val system = ActorSystem()
override def afterAll {
system.stop()
}
"Worker" must {
"calculate pi correctly" in {
@ -23,4 +28,4 @@ class WorkerSpec extends WordSpec with MustMatchers {
actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001)
}
}
}
}