From d63c511401ce1f36c889afc057b793b8e1a729de Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Nov 2011 11:16:23 +0100 Subject: [PATCH] #1351 - Making sure that the remoting is shut down when the ActorSystem is shut down --- .../src/main/scala/akka/actor/ActorSystem.scala | 12 ++++++------ akka-remote/src/main/scala/akka/remote/Remote.scala | 3 +-- .../scala/akka/remote/RemoteActorRefProvider.scala | 11 ++++------- .../scala/akka/remote/netty/NettyRemoteSupport.scala | 3 ++- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index aaa307c228..6be2325385 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -221,7 +221,7 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst case Left(e) ⇒ throw e case Right(b) ⇒ b } - val arguments = List( + val arguments = Seq( classOf[Settings] -> settings, classOf[ActorPath] -> rootPath, classOf[EventStream] -> eventStream, @@ -242,9 +242,6 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst def deathWatch: DeathWatch = provider.deathWatch def nodename: String = provider.nodename - terminationFuture.onComplete(_ ⇒ scheduler.stop()) - terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) - @volatile private var _serialization: Serialization = _ def serialization = _serialization @@ -254,8 +251,7 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst def /(actorName: String): ActorPath = guardian.path / actorName - def start(): this.type = { - if (_serialization != null) throw new IllegalStateException("cannot initialize ActorSystemImpl twice!") + private lazy val _start: this.type = { _serialization = new Serialization(this) _typedActor = new TypedActor(settings, _serialization) provider.init(this) @@ -265,12 +261,16 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst this } + def start() = _start + def registerOnTermination(code: ⇒ Unit) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } // TODO shutdown all that other stuff, whatever that may be def stop() { guardian.stop() + terminationFuture onComplete (_ ⇒ scheduler.stop()) + terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index a140f1f252..cf3b93b311 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -31,8 +31,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { val log = Logging(system, this) import system._ - val AC = settings - import AC._ + import settings._ // TODO move to settings? val shouldCompressData = config.getBool("akka.remote.use-compression", false) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index e8bdddd5c2..facbf6cba1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -13,15 +13,14 @@ import akka.dispatch._ import akka.util.duration._ import akka.config.ConfigurationException import akka.event.{ DeathWatch, Logging } -import akka.serialization.{ Serialization, Serializer, Compression } import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ -import java.net.InetSocketAddress -import java.util.concurrent.ConcurrentHashMap import com.google.protobuf.ByteString import java.util.concurrent.atomic.AtomicBoolean import akka.event.EventStream +import java.util.concurrent.ConcurrentHashMap +import akka.dispatch.Promise /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -36,11 +35,8 @@ class RemoteActorRefProvider( val scheduler: Scheduler) extends ActorRefProvider { val log = Logging(eventStream, this) - - import java.util.concurrent.ConcurrentHashMap - import akka.dispatch.Promise - val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler) + def deathWatch = local.deathWatch def guardian = local.guardian def systemGuardian = local.systemGuardian @@ -59,6 +55,7 @@ class RemoteActorRefProvider( local.init(system) remote = new Remote(system, nodename) remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) + terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 77fa2fded3..f6110f62d4 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -351,6 +351,7 @@ class ActiveRemoteClientHandler( * Provides the implementation of the Netty remote support */ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { + val log = Logging(system, this) val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit) val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit) @@ -474,8 +475,8 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi remoteClients.clear() } finally { clientsLock.writeLock().unlock() + currentServer.getAndSet(None) foreach { _.shutdown() } } - currentServer.getAndSet(None) foreach { _.shutdown() } } }