Scala style fixes, added parens for side effecting shutdown methods

This commit is contained in:
Patrik Nordwall 2011-04-29 10:20:16 +02:00
parent cf49478183
commit 6576cd51e9
20 changed files with 91 additions and 82 deletions

View file

@ -381,7 +381,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
inits.get must be (3) inits.get must be (3)
supervisor.shutdown supervisor.shutdown()
} }
} }
} }

View file

@ -14,7 +14,7 @@ import org.scalatest.matchers.MustMatchers
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
import Ticket669Spec._ import Ticket669Spec._
override def afterAll = Actor.registry.shutdownAll() override def afterAll() { Actor.registry.shutdownAll() }
"A supervised actor with lifecycle PERMANENT" should { "A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in { "be able to reply on failure during preRestart" in {

View file

@ -36,7 +36,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
latch.await(10,TimeUnit.SECONDS) should equal (true) latch.await(10,TimeUnit.SECONDS) should equal (true)
result.get should equal (42) result.get should equal (42)
List(x,y,z).foreach(_.shutdown) List(x,y,z).foreach(_.shutdown())
} }
it("should be able to sum a sequence of ints") { it("should be able to sum a sequence of ints") {
@ -67,7 +67,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
latch.await(10,TimeUnit.SECONDS) should equal (true) latch.await(10,TimeUnit.SECONDS) should equal (true)
result.get should equal (sum(0,ints(0,1000))) result.get should equal (sum(0,ints(0,1000)))
List(x,y,z).foreach(_.shutdown) List(x,y,z).foreach(_.shutdown())
} }
/* /*
it("should be able to join streams") { it("should be able to join streams") {
@ -158,7 +158,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
val setV = thread { val setV = thread {
v << y v << y
} }
List(x,y,z,v) foreach (_.shutdown) List(x,y,z,v) foreach (_.shutdown())
latch.await(2,TimeUnit.SECONDS) should equal (true) latch.await(2,TimeUnit.SECONDS) should equal (true)
}*/ }*/
} }

View file

@ -111,13 +111,13 @@ object ActorModelSpec {
super.dispatch(invocation) super.dispatch(invocation)
} }
private[akka] abstract override def start { private[akka] abstract override def start() {
super.start super.start()
starts.incrementAndGet() starts.incrementAndGet()
} }
private[akka] abstract override def shutdown { private[akka] abstract override def shutdown() {
super.shutdown super.shutdown()
stops.incrementAndGet() stops.incrementAndGet()
} }
} }

View file

@ -105,15 +105,19 @@ object Scheduler {
} }
} }
def shutdown: Unit = synchronized { def shutdown() {
service.shutdown synchronized {
service.shutdown()
}
} }
def restart: Unit = synchronized { def restart() {
shutdown synchronized {
shutdown()
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
} }
} }
}
private object SchedulerThreadFactory extends ThreadFactory { private object SchedulerThreadFactory extends ThreadFactory {
private var count = 0 private var count = 0

View file

@ -160,6 +160,6 @@ object DataFlow {
} }
} }
def shutdown = in ! Exit def shutdown() { in ! Exit }
} }
} }

View file

