diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 515676642a..191a442e7a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -56,6 +56,8 @@ trait ActorRefFactory { def createActor[T <: Actor](clazz: Class[T]): ActorRef = createActor(Props(clazz)) def createActor(factory: ⇒ Actor): ActorRef = createActor(Props(() ⇒ factory)) + + def createActor(creator: UntypedActorFactory): ActorRef = createActor(Props(() => creator.create())) def findActor(address: String): Option[ActorRef] = provider.findActorRef(address) diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index e5622948ec..7896ff9692 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -15,9 +15,14 @@ import akka.AkkaApplication */ trait BootableActorLoaderService extends Bootable { - protected def createApplicationClassLoader(application: AkkaApplication): Option[ClassLoader] = Some({ - if (application.AkkaConfig.HOME.isDefined) { - val DEPLOY = application.AkkaConfig.HOME.get + "/deploy" + def app: AkkaApplication + + val BOOT_CLASSES = app.AkkaConfig.BOOT_CLASSES + lazy val applicationLoader = createApplicationClassLoader() + + protected def createApplicationClassLoader(): Option[ClassLoader] = Some({ + if (app.AkkaConfig.HOME.isDefined) { + val DEPLOY = app.AkkaConfig.HOME.get + "/deploy" val DEPLOY_DIR = new File(DEPLOY) if (!DEPLOY_DIR.exists) { System.exit(-1) @@ -41,11 +46,8 @@ trait BootableActorLoaderService extends Bootable { } else Thread.currentThread.getContextClassLoader }) - abstract override def onLoad(application: AkkaApplication) = { - super.onLoad(application) - - val BOOT_CLASSES = application.AkkaConfig.BOOT_CLASSES - val applicationLoader = createApplicationClassLoader(application) + abstract override def onLoad() = { + super.onLoad() applicationLoader foreach Thread.currentThread.setContextClassLoader @@ -54,13 +56,13 @@ trait BootableActorLoaderService extends Bootable { } } - abstract override def onUnload(application: AkkaApplication) = { - super.onUnload(application) - application.registry.local.shutdownAll + abstract override def onUnload() = { + super.onUnload() + app.registry.local.shutdownAll } } /** * Java API for the default JAX-RS/Mist Initializer */ -class DefaultBootableActorLoaderService extends BootableActorLoaderService +class DefaultBootableActorLoaderService(val app: AkkaApplication) extends BootableActorLoaderService diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 84c836d192..1161cad2e0 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -185,12 +185,12 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) } -abstract class RemoteSupport(val application: AkkaApplication) extends ListenerManagement with RemoteServerModule with RemoteClientModule { +abstract class RemoteSupport(val app: AkkaApplication) extends ListenerManagement with RemoteServerModule with RemoteClientModule { val eventHandler: ActorRef = { implicit object format extends StatelessActorFormat[RemoteEventHandler] val clazz = classOf[RemoteEventHandler] - val handler = new LocalActorRef(application, Props(clazz), clazz.getName, true) + val handler = new LocalActorRef(app, Props(clazz), clazz.getName, true) // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler @@ -243,16 +243,16 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒ * Starts the server up */ def start(): RemoteServerModule = - start(application.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, - application.reflective.RemoteModule.configDefaultAddress.getPort, + start(app.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, + app.reflective.RemoteModule.configDefaultAddress.getPort, None) /** * Starts the server up */ def start(loader: ClassLoader): RemoteServerModule = - start(application.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, - application.reflective.RemoteModule.configDefaultAddress.getPort, + start(app.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, + app.reflective.RemoteModule.configDefaultAddress.getPort, Option(loader)) /** @@ -333,10 +333,10 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒ def actorFor(address: String, hostname: String, port: Int): ActorRef = - actorFor(address, application.AkkaConfig.TimeoutMillis, hostname, port, None) + actorFor(address, app.AkkaConfig.TimeoutMillis, hostname, port, None) def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(address, application.AkkaConfig.TimeoutMillis, hostname, port, Some(loader)) + actorFor(address, app.AkkaConfig.TimeoutMillis, hostname, port, Some(loader)) def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef = actorFor(address, timeout, hostname, port, None) diff --git a/akka-actor/src/main/scala/akka/util/AkkaLoader.scala b/akka-actor/src/main/scala/akka/util/AkkaLoader.scala index 74704c95f4..943c9d7c59 100644 --- a/akka-actor/src/main/scala/akka/util/AkkaLoader.scala +++ b/akka-actor/src/main/scala/akka/util/AkkaLoader.scala @@ -22,7 +22,7 @@ class AkkaLoader(application: AkkaApplication) { def boot(withBanner: Boolean, b: Bootable): Unit = hasBooted switchOn { if (withBanner) printBanner() println("Starting Akka...") - b.onLoad(application) + b.onLoad() Thread.currentThread.setContextClassLoader(getClass.getClassLoader) _bundles = Some(b) println("Akka started successfully") @@ -34,7 +34,7 @@ class AkkaLoader(application: AkkaApplication) { def shutdown() { hasBooted switchOff { println("Shutting down Akka...") - _bundles.foreach(_.onUnload(application)) + _bundles.foreach(_.onUnload()) _bundles = None println("Akka succesfully shut down") } diff --git a/akka-actor/src/main/scala/akka/util/Bootable.scala b/akka-actor/src/main/scala/akka/util/Bootable.scala index 636fdf0356..2acadc80cc 100644 --- a/akka-actor/src/main/scala/akka/util/Bootable.scala +++ b/akka-actor/src/main/scala/akka/util/Bootable.scala @@ -6,6 +6,6 @@ package akka.util import akka.AkkaApplication trait Bootable { - def onLoad(application: AkkaApplication) {} - def onUnload(application: AkkaApplication) {} + def onLoad() {} + def onUnload() {} } diff --git a/akka-http/src/test/scala/config/ConfigSpec.scala b/akka-http/src/test/scala/config/ConfigSpec.scala index caa8610e4a..5423dd8aa7 100644 --- a/akka-http/src/test/scala/config/ConfigSpec.scala +++ b/akka-http/src/test/scala/config/ConfigSpec.scala @@ -14,7 +14,7 @@ class ConfigSpec extends AkkaSpec { "The default configuration file (i.e. akka-reference.conf)" should { "contain all configuration properties for akka-http that are used in code with their correct defaults" in { - import application.config._ + import app.config._ getBool("akka.http.connection-close") must equal(Some(true)) getString("akka.http.expired-header-name") must equal(Some("Async-Timeout")) getString("akka.http.hostname") must equal(Some("localhost")) diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index f214e12f52..8b3b05303e 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -15,15 +15,17 @@ import akka.event.EventHandler */ trait BootableRemoteActorService extends Bootable { self: BootableActorLoaderService ⇒ + + def settings: RemoteServerSettings protected lazy val remoteServerThread = new Thread(new Runnable() { - def run = Actor.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port + def run = app.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port }, "Akka RemoteModule Service") def startRemoteService() { remoteServerThread.start() } abstract override def onLoad() { - if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) { + if (app.reflective.ClusterModule.isEnabled && settings.isRemotingEnabled) { EventHandler.info(this, "Initializing Remote Actors Service...") startRemoteService() EventHandler.info(this, "Remote Actors Service initialized") @@ -34,7 +36,7 @@ trait BootableRemoteActorService extends Bootable { abstract override def onUnload() { EventHandler.info(this, "Shutting down Remote Actors Service") - Actor.remote.shutdown() + app.remote.shutdown() if (remoteServerThread.isAlive) remoteServerThread.join(1000) EventHandler.info(this, "Remote Actors Service has been shut down") super.onUnload() diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 2f8750264e..333cb6f3dc 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -6,20 +6,20 @@ package akka.remote import akka.remote.RemoteProtocol._ import akka.serialization.Serialization - import com.google.protobuf.ByteString +import akka.AkkaApplication object MessageSerializer { - def deserialize(messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { + def deserialize(app: AkkaApplication, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { val clazz = loadManifest(classLoader, messageProtocol) - Serialization.deserialize(messageProtocol.getMessage.toByteArray, + app.serialization.deserialize(messageProtocol.getMessage.toByteArray, clazz, classLoader).fold(x ⇒ throw x, o ⇒ o) } - def serialize(message: AnyRef): MessageProtocol = { + def serialize(app: AkkaApplication, message: AnyRef): MessageProtocol = { val builder = MessageProtocol.newBuilder - val bytes = Serialization.serialize(message).fold(x ⇒ throw x, b ⇒ b) + val bytes = app.serialization.serialize(message).fold(x ⇒ throw x, b ⇒ b) builder.setMessage(ByteString.copyFrom(bytes)) builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.build diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 18e09112bd..0ecfa4adab 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -5,12 +5,11 @@ package akka.remote import akka.dispatch.PinnedDispatcher - import scala.collection.mutable - import java.net.InetSocketAddress import akka.actor.{ LocalActorRef, Actor, ActorRef, Props, newUuid } import akka.actor.Actor._ +import akka.AkkaApplication /** * Stream of all kinds of network events, remote failure and connection events, cluster failure and connection events etc. @@ -34,7 +33,12 @@ object NetworkEventStream { trait Listener { def notify(event: RemoteLifeCycleEvent) } +} +class NetworkEventStream(val app: AkkaApplication) { + + import NetworkEventStream._ + /** * Channel actor with a registry of listeners. */ @@ -60,8 +64,8 @@ object NetworkEventStream { } } - private[akka] val channel = new LocalActorRef( - Props[Channel].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true) + private[akka] val channel = new LocalActorRef(app, + Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), newUuid.toString, systemService = true) /** * Registers a network event stream listener (asyncronously). diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c51fbabc91..9e8a62df41 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -17,24 +17,25 @@ import RemoteProtocol._ import RemoteDaemonMessageType._ import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } import Compression.LZF - import java.net.InetSocketAddress - import com.google.protobuf.ByteString +import akka.AkkaApplication /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * * @author Jonas Bonér */ -class RemoteActorRefProvider extends ActorRefProvider { +class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) extends ActorRefProvider { import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise + + implicit def _app = app private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] - private val failureDetector = new BannagePeriodFailureDetector(timeToBan = 60 seconds) // FIXME make timeToBan configurable + private val failureDetector = new BannagePeriodFailureDetector(remote, timeToBan = 60 seconds) // FIXME make timeToBan configurable def actorOf(props: Props, address: String): Option[ActorRef] = { Address.validate(address) @@ -44,11 +45,11 @@ class RemoteActorRefProvider extends ActorRefProvider { if (oldFuture eq null) { // we won the race -- create the actor and resolve the future val actor = try { - Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(remoteAddresses))) ⇒ + app.deployer.lookupDeploymentFor(address) match { + case Some(Deploy(_, _, router, nrOfInstances, _, app.deployment.RemoteScope(remoteAddresses))) ⇒ - val thisHostname = Remote.address.getHostName - val thisPort = Remote.address.getPort + val thisHostname = remote.address.getHostName + val thisPort = remote.address.getPort def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress ⇒ remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort @@ -56,7 +57,7 @@ class RemoteActorRefProvider extends ActorRefProvider { if (isReplicaNode) { // we are on one of the replica node for this remote actor - Some(new LocalActorRef(props, address, false)) // create a local actor + Some(new LocalActorRef(app, props, address, false)) // create a local actor } else { // we are on the single "reference" node uses the remote actors on the replica nodes @@ -89,12 +90,12 @@ class RemoteActorRefProvider extends ActorRefProvider { def provisionActorToNode(remoteAddress: RemoteAddress): RemoteActorRef = { val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) useActorOnNode(inetSocketAddress, address, props.creator) - RemoteActorRef(inetSocketAddress, address, Actor.TIMEOUT, None) + RemoteActorRef(app, app.remote, inetSocketAddress, address, None) } val connections: Iterable[ActorRef] = remoteAddresses map { provisionActorToNode(_) } - Some(Routing.actorOf(RoutedProps( + Some(app.routing.actorOf(RoutedProps( routerFactory = routerFactory, connections = connections))) } @@ -107,7 +108,7 @@ class RemoteActorRefProvider extends ActorRefProvider { throw e } - actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later + actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later newFuture completeWithResult actor actor @@ -131,10 +132,10 @@ class RemoteActorRefProvider extends ActorRefProvider { EventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress)) val actorFactoryBytes = - Serialization.serialize(actorFactory) match { + app.serialization.serialize(actorFactory) match { case Left(error) ⇒ throw error case Right(bytes) ⇒ - if (Remote.shouldCompressData) LZF.compress(bytes) + if (remote.shouldCompressData) LZF.compress(bytes) else bytes } @@ -145,8 +146,8 @@ class RemoteActorRefProvider extends ActorRefProvider { .build() val connectionFactory = - () ⇒ Remote.server.actorFor( - Remote.remoteDaemonServiceName, remoteAddress.getHostName, remoteAddress.getPort) + () ⇒ remote.server.actorFor( + remote.remoteDaemonServiceName, remoteAddress.getHostName, remoteAddress.getPort) // try to get the connection for the remote address, if not already there then create it val connection = failureDetector.putIfAbsent(remoteAddress, connectionFactory) @@ -161,7 +162,7 @@ class RemoteActorRefProvider extends ActorRefProvider { if (withACK) { try { - (connection ? (command, Remote.remoteDaemonAckTimeout)).as[Status] match { + (connection ? (command, remote.remoteDaemonAckTimeout)).as[Status] match { case Some(Success(receiver)) ⇒ EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(receiver)) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala index 993af29d99..57876fc0b9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala @@ -5,10 +5,14 @@ package akka.remote import akka.util.Duration -import akka.config.Config._ import akka.config.ConfigurationException +import akka.AkkaApplication -object RemoteClientSettings { +class RemoteClientSettings(val app: AkkaApplication) { + + import app.config + import app.AkkaConfig.TIME_UNIT + val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match { case "" ⇒ None case cookie ⇒ Some(cookie) @@ -21,7 +25,11 @@ object RemoteClientSettings { val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) } -object RemoteServerSettings { +class RemoteServerSettings(val app: AkkaApplication) { + + import app.config + import app.AkkaConfig.TIME_UNIT + val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "cluster") val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 437a3c288a..bd21eb3905 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -4,19 +4,16 @@ package akka.remote +import akka.AkkaApplication import akka.actor._ -import akka.actor.Actor._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } -import akka.config.Config -import akka.config.Config._ import akka.actor.Status._ import akka.util._ import akka.util.duration._ import akka.util.Helpers._ import akka.actor.DeploymentConfig._ import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } -import akka.serialization.ActorSerialization._ import Compression.LZF import RemoteProtocol._ import RemoteDaemonMessageType._ @@ -28,42 +25,50 @@ import com.eaio.uuid.UUID /** * @author Jonas Bonér */ -object Remote extends RemoteService { +class Remote(val app: AkkaApplication) extends RemoteService { + + import app.config + import app.AkkaConfig.TIME_UNIT + val shouldCompressData = config.getBool("akka.remote.use-compression", false) val remoteDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt - val hostname = Config.hostname - val port = Config.remoteServerPort + val hostname = app.hostname + val port = app.AkkaConfig.REMOTE_SERVER_PORT val remoteDaemonServiceName = "akka-remote-daemon".intern // FIXME configure computeGridDispatcher to what? - val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build + val computeGridDispatcher = app.dispatcherFactory.newDispatcher("akka:compute-grid").build - private[remote] lazy val remoteDaemonSupervisor = Supervisor( - OneForOneStrategy(List(classOf[Exception]), None, None)) // is infinite restart what we want? + private[remote] lazy val remoteDaemonSupervisor = app.createActor(Props( + OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want? private[remote] lazy val remoteDaemon = new LocalActorRef( - props = Props(new RemoteDaemon).withDispatcher(new PinnedDispatcher()).withSupervisor(remoteDaemonSupervisor), - address = Remote.remoteDaemonServiceName, + app, + props = Props(new RemoteDaemon(this)).withDispatcher(app.dispatcherFactory.newPinnedDispatcher("Remote")).withSupervisor(remoteDaemonSupervisor), + address = remoteDaemonServiceName, systemService = true) - private[remote] lazy val remoteClientLifeCycleHandler = actorOf(Props(new Actor { + private[remote] lazy val remoteClientLifeCycleHandler = app.createActor(Props(new Actor { def receive = { case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() case _ ⇒ //ignore other } }), "akka.cluster.RemoteClientLifeCycleListener") + + lazy val eventStream = new NetworkEventStream(app) lazy val server: RemoteSupport = { - val remote = new akka.remote.netty.NettyRemoteSupport + val remote = new akka.remote.netty.NettyRemoteSupport(app) remote.start(hostname, port) - remote.register(Remote.remoteDaemonServiceName, remoteDaemon) - remote.addListener(NetworkEventStream.channel) + remote.register(remoteDaemonServiceName, remoteDaemon) + remote.addListener(eventStream.channel) remote.addListener(remoteClientLifeCycleHandler) - Actor.provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) + // TODO actually register this provider in application in remote mode + //app.provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) remote } @@ -90,18 +95,18 @@ object Remote extends RemoteService { * * @author Jonas Bonér */ -class RemoteDaemon extends Actor { - - import Remote._ - +class RemoteDaemon(val remote: Remote) extends Actor { + + import remote._ + override def preRestart(reason: Throwable, msg: Option[Any]) { EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason)) } - def receive: Receive = { + def receive: Actor.Receive = { case message: RemoteDaemonMessageProtocol ⇒ EventHandler.debug(this, - "Received command [\n%s] to RemoteDaemon on [%s]".format(message, address)) + "Received command [\n%s] to RemoteDaemon on [%s]".format(message, app.nodename)) message.getMessageType match { case USE ⇒ handleUse(message) @@ -130,15 +135,15 @@ class RemoteDaemon extends Actor { else message.getPayload.toByteArray val actorFactory = - Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { + app.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] } val actorAddress = message.getActorAddress - val newActorRef = actorOf(Props(creator = actorFactory), actorAddress) + val newActorRef = app.createActor(Props(creator = actorFactory), actorAddress) - Remote.server.register(actorAddress, newActorRef) + remote.server.register(actorAddress, newActorRef) } else { EventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message)) @@ -168,7 +173,7 @@ class RemoteDaemon extends Actor { } def handle_fun0_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { - new LocalActorRef( + new LocalActorRef(app, Props( context ⇒ { case f: Function0[_] ⇒ try { f() } finally { context.self.stop() } @@ -176,7 +181,7 @@ class RemoteDaemon extends Actor { } def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { - new LocalActorRef( + new LocalActorRef(app, Props( context ⇒ { case f: Function0[_] ⇒ try { reply(f()) } finally { context.self.stop() } @@ -184,7 +189,7 @@ class RemoteDaemon extends Actor { } def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { - new LocalActorRef( + new LocalActorRef(app, Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() } @@ -192,7 +197,7 @@ class RemoteDaemon extends Actor { } def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { - new LocalActorRef( + new LocalActorRef(app, Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { reply(fun.asInstanceOf[Any ⇒ Any](param)) } finally { context.self.stop() } @@ -205,7 +210,7 @@ class RemoteDaemon extends Actor { } private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { - Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { + app.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[T] } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala index 02601be601..42347d9fc6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala @@ -10,11 +10,9 @@ import akka.routing._ import akka.dispatch.PinnedDispatcher import akka.event.EventHandler import akka.util.{ ListenerManagement, Duration } - import scala.collection.immutable.Map import scala.collection.mutable import scala.annotation.tailrec - import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import System.{ currentTimeMillis ⇒ newTimestamp } @@ -24,7 +22,7 @@ import System.{ currentTimeMillis ⇒ newTimestamp } * * @author Jonas Bonér */ -abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) +abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector with NetworkEventStream.Listener { @@ -41,7 +39,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre protected val state: AtomicReference[State] = new AtomicReference[State](newState()) // register all initial connections - e.g listen to events from them - initialConnections.keys foreach (NetworkEventStream.register(this, _)) + initialConnections.keys foreach (remote.eventStream.register(this, _)) /** * State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field. @@ -135,7 +133,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre remove(faultyConnection) // recur } else { EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) - NetworkEventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it + remote.eventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it } } } @@ -163,23 +161,23 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre } else { // we succeeded EventHandler.debug(this, "Adding connection [%s]".format(address)) - NetworkEventStream.register(this, address) // register the connection - e.g listen to events from it + remote.eventStream.register(this, address) // register the connection - e.g listen to events from it newConnection // return new connection actor } } } private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { - RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) + RemoteActorRef(remote.app, remote.server, inetSocketAddress, actorAddress, None) } } /** * Simple failure detector that removes the failing connection permanently on first error. */ -class RemoveConnectionOnFirstFailureRemoteFailureDetector( +class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) { + extends RemoteFailureDetectorBase(_remote, initialConnections) { protected def newState() = State(Long.MinValue, initialConnections) @@ -215,10 +213,10 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector( * * @author Jonas Bonér */ -class BannagePeriodFailureDetector( +class BannagePeriodFailureDetector(_remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], timeToBan: Duration) - extends RemoteFailureDetectorBase(initialConnections) { + extends RemoteFailureDetectorBase(_remote, initialConnections) { // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 888291e9cc..d7af799459 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -6,16 +6,10 @@ package akka.remote.netty import akka.actor.{ ActorRef, Uuid, newUuid, uuidFrom, IllegalActorStateException, RemoteActorRef, PoisonPill, RemoteActorSystemMessage, AutoReceivedMessage } import akka.dispatch.{ ActorPromise, DefaultPromise, Promise } -import akka.serialization.RemoteActorSerialization -import akka.serialization.RemoteActorSerialization._ import akka.remote._ import RemoteProtocol._ -import akka.actor.Actor._ -import akka.config.Config -import akka.config.Config._ import akka.util._ import akka.event.EventHandler - import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory @@ -27,14 +21,14 @@ import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } - import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ - import java.net.InetSocketAddress import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException +import akka.AkkaApplication +import akka.serialization.RemoteActorSerialization class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); @@ -55,21 +49,23 @@ object RemoteEncoder { } trait NettyRemoteClientModule extends RemoteClientModule { - self: ListenerManagement ⇒ + self: RemoteSupport ⇒ + private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val remoteActors = new Index[RemoteAddress, Uuid] private val lock = new ReadWriteGuard + def app: AkkaApplication + protected[akka] def send[T](message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef, - loader: Option[ClassLoader]): Option[Promise[T]] = + senderOption: Option[ActorRef], + senderFuture: Option[Promise[T]], + remoteAddress: InetSocketAddress, + isOneWay: Boolean, + actorRef: ActorRef, + loader: Option[ClassLoader]): Option[Promise[T]] = withClientFor(remoteAddress, loader) { client ⇒ - client.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef) + client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef) } private[akka] def withClientFor[T]( @@ -89,7 +85,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) + val client = new ActiveRemoteClient(app, this, address, loader, self.notifyListeners _) client.connect() remoteClients += key -> client client @@ -144,9 +140,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { * reuses an already established connection. */ abstract class RemoteClient private[akka] ( + val app: AkkaApplication, val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { + import app.config + implicit def _app = app + val serialization = new RemoteActorSerialization(app) + val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false) val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) @@ -157,7 +158,7 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] - else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) + new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) } private[remote] val runSwitch = new Switch() @@ -180,7 +181,7 @@ abstract class RemoteClient private[akka] ( val iter = pendingRequests.iterator while (iter.hasNext) { val (_, _, message) = iter.next - messages = messages :+ MessageSerializer.deserialize(message.getMessage) + messages = messages :+ MessageSerializer.deserialize(app, message.getMessage) } messages.toArray } @@ -193,11 +194,10 @@ abstract class RemoteClient private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, - timeout: Long, isOneWay: Boolean, actorRef: ActorRef): Option[Promise[T]] = { - val messageProtocol = createRemoteMessageProtocolBuilder( - Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build + val messageProtocol = serialization.createRemoteMessageProtocolBuilder( + Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.TimeoutMillis, Right(message), isOneWay, senderOption).build send(messageProtocol, senderFuture) } @@ -318,13 +318,15 @@ abstract class RemoteClient private[akka] ( * @author Jonas Bonér */ class ActiveRemoteClient private[akka] ( + _app: AkkaApplication, module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) - extends RemoteClient(module, remoteAddress) { + extends RemoteClient(_app, module, remoteAddress) { - import RemoteClientSettings._ + val settings = new RemoteClientSettings(app) + import settings._ //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile @@ -381,7 +383,7 @@ class ActiveRemoteClient private[akka] ( timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, futures, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -412,7 +414,7 @@ class ActiveRemoteClient private[akka] ( } } } - }, RemoteClientSettings.REAP_FUTURES_DELAY.length, RemoteClientSettings.REAP_FUTURES_DELAY.unit) + }, REAP_FUTURES_DELAY.length, REAP_FUTURES_DELAY.unit) notifyListeners(RemoteClientStarted(module, remoteAddress)) true } @@ -465,20 +467,24 @@ class ActiveRemoteClient private[akka] ( * @author Jonas Bonér */ class ActiveRemoteClientPipelineFactory( + app: AkkaApplication, + val settings: RemoteClientSettings, name: String, futures: ConcurrentMap[Uuid, Promise[_]], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, client: ActiveRemoteClient) extends ChannelPipelineFactory { + + import settings._ def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit) - val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val timeout = new ReadTimeoutHandler(timer, READ_TIMEOUT.length, READ_TIMEOUT.unit) + val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(app, settings, name, futures, bootstrap, remoteAddress, timer, client) new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient) } @@ -489,6 +495,8 @@ class ActiveRemoteClientPipelineFactory( */ @ChannelHandler.Sharable class ActiveRemoteClientHandler( + val app: AkkaApplication, + val settings: RemoteClientSettings, val name: String, val futures: ConcurrentMap[Uuid, Promise[_]], val bootstrap: ClientBootstrap, @@ -497,13 +505,15 @@ class ActiveRemoteClientHandler( val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { + implicit def _app = app + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { event.getMessage match { case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ val rcp = arp.getInstruction rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ spawn { + case CommandType.SHUTDOWN ⇒ akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) } } @@ -521,7 +531,7 @@ class ActiveRemoteClientHandler( case future ⇒ if (reply.hasMessage) { - val message = MessageSerializer.deserialize(reply.getMessage) + val message = MessageSerializer.deserialize(app, reply.getMessage) future.completeWithResult(message) } else { future.completeWithException(parseException(reply, client.loader)) @@ -547,8 +557,8 @@ class ActiveRemoteClientHandler( client.connect(reconnectIfAlreadyConnected = true) } } - }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) - } else spawn { + }, settings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) + } else akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) // spawn in another thread } } @@ -578,7 +588,7 @@ class ActiveRemoteClientHandler( cause match { case e: ReadTimeoutException ⇒ - spawn { + akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) // spawn in another thread } case e: Exception ⇒ @@ -610,7 +620,7 @@ class ActiveRemoteClientHandler( /** * Provides the implementation of the Netty remote support */ -class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { +class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with NettyRemoteServerModule with NettyRemoteClientModule { // Needed for remote testing and switching on/off under run val optimizeLocal = new AtomicBoolean(true) @@ -639,13 +649,16 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with EventHandler.debug(this, "Creating RemoteActorRef with address [%s] connected to [%s]" .format(actorAddress, remoteInetSocketAddress)) - RemoteActorRef(remoteInetSocketAddress, actorAddress, timeout, loader) + RemoteActorRef(app, app.remote, remoteInetSocketAddress, actorAddress, loader) } } -class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { - - import RemoteServerSettings._ +class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { + + val settings = new RemoteServerSettings(app) + import settings._ + + val serialization = new RemoteActorSerialization(app) val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) @@ -664,13 +677,13 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, serverModule) + val pipelineFactory = new RemoteServerPipelineFactory(settings, serialization, name, openChannels, executor, loader, serverModule) bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", RemoteServerSettings.BACKLOG) + bootstrap.setOption("backlog", BACKLOG) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT.toMillis) + bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis) openChannels.add(bootstrap.bind(address)) serverModule.notifyListeners(RemoteServerStarted(serverModule)) @@ -680,8 +693,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) - if (RemoteClientSettings.SECURE_COOKIE.nonEmpty) - b.setCookie(RemoteClientSettings.SECURE_COOKIE.get) + if (SECURE_COOKIE.nonEmpty) + b.setCookie(SECURE_COOKIE.get) b.build } openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly @@ -698,19 +711,21 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, } trait NettyRemoteServerModule extends RemoteServerModule { - self: RemoteModule ⇒ + self: RemoteSupport ⇒ + + def app: AkkaApplication private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { case Some(server) ⇒ server.address - case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress + case None ⇒ app.reflective.RemoteModule.configDefaultAddress } def name = currentServer.get match { case Some(server) ⇒ server.name case None ⇒ - val a = ReflectiveAccess.RemoteModule.configDefaultAddress + val a = app.reflective.RemoteModule.configDefaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort } @@ -723,7 +738,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { _isRunning switchOn { EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) - currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) + currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader))) } } catch { case e: Exception ⇒ @@ -826,13 +841,15 @@ trait NettyRemoteServerModule extends RemoteServerModule { * @author Jonas Bonér */ class RemoteServerPipelineFactory( + val settings: RemoteServerSettings, + val serialization: RemoteActorSerialization, val name: String, val openChannels: ChannelGroup, val executor: ExecutionHandler, val loader: Option[ClassLoader], val server: NettyRemoteServerModule) extends ChannelPipelineFactory { - import RemoteServerSettings._ + import settings._ def getPipeline: ChannelPipeline = { val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) @@ -841,7 +858,7 @@ class RemoteServerPipelineFactory( val protobufEnc = new ProtobufEncoder val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil - val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) + val remoteServer = new RemoteServerHandler(settings, serialization, name, openChannels, loader, server) val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } @@ -878,12 +895,16 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si */ @ChannelHandler.Sharable class RemoteServerHandler( + val settings: RemoteServerSettings, + val serialization: RemoteActorSerialization, val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { - import RemoteServerSettings._ + import settings._ + + implicit def app = server.app // applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY @@ -1000,9 +1021,9 @@ class RemoteServerHandler( return } - val message = MessageSerializer.deserialize(request.getMessage) + val message = MessageSerializer.deserialize(app, request.getMessage) val sender = - if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) + if (request.hasSender) Some(serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None message match { @@ -1023,7 +1044,7 @@ class RemoteServerHandler( onComplete(_.value.get match { case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) case r: Right[_, _] ⇒ - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + val messageBuilder = serialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), actorInfo.getAddress, @@ -1059,7 +1080,7 @@ class RemoteServerHandler( EventHandler.debug(this, "Looking up a remotely available actor for address [%s] on node [%s]" - .format(address, Config.nodename)) + .format(address, app.nodename)) val byAddress = server.actors.get(address) // try actor-by-address if (byAddress eq null) { @@ -1102,7 +1123,7 @@ class RemoteServerHandler( private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { val actorInfo = request.getActorInfo - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + val messageBuilder = serialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), actorInfo.getAddress, diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 402ba173ec..eb29c13484 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -11,6 +11,7 @@ import akka.util.{ ReflectiveAccess, Duration } import akka.event.EventHandler import akka.remote.{ RemoteProtocol, RemoteClientSettings, MessageSerializer } import RemoteProtocol._ +import akka.AkkaApplication import scala.collection.immutable.Stack @@ -24,8 +25,10 @@ import com.eaio.uuid.UUID /** * Module for local actor serialization. */ -object ActorSerialization { +class ActorSerialization(val app: AkkaApplication) { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default + + val remoteActorSerialization = new RemoteActorSerialization(app) def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) @@ -67,7 +70,7 @@ object ActorSerialization { val builder = SerializedActorRefProtocol.newBuilder .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) .setAddress(actorRef.address) - .setTimeout(actorRef.timeout) + .setTimeout(app.AkkaConfig.TimeoutMillis) replicationScheme match { case _: Transient | Transient ⇒ @@ -97,11 +100,11 @@ object ActorSerialization { while (it.hasNext) l += it.next.asInstanceOf[Envelope] l map { m ⇒ - RemoteActorSerialization.createRemoteMessageProtocolBuilder( + remoteActorSerialization.createRemoteMessageProtocolBuilder( Option(m.receiver.ref), Left(actorRef.uuid), actorRef.address, - actorRef.timeout, + app.AkkaConfig.TimeoutMillis, Right(m.message), false, m.channel match { @@ -116,7 +119,7 @@ object ActorSerialization { l.underlying.receiveTimeout.foreach(builder.setReceiveTimeout(_)) val actorInstance = l.underlyingActorInstance - Serialization.serialize(actorInstance.asInstanceOf[T]) match { + app.serialization.serialize(actorInstance.asInstanceOf[T]) match { case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes)) case Left(exception) ⇒ throw new Exception("Error serializing : " + actorInstance.getClass.getName) } @@ -167,7 +170,7 @@ object ActorSerialization { val storedHotswap = try { - Serialization.deserialize( + app.serialization.deserialize( protocol.getHotswapStack.toByteArray, classOf[Stack[PartialFunction[Any, Unit]]], loader) match { @@ -179,14 +182,14 @@ object ActorSerialization { } val storedSupervisor = - if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) + if (protocol.hasSupervisor) Some(remoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) else None val classLoader = loader.getOrElse(this.getClass.getClassLoader) val bytes = protocol.getActorInstance.toByteArray val actorClass = classLoader.loadClass(protocol.getActorClassname) val factory = () ⇒ { - Serialization.deserialize(bytes, actorClass, loader) match { + app.serialization.deserialize(bytes, actorClass, loader) match { case Right(r) ⇒ r.asInstanceOf[Actor] case Left(ex) ⇒ throw new Exception("Cannot de-serialize : " + actorClass) } @@ -198,7 +201,7 @@ object ActorSerialization { } val props = Props(creator = factory, - timeout = if (protocol.hasTimeout) protocol.getTimeout else Timeout.default, + timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.TIMEOUT, supervisor = storedSupervisor //TODO what dispatcher should it use? //TODO what faultHandler should it use? // @@ -206,20 +209,20 @@ object ActorSerialization { val receiveTimeout = if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None //TODO FIXME, I'm expensive and slow - val ar = new LocalActorRef(actorUuid, protocol.getAddress, props, receiveTimeout, storedHotswap) + val ar = new LocalActorRef(app, props, protocol.getAddress, false, actorUuid, receiveTimeout, storedHotswap) //Deserialize messages { val iterator = protocol.getMessagesList.iterator() while (iterator.hasNext()) - ar ! MessageSerializer.deserialize(iterator.next().getMessage, Some(classLoader)) //TODO This is broken, why aren't we preserving the sender? + ar ! MessageSerializer.deserialize(app, iterator.next().getMessage, Some(classLoader)) //TODO This is broken, why aren't we preserving the sender? } ar } } -object RemoteActorSerialization { +class RemoteActorSerialization(val app: AkkaApplication) { /** * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. @@ -240,9 +243,9 @@ object RemoteActorSerialization { EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) val ref = RemoteActorRef( + app, app.remote, JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], protocol.getAddress, - protocol.getTimeout, loader) EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) @@ -258,10 +261,10 @@ object RemoteActorSerialization { case ar: RemoteActorRef ⇒ ar.remoteAddress case ar: LocalActorRef ⇒ - Actor.remote.registerByUuid(ar) - ReflectiveAccess.RemoteModule.configDefaultAddress + app.remote.registerByUuid(ar) + app.reflective.RemoteModule.configDefaultAddress case _ ⇒ - ReflectiveAccess.RemoteModule.configDefaultAddress + app.reflective.RemoteModule.configDefaultAddress } EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) @@ -269,7 +272,7 @@ object RemoteActorSerialization { RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setAddress(actor.address) - .setTimeout(actor.timeout) + .setTimeout(app.AkkaConfig.TimeoutMillis) .build } @@ -303,7 +306,7 @@ object RemoteActorSerialization { message match { case Right(message) ⇒ - messageBuilder.setMessage(MessageSerializer.serialize(message.asInstanceOf[AnyRef])) + messageBuilder.setMessage(MessageSerializer.serialize(app, message.asInstanceOf[AnyRef])) case Left(exception) ⇒ messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index d81e830116..b81c9c24f3 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -4,19 +4,17 @@ package akka.remote -import org.scalatest.{ Spec, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } -import org.scalatest.matchers.MustMatchers -import org.scalatest.junit.JUnitRunner - -import org.junit.runner.RunWith +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.remote.netty.NettyRemoteSupport import akka.actor.{ Actor, ActorRegistry } +import akka.testkit.AkkaSpec +import akka.dispatch.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.atomic.AtomicBoolean -trait NetworkFailureSpec { self: WordSpec ⇒ +trait NetworkFailureSpec { self: AkkaSpec ⇒ import Actor._ import akka.util.Duration @@ -25,7 +23,7 @@ trait NetworkFailureSpec { self: WordSpec ⇒ val PortRang = "1024-65535" def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { - spawn { + Future { try { enableTcpReset() println("===>>> Reply with [TCP RST] for [" + duration + "]") @@ -40,7 +38,7 @@ trait NetworkFailureSpec { self: WordSpec ⇒ } def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = { - spawn { + Future { try { enableNetworkThrottling() println("===>>> Throttling network with [" + BytesPerSecond + ", " + DelayMillis + "] for [" + duration + "]") @@ -55,7 +53,7 @@ trait NetworkFailureSpec { self: WordSpec ⇒ } def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = { - spawn { + Future { try { enableNetworkDrop() println("===>>> Blocking network [TCP DENY] for [" + duration + "]") diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 80ce01141c..d8013801b2 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -1,61 +1,55 @@ package akka.serialization -import org.scalatest.WordSpec -import org.scalatest.matchers.ShouldMatchers import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - import com.google.protobuf.Message - -import akka.serialization.ActorSerialization._ import akka.actor._ -import Actor._ -import SerializeSpec._ +import akka.testkit.AkkaSpec +import akka.serialization.SerializeSpec.Person case class MyMessage(id: Long, name: String, status: Boolean) -@RunWith(classOf[JUnitRunner]) -class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfterAll { +class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { + + val serialization = new ActorSerialization(app) - "Serializable actor" should { - "should be able to serialize and de-serialize a stateful actor with a given serializer" ignore { + "Serializable actor" must { + "must be able to serialize and de-serialize a stateful actor with a given serializer" ignore { - val actor1 = new LocalActorRef(Props[MyJavaSerializableActor], newUuid.toString, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], newUuid.toString, systemService = true) - (actor1 ? "hello").get should equal("world 1") - (actor1 ? "hello").get should equal("world 2") + (actor1 ? "hello").get must equal("world 1") + (actor1 ? "hello").get must equal("world 2") - val bytes = toBinary(actor1) - val actor2 = fromBinary(bytes).asInstanceOf[LocalActorRef] - (actor2 ? "hello").get should equal("world 3") + val bytes = serialization.toBinary(actor1) + val actor2 = serialization.fromBinary(bytes).asInstanceOf[LocalActorRef] + (actor2 ? "hello").get must equal("world 3") - actor2.underlying.receiveTimeout should equal(Some(1000)) + actor2.underlying.receiveTimeout must equal(Some(1000)) actor1.stop() actor2.stop() } - "should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore { + "must be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore { - val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true) for (i ← 1 to 10) actor1 ! "hello" - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] + actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - (actor2 ? "hello-reply").get should equal("world") + actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + (actor2 ? "hello-reply").get must equal("world") - val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] + val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0) - (actor3 ? "hello-reply").get should equal("world") + actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) + (actor3 ? "hello-reply").get must equal("world") } - "should be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore { + "must be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore { val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) - val actor1 = new LocalActorRef(Props[PersonActorWithMessagesInMailbox], newUuid.toString, systemService = true) + val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], newUuid.toString, systemService = true) (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) @@ -66,53 +60,55 @@ class ActorSerializeSpec extends WordSpec with ShouldMatchers with BeforeAndAfte (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] + actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - (actor2 ? "hello-reply").get should equal("hello") + actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + (actor2 ? "hello-reply").get must equal("hello") - val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] + val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0) - (actor3 ? "hello-reply").get should equal("hello") + actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) + (actor3 ? "hello-reply").get must equal("hello") } } - "serialize protobuf" should { - "should serialize" ignore { + "serialize protobuf" must { + "must serialize" ignore { val msg = MyMessage(123, "debasish ghosh", true) - import akka.serialization.Serialization._ - val b = serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match { + + val ser = new Serialization(app) + + val b = ser.serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match { case Left(exception) ⇒ fail(exception) case Right(bytes) ⇒ bytes } - val in = deserialize(b, classOf[ProtobufProtocol.MyMessage], None) match { + val in = ser.deserialize(b, classOf[ProtobufProtocol.MyMessage], None) match { case Left(exception) ⇒ fail(exception) case Right(i) ⇒ i } val m = in.asInstanceOf[ProtobufProtocol.MyMessage] - MyMessage(m.getId, m.getName, m.getStatus) should equal(msg) + MyMessage(m.getId, m.getName, m.getStatus) must equal(msg) } } "serialize actor that accepts protobuf message" ignore { - "should serialize" ignore { + "must serialize" ignore { - val actor1 = new LocalActorRef(Props[MyActorWithProtobufMessagesInMailbox], newUuid.toString, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], newUuid.toString, systemService = true) val msg = MyMessage(123, "debasish ghosh", true) val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build for (i ← 1 to 10) actor1 ! b - actor1.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] + actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + val actor2 = serialization.fromBinary(serialization.toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.underlying.dispatcher.mailboxSize(actor1.underlying) should be > (0) - (actor2 ? "hello-reply").get should equal("world") + actor2.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) + (actor2 ? "hello-reply").get must equal("world") - val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] + val actor3 = serialization.fromBinary(serialization.toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.underlying.dispatcher.mailboxSize(actor1.underlying) should equal(0) - (actor3 ? "hello-reply").get should equal("world") + actor3.underlying.dispatcher.mailboxSize(actor1.underlying) must equal(0) + (actor3 ? "hello-reply").get must equal("world") } } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index c655650dad..0dcf33e401 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -1,5 +1,6 @@ package sample.fsm.buncher +import akka.actor.ActorRefFactory import scala.reflect.ClassManifest import akka.util.Duration import akka.actor.{ FSM, Actor, ActorRef } @@ -81,10 +82,6 @@ object Buncher { val Stop = GenericBuncher.Stop // make special message objects visible for Buncher clients val Flush = GenericBuncher.Flush - - def apply[A: Manifest](singleTimeout: Duration, - multiTimeout: Duration) = - Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout)) } class Buncher[A: Manifest](singleTimeout: Duration, multiTimeout: Duration) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index f552702c86..4bd6baafb2 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -4,8 +4,8 @@ package sample.fsm.dining.become //http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/ import akka.actor.{ Scheduler, ActorRef, Actor } -import akka.actor.Actor._ import java.util.concurrent.TimeUnit +import akka.AkkaApplication /* * First we define our messages, they basically speak for themselves @@ -123,13 +123,14 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { * Alright, here's our test-harness */ object DiningHakkers { + val app = AkkaApplication() def run { //Create 5 chopsticks - val chopsticks = for (i ← 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)) + val chopsticks = for (i ← 1 to 5) yield app.createActor(new Chopstick("Chopstick " + i)) //Create 5 awesome hakkers and assign them their left and right chopstick val hakkers = for { (name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex - } yield actorOf(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5))) + } yield app.createActor(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5))) //Signal all hakkers that they should start thinking, and watch the show hakkers.foreach(_ ! Think) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index b59966c5ae..43dfac43b8 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -2,9 +2,9 @@ package sample.fsm.dining.fsm import akka.actor.{ ActorRef, Actor, FSM, UntypedChannel, NullChannel } import akka.actor.FSM._ -import akka.actor.Actor._ import akka.util.Duration import akka.util.duration._ +import akka.AkkaApplication /* * Some messages for the chopstick @@ -163,14 +163,16 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit * Alright, here's our test-harness */ object DiningHakkersOnFsm { + + val app = AkkaApplication() def run = { // Create 5 chopsticks - val chopsticks = for (i ← 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)) + val chopsticks = for (i ← 1 to 5) yield app.createActor(new Chopstick("Chopstick " + i)) // Create 5 awesome fsm hakkers and assign them their left and right chopstick val hakkers = for { (name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex - } yield actorOf(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5))) + } yield app.createActor(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5))) hakkers.foreach(_ ! Think) } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index b05e1e800f..52fa0d7cda 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -4,7 +4,6 @@ package akka.tutorial.first.java; -import static akka.actor.Actors.actorOf; import static akka.actor.Actors.poisonPill; import static java.util.Arrays.asList; @@ -20,7 +19,11 @@ import scala.collection.JavaConversions; import java.util.LinkedList; import java.util.concurrent.CountDownLatch; +import akka.AkkaApplication; + public class Pi { + + private static final AkkaApplication app = new AkkaApplication(); public static void main(String[] args) throws Exception { Pi pi = new Pi(); @@ -105,11 +108,11 @@ public class Pi { LinkedList workers = new LinkedList(); for (int i = 0; i < nrOfWorkers; i++) { - ActorRef worker = actorOf(Worker.class, "worker"); + ActorRef worker = app.createActor(Worker.class); workers.add(worker); } - router = Routing.actorOf(RoutedProps.apply().withRoundRobinRouter().withConnections(workers), "pi"); + router = app.routing().actorOf(RoutedProps.apply().withRoundRobinRouter().withConnections(workers), "pi"); } // message handler @@ -163,11 +166,11 @@ public class Pi { final CountDownLatch latch = new CountDownLatch(1); // create the master - ActorRef master = actorOf(new UntypedActorFactory() { + ActorRef master = app.createActor(new UntypedActorFactory() { public UntypedActor create() { return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); } - }, "master"); + }); // start the calculation master.tell(new Calculate()); diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index eb5db541c9..8207498f35 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -9,8 +9,11 @@ import Actor._ import java.util.concurrent.CountDownLatch import akka.routing.Routing.Broadcast import akka.routing.{ RoutedProps, Routing } +import akka.AkkaApplication object Pi extends App { + + val app = AkkaApplication() calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) @@ -55,10 +58,10 @@ object Pi extends App { var start: Long = _ // create the workers - val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) + val workers = Vector.fill(nrOfWorkers)(app.createActor[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") + val router = app.routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") // message handler def receive = { @@ -101,7 +104,7 @@ object Pi extends App { val latch = new CountDownLatch(1) // create the master - val master = actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) + val master = app.createActor(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)) // start the calculation master ! Calculate