diff --git a/.gitignore b/.gitignore index c04e7c86d8..857686b6aa 100755 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,4 @@ Makefile akka.sublime-project akka.sublime-workspace .target +.multi-jvm diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index a8b6c24f8b..72105b455b 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -11,11 +11,6 @@ public class JavaAPI { private AkkaApplication app = new AkkaApplication(); - @Test void mustBeAbleToUseUntypedActor() { - final RemoteSupport remote = app.remote(); - assertNotNull(remote); - } - @Test void mustBeAbleToCreateActorRefFromClass() { ActorRef ref = app.createActor(JavaAPITestActor.class); assertNotNull(ref); diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 196f31d8a1..703e74c90b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -281,7 +281,6 @@ class ActorRefSpec extends AkkaSpec { } "must throw exception on deserialize if not present in local registry and remoting is not enabled" in { - app.reflective.RemoteModule.isEnabled must be === false val latch = new CountDownLatch(1) val a = createActor(new InnerActor { override def postStop { @@ -290,7 +289,7 @@ class ActorRefSpec extends AkkaSpec { } }) - val inetAddress = app.reflective.RemoteModule.configDefaultAddress + val inetAddress = app.defaultAddress val expectedSerializedRepresentation = SerializedActorRef( a.uuid, @@ -315,7 +314,7 @@ class ActorRefSpec extends AkkaSpec { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) (intercept[java.lang.IllegalStateException] { in.readObject - }).getMessage must be === "Trying to deserialize ActorRef [" + expectedSerializedRepresentation + "] but it's not found in the local registry and remoting is not enabled." + }).getMessage must be === "Could not deserialize ActorRef" } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 62b3c9620f..a19b4fbca0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -22,7 +22,7 @@ class DeployerSpec extends AkkaSpec { RoundRobin, NrOfInstances(3), BannagePeriodFailureDetector(10 seconds), - app.deployer.deploymentConfig.RemoteScope(List( + RemoteScope(List( RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552)))))) // ClusterScope( // List(Node("node1")), diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 7981088aae..9987a2dfcd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -38,19 +38,19 @@ class LocalActorRefProviderSpec extends AkkaSpec { val address = "new-actor" + i spawn { - a1 = provider.actorOf(Props(creator = () ⇒ new NewActor), address) + a1 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) latch.countDown() } spawn { - a2 = provider.actorOf(Props(creator = () ⇒ new NewActor), address) + a2 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) latch.countDown() } spawn { - a3 = provider.actorOf(Props(creator = () ⇒ new NewActor), address) + a3 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) latch.countDown() } spawn { - a4 = provider.actorOf(Props(creator = () ⇒ new NewActor), address) + a4 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) latch.countDown() } @@ -61,7 +61,7 @@ class LocalActorRefProviderSpec extends AkkaSpec { a3.isDefined must be(true) a4.isDefined must be(true) (a1 == a2) must be(true) - (a1 == a2) must be(true) + (a1 == a3) must be(true) (a1 == a4) must be(true) } } diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index b5dc68c65d..b064630107 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -19,6 +19,7 @@ import akka.serialization.Serialization import akka.event.EventHandler import akka.event.EventHandlerLogging import akka.event.Logging +import java.net.InetSocketAddress object AkkaApplication { @@ -82,6 +83,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor object AkkaConfig { import config._ val ConfigVersion = getString("akka.version", Version) + + val ProviderClass = getString("akka.actor.provider", "akka.actor.LocalActorRefProvider") + val DefaultTimeUnit = getString("akka.time-unit", "seconds") val ActorTimeout = Timeout(Duration(getInt("akka.actor.timeout", 5), DefaultTimeUnit)) val ActorTimeoutMillis = ActorTimeout.duration.toMillis @@ -127,6 +131,30 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor import AkkaConfig._ + if (ConfigVersion != Version) + throw new ConfigurationException("Akka JAR version [" + Version + + "] does not match the provided config version [" + ConfigVersion + "]") + + val startTime = System.currentTimeMillis + def uptime = (System.currentTimeMillis - startTime) / 1000 + + val nodename: String = System.getProperty("akka.cluster.nodename") match { + case null | "" ⇒ new UUID().toString + case value ⇒ value + } + + val hostname: String = System.getProperty("akka.remote.hostname") match { + case null | "" ⇒ InetAddress.getLocalHost.getHostName + case value ⇒ value + } + + val port: Int = System.getProperty("akka.remote.port") match { + case null | "" ⇒ AkkaConfig.RemoteServerPort + case value ⇒ value.toInt + } + + val defaultAddress = new InetSocketAddress(hostname, AkkaConfig.RemoteServerPort) + if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") @@ -134,46 +162,21 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) + implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + val eventHandler = new EventHandler(this) val log: Logging = new EventHandlerLogging(eventHandler, this) - val startTime = System.currentTimeMillis - def uptime = (System.currentTimeMillis - startTime) / 1000 - - val nodename = System.getProperty("akka.cluster.nodename") match { - case null | "" ⇒ new UUID().toString - case value ⇒ value - } - - val hostname = System.getProperty("akka.remote.hostname") match { - case null | "" ⇒ InetAddress.getLocalHost.getHostName - case value ⇒ value - } - - implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + val reflective = new ReflectiveAccess(this) // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor val deployer = new Deployer(this) // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor - val provider: ActorRefProvider = new LocalActorRefProvider(this, deployer) - - /** - * Handle to the ActorRegistry. - * TODO: delete me! - */ - // val registry = new ActorRegistry - - // TODO check memory consistency issues - val reflective = new ReflectiveAccess(this) - - // val routing = new Routing(this) - - val remote = reflective.RemoteModule.defaultRemoteSupport map (_.apply) getOrElse null + val provider: ActorRefProvider = reflective.createProvider val typedActor = new TypedActor(this) val serialization = new Serialization(this) - } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 61903e96d4..ad983c92c9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -243,7 +243,7 @@ class LocalActorRef private[akka] ( @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = { // TODO: this was used to really send LocalActorRef across the network, which is broken now - val inetaddr = app.reflective.RemoteModule.configDefaultAddress + val inetaddr = app.defaultAddress SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort) } } @@ -257,69 +257,6 @@ object RemoteActorSystemMessage { val Stop = "RemoteActorRef:stop".intern } -/** - * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. - * This reference is network-aware (remembers its origin) and immutable. - * - * @author Jonas Bonér - */ -private[akka] case class RemoteActorRef private[akka] ( - val remote: RemoteSupport, - val remoteAddress: InetSocketAddress, - val address: String, - loader: Option[ClassLoader]) - extends ActorRef with ScalaActorRef { - - @volatile - private var running: Boolean = true - - def isShutdown: Boolean = !running - - def postMessageToMailbox(message: Any, channel: UntypedChannel) { - val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None - remote.send[Any](message, chSender, None, remoteAddress, true, this, loader) - } - - def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - - val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None - val chFuture = if (channel.isInstanceOf[Promise[_]]) Some(channel.asInstanceOf[Promise[Any]]) else None - val future = remote.send[Any](message, chSender, chFuture, remoteAddress, false, this, loader) - - if (future.isDefined) ActorPromise(future.get)(timeout) - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } - - def suspend(): Unit = unsupported - - def resume(): Unit = unsupported - - def stop() { //FIXME send the cause as well! - synchronized { - if (running) { - running = false - postMessageToMailbox(RemoteActorSystemMessage.Stop, None) - } - } - } - - @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = { - SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) - } - - def link(actorRef: ActorRef): ActorRef = unsupported - - def unlink(actorRef: ActorRef): ActorRef = unsupported - - protected[akka] def restart(cause: Throwable): Unit = unsupported - - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") -} - /** * This trait represents the common (external) methods for all ActorRefs * Needed because implicit conversions aren't applied when instance imports are used @@ -379,10 +316,9 @@ trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRe /** * Memento pattern for serializing ActorRefs transparently */ -case class SerializedActorRef(uuid: Uuid, - address: String, - hostname: String, - port: Int) { + +case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, port: Int) { + import akka.serialization.Serialization.app @throws(classOf[java.io.ObjectStreamException]) @@ -390,18 +326,9 @@ case class SerializedActorRef(uuid: Uuid, if (app.value eq null) throw new IllegalStateException( "Trying to deserialize a serialized ActorRef without an AkkaApplication in scope." + " Use akka.serialization.Serialization.app.withValue(akkaApplication) { ... }") - app.value.provider.actorFor(address) match { + app.value.provider.deserialize(this) match { case Some(actor) ⇒ actor - case None ⇒ - // TODO FIXME Add case for when hostname+port == remote.address.hostname+port, should return a DeadActorRef or something - // TODO FIXME the remote should only be in the remote actor ref provider - val remote = app.value.reflective.RemoteModule - if (remote.isEnabled) - RemoteActorRef(remote.defaultRemoteSupport.get(), new InetSocketAddress(hostname, port), address, None) - else - throw new IllegalStateException( - "Trying to deserialize ActorRef [" + this + - "] but it's not found in the local registry and remoting is not enabled.") + case None ⇒ throw new IllegalStateException("Could not deserialize ActorRef") } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ba317ac25e..4904c07d21 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -20,13 +20,17 @@ import akka.AkkaException */ trait ActorRefProvider { - def actorOf(props: Props, address: String): Option[ActorRef] + def actorOf(props: Props, address: String): ActorRef - def actorOf(props: RoutedProps, address: String): Option[ActorRef] + def actorOf(props: RoutedProps, address: String): ActorRef def actorFor(address: String): Option[ActorRef] + private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef + private[akka] def evict(address: String): Boolean + + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] } /** @@ -45,10 +49,13 @@ trait ActorRefFactory { * the same address can race on the cluster, and then you never know which * implementation wins */ - def createActor(props: Props, address: String): ActorRef = provider.actorOf(props, address).get + def createActor(props: Props, address: String): ActorRef = provider.actorOf(props, address) def createActor[T <: Actor](implicit m: Manifest[T]): ActorRef = createActor(Props(m.erasure.asInstanceOf[Class[_ <: Actor]])) + def createActor[T <: Actor](address: String)(implicit m: Manifest[T]): ActorRef = + createActor(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]), address) + def createActor[T <: Actor](clazz: Class[T]): ActorRef = createActor(Props(clazz)) def createActor(factory: ⇒ Actor): ActorRef = createActor(Props(() ⇒ factory)) @@ -57,7 +64,7 @@ trait ActorRefFactory { def createActor(props: RoutedProps): ActorRef = createActor(props, new UUID().toString) - def createActor(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, address).get + def createActor(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, address) def findActor(address: String): Option[ActorRef] = provider.actorFor(address) @@ -65,25 +72,18 @@ trait ActorRefFactory { class ActorRefProviderException(message: String) extends AkkaException(message) -object ActorRefProvider { - sealed trait ProviderType - object LocalProvider extends ProviderType - object RemoteProvider extends ProviderType - object ClusterProvider extends ProviderType -} - /** * Local ActorRef provider. */ -class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) extends ActorRefProvider { +class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { - private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] + private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] - def actorOf(props: Props, address: String): Option[ActorRef] = actorOf(props, address, false) + def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) def actorFor(address: String): Option[ActorRef] = actors.get(address) match { case null ⇒ None - case future ⇒ future.await.resultOrException.getOrElse(None) + case future ⇒ Some(future.get) } /** @@ -91,7 +91,7 @@ class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) ex */ private[akka] def evict(address: String): Boolean = actors.remove(address) ne null - private[akka] def actorOf(props: Props, address: String, systemService: Boolean): Option[ActorRef] = { + private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { Address.validate(address) val localProps = @@ -102,17 +102,17 @@ class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) ex val defaultTimeout = app.AkkaConfig.ActorTimeout - val newFuture = Promise[Option[ActorRef]](5000)(app.dispatcher) // FIXME is this proper timeout? + val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? val oldFuture = actors.putIfAbsent(address, newFuture) if (oldFuture eq null) { // we won the race -- create the actor and resolve the future - val actor = try { - deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor // create a local actor case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ - Some(new LocalActorRef(app, localProps, address, systemService)) // create a local actor + new LocalActorRef(app, localProps, address, systemService) // create a local actor // create a routed actor ref case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ @@ -134,7 +134,7 @@ class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) ex actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) - case _ ⇒ None // non-local actor - pass it on + case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") } } catch { case e: Exception ⇒ @@ -146,14 +146,14 @@ class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) ex actor } else { // we lost the race -- wait for future to complete - oldFuture.await.resultOrException.getOrElse(None) + oldFuture.await.resultOrException.get } } /** * Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration. */ - def actorOf(props: RoutedProps, address: String): Option[ActorRef] = { + def actorOf(props: RoutedProps, address: String): ActorRef = { //FIXME clustering should be implemented by cluster actor ref provider //TODO Implement support for configuring by deployment ID etc //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor @@ -164,6 +164,8 @@ class LocalActorRefProvider(val app: AkkaApplication, val deployer: Deployer) ex // val localOnly = props.localOnly // if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) // else new RoutedActorRef(props, address) - Some(new RoutedActorRef(props, address)) + new RoutedActorRef(props, address) } + + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 7c4f201317..1a7b814308 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -260,7 +260,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } } - Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.RemoteScope(remoteAddresses))) + Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(remoteAddresses))) case None ⇒ // check for 'cluster' config section diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 4fd6c57a5a..2e10680e0a 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -79,6 +79,8 @@ object DeploymentConfig { // For Scala API case object LocalScope extends Scope + case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope + case class RemoteAddress(hostname: String, port: Int) // -------------------------------- @@ -254,8 +256,6 @@ class DeploymentConfig(val app: AkkaApplication) { preferredNodes: Iterable[Home] = Vector(Node(app.nodename)), replication: ReplicationScheme = Transient) extends Scope - case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope - def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == app.nodename) def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 1b121ef797..5df0607365 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -187,10 +187,10 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept abstract class RemoteSupport(val app: AkkaApplication) extends ListenerManagement with RemoteServerModule with RemoteClientModule { - val eventHandler: ActorRef = { + lazy val eventHandler: ActorRef = { implicit object format extends StatelessActorFormat[RemoteEventHandler] val clazz = classOf[RemoteEventHandler] - val handler = new LocalActorRef(app, Props(clazz), clazz.getName, true) + val handler = app.provider.actorOf(Props(clazz), clazz.getName, true) // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler @@ -243,16 +243,16 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒ * Starts the server up */ def start(): RemoteServerModule = - start(app.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, - app.reflective.RemoteModule.configDefaultAddress.getPort, + start(app.defaultAddress.getAddress.getHostAddress, + app.defaultAddress.getPort, None) /** * Starts the server up */ def start(loader: ClassLoader): RemoteServerModule = - start(app.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress, - app.reflective.RemoteModule.configDefaultAddress.getPort, + start(app.defaultAddress.getAddress.getHostAddress, + app.defaultAddress.getPort, Option(loader)) /** diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 104d2b59bd..a1e05cc819 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -123,6 +123,23 @@ class ReflectiveAccess(val app: AkkaApplication) { import ReflectiveAccess._ + def providerClass: Class[_] = { + getClassFor(app.AkkaConfig.ProviderClass) match { + case Left(e) ⇒ throw e + case Right(b) ⇒ b + } + } + + def createProvider: ActorRefProvider = { + val params: Array[Class[_]] = Array(classOf[AkkaApplication]) + val args: Array[AnyRef] = Array(app) + + createInstance[ActorRefProvider](providerClass, params, args) match { + case Right(p) ⇒ p + case Left(e) ⇒ throw e + } + } + /** * Reflective access to the Cluster module. * @@ -228,61 +245,4 @@ class ReflectiveAccess(val app: AkkaApplication) { def close() } } - - /** - * Reflective access to the RemoteClient module. - * - * @author Jonas Bonér - */ - object RemoteModule { - val TRANSPORT = app.AkkaConfig.RemoteTransport - - val configDefaultAddress = new InetSocketAddress(app.hostname, app.AkkaConfig.RemoteServerPort) - - lazy val isEnabled = remoteSupportClass.isDefined - - def ensureEnabled() = { - if (!isEnabled) { - val e = new ModuleNotAvailableException( - "Can't load the remote module, make sure it is enabled in the config ('akka.enabled-modules = [\"remote\"])' and that akka-remote.jar is on the classpath") - app.eventHandler.debug(this, e.toString) - throw e - } - } - - lazy val remoteInstance: Option[RemoteService] = getObjectFor("akka.remote.Remote$") match { - case Right(value) ⇒ Some(value) - case Left(exception) ⇒ - app.eventHandler.debug(this, exception.toString) - None - } - - lazy val remoteService: RemoteService = { - ensureEnabled() - remoteInstance.get - } - - val remoteSupportClass = getClassFor[RemoteSupport](TRANSPORT) match { - case Right(value) ⇒ Some(value) - case Left(exception) ⇒ - app.eventHandler.debug(this, exception.toString) - None - } - - protected[akka] val defaultRemoteSupport: Option[() ⇒ RemoteSupport] = - remoteSupportClass map { remoteClass ⇒ - () ⇒ createInstance[RemoteSupport]( - remoteClass, - Array[Class[_]](), - Array[AnyRef]()) match { - case Right(value) ⇒ value - case Left(exception) ⇒ - val e = new ModuleNotAvailableException( - "Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName), exception) - app.eventHandler.debug(this, e.toString) - throw e - } - } - } - } diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 0f423db230..6004e7c5ad 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -8,11 +8,14 @@ import akka.actor.{ Actor, BootableActorLoaderService } import akka.util.{ ReflectiveAccess, Bootable } import akka.event.EventHandler +// TODO: remove me - remoting is enabled through the RemoteActorRefProvider + /** * This bundle/service is responsible for booting up and shutting down the remote actors facility. *

* It is used in Kernel. */ +/* trait BootableRemoteActorService extends Bootable { self: BootableActorLoaderService ⇒ @@ -42,3 +45,4 @@ trait BootableRemoteActorService extends Bootable { super.onUnload() } } +*/ diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index b7ee801c34..0a0058c525 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -33,11 +33,6 @@ object NetworkEventStream { trait Listener { def notify(event: RemoteLifeCycleEvent) } -} - -class NetworkEventStream(val app: AkkaApplication) { - - import NetworkEventStream._ /** * Channel actor with a registry of listeners. @@ -63,8 +58,13 @@ class NetworkEventStream(val app: AkkaApplication) { case _ ⇒ //ignore other } } +} - private[akka] val channel = new LocalActorRef(app, +class NetworkEventStream(val app: AkkaApplication) { + + import NetworkEventStream._ + + private[akka] val channel = app.provider.actorOf( Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), newUuid.toString, systemService = true) /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4eb5d1129f..363f036298 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -8,6 +8,7 @@ import akka.actor._ import akka.routing._ import akka.actor.Actor._ import akka.actor.Status._ +import akka.dispatch._ import akka.event.EventHandler import akka.util.duration._ import akka.config.ConfigurationException @@ -25,31 +26,36 @@ import akka.AkkaApplication * * @author Jonas Bonér */ -class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) extends ActorRefProvider { +class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise - implicit def _app = app + val local = new LocalActorRefProvider(app) + val remote = new Remote(app) - private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] + private val actors = new ConcurrentHashMap[String, Promise[ActorRef]] private val remoteDaemonConnectionManager = new RemoteConnectionManager( app, remote = remote, failureDetector = new BannagePeriodFailureDetector(60 seconds)) // FIXME make timeout configurable - def actorOf(props: Props, address: String): Option[ActorRef] = { + def defaultDispatcher = app.dispatcher + def defaultTimeout = app.AkkaConfig.ActorTimeout + + def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) + + def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { Address.validate(address) - val newFuture = Promise[Option[ActorRef]](5000) // FIXME is this proper timeout? + val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? val oldFuture = actors.putIfAbsent(address, newFuture) if (oldFuture eq null) { // we won the race -- create the actor and resolve the future - val deploymentConfig = app.deployer.deploymentConfig - val actor = try { + val actor: ActorRef = try { app.deployer.lookupDeploymentFor(address) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, deploymentConfig.RemoteScope(remoteAddresses))) ⇒ + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector @@ -67,7 +73,10 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten if (isReplicaNode) { // we are on one of the replica node for this remote actor - Some(new LocalActorRef(app, props, address, false)) // create a local actor + val localProps = + if (props.dispatcher == Props.defaultDispatcher) props.copy(dispatcher = app.dispatcher) + else props + new LocalActorRef(app, localProps, address, false) } else { // we are on the single "reference" node uses the remote actors on the replica nodes @@ -94,7 +103,7 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten if (remoteAddresses.size < 1) throw new ConfigurationException( "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" .format(address, remoteAddresses.mkString(", "))) - () ⇒ new ScatterGatherFirstCompletedRouter + () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") @@ -105,19 +114,17 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten var connections = Map.empty[InetSocketAddress, ActorRef] remoteAddresses foreach { remoteAddress: DeploymentConfig.RemoteAddress ⇒ val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) - connections += (inetSocketAddress -> RemoteActorRef(app.remote, inetSocketAddress, address, None)) + connections += (inetSocketAddress -> RemoteActorRef(remote.server, inetSocketAddress, address, None)) } val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) connections.keys foreach { useActorOnNode(_, address, props.creator) } - Some(app.createActor(RoutedProps( - routerFactory = routerFactory, - connectionManager = connectionManager))) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) } - case deploy ⇒ None // non-remote actor + case deploy ⇒ local.actorOf(props, address, systemService) } } catch { case e: Exception ⇒ @@ -131,21 +138,21 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten actor } else { // we lost the race -- wait for future to complete - oldFuture.await.resultOrException.getOrElse(None) + oldFuture.await.resultOrException.get } } /** * Copied from LocalActorRefProvider... */ - def actorOf(props: RoutedProps, address: String): Option[ActorRef] = { + def actorOf(props: RoutedProps, address: String): ActorRef = { if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") - Some(new RoutedActorRef(props, address)) + new RoutedActorRef(props, address) } def actorFor(address: String): Option[ActorRef] = actors.get(address) match { case null ⇒ None - case future ⇒ future.await.resultOrException.getOrElse(None) + case future ⇒ Some(future.get) } /** @@ -153,6 +160,12 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten */ private[akka] def evict(address: String): Boolean = actors.remove(address) ne null + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { + local.actorFor(actor.address) orElse { + Some(RemoteActorRef(remote.server, new InetSocketAddress(actor.hostname, actor.port), actor.address, None)) + } + } + /** * Using (checking out) actor on a specific node. */ @@ -213,3 +226,66 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten } } } + +/** + * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. + * This reference is network-aware (remembers its origin) and immutable. + * + * @author Jonas Bonér + */ +private[akka] case class RemoteActorRef private[akka] ( + val remote: RemoteSupport, + val remoteAddress: InetSocketAddress, + val address: String, + loader: Option[ClassLoader]) + extends ActorRef with ScalaActorRef { + + @volatile + private var running: Boolean = true + + def isShutdown: Boolean = !running + + def postMessageToMailbox(message: Any, channel: UntypedChannel) { + val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None + remote.send[Any](message, chSender, None, remoteAddress, true, this, loader) + } + + def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Timeout, + channel: UntypedChannel): Future[Any] = { + + val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None + val chFuture = if (channel.isInstanceOf[Promise[_]]) Some(channel.asInstanceOf[Promise[Any]]) else None + val future = remote.send[Any](message, chSender, chFuture, remoteAddress, false, this, loader) + + if (future.isDefined) ActorPromise(future.get)(timeout) + else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) + } + + def suspend(): Unit = unsupported + + def resume(): Unit = unsupported + + def stop() { //FIXME send the cause as well! + synchronized { + if (running) { + running = false + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + } + } + } + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) + } + + def link(actorRef: ActorRef): ActorRef = unsupported + + def unlink(actorRef: ActorRef): ActorRef = unsupported + + protected[akka] def restart(cause: Throwable): Unit = unsupported + + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 202bfaa518..0057570b41 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -34,7 +34,7 @@ class Remote(val app: AkkaApplication) extends RemoteService { val remoteDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt val hostname = app.hostname - val port = app.AkkaConfig.RemoteServerPort + val port = app.port val remoteDaemonServiceName = "akka-remote-daemon".intern diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 073725643f..9c858d4701 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.actor.{ ActorRef, Uuid, newUuid, uuidFrom, IllegalActorStateException, RemoteActorRef, PoisonPill, RemoteActorSystemMessage, AutoReceivedMessage } +import akka.actor.{ ActorRef, Uuid, newUuid, uuidFrom, IllegalActorStateException, PoisonPill, RemoteActorSystemMessage, AutoReceivedMessage } import akka.dispatch.{ ActorPromise, DefaultPromise, Promise } import akka.remote._ import RemoteProtocol._ @@ -83,7 +83,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(app, this, address, loader, self.notifyListeners _) + val client = new ActiveRemoteClient(app, self, this, address, loader, self.notifyListeners _) client.connect() remoteClients += key -> client client @@ -139,6 +139,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { */ abstract class RemoteClient private[akka] ( val app: AkkaApplication, + val remoteSupport: RemoteSupport, val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { @@ -146,7 +147,7 @@ abstract class RemoteClient private[akka] ( remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort - val serialization = new RemoteActorSerialization(app) + val serialization = new RemoteActorSerialization(app, remoteSupport) protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] @@ -248,11 +249,12 @@ abstract class RemoteClient private[akka] ( */ class ActiveRemoteClient private[akka] ( _app: AkkaApplication, + remoteSupport: RemoteSupport, module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) - extends RemoteClient(_app, module, remoteAddress) { + extends RemoteClient(_app, remoteSupport, module, remoteAddress) { val settings = new RemoteClientSettings(app) import settings._ @@ -576,7 +578,7 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with app.eventHandler.debug(this, "Creating RemoteActorRef with address [%s] connected to [%s]" .format(actorAddress, remoteInetSocketAddress)) - RemoteActorRef(app.remote, remoteInetSocketAddress, actorAddress, loader) + RemoteActorRef(this, remoteInetSocketAddress, actorAddress, loader) } } @@ -585,7 +587,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod val settings = new RemoteServerSettings(app) import settings._ - val serialization = new RemoteActorSerialization(app) + val serialization = new RemoteActorSerialization(app, serverModule.remoteSupport) val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) @@ -641,18 +643,19 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteSupport ⇒ def app: AkkaApplication + def remoteSupport = self private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { case Some(server) ⇒ server.address - case None ⇒ app.reflective.RemoteModule.configDefaultAddress + case None ⇒ app.defaultAddress } def name = currentServer.get match { case Some(server) ⇒ server.name case None ⇒ - val a = app.reflective.RemoteModule.configDefaultAddress + val a = app.defaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 55183ac76c..ae8a204eb6 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -9,7 +9,7 @@ import akka.actor.DeploymentConfig._ import akka.dispatch.Envelope import akka.util.{ ReflectiveAccess, Duration } import akka.event.EventHandler -import akka.remote.{ RemoteProtocol, RemoteClientSettings, MessageSerializer } +import akka.remote._ import RemoteProtocol._ import akka.AkkaApplication @@ -25,10 +25,10 @@ import com.eaio.uuid.UUID /** * Module for local actor serialization. */ -class ActorSerialization(val app: AkkaApplication) { +class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default - val remoteActorSerialization = new RemoteActorSerialization(app) + val remoteActorSerialization = new RemoteActorSerialization(app, remote) def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) @@ -222,7 +222,7 @@ class ActorSerialization(val app: AkkaApplication) { } } -class RemoteActorSerialization(val app: AkkaApplication) { +class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { /** * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. @@ -243,7 +243,7 @@ class RemoteActorSerialization(val app: AkkaApplication) { app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) val ref = RemoteActorRef( - app.remote, + remote, JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], protocol.getAddress, loader) @@ -261,10 +261,10 @@ class RemoteActorSerialization(val app: AkkaApplication) { case ar: RemoteActorRef ⇒ ar.remoteAddress case ar: LocalActorRef ⇒ - app.remote.registerByUuid(ar) - app.reflective.RemoteModule.configDefaultAddress + remote.registerByUuid(ar) + app.defaultAddress case _ ⇒ - app.reflective.RemoteModule.configDefaultAddress + app.defaultAddress } app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala new file mode 100644 index 0000000000..003f324217 --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import akka.testkit._ + +abstract class AkkaRemoteSpec extends AkkaSpec with MultiJvmSync { + + /** + * Helper function for accessing the underlying remoting. + */ + def remote: Remote = { + app.provider match { + case r: RemoteActorRefProvider ⇒ r.remote + case _ ⇒ throw new Exception("Remoting is not enabled") + } + } + +} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ClusterTestNode.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ClusterTestNode.scala deleted file mode 100644 index 32abd54953..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ClusterTestNode.scala +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.remote - -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.BeforeAndAfterAll - -import akka.util.duration._ -import akka.util.Duration -import System.{ currentTimeMillis ⇒ now } - -import java.io.File - -trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { - - override def beforeAll() = { - ClusterTestNode.waitForReady(getClass.getName) - } - - override def afterAll() = { - ClusterTestNode.exit(getClass.getName) - } -} - -object ClusterTestNode { - val TestMarker = "MultiJvm" - val HomeDir = "_akka_cluster" - val TestDir = "multi-jvm" - val Sleep = 100.millis - val Timeout = 1.minute - - def ready(className: String) = { - readyFile(className).createNewFile() - } - - def waitForReady(className: String) = { - if (!waitExists(readyFile(className))) { - cleanUp(className) - sys.error("Timeout waiting for cluster ready") - } - } - - def exit(className: String) = { - exitFile(className).createNewFile() - } - - def waitForExits(className: String, nodes: Int) = { - if (!waitCount(exitDir(className), nodes)) { - cleanUp(className) - sys.error("Timeout waiting for node exits") - } - } - - def cleanUp(className: String) = { - deleteRecursive(testDir(className)) - } - - def testName(name: String) = { - val i = name.indexOf(TestMarker) - if (i >= 0) name.substring(0, i) else name - } - - def nodeName(name: String) = { - val i = name.indexOf(TestMarker) - if (i >= 0) name.substring(i + TestMarker.length) else name - } - - def testDir(className: String) = { - val home = new File(HomeDir) - val tests = new File(home, TestDir) - val dir = new File(tests, testName(className)) - dir.mkdirs() - dir - } - - def readyFile(className: String) = { - new File(testDir(className), "ready") - } - - def exitDir(className: String) = { - val dir = new File(testDir(className), "exit") - dir.mkdirs() - dir - } - - def exitFile(className: String) = { - new File(exitDir(className), nodeName(className)) - } - - def waitExists(file: File) = waitFor(file.exists) - - def waitCount(file: File, n: Int) = waitFor(file.list.size >= n) - - def waitFor(test: ⇒ Boolean, sleep: Duration = Sleep, timeout: Duration = Timeout): Boolean = { - val start = now - val limit = start + timeout.toMillis - var passed = test - var expired = false - while (!passed && !expired) { - if (now > limit) expired = true - else { - Thread.sleep(sleep.toMillis) - passed = test - } - } - passed - } - - def deleteRecursive(file: File): Boolean = { - if (file.isDirectory) file.listFiles.foreach(deleteRecursive) - file.delete() - } -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MasterClusterTestNode.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MasterClusterTestNode.scala deleted file mode 100644 index eea1bbf9a8..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MasterClusterTestNode.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.remote - -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.BeforeAndAfterAll - -trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { - def testNodes: Int - - override def beforeAll() = { - // LocalCluster.startLocalCluster() - onReady() - ClusterTestNode.ready(getClass.getName) - } - - def onReady() = {} - - override def afterAll() = { - ClusterTestNode.waitForExits(getClass.getName, testNodes - 1) - ClusterTestNode.cleanUp(getClass.getName) - onShutdown() - // LocalCluster.shutdownLocalCluster() - } - - def onShutdown() = {} -} - diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala b/akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala index f69b7908bd..96ebb55f35 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala @@ -8,7 +8,7 @@ import org.scalatest.tools.StandardOutReporter import org.scalatest.events._ import java.lang.Boolean.getBoolean -class QuietReporter(inColor: Boolean) extends StandardOutReporter(false, inColor, false, false) { +class QuietReporter(inColor: Boolean) extends StandardOutReporter(false, inColor, false, true) { def this() = this(!getBoolean("akka.test.nocolor")) override def apply(event: Event): Unit = event match { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf index ee5b2ff42f..a7db5ca6e6 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode1.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" akka.actor.deployment.service-hello.nr-of-instances = 1 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf index ee5b2ff42f..a7db5ca6e6 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmNode2.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" akka.actor.deployment.service-hello.nr-of-instances = 1 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala index e35ee388de..9e14609a6b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -2,9 +2,8 @@ package akka.remote.direct_routed import akka.remote._ import akka.routing._ - import akka.actor.Actor -import akka.config.Config +import akka.testkit._ object DirectRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 2 @@ -12,13 +11,13 @@ object DirectRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "identify" ⇒ { - reply(Config.nodename) + reply(app.nodename) } } } } -class DirectRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { +class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { import DirectRoutedRemoteActorMultiJvmSpec._ @@ -27,14 +26,16 @@ class DirectRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { "___" must { "___" in { barrier("setup") - Remote.start() + + remote.start() + barrier("start") barrier("done") } } } -class DirectRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { +class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { import DirectRoutedRemoteActorMultiJvmSpec._ @@ -43,10 +44,12 @@ class DirectRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { "A new remote actor configured with a Direct router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("setup") - Remote.start() + + remote.start() + barrier("start") - val actor = Actor.actorOf[SomeActor]("service-hello") + val actor = app.createActor[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) val result = (actor ? "identify").get diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf index e57dcbd806..8281319e9a 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf @@ -1,3 +1,3 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf index e57dcbd806..8281319e9a 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf @@ -1,3 +1,3 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala index 213ce6e482..0de114f4bb 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala @@ -1,9 +1,7 @@ package akka.remote.new_remote_actor -import akka.remote._ - import akka.actor.Actor -import akka.config.Config +import akka.remote._ object NewRemoteActorMultiJvmSpec { val NrOfNodes = 2 @@ -11,13 +9,13 @@ object NewRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "identify" ⇒ { - reply(Config.nodename) + reply(app.nodename) } } } } -class NewRemoteActorMultiJvmNode1 extends MultiJvmSync { +class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { import NewRemoteActorMultiJvmSpec._ @@ -27,7 +25,7 @@ class NewRemoteActorMultiJvmNode1 extends MultiJvmSync { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") @@ -36,7 +34,7 @@ class NewRemoteActorMultiJvmNode1 extends MultiJvmSync { } } -class NewRemoteActorMultiJvmNode2 extends MultiJvmSync { +class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { import NewRemoteActorMultiJvmSpec._ @@ -46,11 +44,11 @@ class NewRemoteActorMultiJvmNode2 extends MultiJvmSync { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("setup") - Remote.start() + remote.start() barrier("start") - val actor = Actor.actorOf[SomeActor]("service-hello") + val actor = app.createActor[SomeActor]("service-hello") val result = (actor ? "identify").get result must equal("node1") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf index 545dd2825b..4a171ba96f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode1.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf index 545dd2825b..4a171ba96f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode2.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf index 545dd2825b..4a171ba96f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode3.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf index 545dd2825b..4a171ba96f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmNode4.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index dffcea7b99..c3688ae178 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -1,29 +1,27 @@ package akka.remote.random_routed +import akka.actor.Actor import akka.remote._ import akka.routing._ -import Routing.Broadcast - -import akka.actor.Actor -import akka.config.Config +import akka.routing.Routing.Broadcast object RandomRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ reply(Config.nodename) + case "hit" ⇒ reply(app.nodename) case "end" ⇒ self.stop() } } } -class RandomRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { +class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -32,13 +30,13 @@ class RandomRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { } } -class RandomRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { +class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -47,13 +45,13 @@ class RandomRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { } } -class RandomRoutedRemoteActorMultiJvmNode3 extends MultiJvmSync { +class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -62,17 +60,17 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends MultiJvmSync { } } -class RandomRoutedRemoteActorMultiJvmNode4 extends MultiJvmSync { +class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a Random router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("setup") - Remote.start() + remote.start() barrier("start") - val actor = Actor.actorOf[SomeActor]("service-hello") + val actor = app.createActor[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) val connectionCount = NrOfNodes - 1 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf index 8c1cac697b..08c0dc70d2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode1.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf index 8c1cac697b..08c0dc70d2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode2.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf index 8c1cac697b..08c0dc70d2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode3.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf index 8c1cac697b..08c0dc70d2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmNode4.conf @@ -1,4 +1,4 @@ -akka.enabled-modules = ["remote"] +akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.nr-of-instances = 3 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 8af15a4949..1a6a17041d 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -1,29 +1,27 @@ package akka.remote.round_robin_routed +import akka.actor.Actor import akka.remote._ import akka.routing._ -import Routing.Broadcast - -import akka.actor.Actor -import akka.config.Config +import akka.routing.Routing.Broadcast object RoundRobinRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ reply(Config.nodename) + case "hit" ⇒ reply(app.nodename) case "end" ⇒ self.stop() } } } -class RoundRobinRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { +class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -32,13 +30,13 @@ class RoundRobinRoutedRemoteActorMultiJvmNode1 extends MultiJvmSync { } } -class RoundRobinRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { +class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -47,13 +45,13 @@ class RoundRobinRoutedRemoteActorMultiJvmNode2 extends MultiJvmSync { } } -class RoundRobinRoutedRemoteActorMultiJvmNode3 extends MultiJvmSync { +class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { "___" in { barrier("setup") - Remote.start() + remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -62,17 +60,17 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends MultiJvmSync { } } -class RoundRobinRoutedRemoteActorMultiJvmNode4 extends MultiJvmSync { +class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a RoundRobin router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("setup") - Remote.start() + remote.start() barrier("start") - val actor = Actor.actorOf[SomeActor]("service-hello") + val actor = app.createActor[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) val connectionCount = NrOfNodes - 1 diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 91b055422c..67df328a64 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -3,6 +3,7 @@ package akka.serialization import org.scalatest.BeforeAndAfterAll import com.google.protobuf.Message import akka.actor._ +import akka.remote._ import akka.testkit.AkkaSpec import akka.serialization.SerializeSpec.Person @@ -10,7 +11,14 @@ case class MyMessage(id: Long, name: String, status: Boolean) class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { - val serialization = new ActorSerialization(app) + lazy val remote: Remote = { + app.provider match { + case r: RemoteActorRefProvider ⇒ r.remote + case _ ⇒ throw new Exception("Remoting is not enabled") + } + } + + lazy val serialization = new ActorSerialization(app, remote.server) "Serializable actor" must { "must be able to serialize and de-serialize a stateful actor with a given serializer" ignore {