diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index c9750ff912..bc25ff5f3c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -17,7 +17,7 @@ class DeployerSpec extends WordSpec with MustMatchers { deployment must equal (Some(Deploy( "service-pi", RoundRobin, - "akka.serializer.Format$Default$", + "akka.serialization.Format$Default$", Clustered( Node("test-1"), Replicate(3), @@ -25,3 +25,12 @@ class DeployerSpec extends WordSpec with MustMatchers { } } } + +Deployer.deploy(Deploy( + "service-pi", + RoundRobin, + "akka.serialization.Format$Default$", + Clustered( + Node("test-1"), + Replicate(3), + Stateless)))) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b7b537ec8e..41661a6061 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -4,9 +4,11 @@ package akka.actor +import DeploymentConfig._ import akka.dispatch._ import akka.config.Config._ import akka.util.{ListenerManagement, ReflectiveAccess, Duration, Helpers} +import ReflectiveAccess._ import Helpers.{narrow, narrowSilently} import akka.remoteinterface.RemoteSupport import akka.japi.{Creator, Procedure} @@ -17,6 +19,7 @@ import akka.event.EventHandler import scala.reflect.BeanProperty import com.eaio.uuid.UUID + import java.lang.reflect.InvocationTargetException /** @@ -203,96 +206,12 @@ object Actor extends ListenerManagement { * */ def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { - import DeploymentConfig._ - import ReflectiveAccess._ Address.validate(address) try { Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) => - // FIXME handle 'router' in 'Local' actors - newLocalActorRef(clazz, address) - - case Deploy(_, router, formatClassName, Clustered(home, replication, state)) => - ClusterModule.ensureEnabled() - if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") - - val hostname = home match { - case Host(hostname) => hostname - case IP(address) => address - case Node(nodeName) => "localhost" // FIXME lookup hostname for node name - } - - val replicas = replication match { - case Replicate(replicas) => replicas - case AutoReplicate => -1 - case NoReplicas => 0 - } - - import ClusterModule.node - if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor - - def formatErrorDueTo(reason: String) = { - throw new akka.config.ConfigurationException( - "Could not create Format[T] object [" + formatClassName + - "] for serialization of actor [" + address + - "] since " + reason) - } - - implicit val format: Format[T] = { - if (formatClassName == "N/A") formatErrorDueTo("no class name defined in configuration") - val f = ReflectiveAccess.getObjectFor(formatClassName) match { - case Right(actor) => actor - case Left(exception) => - val cause = exception match { - case i: InvocationTargetException => i.getTargetException - case _ => exception - } - formatErrorDueTo(" " + cause.toString) - } - if (f.isInstanceOf[Format[T]]) f.asInstanceOf[Format[T]] - else formatErrorDueTo("class must be of type [akka.serialization.Format[T]]") - } - - if (!node.isClustered(address)) node.store(address, clazz, replicas, false) - node.use(address) - } else { - //val router = - node.ref(address, null) - } - sys.error("Clustered deployment not yet supported") - - /* - val remoteAddress = Actor.remote.address - if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { - // home node for actor - } else { - } - */ - /* - 2. Check Home(..) - a) If home is same as Actor.remote.address then: - - check if actor is stored in ZK, if not; node.store(..) - - checkout actor using node.use(..) - b) If not the same - - check out actor using node.ref(..) - - Misc stuff: - - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - - ClusterNode API and Actor.remote API should be made private[akka] - - Rewrite ClusterSpec or remove it - - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor - - Should we allow configuring of session-scoped remote actors? How? - - - */ - - RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) - - case invalid => throw new IllegalActorStateException( - "Could not create actor [" + clazz.getName + - "] with address [" + address + - "], not bound to a valid deployment scheme [" + invalid + "]") + case Deploy(_, router, _, Local) => newLocalActorRef(clazz, address) // FIXME handle 'router' in 'Local' actors + case deploy => newClusterActorRef[T](clazz, address, deploy) } } catch { case e: DeploymentException => @@ -340,6 +259,7 @@ object Actor extends ListenerManagement { * */ def actorOf[T <: Actor](factory: => T, address: String): ActorRef = { + // FIXME use deployment info Address.validate(address) new LocalActorRef(() => factory, address) } @@ -364,6 +284,7 @@ object Actor extends ListenerManagement { * JAVA API */ def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = { + // FIXME use deployment info Address.validate(address) new LocalActorRef(() => creator.create, address) } @@ -393,6 +314,25 @@ object Actor extends ListenerManagement { }).start() ! Spawn } + /** + * Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method as[T] + * to convert an Option[Any] to an Option[T]. + */ + implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption) + + /** + * Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method as[T] + * to convert an Option[Any] to an Option[T]. + * This means that the following code is equivalent: + * (actor !! "foo").as[Int] (Deprecated) + * and + * (actor !!! "foo").as[Int] (Recommended) + */ + implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({ + try { anyFuture.await } catch { case t: FutureTimeoutException => } + anyFuture.resultOrException + }) + private[akka] def newLocalActorRef(clazz: Class[_ <: Actor], address: String): ActorRef = { new LocalActorRef(() => { import ReflectiveAccess.{ createInstance, noParams, noArgs } @@ -413,24 +353,96 @@ object Actor extends ListenerManagement { }, address) } - /** - * Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method as[T] - * to convert an Option[Any] to an Option[T]. - */ - implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption) + private def newClusterActorRef[T <: Actor](clazz: Class[T], address: String, deploy: Deploy): ActorRef = { + deploy match { + case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) => + ClusterModule.ensureEnabled() - /** - * Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method as[T] - * to convert an Option[Any] to an Option[T]. - * This means that the following code is equivalent: - * (actor !! "foo").as[Int] (Deprecated) - * and - * (actor !!! "foo").as[Int] (Recommended) - */ - implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({ - try { anyFuture.await } catch { case t: FutureTimeoutException => } - anyFuture.resultOrException - }) + if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") + + val hostname = home match { + case Host(hostname) => hostname + case IP(address) => address + case Node(nodeName) => "localhost" // FIXME lookup hostname for node name + } + + val replicas = replication match { + case Replicate(replicas) => replicas + case AutoReplicate => -1 + case AutoReplicate() => -1 + case NoReplicas => 0 + case NoReplicas() => 0 + } + + import ClusterModule.node + node.start() // start cluster node + + if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor + + def serializerErrorDueTo(reason: String) = + throw new akka.config.ConfigurationException( + "Could not create Serializer object [" + serializerClassName + + "] for serialization of actor [" + address + + "] since " + reason) + + val serializer: Serializer = { + if (serializerClassName == "N/A") serializerErrorDueTo("no class name defined in configuration") + val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { + case Right(clazz) => clazz + case Left(exception) => + val cause = exception match { + case i: InvocationTargetException => i.getTargetException + case _ => exception + } + serializerErrorDueTo(cause.toString) + } + val f = clazz.newInstance.asInstanceOf[AnyRef] + if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] + else serializerErrorDueTo("class must be of type [akka.serialization.Serializer") + } + + // FIXME use the serializer above instead of dummy Format, but then the ClusterNode AND ActorRef serialization needs to be rewritten + implicit val format: Format[T] = null + sys.error("FIXME use the serializer above instead of dummy Format, but then the ClusterNode AND ActorRef serialization needs to be rewritten") + + if (!node.isClustered(address)) node.store(address, clazz, replicas, false) + node.use(address) + } else { + val routerType = router match { + case Direct => RouterType.Direct + case Direct() => RouterType.Direct + case RoundRobin => RouterType.RoundRobin + case RoundRobin() => RouterType.RoundRobin + case Random => RouterType.Random + case Random() => RouterType.Random + case LeastCPU => RouterType.LeastCPU + case LeastCPU() => RouterType.LeastCPU + case LeastRAM => RouterType.LeastRAM + case LeastRAM() => RouterType.LeastRAM + case LeastMessages => RouterType.LeastMessages + case LeastMessages() => RouterType.LeastMessages + } + node.ref(address, routerType) + } + + /* + Misc stuff: + - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? + - ClusterNode API and Actor.remote API should be made private[akka] + - Rewrite ClusterSpec or remove it + - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor + - Should we allow configuring of session-scoped remote actors? How? + + + */ + + RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) + case invalid => throw new IllegalActorStateException( + "Could not create actor [" + clazz.getName + + "] with address [" + address + + "], not bound to a valid deployment scheme [" + invalid + "]") + } + } } /** diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 7ff3659919..435f1f1601 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -333,8 +333,7 @@ object LocalDeployer { private val deployments = new ConcurrentHashMap[String, Deploy] private[akka] def init(deployments: List[Deploy]) { - EventHandler.info(this, "Initializing local deployer") - EventHandler.info(this, "Deploying locally [\n" + deployments.mkString("\n\t") + "\n]") + EventHandler.info(this, "Initializing local deployer\nDeploying actors locally [\n%s\n]" format deployments.mkString("\n\t")) deployments foreach (deploy(_)) // deploy } diff --git a/akka-actor/src/main/scala/akka/actor/Format.scala b/akka-actor/src/main/scala/akka/actor/Format.scala index f61ad3b0ae..660fded9f5 100644 --- a/akka-actor/src/main/scala/akka/actor/Format.scala +++ b/akka-actor/src/main/scala/akka/actor/Format.scala @@ -8,20 +8,6 @@ import akka.actor.Actor import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} -/** - * Type class definition for Actor Serialization - */ -trait FromBinary[T <: Actor] { - def fromBinary(bytes: Array[Byte], act: T): T -} - -trait ToBinary[T <: Actor] { - def toBinary(t: T): Array[Byte] -} - -// client needs to implement Format[] for the respective actor -trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] - /** * @author Jonas Bonér */ @@ -33,6 +19,48 @@ trait Serializer extends scala.Serializable { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef } +trait FromBinary[T <: Actor] { + def fromBinary(bytes: Array[Byte], act: T): T +} + +trait ToBinary[T <: Actor] { + def toBinary(t: T): Array[Byte] +} + +/** + * Type class definition for Actor Serialization. + * Client needs to implement Format[] for the respective actor. + */ +trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] + +/** + * + */ +object Format { + + object Default extends Serializer { + import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} + //import org.apache.commons.io.input.ClassLoaderObjectInputStream + + def toBinary(obj: AnyRef): Array[Byte] = { + val bos = new ByteArrayOutputStream + val out = new ObjectOutputStream(bos) + out.writeObject(obj) + out.close + bos.toByteArray + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { + val in = + //if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else + new ObjectInputStream(new ByteArrayInputStream(bytes)) + val obj = in.readObject + in.close + obj + } + } +} + /** * A default implementation for a stateless actor * @@ -60,7 +88,7 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable * object BinaryFormatMyJavaSerializableActor { * implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] { * val serializer = Serializers.Java - * } + * } * } * */ diff --git a/akka-actor/src/main/scala/akka/actor/Routing.scala b/akka-actor/src/main/scala/akka/actor/Routing.scala index 57e6dcf41e..7da3087273 100644 --- a/akka-actor/src/main/scala/akka/actor/Routing.scala +++ b/akka-actor/src/main/scala/akka/actor/Routing.scala @@ -16,6 +16,9 @@ object RouterType { object Direct extends RouterType object Random extends RouterType object RoundRobin extends RouterType + object LeastCPU extends RouterType + object LeastRAM extends RouterType + object LeastMessages extends RouterType } // FIXME move all routing in cluster here when we can \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remoteinterface/NodeAddress.scala b/akka-actor/src/main/scala/akka/remoteinterface/NodeAddress.scala new file mode 100644 index 0000000000..44faa1552a --- /dev/null +++ b/akka-actor/src/main/scala/akka/remoteinterface/NodeAddress.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.cluster + +import akka.util.ReflectiveAccess.ClusterModule + +/** + * Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance. + * + * @author Jonas Bonér + */ +class NodeAddress( + val clusterName: String, + val nodeName: String, + val hostname: String, + val port: Int) { + if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") + if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") + if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string") + if (port < 1) throw new NullPointerException("Port can not be negative") + + override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port) + + override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.## + + override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other) +} + +object NodeAddress { + + def apply( + clusterName: String = ClusterModule.name, + nodeName: String = ClusterModule.nodeName, + hostname: String = ClusterModule.hostname, + port: Int = ClusterModule.remoteServerPort): NodeAddress = + new NodeAddress(clusterName, nodeName, hostname, port) + + def unapply(other: Any) = other match { + case address: NodeAddress => Some((address.clusterName, address.nodeName, address.hostname, address.port)) + case _ => None + } +} diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index ace294b743..8dba6359a3 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -37,6 +37,27 @@ object ReflectiveAccess { * @author Jonas Bonér */ object ClusterModule { + import java.net.InetAddress + import com.eaio.uuid.UUID + import akka.cluster.NodeAddress + import Config.{config, TIME_UNIT} + + val name = config.getString("akka.cluster.name", "default") + val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") + val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552) + val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt + val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt + val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt + val shouldCompressData = config.getBool("akka.cluster.use-compression", false) + + // FIXME allow setting hostname from command line or node local property file + val hostname = InetAddress.getLocalHost.getHostName + + // FIXME allow setting nodeName from command line or node local property file + val nodeName = new UUID().toString + + val nodeAddress = NodeAddress(name, nodeName, hostname, remoteServerPort) + lazy val isEnabled = clusterInstance.isDefined def ensureEnabled() { @@ -71,7 +92,7 @@ object ReflectiveAccess { lazy val node: ClusterNode = { ensureEnabled() - clusterInstance.get.newNode() + clusterInstance.get.newNode(nodeAddress, zooKeeperServers) } lazy val clusterDeployer: ClusterDeployer = { @@ -80,6 +101,9 @@ object ReflectiveAccess { } type ClusterNode = { + def start() + def shutdown() + def store[T <: Actor] (address: String, actorClass: Class[T], replicas: Int, serializeMailbox: Boolean) (implicit format: Format[T]) @@ -105,11 +129,7 @@ object ReflectiveAccess { } type Cluster = { - def newNode( - //nodeAddress: NodeAddress, - //zkServerAddresses: String, - //serializer: ZkSerializer - ): ClusterNode + def newNode(nodeAddress: NodeAddress, zkServerAddresses: String): ClusterNode } type Mailbox = { @@ -263,7 +283,7 @@ object ReflectiveAccess { Left(e) } - def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception,Class[T]] = try { + def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try { assert(fqn ne null) // First, use the specified CL diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 73fc43f379..ea21ef84a9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -92,21 +92,6 @@ trait ClusterNodeMBean { def getConfigElementKeys: Array[String] } -/** - * Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance. - * - * @author Jonas Bonér - */ -final case class NodeAddress( - clusterName: String, - nodeName: String, - hostname: String = Cluster.lookupLocalhostName, - port: Int = Cluster.remoteServerPort) { - if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") - if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string") - override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port) -} - /** * Factory object for ClusterNode. Also holds global state such as configuration data etc. * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index ebad829293..d86105f59f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -14,12 +14,13 @@ import akka.cluster.zookeeper.AkkaZkClient import org.apache.zookeeper.CreateMode import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener} +import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} + import scala.collection.JavaConversions.collectionAsScalaIterable import com.eaio.uuid.UUID import java.util.concurrent.CountDownLatch -import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} /** * @author Jonas Bonér @@ -46,9 +47,6 @@ object ClusterDeployer { zk } - // FIXME invert dependency; let Cluster have an instance of ClusterDeployer instead - lazy val cluster = Cluster(NodeAddress(clusterName, nodeName)) - private val clusterDeploymentLockListener = new LockListener { def lockAcquired() { EventHandler.debug(this, "Clustered deployment started") diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index c7f6760166..b4e5584ff3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -41,6 +41,10 @@ object Router { case RoundRobin => new ClusterActorRef( addresses, serviceId, timeout, actorType, replicationStrategy) with RoundRobin + + case LeastCPU => sys.error("Router LeastCPU not supported yet") + case LeastRAM => sys.error("Router LeastRAM not supported yet") + case LeastMessages => sys.error("Router LeastMessages not supported yet") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 5faf61630d..7568a6a580 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -8,7 +8,16 @@ import org.I0Itec.zkclient._ import akka.actor._ +object ClusterDeployerSpec { + class Pi extends Actor { + def receive = { + case "Hello" => self.reply("World") + } + } +} + class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { + import ClusterDeployerSpec._ val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" @@ -27,6 +36,12 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter oldDeployment must equal(newDeployment.get) } } +/* + "be able to create an actor deployed using ClusterDeployer" in { + val pi = Actor.actorOf[Pi]("service-pi") + pi must not equal(null) + } +*/ } override def beforeAll() { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8b64cda636..7f5b46a290 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -45,7 +45,7 @@ akka { # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # or: fully qualified class name of the router class # default is "direct"; - format = "akka.serializer.Format$Default$" + format = "akka.serialization.Format$Default$" clustered { # makes the actor available in the cluster registry # default (if omitted) is local non-clustered actor home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor