- Changed implementation of Actor.actorOf to work in the the new world of cluster.ref, cluster.use and cluster.store.

- Changed semantics of replica config. Default replicas is now 0. Replica 1 means one copy of the actor is instantiated on another node.
- Actor.remote.actorFor/Actor.remote.register is now separated and orthogonal from cluster implementation.
- cluster.ref now creates and instantiates its replicas automatically, e.g. it can be created first and will then set up what it needs.
- Added logging everywhere, better warning messages etc.
- Each node now fetches the whole deployment configuration from the cluster on boot.
- Added some config options to cluster

Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-05-25 16:18:35 +02:00
parent 71beab820c
commit e6fa55b3a8
15 changed files with 328 additions and 217 deletions

View file

@ -93,6 +93,15 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
/**
* Classes for passing status back to the sender.
*/
object Status {
sealed trait Status extends Serializable
case object Success extends Status
case class Failure(cause: Throwable) extends Status
}
/**
* Actor factory module with factory methods for creating various kinds of Actors.
*
@ -221,18 +230,7 @@ object Actor extends ListenerManagement {
* </pre>
*/
def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = {
Address.validate(address)
val actorRefFactory = () newLocalActorRef(clazz, address)
try {
Deployer.deploymentFor(address) match {
case Deploy(_, router, _, Local) actorRefFactory() // FIXME handle 'router' in 'Local' actors
case deploy newClusterActorRef[T](actorRefFactory, address, deploy)
}
} catch {
case e: DeploymentException
EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address)
actorRefFactory() // if deployment fails, fall back to local actors
}
createActor(address, () newLocalActorRef(clazz, address))
}
/**
@ -274,18 +272,7 @@ object Actor extends ListenerManagement {
* </pre>
*/
def actorOf[T <: Actor](creator: T, address: String): ActorRef = {
Address.validate(address)
val actorRefFactory = () new LocalActorRef(() creator, address)
try {
Deployer.deploymentFor(address) match {
case Deploy(_, router, _, Local) actorRefFactory() // FIXME handle 'router' in 'Local' actors
case deploy newClusterActorRef[T](actorRefFactory, address, deploy)
}
} catch {
case e: DeploymentException
EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address)
actorRefFactory() // if deployment fails, fall back to local actors
}
createActor(address, () new LocalActorRef(() creator, address))
}
/**
@ -308,18 +295,7 @@ object Actor extends ListenerManagement {
* JAVA API
*/
def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = {
Address.validate(address)
val actorRefFactory = () new LocalActorRef(() creator.create, address)
try {
Deployer.deploymentFor(address) match {
case Deploy(_, router, _, Local) actorRefFactory() // FIXME handle 'router' in 'Local' actors
case deploy newClusterActorRef[T](actorRefFactory, address, deploy)
}
} catch {
case e: DeploymentException
EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address)
actorRefFactory() // if deployment fails, fall back to local actors
}
createActor(address, () new LocalActorRef(() creator.create, address))
}
/**
@ -366,6 +342,24 @@ object Actor extends ListenerManagement {
anyFuture.resultOrException
})
private[akka] def createActor(address: String, actorFactory: () ActorRef): ActorRef = {
Address.validate(address)
registry.actorFor(address) match { // check if the actor for the address is already in the registry
case Some(actorRef) actorRef // it is -> return it
case None // it is not -> create it
try {
Deployer.deploymentFor(address) match {
case Deploy(_, router, _, Local) actorFactory() // create a local actor
case deploy newClusterActorRef(actorFactory, address, deploy)
}
} catch {
case e: DeploymentException
EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address)
actorFactory() // if deployment fails, fall back to local actors
}
}
}
private[akka] def newLocalActorRef(clazz: Class[_ <: Actor], address: String): ActorRef = {
new LocalActorRef(() {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
@ -386,7 +380,7 @@ object Actor extends ListenerManagement {
}, address)
}
private def newClusterActorRef[T <: Actor](factory: () ActorRef, address: String, deploy: Deploy): ActorRef = {
private def newClusterActorRef(factory: () ActorRef, address: String, deploy: Deploy): ActorRef = {
deploy match {
case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State))
@ -396,43 +390,45 @@ object Actor extends ListenerManagement {
val isHomeNode = DeploymentConfig.isHomeNode(home)
val replicas = DeploymentConfig.replicaValueFor(replication)
if (isHomeNode) { // home node for clustered actor
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + serializerClassName +
"] for serialization of actor [" + address +
"] since " + reason)
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + serializerClassName +
"] for serialization of actor [" + address +
"] since " + reason)
val serializer: Serializer = {
if ((serializerClassName eq null) ||
(serializerClassName == "") ||
(serializerClassName == Format.defaultSerializerName)) {
Format.Default
} else {
val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match {
case Right(clazz) clazz
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
serializerErrorDueTo(cause.toString)
}
val f = clazz.newInstance.asInstanceOf[AnyRef]
if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer]
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]")
val serializer: Serializer = {
if ((serializerClassName eq null) ||
(serializerClassName == "") ||
(serializerClassName == Format.defaultSerializerName)) {
Format.Default
} else {
val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match {
case Right(clazz) clazz
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
serializerErrorDueTo(cause.toString)
}
val f = clazz.newInstance.asInstanceOf[AnyRef]
if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer]
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]")
}
}
if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
if (isHomeNode) { // home node for clustered actor
// home node, check out as LocalActorRef
cluster
.use(address, serializer)
.getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
.getOrElse(throw new ConfigurationException(
"Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
if (!cluster.isClustered(address)) {
cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
}
// Thread.sleep(5000)
// remote node (not home node), check out as ClusterActorRef
cluster.ref(address, DeploymentConfig.routerTypeFor(router))
}

View file

@ -159,6 +159,10 @@ object Deployer {
deployer
}
def start() {
instance.toString
}
def shutdown() {
instance.shutdown()
}
@ -325,9 +329,9 @@ object Deployer {
// --------------------------------
// akka.actor.deployment.<address>.clustered.replicas
// --------------------------------
val replicas = clusteredConfig.getAny("replicas", "1") match {
val replicas = clusteredConfig.getAny("replicas", "0") match {
case "auto" AutoReplicate
case "1" NoReplicas
case "0" NoReplicas
case nrOfReplicas: String
try {
Replicate(nrOfReplicas.toInt)
@ -335,7 +339,7 @@ object Deployer {
case e: NumberFormatException
throw new ConfigurationException(
"Config option [" + addressPath +
".clustered.replicas] needs to be either [\"auto\"] or [1-N] - was [" +
".clustered.replicas] needs to be either [\"auto\"] or [0-N] - was [" +
nrOfReplicas + "]")
}
}

View file

@ -29,6 +29,20 @@ trait RemoteModule {
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map?
private[akka] def actorsFactories: ConcurrentHashMap[String, () ActorRef] // FIXME what to do wit actorsFactories map?
private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address)
private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid)
private[akka] def findActorFactory(address: String): () ActorRef = actorsFactories.get(address)
private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = {
var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length))
else findActorByAddress(address)
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
/*
private[akka] def findActorByAddress(address: String): ActorRef = {
val cachedActorRef = actors.get(address)
if (cachedActorRef ne null) cachedActorRef
@ -71,6 +85,7 @@ trait RemoteModule {
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
*/
}
/**

View file

@ -28,6 +28,7 @@ import akka.util._
import Helpers._
import akka.actor._
import Actor._
import Status._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
@ -124,6 +125,8 @@ object Cluster {
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
val enableJMX = config.getBool("akka.enable-jmx", true)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
val excludeRefNodeInReplicaSet = config.getBool("akka.cluster.exclude-ref-node-in-replica-set", true)
@volatile
private var properties = Map.empty[String, String]
@ -518,7 +521,7 @@ class DefaultClusterNode private[akka] (
val uuid = actorRef.uuid
EventHandler.debug(this,
"Clustering actor [%s] with UUID [%s]".format(actorRef.address, uuid))
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)(format))
else toBinary(actorRef)(format)
@ -565,12 +568,30 @@ class DefaultClusterNode private[akka] (
ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)))
}
import RemoteClusterDaemon._
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection
connection ! command
(connection !! (command, remoteDaemonAckTimeout)) match {
case Some(Success)
EventHandler.debug(this,
"Replica for [%s] successfully created on [%s]"
.format(actorRef.address, connection))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
throw cause
case None
val error = new ClusterException(
"Operation to instantiate replicas throughout the cluster timed out, cause of error unknow")
EventHandler.error(error, this, error.toString)
throw error
}
}
this
@ -596,7 +617,7 @@ class DefaultClusterNode private[akka] (
def remove(address: String): ClusterNode = {
isConnected ifOn {
EventHandler.debug(this,
"Removing actor(s) with ADDRESS [%s] from cluster".format(address))
"Removing actor(s) with address [%s] from cluster".format(address))
uuidsForActorAddress(address) foreach (uuid remove(uuid))
}
this
@ -641,7 +662,8 @@ class DefaultClusterNode private[akka] (
actorUuidsForActorAddress(actorAddress) map { uuid
EventHandler.debug(this,
"Checking out actor with UUID [%s] to be used on node [%s]".format(uuid, nodeAddress.nodeName))
"Checking out actor with UUID [%s] to be used on node [%s] as local actor"
.format(uuid, nodeAddress.nodeName))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
@ -750,34 +772,14 @@ class DefaultClusterNode private[akka] (
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Creating cluster actor ref with router [%s] for actors [%s]".format(router, addresses.mkString(", ")))
"Checking out cluster actor ref with address [%s] and router [%s] connected to [\n\t%s]"
.format(actorAddress, router, addresses.mkString("\n\t")))
def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) {
addresses foreach {
case (_, address) clusterActorRefs.put(address, actorRef)
}
}
// FIXME remove?
def refByUuid(uuid: UUID): ActorRef = {
val actor = Router newRouter (router, addresses, uuidToString(uuid), Actor.TIMEOUT)
registerClusterActorRefForAddress(actor, addresses)
actor
}
def refByAddress(actorAddress: String): ActorRef = {
//FIXME: unused uuids
val uuids = uuidsForActorAddress(actorAddress)
val actor = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
registerClusterActorRefForAddress(actor, addresses)
actor
}
refByAddress(actorAddress).start()
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach { case (_, address) clusterActorRefs.put(address, actorRef) }
actorRef.start()
} else throw new ClusterException("Not connected to cluster")
@ -1103,7 +1105,8 @@ class DefaultClusterNode private[akka] (
new InetSocketAddress(hostname, port)
}
private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = uuidsForActorAddress(actorAddress) filter (_ ne null)
private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] =
uuidsForActorAddress(actorAddress) filter (_ ne null)
/**
* Returns a random set with replica connections of size 'replicationFactor'.
@ -1113,7 +1116,7 @@ class DefaultClusterNode private[akka] (
var replicas = HashSet.empty[ActorRef]
if (replicationFactor < 1) return replicas
connectToAllReplicas()
connectToAllMembershipNodesInCluster()
val numberOfReplicas = replicaConnections.size
val replicaConnectionsAsArray = replicaConnections.toList map {
@ -1138,10 +1141,14 @@ class DefaultClusterNode private[akka] (
/**
* Connect to all available replicas unless already connected).
*/
private def connectToAllReplicas() {
private def connectToAllMembershipNodesInCluster() {
val runOnThisNode = false // (node: String) !excludeRefNodeInReplicaSet && node != Config.nodename
membershipNodes foreach { node
if (!replicaConnections.contains(node)) {
// if (runOnThisNode(node) && !replicaConnections.contains(node)) { // only connect to each replica once
if (!replicaConnections.contains(node)) { // only connect to each replica once
val address = addressForNode(node)
EventHandler.debug(this,
"Connecting to replica with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
replicaConnections.put(node, (address, clusterDaemon))
}
@ -1453,6 +1460,8 @@ object RemoteClusterDaemon {
val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build
}
// FIXME supervise RemoteClusterDaemon
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -1466,28 +1475,42 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
def receive: Receive = {
case message: RemoteDaemonMessageProtocol
EventHandler.debug(this, "Received command to RemoteClusterDaemon [%s]".format(message))
message.getMessageType match {
case USE
if (message.hasActorUuid) {
val uuid = uuidProtocolToUuid(message.getActorUuid)
val address = cluster.actorAddressForUuid(uuid)
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else if (message.hasActorAddress) {
val address = message.getActorAddress
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else EventHandler.warning(this,
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]".format(message))
try {
if (message.hasActorUuid) {
val uuid = uuidProtocolToUuid(message.getActorUuid)
val address = cluster.actorAddressForUuid(uuid)
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else if (message.hasActorAddress) {
val address = message.getActorAddress
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else {
EventHandler.warning(this,
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]"
.format(message))
}
self.reply(Success)
} catch {
case error
self.reply(Failure(error))
throw error
}
case RELEASE
if (message.hasActorUuid) {
cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid))
} else if (message.hasActorAddress) {
cluster release message.getActorAddress
} else EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message))
} else {
EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]"
.format(message))
}
case START cluster.start()

View file

@ -26,7 +26,9 @@ class ClusterActorRef private[akka] (
extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef
this: ClusterActorRef with Router.Router
EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(actorAddress))
EventHandler.debug(this,
"Creating a ClusterActorRef for actor with address [%s] with connections [\n\t%s]"
.format(actorAddress, inetSocketAddresses.mkString("\n\t")))
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
(Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) {

View file

@ -70,33 +70,6 @@ object ClusterDeployer {
private val systemDeployments: List[Deploy] = Nil
private[akka] def init(deployments: List[Deploy]) {
isConnected switchOn {
baseNodes foreach { path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e
val error = new DeploymentException(e.toString)
EventHandler.error(error, this)
throw error
}
}
val allDeployments = deployments ::: systemDeployments
EventHandler.info(this, "Initializing cluster deployer")
if (deploymentLock.lock()) {
// try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]")
allDeployments foreach (deploy(_)) // deploy
deploymentLock.unlock() // signal deployment complete
} else {
deploymentCompleted.await() // wait until deployment is completed
}
}
}
def shutdown() {
isConnected switchOff {
// undeploy all
@ -117,19 +90,95 @@ object ClusterDeployer {
}
}
def lookupDeploymentFor(address: String): Option[Deploy] = ensureRunning {
LocalDeployer.lookupDeploymentFor(address) match { // try local cache
case Some(deployment) // in local cache
deployment
case None // not in cache, check cluster
val deployment =
try {
Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy])
} catch {
case e: ZkNoNodeException None
case e: Exception
EventHandler.warning(this, e.toString)
None
}
deployment foreach (LocalDeployer.deploy(_)) // cache it in local cache
deployment
}
}
def fetchDeploymentsFromCluster: List[Deploy] = ensureRunning {
val addresses =
try {
zkClient.getChildren(deploymentPath).toList
} catch {
case e: ZkNoNodeException List[String]()
}
val deployments = addresses map { address
zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]
}
EventHandler.info(this, "Fetched clustered deployments [\n\t%s\n]" format deployments.mkString("\n\t"))
deployments
}
private[akka] def init(deployments: List[Deploy]) {
println("===============================================================")
println("------------ INIT 1")
isConnected switchOn {
EventHandler.info(this, "Initializing cluster deployer")
baseNodes foreach { path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e
val error = new DeploymentException(e.toString)
EventHandler.error(error, this)
throw error
}
}
println("------------ INIT 2")
val allDeployments = deployments ::: systemDeployments
// FIXME need to wrap in if (!deploymentDone) { .. }
if (deploymentLock.lock()) {
println("------------ INIT 3")
// try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]")
println("------------ INIT 4")
allDeployments foreach (deploy(_)) // deploy
println("------------ INIT 5")
// FIXME need to set deployment done flag
deploymentLock.unlock() // signal deployment complete
} else {
println("------------ INIT WAITING")
deploymentCompleted.await() // wait until deployment is completed by other "master" node
}
println("------------ INIT 6")
// fetch clustered deployments and deploy them locally
fetchDeploymentsFromCluster foreach (LocalDeployer.deploy(_))
}
}
private[akka] def deploy(deployment: Deploy) {
ensureRunning {
LocalDeployer.deploy(deployment)
deployment match {
case Deploy(_, _, _, Local) // local deployment
LocalDeployer.deploy(deployment)
case Deploy(_, _, _, Local) {} // local deployment, do nothing here
case _ // cluster deployment
val path = deploymentAddressPath.format(deployment.address)
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
zkClient.writeData(path, deployment)
// FIXME trigger cluster-wide deploy action
} catch {
case e: NullPointerException
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
@ -140,17 +189,6 @@ object ClusterDeployer {
}
}
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = ensureRunning {
try {
Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy])
} catch {
case e: ZkNoNodeException None
case e: Exception
EventHandler.warning(this, e.toString)
None
}
}
private def ensureRunning[T](body: T): T = {
if (isConnected.isOn) body
else throw new IllegalStateException("ClusterDeployer is not running")

View file

@ -8,6 +8,7 @@ import Cluster._
import akka.actor._
import Actor._
import akka.dispatch.Future
import akka.event.EventHandler
import akka.routing.{ RouterType, RoutingException }
import RouterType._
@ -52,22 +53,32 @@ object Router {
trait BasicRouter extends Router {
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
case Some(actor) actor.!(message)(sender)
case _ throw new RoutingException("No node connections for router")
case _ throwNoConnectionsError()
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
case Some(actor) actor.!!!(message, timeout)(sender)
case _ throw new RoutingException("No node connections for router")
case _ throwNoConnectionsError()
}
protected def next: Option[ActorRef]
private def throwNoConnectionsError() = {
val error = new RoutingException("No replica connections for router")
EventHandler.error(error, this, error.toString)
throw error
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Direct extends BasicRouter {
lazy val next: Option[ActorRef] = connections.values.headOption
lazy val next: Option[ActorRef] = {
val connection = connections.values.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
connection
}
}
/**
@ -77,8 +88,10 @@ object Router {
private val random = new java.util.Random(System.currentTimeMillis)
def next: Option[ActorRef] =
if (connections.isEmpty) None
else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next)
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next)
}
/**
@ -100,8 +113,13 @@ object Router {
case xs xs
}
if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
else findNext
if (newItems.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else {
if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
else findNext
}
}
findNext

View file

@ -39,6 +39,16 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
oldDeployment must equal(newDeployment.get)
}
}
"be able to fetch deployments from ZooKeeper" in {
val deployments1 = Deployer.deploymentsInConfig
deployments1 must not equal (Nil)
ClusterDeployer.init(deployments1)
val deployments2 = ClusterDeployer.fetchDeploymentsFromCluster
deployments2.size must equal(1)
deployments2.first must equal(deployments1.first)
}
}
override def beforeAll() {

View file

@ -1,5 +1,5 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 2
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.stateless = on

View file

@ -1,5 +1,5 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 2
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.stateless = on

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster.store_actor
package akka.cluster.routing.roundrobin_1_replica
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -13,20 +13,19 @@ import akka.actor._
import Actor._
import akka.config.Config
object StoreActorMultiJvmSpec {
object RoundRobin1ReplicaMultiJvmSpec {
val NrOfNodes = 2
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
println("GOT HELLO on NODE: " + Config.nodename)
self.reply("World from node [" + Config.nodename + "]")
}
}
}
class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import StoreActorMultiJvmSpec._
class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin1ReplicaMultiJvmSpec._
"A cluster" must {
@ -40,13 +39,6 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA
Cluster.barrier("start-node2", NrOfNodes) {}
Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {
val hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[LocalActorRef] must be(true)
}
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {}
@ -64,8 +56,8 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA
}
}
class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers {
import StoreActorMultiJvmSpec._
class RoundRobin1ReplicaMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin1ReplicaMultiJvmSpec._
"A cluster" must {
@ -79,8 +71,6 @@ class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers {
Cluster.node.start()
}
Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {}
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[HelloWorld]("service-hello")

View file

@ -209,7 +209,7 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
if (isRunning) {
EventHandler.debug(this, "Sending remote message [%s]".format(request))
EventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, request))
if (request.getOneWay) {
try {
@ -550,16 +550,28 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(actorAddress: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
val inetSocketAddress = this.address
protected[akka] def actorFor(
actorAddress: String,
timeout: Long,
host: String,
port: Int,
loader: Option[ClassLoader]): ActorRef = {
val homeInetSocketAddress = this.address
if (optimizeLocalScoped_?) {
if ((host == inetSocketAddress.getAddress.getHostAddress || host == inetSocketAddress.getHostName) && port == inetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals?
if ((host == homeInetSocketAddress.getAddress.getHostAddress ||
host == homeInetSocketAddress.getHostName) &&
port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
RemoteActorRef(inetSocketAddress, actorAddress, timeout, loader)
val remoteInetSocketAddress = new InetSocketAddress(host, port)
EventHandler.debug(this,
"Creating RemoteActorRef with address [%s] connected to [%s]"
.format(actorAddress, remoteInetSocketAddress))
RemoteActorRef(remoteInetSocketAddress, actorAddress, timeout, loader)
}
}
@ -832,7 +844,7 @@ class RemoteServerHandler(
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)gddd
actor collectionAsScalaIterable(map.values)
) {
try { actor ! PoisonPill } catch { case e: Exception }
}
@ -923,11 +935,27 @@ class RemoteServerHandler(
}
}
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
* If actor already created then just return it from the registry.
*
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
EventHandler.debug(this,
"Creating an remotely available actor for address [%s] on node [%s]"
.format(address, Config.nodename))
val actorRef = Actor.createActor(address, () createSessionActor(actorInfo, channel))
if (actorRef eq null) throw new IllegalActorStateException(
"Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]")
actorRef
}
/**
* gets the actor from the session, or creates one if there is a factory for it
@ -950,29 +978,12 @@ class RemoteServerHandler(
}
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
* If actor already created then just return it from the registry.
*
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
EventHandler.debug(this, "Creating an remotely available actor for address [%s] on node [%s]".format(address, Config.nodename))
val actorRef = server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null createSessionActor(actorInfo, channel) // FIXME now session scoped actors are disabled, how to introduce them?
case actorRef actorRef
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
}
if (actorRef eq null) throw new IllegalActorStateException("Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]")
actorRef
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(

View file

@ -52,9 +52,9 @@ akka {
# available: "host:<hostname>", "ip:<ip address>" and "node:<node name>"
# default is "host:localhost"
replicas = 3 # number of actor replicas in the cluster
# available: integer above 0 (1-N) or the string "auto" for auto-scaling
# available: positivoe integer (0-N) or the string "auto" for auto-scaling
# if "auto" is used then 'home' has no meaning
# default is '1';
# default is '0', meaning no replicas;
stateless = on # is the actor stateless or stateful
# if turned 'on': actor is defined as stateless and can be load-balanced accordingly
# if turned 'off' (or omitted): actor is defined as stateful which means replicatable through transaction log
@ -132,6 +132,10 @@ akka {
session-timeout = 60
connection-timeout = 60
use-compression = off
remote-daemon-ack-timeout = 30
exclude-ref-node-in-replica-set = on # should a replica be instantiated on the same node as the
# cluster reference to the actor
# default: on
replication {
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)