Merge branch 'master' into wip-1141-config-patriknw

This commit is contained in:
Patrik Nordwall 2011-11-18 11:29:56 +01:00
commit c6b157dd3a
4 changed files with 13 additions and 16 deletions

View file

@ -242,7 +242,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
case Left(e) throw e case Left(e) throw e
case Right(b) b case Right(b) b
} }
val arguments = List( val arguments = Seq(
classOf[Settings] -> settings, classOf[Settings] -> settings,
classOf[ActorPath] -> rootPath, classOf[ActorPath] -> rootPath,
classOf[EventStream] -> eventStream, classOf[EventStream] -> eventStream,
@ -263,9 +263,6 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
def deathWatch: DeathWatch = provider.deathWatch def deathWatch: DeathWatch = provider.deathWatch
def nodename: String = provider.nodename def nodename: String = provider.nodename
terminationFuture.onComplete(_ scheduler.stop())
terminationFuture.onComplete(_ dispatcher.shutdown())
@volatile @volatile
private var _serialization: Serialization = _ private var _serialization: Serialization = _
def serialization = _serialization def serialization = _serialization
@ -275,8 +272,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
def /(actorName: String): ActorPath = guardian.path / actorName def /(actorName: String): ActorPath = guardian.path / actorName
def start(): this.type = { private lazy val _start: this.type = {
if (_serialization != null) throw new IllegalStateException("cannot initialize ActorSystemImpl twice!")
_serialization = new Serialization(this) _serialization = new Serialization(this)
_typedActor = new TypedActor(settings, _serialization) _typedActor = new TypedActor(settings, _serialization)
provider.init(this) provider.init(this)
@ -286,12 +282,16 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
this this
} }
def start() = _start
def registerOnTermination(code: Unit) { terminationFuture onComplete (_ code) } def registerOnTermination(code: Unit) { terminationFuture onComplete (_ code) }
def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) }
// TODO shutdown all that other stuff, whatever that may be // TODO shutdown all that other stuff, whatever that may be
def stop() { def stop() {
guardian.stop() guardian.stop()
terminationFuture onComplete (_ scheduler.stop())
terminationFuture onComplete (_ dispatcher.shutdown())
} }
} }

View file

@ -31,8 +31,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
val log = Logging(system, this) val log = Logging(system, this)
import system._ import system._
val AC = settings import settings._
import AC._
// TODO move to settings? // TODO move to settings?
val shouldCompressData = config.getBoolean("akka.remote.use-compression") val shouldCompressData = config.getBoolean("akka.remote.use-compression")

View file

@ -13,15 +13,14 @@ import akka.dispatch._
import akka.util.duration._ import akka.util.duration._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.event.{ DeathWatch, Logging } import akka.event.{ DeathWatch, Logging }
import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.Compression.LZF import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import akka.event.EventStream import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
/** /**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -36,11 +35,8 @@ class RemoteActorRefProvider(
val scheduler: Scheduler) extends ActorRefProvider { val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(eventStream, this) val log = Logging(eventStream, this)
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler) val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler)
def deathWatch = local.deathWatch def deathWatch = local.deathWatch
def guardian = local.guardian def guardian = local.guardian
def systemGuardian = local.systemGuardian def systemGuardian = local.systemGuardian
@ -59,6 +55,7 @@ class RemoteActorRefProvider(
local.init(system) local.init(system)
remote = new Remote(system, nodename) remote = new Remote(system, nodename)
remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
terminationFuture.onComplete(_ remote.server.shutdown())
} }
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime

View file

@ -351,6 +351,7 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support * Provides the implementation of the Netty remote support
*/ */
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
val log = Logging(system, this)
val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit) val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit)
val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit) val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit)
@ -474,8 +475,8 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
remoteClients.clear() remoteClients.clear()
} finally { } finally {
clientsLock.writeLock().unlock() clientsLock.writeLock().unlock()
currentServer.getAndSet(None) foreach { _.shutdown() }
} }
currentServer.getAndSet(None) foreach { _.shutdown() }
} }
} }