From af1ee4fb5a0c21a39866bb0fd9fc15a62bb7d26e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 2 Dec 2011 16:08:32 +0100 Subject: [PATCH 1/9] Utilized the optimized withFallback to simplify config checkValid stuff --- .../src/main/scala/akka/actor/ActorSystem.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 01f2d2bafb..9d064834bd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -59,18 +59,11 @@ object ActorSystem { class Settings(cfg: Config) { - // Verify that the Config is sane and has our reference config. - val config: Config = - try { - cfg.checkValid(ConfigFactory.defaultReference, "akka") - cfg - } catch { - case e: ConfigException ⇒ - // try again with added defaultReference - val cfg2 = cfg.withFallback(ConfigFactory.defaultReference) - cfg2.checkValid(ConfigFactory.defaultReference, "akka") - cfg2 - } + val config: Config = { + val config = cfg.withFallback(ConfigFactory.defaultReference) + config.checkValid(ConfigFactory.defaultReference, "akka") + config + } import scala.collection.JavaConverters._ import config._ From 1f665ab4c694706c2c903df01045b50f8769e39f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 2 Dec 2011 17:13:46 +0100 Subject: [PATCH 2/9] Changed signatures of Scheduler for better api of by-name blocks --- .../test/scala/akka/actor/SchedulerSpec.scala | 30 +++++++++++-------- .../src/main/scala/akka/actor/ActorCell.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 24 +++++++-------- .../main/scala/akka/actor/ActorSystem.scala | 5 +++- .../src/main/scala/akka/actor/FSM.scala | 6 ++-- .../src/main/scala/akka/actor/Scheduler.scala | 10 +++---- .../akka/dispatch/AbstractDispatcher.scala | 4 +-- .../src/main/scala/akka/dispatch/Future.scala | 16 +++++----- .../src/main/scala/akka/remote/Gossiper.scala | 4 +-- .../main/scala/DiningHakkersOnBecome.scala | 6 ++-- 10 files changed, 57 insertions(+), 50 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 4640951322..4f70bc52f5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 milliseconds - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown())) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -44,7 +44,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { "should stop continuous scheduling if the receiving actor has been terminated" in { // run immediately and then every 100 milliseconds - collectCancellable(system.scheduler.schedule(testActor, "msg", 0 milliseconds, 100 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, testActor, "msg")) // stop the actor and, hence, the continuous messaging from happening testActor ! PoisonPill @@ -59,14 +59,18 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) - // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) + // run after 300 millisec + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds, tickActor, Tick)) + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds)(countDownLatch.countDown())) + + // should not be run immediately + assert(countDownLatch.await(100, TimeUnit.MILLISECONDS) == false) + countDownLatch.getCount must be(3) // after 1 second the wait should fail - assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) + assert(countDownLatch.await(1, TimeUnit.SECONDS) == false) // should still be 1 left - assert(countDownLatch.getCount == 1) + countDownLatch.getCount must be(1) } /** @@ -81,7 +85,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping)) timeout.cancel() } @@ -109,10 +113,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) + collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -136,7 +140,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime), 10 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(10 milliseconds, actor, Msg(System.nanoTime))) Thread.sleep(5) } @@ -155,7 +159,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg) ticks.await(3, TimeUnit.SECONDS) val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6d41aafc5b..c7a37de589 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -404,7 +404,7 @@ private[akka] class ActorCell( if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS))) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index c397d67be6..f6e0c19adc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -417,19 +417,19 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { */ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable { - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, receiver, message), initialDelay)) - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = @@ -446,14 +446,14 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = + private def createSingleTask(f: ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { // Check if the receiver is still alive and kicking before sending it a message and reschedule the task @@ -467,10 +467,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) timeout.getTimer.newTimeout(this, delay) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 9d064834bd..fef236d5fd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -398,7 +398,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor case x: Closeable ⇒ // Let dispatchers shutdown first. // Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay. - x.scheduleOnce(() ⇒ { x.close(); dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4) + x.scheduleOnce(settings.DispatcherDefaultShutdown * 4) { + x.close() + dispatcher.shutdown() + } case _ ⇒ } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 76495843fc..0dc961ab0b 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -34,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) + ref = Some(system.scheduler.schedule(timeout, timeout, actor, this)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) + ref = Some(system.scheduler.scheduleOnce(timeout, actor, this)) } } @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) + timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 52a63f1730..5b32a86a60 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -20,29 +20,29 @@ trait Scheduler { * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) */ - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and a frequency. * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) */ - def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable /** * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. */ - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable /** * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. */ - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. */ - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable } trait Cancellable { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 99557e33c8..203520853a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -136,7 +136,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout) + scheduler.scheduleOnce(shutdownTimeout, shutdownAction) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -211,7 +211,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout) + scheduler.scheduleOnce(shutdownTimeout, this) else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b9def99301..1cd0f575cd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -14,7 +14,7 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS } +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } @@ -853,7 +853,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOS.toMillis(waitTimeNanos) + val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = currentTimeInNanos try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } @@ -877,7 +877,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi else Long.MaxValue //If both are infinite, use Long.MaxValue if (awaitUnsafe(waitNanos)) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") + else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") } def await = await(timeout.duration) @@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) promise } } else this @@ -999,7 +999,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } @inline - private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? + private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. @inline private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 119a4a43a6..40f4cee2dd 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -130,8 +130,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) - system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip()) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize()) } /** diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 78449edc1b..b574cd8888 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -80,7 +80,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name)) become(eating) - system.scheduler.scheduleOnce(self, Think, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Think) case Busy(chopstick) ⇒ become(thinking) @@ -109,7 +109,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think".format(name)) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } //All hakkers start in a non-eating state @@ -117,7 +117,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think".format(name)) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } } From 93d093e87ab2ebda56b88d4890b1f230c954e4f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 2 Dec 2011 17:29:47 +0100 Subject: [PATCH 3/9] Commenting out the ForkJoin stuff until I've cleared some bits with Doug Lea --- .../akka/actor/dispatch/ActorModelSpec.scala | 36 ------- .../akka/dispatch/ThreadPoolBuilder.scala | 97 ++++++++++--------- 2 files changed, 51 insertions(+), 82 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index f07ed9dfa1..c601a42700 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -470,42 +470,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { def dispatcherType = "Balancing Dispatcher" - "A " + dispatcherType must { - "process messages in parallel" in { - implicit val dispatcher = newInterceptedDispatcher - val aStart, aStop, bParallel = new CountDownLatch(1) - val a, b = newTestActor(dispatcher) - - a ! Meet(aStart, aStop) - assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds") - - b ! CountDown(bParallel) - assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel") - - aStop.countDown() - - a.stop - b.stop - - while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination - - assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FJDispatcherModelSpec extends ActorModelSpec { - import ActorModelSpec._ - - def newInterceptedDispatcher = - (new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput, - system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, - new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor] - - def dispatcherType = "FJDispatcher" - "A " + dispatcherType must { "process messages in parallel" in { implicit val dispatcher = newInterceptedDispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index c45cc74593..77663c780e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -187,52 +187,6 @@ class MonitorableThread(runnable: Runnable, name: String) } } -case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { - final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { - def createExecutorService: ExecutorService = { - new ForkJoinPool(targetParallelism) with ExecutorService { - setAsyncMode(true) - setMaintainsParallelism(true) - - override final def execute(r: Runnable) { - r match { - case fjmbox: FJMailbox ⇒ - //fjmbox.fjTask.reinitialize() - Thread.currentThread match { - case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ - fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected - case _ ⇒ super.execute[Unit](fjmbox.fjTask) - } - case _ ⇒ - super.execute(r) - } - } - - import java.util.{ Collection ⇒ JCollection } - - def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = - throw new UnsupportedOperationException("invokeAny. NOT!") - - def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - throw new UnsupportedOperationException("invokeAny. NOT!") - - def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = - throw new UnsupportedOperationException("invokeAny. NOT!") - } - } - } -} - -trait FJMailbox { self: Mailbox ⇒ - final val fjTask = new ForkJoinTask[Unit] with Runnable { - private[this] var result: Unit = () - final def getRawResult() = result - final def setRawResult(v: Unit) { result = v } - final def exec() = { self.run(); true } - final def run() { invoke() } - } -} - /** * As the name says */ @@ -273,3 +227,54 @@ class SaneRejectedExecutionHandler extends RejectedExecutionHandler { else runnable.run() } } + +/** + * Commented out pending discussion with Doug Lea + * + * case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider { + * final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory { + * def createExecutorService: ExecutorService = { + * new ForkJoinPool(targetParallelism) with ExecutorService { + * setAsyncMode(true) + * setMaintainsParallelism(true) + * + * override final def execute(r: Runnable) { + * r match { + * case fjmbox: FJMailbox ⇒ + * //fjmbox.fjTask.reinitialize() + * Thread.currentThread match { + * case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ + * fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected + * case _ ⇒ super.execute[Unit](fjmbox.fjTask) + * } + * case _ ⇒ + * super.execute(r) + * } + * } + * + * import java.util.{ Collection ⇒ JCollection } + * + * def invokeAny[T](callables: JCollection[_ <: Callable[T]]) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * + * def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * + * def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = + * throw new UnsupportedOperationException("invokeAny. NOT!") + * } + * } + * } + * } + * + * trait FJMailbox { self: Mailbox ⇒ + * final val fjTask = new ForkJoinTask[Unit] with Runnable { + * private[this] var result: Unit = () + * final def getRawResult() = result + * final def setRawResult(v: Unit) { result = v } + * final def exec() = { self.run(); true } + * final def run() { invoke() } + * } + * } + * + */ From 95791ce4c5c14bc8ed6162fd21802989b77d461c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 2 Dec 2011 18:08:13 +0100 Subject: [PATCH 4/9] #1424 - RemoteSupport is now instantiated from the config, so now anyone can write their own Akka transport layer for remote actors --- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- akka-remote/src/main/resources/reference.conf | 2 +- .../src/main/scala/akka/remote/Remote.scala | 16 +++++++++++----- .../scala/akka/remote/RemoteConfigSpec.scala | 2 +- akka-spring/src/test/resources/akka-test.conf | 2 +- .../test/scala/TypedActorSpringFeatureTest.scala | 2 +- .../scala/UntypedActorSpringFeatureTest.scala | 2 +- 7 files changed, 17 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 274ac6e9dc..356a4461bd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -297,7 +297,7 @@ class DefaultClusterNode private[akka] ( :: Nil)).start() lazy val remoteService: RemoteSupport = { - val remote = new akka.cluster.netty.NettyRemoteSupport + val remote = new akka.remote.netty.NettyRemoteSupport remote.start(hostname, port) remote.register(RemoteClusterDaemon.Address, remoteDaemon) remote.addListener(RemoteFailureDetector.sender) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a80c4fa6a7..41eaf2e117 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -8,7 +8,7 @@ akka { remote { - transport = "akka.cluster.netty.NettyRemoteSupport" + transport = "akka.remote.netty.NettyRemoteSupport" use-compression = off diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 82daeaf820..e5854e03a3 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -71,13 +71,19 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { lazy val eventStream = new NetworkEventStream(system) lazy val server: RemoteSupport = { - val remote = new akka.remote.netty.NettyRemoteSupport(system) - remote.start() //TODO Any application loader here? + //new akka.remote.netty.NettyRemoteSupport(system) + ReflectiveAccess.createInstance[RemoteSupport](remoteExtension.RemoteTransport, Array[Class[_]](classOf[ActorSystem]), Array[AnyRef](system)) match { + case Left(problem) ⇒ + log.error(problem, "Could not load remote transport layer") + throw problem + case Right(remote) ⇒ + remote.start(None) //TODO Any application loader here? - system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) - system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) + system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - remote + remote + } } def start(): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index d11d07ddd3..87a213e0eb 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -12,7 +12,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { import config._ //akka.remote - getString("akka.remote.transport") must equal("akka.cluster.netty.NettyRemoteSupport") + getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) diff --git a/akka-spring/src/test/resources/akka-test.conf b/akka-spring/src/test/resources/akka-test.conf index e6c019e24c..806783d217 100644 --- a/akka-spring/src/test/resources/akka-test.conf +++ b/akka-spring/src/test/resources/akka-test.conf @@ -128,7 +128,7 @@ akka { # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' secure-cookie = "" - layer = "akka.cluster.netty.NettyRemoteSupport" + layer = "akka.remote.netty.NettyRemoteSupport" server { hostname = "localhost" # The hostname or IP that clients should connect to diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index 9b9f428d3d..f2cc5a288d 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -15,7 +15,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ ClassPathResource, Resource } import org.scalatest.{ BeforeAndAfterAll, FeatureSpec } import java.util.concurrent.CountDownLatch -import akka.cluster.netty.NettyRemoteSupport +import akka.remote.netty.NettyRemoteSupport import akka.actor._ import akka.actor.Actor._ diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index 6c7a0156e7..66ca68dba7 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -9,7 +9,7 @@ import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import org.springframework.context.support.ClassPathXmlApplicationContext -import akka.cluster.netty.NettyRemoteSupport +import akka.remote.netty.NettyRemoteSupport import org.scalatest.{ BeforeAndAfterAll, FeatureSpec } import java.util.concurrent.CountDownLatch From c8a1a963101215032fc20b103e5c2077a5d37e23 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 5 Dec 2011 10:41:36 +0100 Subject: [PATCH 5/9] Updated config documentation --- akka-docs/general/code/ConfigDocSpec.scala | 6 +- akka-docs/general/configuration.rst | 83 ++++++++----------- .../test/scala/akka/testkit/AkkaSpec.scala | 7 +- 3 files changed, 40 insertions(+), 56 deletions(-) diff --git a/akka-docs/general/code/ConfigDocSpec.scala b/akka-docs/general/code/ConfigDocSpec.scala index 4b3de65e65..3180fa3b8a 100644 --- a/akka-docs/general/code/ConfigDocSpec.scala +++ b/akka-docs/general/code/ConfigDocSpec.scala @@ -9,7 +9,7 @@ import com.typesafe.config.ConfigFactory //#imports -class ConfigDocSpec extends WordSpec { +class ConfigDocSpec extends WordSpec with MustMatchers { "programmatically configure ActorSystem" in { //#custom-config @@ -21,7 +21,9 @@ class ConfigDocSpec extends WordSpec { } } """) - val system = ActorSystem("MySystem", ConfigFactory.systemProperties.withFallback(customConf)) + // ConfigFactory.load sandwiches customConfig between default reference + // config and default overrides, and then resolves it. + val system = ActorSystem("MySystem", ConfigFactory.load(customConf)) //#custom-config system.stop() diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 58dfbd5edd..0e96f8165e 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -5,8 +5,6 @@ Configuration .. contents:: :local: -.. _-Dakka.config: -.. _-Dakka.home: Specifying the configuration file --------------------------------- @@ -16,23 +14,36 @@ configuration files that you see below. You can specify your own configuration f property in the reference config. You only have to define the properties that differ from the default configuration. -FIXME: These default locations has changed +By default the ``ConfigFactory.load`` method is used, which will load all ``application.conf`` (and +``application.json`` and ``application.properties``) from the root of the classpath, if they exists. +It uses ``ConfigFactory.defaultOverrides``, i.e. system properties, before falling back to +application and reference configuration. -The location of the config file to use can be specified in various ways: +Note that *all* ``application.{conf,json,properties}`` classpath resources, from all directories and +jar files, are loaded and merged. Therefore it is a good practice to define separate sub-trees in the +configuration for each actor system, and grab the specific configuration when instantiating the ActorSystem. -* Define the ``-Dakka.config=...`` system property parameter with a file path to configuration file. +:: + + myapp1 { + akka.logLevel = WARNING + } + myapp2 { + akka.logLevel = ERROR + } -* Put an ``akka.conf`` file in the root of the classpath. +.. code-block:: scala -* Define the ``AKKA_HOME`` environment variable pointing to the root of the Akka - distribution. The config is taken from the ``AKKA_HOME/config/akka.conf``. You - can also point to the AKKA_HOME by specifying the ``-Dakka.home=...`` system - property parameter. + val app1 = ActorSystem("MyApp1", ConfigFactory.load.getConfig("myapp1")) + val app2 = ActorSystem("MyApp2", ConfigFactory.load.getConfig("myapp2")) -If several of these ways to specify the config file are used at the same time the precedence is the order as given above, -i.e. you can always redefine the location with the ``-Dakka.config=...`` system property. +If the system properties ``config.resource``, ``config.file``, or ``config.url`` are set, then the +classpath resource, file, or URL specified in those properties will be used rather than the default +``application.{conf,json,properties}`` classpath resources. Note that classpath resource names start +with ``/``. ``-Dconfig.resource=/dev.conf`` will load the ``dev.conf`` from the root of the classpath. -You may also specify the configuration programmatically when instantiating the ``ActorSystem``. +You may also specify and parse the configuration programmatically in other ways when instantiating +the ``ActorSystem``. .. includecode:: code/ConfigDocSpec.scala :include: imports,custom-config @@ -84,7 +95,7 @@ Each Akka module has a reference configuration file with the default values. .. literalinclude:: ../../akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/reference.conf :language: none -A custom ``akka.conf`` might look like this:: +A custom ``application.conf`` might look like this:: # In this file you can override any option defined in the reference files. # Copy in parts of the reference files and modify as you please. @@ -125,48 +136,29 @@ Config file format The configuration file syntax is described in the `HOCON `_ specification. Note that it supports three formats; conf, json, and properties. -.. _-Dakka.mode: - -Specifying files for different modes ------------------------------------- - -FIXME: mode doesn't exist, or will it? - -You can use different configuration files for different purposes by specifying a mode option, either as -``-Dakka.mode=...`` system property or as ``AKKA_MODE=...`` environment variable. For example using DEBUG log level -when in development mode. Run with ``-Dakka.mode=dev`` and place the following ``akka.dev.conf`` in the root of -the classpath. - -akka.dev.conf: - -:: - - akka { - loglevel = "DEBUG" - } - -The mode option works in the same way when using configuration files in ``AKKA_HOME/config/`` directory. - -The mode option is not used when specifying the configuration file with ``-Dakka.config=...`` system property. Including files --------------- -FIXME: The include syntax has changed +Sometimes it can be useful to include another configuration file, for example if you have one ``application.conf`` with all +environment independent settings and then override some settings for specific environments. -Sometimes it can be useful to include another configuration file, for example if you have one ``akka.conf`` with all -environment independent settings and then override some settings for specific modes. +Specifying system property with ``-Dconfig.resource=/dev.conf`` will load the ``dev.conf`` file, which includes the ``application.conf`` -akka.dev.conf: +dev.conf: :: - include "akka.conf" + include "application" akka { loglevel = "DEBUG" } +More advanced include and substitution mechanisms are explained in the `HOCON `_ +specification. + + .. _-Dakka.logConfigOnStart: Logging of Configuration @@ -175,10 +167,3 @@ Logging of Configuration If the system or config property ``akka.logConfigOnStart`` is set to ``on``, then the complete configuration at INFO level when the actor system is started. This is useful when you are uncertain of what configuration is used. - -Summary of System Properties ----------------------------- - -* :ref:`akka.home <-Dakka.home>` (``AKKA_HOME``): where Akka searches for configuration -* :ref:`akka.config <-Dakka.config>`: explicit configuration file location -* :ref:`akka.mode <-Dakka.mode>` (``AKKA_MODE``): modify configuration file name for multiple profiles diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index ce8dad2b4c..3e0d42be57 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -17,8 +17,7 @@ import com.typesafe.config.ConfigFactory object TimingTest extends Tag("timing") object AkkaSpec { - val testConf = { - val cfg = ConfigFactory.parseString(""" + val testConf: Config = ConfigFactory.parseString(""" akka { event-handlers = ["akka.testkit.TestEventListener"] loglevel = "WARNING" @@ -30,8 +29,6 @@ object AkkaSpec { } } """) - ConfigFactory.load(cfg) - } def mapToConfig(map: Map[String, Any]): Config = { import scala.collection.JavaConverters._ @@ -61,7 +58,7 @@ abstract class AkkaSpec(_system: ActorSystem = ActorSystem(getClass.getSimpleNam protected def atTermination() {} - def this(config: Config) = this(ActorSystem(getClass.getSimpleName, config.withFallback(AkkaSpec.testConf))) + def this(config: Config) = this(ActorSystem(getClass.getSimpleName, ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) def this(s: String) = this(ConfigFactory.parseString(s)) From ddf3a3667381b6233be572ef7f34468493c1bdfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 5 Dec 2011 11:14:05 +0100 Subject: [PATCH 6/9] Added JSON file for ls.implicit.ly --- src/main/ls/2.0.json | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 src/main/ls/2.0.json diff --git a/src/main/ls/2.0.json b/src/main/ls/2.0.json new file mode 100644 index 0000000000..067332ed73 --- /dev/null +++ b/src/main/ls/2.0.json @@ -0,0 +1,15 @@ + +{ + "organization":"com.typesafe.akka", + "name":"akka", + "version":"2.0-SNAPSHOT", + "description":"Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM.", + "site":"", + "tags":["actors","stm","concurrency","distributed","fault-tolerance","scala","java","futures","dataflow","remoting"], + "docs":"", + "licenses": [], + "resolvers": ["http://akka.io/repository/"], + "dependencies": [], + "scalas": ["2.9.1"], + "sbt": false +} \ No newline at end of file From d12a33235ffbbc2fbeb9c2d775ef59ca129ee4d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 5 Dec 2011 12:31:56 +0100 Subject: [PATCH 7/9] Fixed wrong help text in exception. Fixes #1431. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- akka-actor/src/main/scala/akka/actor/Actor.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a7ea4d7433..d39b0a2270 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -199,8 +199,10 @@ trait Actor { throw new ActorInitializationException( "\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + - "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or") + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem)") if (contextStack.isEmpty) noContextError val c = contextStack.head From 5f91bf5c7d513a881dc287569b63cba9992909b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 5 Dec 2011 12:34:17 +0100 Subject: [PATCH 8/9] Added disabled GossipMembershipMultiJVMSpec. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../remote/GossipMembershipMultiJvmSpec.scala | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala new file mode 100644 index 0000000000..878b7840b0 --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/GossipMembershipMultiJvmSpec.scala @@ -0,0 +1,134 @@ +// package akka.remote + +// import akka.actor.Actor +// import akka.remote._ +// import akka.routing._ +// import akka.routing.Routing.Broadcast + +// object GossipMembershipMultiJvmSpec { +// val NrOfNodes = 4 +// class SomeActor extends Actor with Serializable { +// def receive = { +// case "hit" ⇒ sender ! system.nodename +// case "end" ⇒ self.stop() +// } +// } + +// import com.typesafe.config.ConfigFactory +// val commonConfig = ConfigFactory.parseString(""" +// akka { +// loglevel = "WARNING" +// cluster { +// seed-nodes = ["localhost:9991"] +// } +// remote.server.hostname = "localhost" +// }""") + +// val node1Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9991" +// cluster.nodename = "node1" +// }""") withFallback commonConfig + +// val node2Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9992" +// cluster.nodename = "node2" +// }""") withFallback commonConfig + +// val node3Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9993" +// cluster.nodename = "node3" +// }""") withFallback commonConfig + +// val node4Config = ConfigFactory.parseString(""" +// akka { +// remote.server.port = "9994" +// cluster.nodename = "node4" +// }""") withFallback commonConfig +// } + +// class GossipMembershipMultiJvmNode1 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node1Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "A cluster" must { +// "allow new node to join and should reach convergence with new membership table" in { + +// barrier("setup") +// remote.start() + +// barrier("start") +// val actor = system.actorOf[SomeActor]("service-hello") +// actor.isInstanceOf[RoutedActorRef] must be(true) + +// val connectionCount = NrOfNodes - 1 +// val iterationCount = 10 + +// var replies = Map( +// "node1" -> 0, +// "node2" -> 0, +// "node3" -> 0) + +// for (i ← 0 until iterationCount) { +// for (k ← 0 until connectionCount) { +// val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor")) +// replies = replies + (nodeName -> (replies(nodeName) + 1)) +// } +// } + +// barrier("broadcast-end") +// actor ! Broadcast("end") + +// barrier("end") +// replies.values foreach { _ must be > (0) } + +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode2 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node2Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode3 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node3Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } + +// class GossipMembershipMultiJvmNode4 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node4Config) { +// import GossipMembershipMultiJvmSpec._ +// val nodes = NrOfNodes +// "___" must { +// "___" in { +// barrier("setup") +// remote.start() +// barrier("start") +// barrier("broadcast-end") +// barrier("end") +// barrier("done") +// } +// } +// } From 5530c4cbdb34d439a6851bdf5000275159a680d0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 5 Dec 2011 14:01:03 +0100 Subject: [PATCH 9/9] Unborking master --- akka-actor/src/main/scala/akka/actor/Actor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d39b0a2270..0770448c3b 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -200,7 +200,7 @@ trait Actor { "\n\tYou cannot create an instance of " + getClass.getName + " explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + - "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or") + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem), or" + "\n\t\t'val actor = context.actorOf[MyActor]' (to create a supervised child actor from within an actor), or" + "\n\t\t'val actor = system.actorOf(new MyActor(..))' (to create a top level actor from the ActorSystem)")