@ -168,7 +168,7 @@ trait MessageDispatcher {
shutdownSchedule = SCHEDULED shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED => case SCHEDULED =>
if (uuids.isEmpty() && futures.get == 0) { if (uuids.isEmpty && futures.get == 0) {
active switchOff { active switchOff {
shutdown // shut down in the dispatcher's references is zero shutdown // shut down in the dispatcher's references is zero
} }
@ -205,12 +205,12 @@ trait MessageDispatcher {
/** /**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/ */
private[akka] def start: Unit private[akka] def start(): Unit
/** /**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/ */
private[akka] def shutdown: Unit private[akka] def shutdown(): Unit
/** /**
* Returns the size of the mailbox for the specified actor * Returns the size of the mailbox for the specified actor

View file

@ -221,9 +221,9 @@ trait ExecutorServiceDelegate extends ExecutorService {
def execute(command: Runnable) = executor.execute(command) def execute(command: Runnable) = executor.execute(command)
def shutdown = executor.shutdown def shutdown() { executor.shutdown() }
def shutdownNow = executor.shutdownNow def shutdownNow() = executor.shutdownNow()
def isShutdown = executor.isShutdown def isShutdown = executor.isShutdown

View file

@ -102,9 +102,9 @@ object EventHandler extends ListenerManagement {
/** /**
* Shuts down all event handler listeners including the event handle dispatcher. * Shuts down all event handler listeners including the event handle dispatcher.
*/ */
def shutdown() = { def shutdown() {
foreachListener(_.stop) foreachListener(_.stop())
EventHandlerDispatcher.shutdown EventHandlerDispatcher.shutdown()
} }
def notify(event: Any) { def notify(event: Any) {

View file

@ -143,11 +143,11 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
handler handler
} }
def shutdown { def shutdown() {
eventHandler.stop() eventHandler.stop()
removeListener(eventHandler) removeListener(eventHandler)
this.shutdownClientModule this.shutdownClientModule()
this.shutdownServerModule this.shutdownServerModule()
clear clear
} }

View file

@ -21,7 +21,7 @@ class AkkaLoader {
* Boot initializes the specified bundles * Boot initializes the specified bundles
*/ */
def boot(withBanner: Boolean, b : Bootable): Unit = hasBooted switchOn { def boot(withBanner: Boolean, b : Bootable): Unit = hasBooted switchOn {
if (withBanner) printBanner if (withBanner) printBanner()
println("Starting Akka...") println("Starting Akka...")
b.onLoad b.onLoad
Thread.currentThread.setContextClassLoader(getClass.getClassLoader) Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
@ -32,15 +32,17 @@ class AkkaLoader {
/* /*
* Shutdown, well, shuts down the bundles used in boot * Shutdown, well, shuts down the bundles used in boot
*/ */
def shutdown: Unit = hasBooted switchOff { def shutdown() {
hasBooted switchOff {
println("Shutting down Akka...") println("Shutting down Akka...")
_bundles.foreach(_.onUnload) _bundles.foreach(_.onUnload)
_bundles = None _bundles = None
Actor.shutdownHook.run Actor.shutdownHook.run
println("Akka succesfully shut down") println("Akka succesfully shut down")
} }
}
private def printBanner = { private def printBanner() {
println("==================================================") println("==================================================")
println(" t") println(" t")
println(" t t t") println(" t t t")

View file

@ -5,6 +5,6 @@
package akka.util package akka.util
trait Bootable { trait Bootable {
def onLoad {} def onLoad() {}
def onUnload {} def onUnload() {}
} }

View file

@ -20,18 +20,18 @@ trait BootableRemoteActorService extends Bootable {
def run = Actor.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port def run = Actor.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port
}, "Akka Remote Service") }, "Akka Remote Service")
def startRemoteService = remoteServerThread.start() def startRemoteService() { remoteServerThread.start() }
abstract override def onLoad = { abstract override def onLoad() {
if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) { if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) {
startRemoteService startRemoteService()
} }
super.onLoad super.onLoad()
} }
abstract override def onUnload = { abstract override def onUnload() {
Actor.remote.shutdown Actor.remote.shutdown()
if (remoteServerThread.isAlive) remoteServerThread.join(1000) if (remoteServerThread.isAlive) remoteServerThread.join(1000)
super.onUnload super.onUnload()
} }
} }

View file

@ -107,7 +107,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match { remoteClients.remove(Address(address)) match {
case s: Some[RemoteClient] => s.get.shutdown case s: Some[RemoteClient] => s.get.shutdown()
case None => false case None => false
} }
} }
@ -132,15 +132,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
/** /**
* Clean-up all open connections. * Clean-up all open connections.
*/ */
def shutdownClientModule = { def shutdownClientModule() {
shutdownRemoteClients shutdownRemoteClients()
//TODO: Should we empty our remoteActors too? //TODO: Should we empty our remoteActors too?
//remoteActors.clear //remoteActors.clear
} }
def shutdownRemoteClients = lock withWriteGuard { def shutdownRemoteClients() = lock withWriteGuard {
remoteClients.foreach({ case (addr, client) => client.shutdown }) remoteClients.foreach({ case (addr, client) => client.shutdown() })
remoteClients.clear remoteClients.clear()
} }
def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = { def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = {
@ -187,7 +187,7 @@ abstract class RemoteClient private[akka] (
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
def shutdown: Boolean def shutdown(): Boolean
/** /**
* Returns an array with the current pending messages not yet delivered. * Returns an array with the current pending messages not yet delivered.
@ -403,16 +403,16 @@ class ActiveRemoteClient private[akka] (
} }
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown = runSwitch switchOff { def shutdown() = runSwitch switchOff {
notifyListeners(RemoteClientShutdown(module, remoteAddress)) notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop() timer.stop()
timer = null timer = null
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
openChannels = null openChannels = null
bootstrap.releaseExternalResources bootstrap.releaseExternalResources()
bootstrap = null bootstrap = null
connection = null connection = null
pendingRequests.clear pendingRequests.clear()
} }
private[akka] def isWithinReconnectionTimeWindow: Boolean = { private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -629,7 +629,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
openChannels.add(bootstrap.bind(address)) openChannels.add(bootstrap.bind(address))
serverModule.notifyListeners(RemoteServerStarted(serverModule)) serverModule.notifyListeners(RemoteServerStarted(serverModule))
def shutdown { def shutdown() {
try { try {
val shutdownSignal = { val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder val b = RemoteControlProtocol.newBuilder
@ -641,7 +641,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
openChannels.disconnect openChannels.disconnect
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources()
serverModule.notifyListeners(RemoteServerShutdown(serverModule)) serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch { } catch {
case e: Exception => case e: Exception =>
@ -684,11 +684,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
this this
} }
def shutdownServerModule = guard withGuard { def shutdownServerModule() = guard withGuard {
_isRunning switchOff { _isRunning switchOff {
currentServer.getAndSet(None) foreach { currentServer.getAndSet(None) foreach {
instance => instance =>
instance.shutdown instance.shutdown()
} }
} }
} }

View file

@ -40,20 +40,20 @@ class AkkaRemoteTest extends
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls
} }
override def afterAll { override def afterAll() {
if (!OptimizeLocal) if (!OptimizeLocal)
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests
} }
override def beforeEach { override def beforeEach() {
remote.start(host,port) remote.start(host,port)
super.beforeEach super.beforeEach
} }
override def afterEach() { override def afterEach() {
remote.shutdown remote.shutdown()
Actor.registry.shutdownAll() Actor.registry.shutdownAll()
super.afterEach super.afterEach()
} }
/* Utilities */ /* Utilities */

View file

@ -48,7 +48,7 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
val result1 = session1 !! GetUser() val result1 = session1 !! GetUser()
result1.as[String] must equal (Some("session[1]")) result1.as[String] must equal (Some("session[1]"))
remote.shutdownClientModule remote.shutdownClientModule()
val session2 = remote.actorFor("untyped-session-actor-service", 5000L, host, port) val session2 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
@ -66,7 +66,7 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
default1.as[String] must equal (Some("anonymous")) default1.as[String] must equal (Some("anonymous"))
instantiatedSessionActors must have size (1) instantiatedSessionActors must have size (1)
remote.shutdownClientModule remote.shutdownClientModule()
Thread.sleep(1000) Thread.sleep(1000)
instantiatedSessionActors must have size (0) instantiatedSessionActors must have size (0)
} }

View file

@ -18,8 +18,8 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest {
} }
// make sure the servers shutdown cleanly after the test has finished // make sure the servers shutdown cleanly after the test has finished
override def afterEach = { override def afterEach() {
super.afterEach super.afterEach()
clearMessageLogs clearMessageLogs
} }
@ -32,7 +32,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest {
session1.login("session[1]") session1.login("session[1]")
session1.getUser() must equal ("session[1]") session1.getUser() must equal ("session[1]")
remote.shutdownClientModule remote.shutdownClientModule()
val session2 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port) val session2 = remote.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, host, port)
@ -46,7 +46,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest {
session1.getUser() must equal ("anonymous") session1.getUser() must equal ("anonymous")
RemoteTypedSessionActorImpl.getInstances() must have size (1) RemoteTypedSessionActorImpl.getInstances() must have size (1)
remote.shutdownClientModule remote.shutdownClientModule()
Thread.sleep(1000) Thread.sleep(1000)
RemoteTypedSessionActorImpl.getInstances() must have size (0) RemoteTypedSessionActorImpl.getInstances() must have size (0)
@ -57,7 +57,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends AkkaRemoteTest {
session1.doSomethingFunny() session1.doSomethingFunny()
remote.shutdownClientModule remote.shutdownClientModule()
Thread.sleep(1000) Thread.sleep(1000)
RemoteTypedSessionActorImpl.getInstances() must have size (0) RemoteTypedSessionActorImpl.getInstances() must have size (0)
} }

