diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 4640951322..4f70bc52f5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 milliseconds - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown())) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -44,7 +44,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { "should stop continuous scheduling if the receiving actor has been terminated" in { // run immediately and then every 100 milliseconds - collectCancellable(system.scheduler.schedule(testActor, "msg", 0 milliseconds, 100 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, testActor, "msg")) // stop the actor and, hence, the continuous messaging from happening testActor ! PoisonPill @@ -59,14 +59,18 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) - // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) + // run after 300 millisec + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds, tickActor, Tick)) + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds)(countDownLatch.countDown())) + + // should not be run immediately + assert(countDownLatch.await(100, TimeUnit.MILLISECONDS) == false) + countDownLatch.getCount must be(3) // after 1 second the wait should fail - assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) + assert(countDownLatch.await(1, TimeUnit.SECONDS) == false) // should still be 1 left - assert(countDownLatch.getCount == 1) + countDownLatch.getCount must be(1) } /** @@ -81,7 +85,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping)) timeout.cancel() } @@ -109,10 +113,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) + collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -136,7 +140,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime), 10 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(10 milliseconds, actor, Msg(System.nanoTime))) Thread.sleep(5) } @@ -155,7 +159,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg) ticks.await(3, TimeUnit.SECONDS) val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index f07ed9dfa1..c601a42700 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -470,42 +470,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { def dispatcherType = "Balancing Dispatcher" - "A " + dispatcherType must { - "process messages in parallel" in { - implicit val dispatcher = newInterceptedDispatcher - val aStart, aStop, bParallel = new CountDownLatch(1) - val a, b = newTestActor(dispatcher) - - a ! Meet(aStart, aStop) - assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") - - b ! CountDown(bParallel) - assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel") - - aStop.countDown() - - a.stop - b.stop - - while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination - - assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FJDispatcherModelSpec extends ActorModelSpec { - import ActorModelSpec._ - - def newInterceptedDispatcher = - (new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput, - system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, - new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor] - - def dispatcherType = "FJDispatcher" - "A " + dispatcherType must { "process messages in parallel" in { implicit val dispatcher = newInterceptedDispatcher diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 07d3cf4d1a..f36df2c352 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -207,8 +207,10 @@ trait Actor { throw new ActorInitializationException( "\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + - "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or" + + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem)") if (contextStack.isEmpty) noContextError val c = contextStack.head diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c103de4eb6..7105621bbc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -410,7 +410,7 @@ private[akka] class ActorCell( if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS))) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 92a49ee7c7..51b2bd48af 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -566,22 +566,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable { import org.jboss.netty.akka.util.{ Timeout ⇒ HWTimeout } - private def schedule(task: TimerTask, delay: Duration): HWTimeout = hashedWheelTimer.newTimeout(task, delay) + def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, receiver, message), initialDelay)) - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(schedule(createContinuousTask(receiver, message, delay), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(schedule(createContinuousTask(f, delay), initialDelay)) + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = - new DefaultCancellable(schedule(createSingleTask(runnable), delay)) + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = - new DefaultCancellable(schedule(createSingleTask(receiver, message), delay)) - - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = - new DefaultCancellable(schedule(createSingleTask(f), delay)) + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { @@ -597,14 +595,14 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = + private def createSingleTask(f: ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { // Check if the receiver is still alive and kicking before sending it a message and reschedule the task @@ -620,10 +618,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index f1195d19c8..f9d79e32e5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -58,18 +58,11 @@ object ActorSystem { class Settings(cfg: Config, val name: String) { - // Verify that the Config is sane and has our reference config. - val config: Config = - try { - cfg.checkValid(ConfigFactory.defaultReference, "akka") - cfg - } catch { - case e: ConfigException ⇒ - // try again with added defaultReference - val cfg2 = cfg.withFallback(ConfigFactory.defaultReference) - cfg2.checkValid(ConfigFactory.defaultReference, "akka") - cfg2 - } + val config: Config = { + val config = cfg.withFallback(ConfigFactory.defaultReference) + config.checkValid(ConfigFactory.defaultReference, "akka") + config + } import scala.collection.JavaConverters._ import config._ diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 76495843fc..0dc961ab0b 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -34,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) + ref = Some(system.scheduler.schedule(timeout, timeout, actor, this)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) + ref = Some(system.scheduler.scheduleOnce(timeout, actor, this)) } } @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) + timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 52a63f1730..5b32a86a60 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -20,29 +20,29 @@ trait Scheduler { * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) */ - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and a frequency. * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) */ - def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable /** * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. */ - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable /** * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. */ - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. */ - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable } trait Cancellable { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 0dba2805ad..56ef4dd336 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -137,7 +137,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout) + scheduler.scheduleOnce(shutdownTimeout, shutdownAction) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -212,7 +212,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - try scheduler.scheduleOnce(this, shutdownTimeout) catch { + try scheduler.scheduleOnce(shutdownTimeout, this) catch { case _: IllegalStateException ⇒ shutdown() } else run() diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cd8c22de6e..a09f28f6a9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -14,7 +14,7 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS } +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } @@ -853,7 +853,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOS.toMillis(waitTimeNanos) + val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = currentTimeInNanos try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } @@ -877,7 +877,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi else Long.MaxValue //If both are infinite, use Long.MaxValue if (awaitUnsafe(waitNanos)) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") + else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") } def await = await(timeout.duration) @@ -957,7 +957,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def run() { if (!isCompleted) { if (!isExpired) - try dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + try dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this) catch { case _: IllegalStateException ⇒ func(DefaultPromise.this) } @@ -965,7 +965,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -990,7 +990,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val done = if (!isExpired) try { - dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this) true } catch { case _: IllegalStateException ⇒ false @@ -1001,7 +1001,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) promise } } else this @@ -1012,7 +1012,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } @inline - private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? + private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. @inline private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index c45cc74593..77663c780e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -187,52 +187,6 @@ class MonitorableThread(runnable: Runnable, name: String) } } -case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { - final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { - def createExecutorService: ExecutorService = { - new ForkJoinPool(targetParallelism) with ExecutorService { - setAsyncMode(true) - setMaintainsParallelism(true) - - override final def execute(r: Runnable) { - r match { - case fjmbox: FJMailbox ⇒ - //fjmbox.fjTask.reinitialize() - Thread.currentThread match { - case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ - fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected - case _ ⇒ super.execute[Unit](fjmbox.fjTask) - } - case _ ⇒ - super.execute(r) - } - } - - import java.util.{ Collection ⇒ JCollection } - - def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = - throw new UnsupportedOperationException("invokeAny. NOT!") - - def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - throw new UnsupportedOperationException("invokeAny. NOT!") - - def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - throw new UnsupportedOperationException("invokeAny. NOT!") - } - } - } -} - -trait FJMailbox { self: Mailbox ⇒ - final val fjTask = new ForkJoinTask[Unit] with Runnable { - private[this] var result: Unit = () - final def getRawResult() = result - final def setRawResult(v: Unit) { result = v } - final def exec() = { self.run(); true } - final def run() { invoke() } - } -} - /** * As the name says */ @@ -273,3 +227,54 @@ class SaneRejectedExecutionHandler extends RejectedExecutionHandler { else runnable.run() } } + +/** + * Commented out pending discussion with Doug Lea + * + * case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { + * final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + * def createExecutorService: ExecutorService = { + * new ForkJoinPool(targetParallelism) with ExecutorService { + * setAsyncMode(true) + * setMaintainsParallelism(true) + * + * override final def execute(r: Runnable) { + * r match { + * case fjmbox: FJMailbox ⇒ + * //fjmbox.fjTask.reinitialize() + * Thread.currentThread match { + * case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ + * fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected + * case _ ⇒ super.execute[Unit](fjmbox.fjTask) + * } + * case _ ⇒ + * super.execute(r) + * } + * } + * + * import java.util.{ Collection ⇒ JCollection } + * + * def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * + * def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * + * def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * } + * } + * } + * } + * + * trait FJMailbox { self: Mailbox ⇒ + * final val fjTask = new ForkJoinTask[Unit] with Runnable { + * private[this] var result: Unit = () + * final def getRawResult() = result + * final def setRawResult(v: Unit) { result = v } + * final def exec() = { self.run(); true } + * final def run() { invoke() } + * } + * } + * + */ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 274ac6e9dc..356a4461bd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -297,7 +297,7 @@ class DefaultClusterNode private[akka] ( :: Nil)).start() lazy val remoteService: RemoteSupport = { - val remote = new akka.cluster.netty.NettyRemoteSupport + val remote = new akka.remote.netty.NettyRemoteSupport remote.start(hostname, port) remote.register(RemoteClusterDaemon.Address, remoteDaemon) remote.addListener(RemoteFailureDetector.sender) diff --git a/akka-docs/general/code/ConfigDocSpec.scala b/akka-docs/general/code/ConfigDocSpec.scala index 4b3de65e65..3180fa3b8a 100644 --- a/akka-docs/general/code/ConfigDocSpec.scala +++ b/akka-docs/general/code/ConfigDocSpec.scala @@ -9,7 +9,7 @@ import com.typesafe.config.ConfigFactory //#imports -class ConfigDocSpec extends WordSpec { +class ConfigDocSpec extends WordSpec with MustMatchers { "programmatically configure ActorSystem" in { //#custom-config @@ -21,7 +21,9 @@ class ConfigDocSpec extends WordSpec { } } """) - val system = ActorSystem("MySystem", ConfigFactory.systemProperties.withFallback(customConf)) + // ConfigFactory.load sandwiches customConfig between default reference + // config and default overrides, and then resolves it. + val system = ActorSystem("MySystem", ConfigFactory.load(customConf)) //#custom-config system.stop() diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 58dfbd5edd..0e96f8165e 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -5,8 +5,6 @@ Configuration .. contents:: :local: -.. _-Dakka.config: -.. _-Dakka.home: Specifying the configuration file --------------------------------- @@ -16,23 +14,36 @@ configuration files that you see below. You can specify your own configuration f property in the reference config. You only have to define the properties that differ from the default configuration. -FIXME: These default locations has changed +By default the ``ConfigFactory.load`` method is used, which will load all ``application.conf`` (and +``application.json`` and ``application.properties``) from the root of the classpath, if they exists. +It uses ``ConfigFactory.defaultOverrides``, i.e. system properties, before falling back to +application and reference configuration. -The location of the config file to use can be specified in various ways: +Note that *all* ``application.{conf,json,properties}`` classpath resources, from all directories and +jar files, are loaded and merged. Therefore it is a good practice to define separate sub-trees in the +configuration for each actor system, and grab the specific configuration when instantiating the ActorSystem. -* Define the ``-Dakka.config=...`` system property parameter with a file path to configuration file. +:: + + myapp1 { + akka.logLevel = WARNING + } + myapp2 { + akka.logLevel = ERROR + } -* Put an ``akka.conf`` file in the root of the classpath. +.. code-block:: scala -* Define the ``AKKA_HOME`` environment variable pointing to the root of the Akka - distribution. The config is taken from the ``AKKA_HOME/config/akka.conf``. You - can also point to the AKKA_HOME by specifying the ``-Dakka.home=...`` system - property parameter. + val app1 = ActorSystem("MyApp1", ConfigFactory.load.getConfig("myapp1")) + val app2 = ActorSystem("MyApp2", ConfigFactory.load.getConfig("myapp2")) -If several of these ways to specify the config file are used at the same time the precedence is the order as given above, -i.e. you can always redefine the location with the ``-Dakka.config=...`` system property. +If the system properties ``config.resource``, ``config.file``, or ``config.url`` are set, then the +classpath resource, file, or URL specified in those properties will be used rather than the default +``application.{conf,json,properties}`` classpath resources. Note that classpath resource names start +with ``/``. ``-Dconfig.resource=/dev.conf`` will load the ``dev.conf`` from the root of the classpath. -You may also specify the configuration programmatically when instantiating the ``ActorSystem``. +You may also specify and parse the configuration programmatically in other ways when instantiating +the ``ActorSystem``. .. includecode:: code/ConfigDocSpec.scala :include: imports,custom-config @@ -84,7 +95,7 @@ Each Akka module has a reference configuration file with the default values. .. literalinclude:: ../../akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/reference.conf :language: none -A custom ``akka.conf`` might look like this:: +A custom ``application.conf`` might look like this:: # In this file you can override any option defined in the reference files. # Copy in parts of the reference files and modify as you please. @@ -125,48 +136,29 @@ Config file format The configuration file syntax is described in the `HOCON `_ specification. Note that it supports three formats; conf, json, and properties. -.. _-Dakka.mode: - -Specifying files for different modes ------------------------------------- - -FIXME: mode doesn't exist, or will it? - -You can use different configuration files for different purposes by specifying a mode option, either as -``-Dakka.mode=...`` system property or as ``AKKA_MODE=...`` environment variable. For example using DEBUG log level -when in development mode. Run with ``-Dakka.mode=dev`` and place the following ``akka.dev.conf`` in the root of -the classpath. - -akka.dev.conf: - -:: - - akka { - loglevel = "DEBUG" - } - -The mode option works in the same way when using configuration files in ``AKKA_HOME/config/`` directory. - -The mode option is not used when specifying the configuration file with ``-Dakka.config=...`` system property. Including files --------------- -FIXME: The include syntax has changed +Sometimes it can be useful to include another configuration file, for example if you have one ``application.conf`` with all +environment independent settings and then override some settings for specific environments. -Sometimes it can be useful to include another configuration file, for example if you have one ``akka.conf`` with all -environment independent settings and then override some settings for specific modes. +Specifying system property with ``-Dconfig.resource=/dev.conf`` will load the ``dev.conf`` file, which includes the ``application.conf`` -akka.dev.conf: +dev.conf: :: - include "akka.conf" + include "application" akka { loglevel = "DEBUG" } +More advanced include and substitution mechanisms are explained in the `HOCON `_ +specification. + + .. _-Dakka.logConfigOnStart: Logging of Configuration @@ -175,10 +167,3 @@ Logging of Configuration If the system or config property ``akka.logConfigOnStart`` is set to ``on``, then the complete configuration at INFO level when the actor system is started. This is useful when you are uncertain of what configuration is used. - -Summary of System Properties ----------------------------- - -* :ref:`akka.home <-Dakka.home>` (``AKKA_HOME``): where Akka searches for configuration -* :ref:`akka.config <-Dakka.config>`: explicit configuration file location -* :ref:`akka.mode <-Dakka.mode>` (``AKKA_MODE``): modify configuration file name for multiple profiles diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a80c4fa6a7..41eaf2e117 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -8,7 +8,7 @@ akka { remote { - transport = "akka.cluster.netty.NettyRemoteSupport" + transport = "akka.remote.netty.NettyRemoteSupport" use-compression = off diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index e03e5e2685..20a047952f 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -130,8 +130,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) - system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip()) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize()) } /** diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 665dc91f83..4bf96bd823 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -71,13 +71,24 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { lazy val eventStream = new NetworkEventStream(system) lazy val server: RemoteSupport = { - val remote = new akka.remote.netty.NettyRemoteSupport(system, this) - remote.start() //TODO Any application loader here? + val arguments = Seq( + classOf[ActorSystem] -> system, + classOf[Remote] -> this) + val types: Array[Class[_]] = arguments map (_._1) toArray + val values: Array[AnyRef] = arguments map (_._2) toArray - system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) - system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + ReflectiveAccess.createInstance[RemoteSupport](remoteExtension.RemoteTransport, types, values) match { + case Left(problem) ⇒ + log.error(problem, "Could not load remote transport layer") + throw problem + case Right(remote) ⇒ + remote.start(None) //TODO Any application loader here? - remote + system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) + system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + + remote + } } def start() { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala new file mode 100644 index 0000000000..878b7840b0 --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala @@ -0,0 +1,134 @@ +// package akka.remote + +// import akka.actor.Actor +// import akka.remote._ +// import akka.routing._ +// import akka.routing.Routing.Broadcast + +// object GossipMembershipMultiJvmSpec { +// val NrOfNodes = 4 +// class SomeActor extends Actor with Serializable { +// def receive = { +// case "hit" ⇒ sender ! system.nodename +// case "end" ⇒ self.stop() +// } +// } + +// import com.typesafe.config.ConfigFactory +// val commonConfig = ConfigFactory.parseString(""" +// akka { +// loglevel = "WARNING" +// cluster { +// seed-nodes = ["localhost:9991"] +// } +// remote.server.hostname = "localhost" +// }""") + +// val node1Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9991" +// cluster.nodename = "node1" +// }""") withFallback commonConfig + +// val node2Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9992" +// cluster.nodename = "node2" +// }""") withFallback commonConfig + +// val node3Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9993" +// cluster.nodename = "node3" +// }""") withFallback commonConfig + +// val node4Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9994" +// cluster.nodename = "node4" +// }""") withFallback commonConfig +// } + +// class GossipMembershipMultiJvmNode1 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node1Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "A cluster" must { +// "allow new node to join and should reach convergence with new membership table" in { + +// barrier("setup") +// remote.start() + +// barrier("start") +// val actor = system.actorOf[SomeActor]("service-hello") +// actor.isInstanceOf[RoutedActorRef] must be(true) + +// val connectionCount = NrOfNodes - 1 +// val iterationCount = 10 + +// var replies = Map( +// "node1" -> 0, +// "node2" -> 0, +// "node3" -> 0) + +// for (i ← 0 until iterationCount) { +// for (k ← 0 until connectionCount) { +// val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor")) +// replies = replies + (nodeName -> (replies(nodeName) + 1)) +// } +// } + +// barrier("broadcast-end") +// actor ! Broadcast("end") + +// barrier("end") +// replies.values foreach { _ must be > (0) } + +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode2 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node2Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode3 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node3Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode4 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node4Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index d11d07ddd3..87a213e0eb 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -12,7 +12,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { import config._ //akka.remote - getString("akka.remote.transport") must equal("akka.cluster.netty.NettyRemoteSupport") + getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index a13e034cae..b0dbcbc9ad 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -80,7 +80,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s and starts to eat".format(name, left.path.name, right.path.name)) become(eating) - system.scheduler.scheduleOnce(self, Think, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Think) case Busy(chopstick) ⇒ become(thinking) @@ -109,7 +109,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think".format(name)) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } //All hakkers start in a non-eating state @@ -117,7 +117,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think".format(name)) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } } diff --git a/akka-spring/src/test/resources/akka-test.conf b/akka-spring/src/test/resources/akka-test.conf index e6c019e24c..806783d217 100644 --- a/akka-spring/src/test/resources/akka-test.conf +++ b/akka-spring/src/test/resources/akka-test.conf @@ -128,7 +128,7 @@ akka { # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' secure-cookie = "" - layer = "akka.cluster.netty.NettyRemoteSupport" + layer = "akka.remote.netty.NettyRemoteSupport" server { hostname = "localhost" # The hostname or IP that clients should connect to diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index 9b9f428d3d..f2cc5a288d 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -15,7 +15,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ ClassPathResource, Resource } import org.scalatest.{ BeforeAndAfterAll, FeatureSpec } import java.util.concurrent.CountDownLatch -import akka.cluster.netty.NettyRemoteSupport +import akka.remote.netty.NettyRemoteSupport import akka.actor._ import akka.actor.Actor._ diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index 6c7a0156e7..66ca68dba7 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -9,7 +9,7 @@ import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import org.springframework.context.support.ClassPathXmlApplicationContext -import akka.cluster.netty.NettyRemoteSupport +import akka.remote.netty.NettyRemoteSupport import org.scalatest.{ BeforeAndAfterAll, FeatureSpec } import java.util.concurrent.CountDownLatch diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index bf81a29e3f..66e86a476a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -21,8 +21,7 @@ import akka.actor.DeadLetter object TimingTest extends Tag("timing") object AkkaSpec { - val testConf = { - val cfg = ConfigFactory.parseString(""" + val testConf: Config = ConfigFactory.parseString(""" akka { event-handlers = ["akka.testkit.TestEventListener"] loglevel = "WARNING" @@ -34,8 +33,6 @@ object AkkaSpec { } } """) - ConfigFactory.load(cfg) - } def mapToConfig(map: Map[String, Any]): Config = { import scala.collection.JavaConverters._ @@ -123,28 +120,41 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { } "must enqueue unread messages from testActor to deadLetters" in { - val system = ActorSystem("AkkaSpec2", AkkaSpec.testConf) + val system, otherSystem = ActorSystem("AkkaSpec3", AkkaSpec.testConf) - var locker = Seq.empty[DeadLetter] - implicit val timeout = system.settings.ActorTimeout - implicit val davyJones = (system.actorFor("/") ? CreateChild(Props(new Actor { - def receive = { - case m: DeadLetter ⇒ locker :+= m + try { + var locker = Seq.empty[DeadLetter] + implicit val timeout = system.settings.ActorTimeout + implicit val davyJones = otherSystem.actorOf(Props(new Actor { + def receive = { + case m: DeadLetter ⇒ locker :+= m + } + }), "davyJones") + + system.eventStream.subscribe(davyJones, classOf[DeadLetter]) + + val probe = new TestProbe(system) + probe.ref ! 42 + /* + * this will ensure that the message is actually received, otherwise it + * may happen that the system.stop() suspends the testActor before it had + * a chance to put the message into its private queue + */ + probe.receiveWhile(1 second) { + case null ⇒ } - }), "davyJones")).as[ActorRef].get - system.eventStream.subscribe(davyJones, classOf[DeadLetter]) + val latch = new TestLatch(1)(system) + system.registerOnTermination(latch.countDown()) + system.stop() + latch.await(2 seconds) - val probe = new TestProbe(system) - probe.ref ! 42 - - val latch = new TestLatch(1)(system) - system.registerOnTermination(latch.countDown()) - system.stop() - latch.await(2 seconds) - - // this will typically also contain log messages which were sent after the logger shutdown - locker must contain(DeadLetter(42, davyJones, probe.ref)) + // this will typically also contain log messages which were sent after the logger shutdown + locker must contain(DeadLetter(42, davyJones, probe.ref)) + } finally { + system.stop() + otherSystem.stop() + } } } diff --git a/src/main/ls/2.0.json b/src/main/ls/2.0.json new file mode 100644 index 0000000000..067332ed73 --- /dev/null +++ b/src/main/ls/2.0.json @@ -0,0 +1,15 @@ + +{ + "organization":"com.typesafe.akka", + "name":"akka", + "version":"2.0-SNAPSHOT", + "description":"Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM.", + "site":"", + "tags":["actors","stm","concurrency","distributed","fault-tolerance","scala","java","futures","dataflow","remoting"], + "docs":"", + "licenses": [], + "resolvers": ["http://akka.io/repository/"], + "dependencies": [], + "scalas": ["2.9.1"], + "sbt": false +} \ No newline at end of file