View file

@ -6,7 +6,7 @@
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import akka.actor.{SupervisorFactory, Actor, ActorRef} import akka.actor.{Actor, ActorRef}
import akka.stm._ import akka.stm._
import akka.config.Supervision.{OneForOneStrategy,Permanent} import akka.config.Supervision.{OneForOneStrategy,Permanent}
import Actor._ import Actor._
@ -108,7 +108,9 @@
self.reply(ChatLog(messageList)) self.reply(ChatLog(messageList))
} }
override def postRestart(reason: Throwable) = chatLog = TransactionalVector() override def postRestart(reason: Throwable) {
chatLog = TransactionalVector()
}
} }
/** /**
@ -135,9 +137,10 @@
sessions -= username sessions -= username
} }
protected def shutdownSessions = protected def shutdownSessions() {
sessions.foreach { case (_, session) => session.stop() } sessions.foreach { case (_, session) => session.stop() }
} }
}
/** /**
* Implements chat management, e.g. chat message dispatch. * Implements chat management, e.g. chat message dispatch.
@ -184,11 +187,11 @@
// abstract methods to be defined somewhere else // abstract methods to be defined somewhere else
protected def chatManagement: Receive protected def chatManagement: Receive
protected def sessionManagement: Receive protected def sessionManagement: Receive
protected def shutdownSessions(): Unit protected def shutdownSessions()
override def postStop() = { override def postStop() {
EventHandler.info(this, "Chat server is shutting down...") EventHandler.info(this, "Chat server is shutting down...")
shutdownSessions shutdownSessions()
self.unlink(storage) self.unlink(storage)
storage.stop() storage.stop()
} }
@ -206,7 +209,7 @@
SessionManagement with SessionManagement with
ChatManagement with ChatManagement with
MemoryChatStorageFactory { MemoryChatStorageFactory {
override def preStart() = { override def preStart() {
remote.start("localhost", 2552); remote.start("localhost", 2552);
remote.register("chat:service", self) //Register the actor with the specified service id remote.register("chat:service", self) //Register the actor with the specified service id
} }
@ -217,9 +220,9 @@
*/ */
object ServerRunner { object ServerRunner {
def main(args: Array[String]): Unit = ServerRunner.run def main(args: Array[String]) { ServerRunner.run() }
def run = { def run() {
actorOf[ChatService].start() actorOf[ChatService].start()
} }
} }
@ -229,9 +232,9 @@
*/ */
object ClientRunner { object ClientRunner {
def main(args: Array[String]): Unit = ClientRunner.run def main(args: Array[String]) { ClientRunner.run() }
def run = { def run() {
val client1 = new ChatClient("jonas") val client1 = new ChatClient("jonas")
client1.login client1.login

View file

@ -108,9 +108,9 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
private[akka] override def start {} private[akka] override def start() {}
private[akka] override def shutdown {} private[akka] override def shutdown() {}
private[akka] override def timeoutMs = 100L private[akka] override def timeoutMs = 100L

View file

@ -173,7 +173,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
} }
def stop = synchronized { def stop = synchronized {
if (supervisor.isDefined) supervisor.get.shutdown if (supervisor.isDefined) supervisor.get.shutdown()
} }
} }