This commit is contained in:
Peter Veentjer 2011-07-26 08:16:26 +03:00
parent 6a91ec0baa
commit 96cc0a00b4
13 changed files with 293 additions and 47 deletions

View file

@ -82,7 +82,11 @@ case class MaximumNumberOfRestartsWithinTimeRangeReached(
// Exceptions for Actors
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause){
def this(msg: String) = this(msg, null);
}
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)

View file

@ -133,8 +133,10 @@ object DeploymentConfig {
case Host("localhost") Config.nodename
case IP("0.0.0.0") Config.nodename
case IP("127.0.0.1") Config.nodename
case Host(hostname) throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
case IP(address) throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
case Host(hostname) throw new UnsupportedOperationException(
"Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
case IP(address) throw new UnsupportedOperationException(
"Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == Config.nodename)

View file

@ -933,7 +933,8 @@ class DefaultClusterNode private[akka](
*/
def release(actorAddress: String) {
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no
// longer available. Then what to do? Should we even remove this method?
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))

View file

@ -3,22 +3,19 @@
*/
package akka.cluster
import Cluster._
import akka.actor._
import Actor._
import akka.dispatch._
import akka.util._
import ReflectiveAccess._
import ClusterModule._
import akka.event.EventHandler
import akka.dispatch.Future
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map JMap }
import java.util.{Map JMap}
import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@ -26,11 +23,11 @@ import com.eaio.uuid.UUID
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ClusterActorRef private[akka] (
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
extends ActorRef with ScalaActorRef { this: Router.Router
class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
extends ActorRef with ScalaActorRef {
this: Router.Router
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
@ -45,29 +42,71 @@ class ClusterActorRef private[akka] (
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
route(message)(sender)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
route[Any](message, timeout)(sender)
}
private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) {
inetSocketAddressToActorRefMap set (inetSocketAddressToActorRefMap.get map {
case (`fromInetSocketAddress`, actorRef)
actorRef.stop()
(toInetSocketAddress, createRemoteActorRef(actorRef.address, toInetSocketAddress))
case other other
})
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
@tailrec
def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
val oldValue = inetSocketAddressToActorRefMap.get
val newValue = oldValue map {
case (`from`, actorRef)
actorRef.stop()
(to, createRemoteActorRef(actorRef.address, to))
case other other
}
if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
doFailover(from, to)
}
doFailover(from, to)
}
/**
* Removes the given address (and the corresponding actorref) from this ClusteredActorRef.
*
* Call can safely be made when the address is missing.
*
* Call is threadsafe.
*/
@tailrec
private def remove(address: InetSocketAddress): Unit = {
val oldValue = inetSocketAddressToActorRefMap.get()
var newValue = oldValue - address
if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
remove(address)
}
def signalDeadActor(ref: ActorRef): Unit = {
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
val it = connections.keySet.iterator
while (it.hasNext) {
val address = it.next()
val foundRef: ActorRef = connections.get(address).get
if (foundRef == ref) {
remove(address)
return
}
}
}
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {

View file

@ -3,13 +3,10 @@
*/
package akka.cluster
import Cluster._
import akka.actor._
import Actor._
import akka.dispatch.Future
import akka.event.EventHandler
import akka.routing.{ RouterType, RoutingException }
import akka.routing.{RouterType, RoutingException}
import RouterType._
import com.eaio.uuid.UUID
@ -24,16 +21,16 @@ import java.util.concurrent.atomic.AtomicReference
*/
object Router {
def newRouter(
routerType: RouterType,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
actorAddress: String,
timeout: Long): ClusterActorRef = {
routerType: RouterType,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
actorAddress: String,
timeout: Long): ClusterActorRef = {
routerType match {
case Direct new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Direct
case Random new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Random
case RoundRobin new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with RoundRobin
case LeastCPU sys.error("Router LeastCPU not supported yet")
case LeastRAM sys.error("Router LeastRAM not supported yet")
case Direct new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Direct
case Random new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Random
case RoundRobin new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with RoundRobin
case LeastCPU sys.error("Router LeastCPU not supported yet")
case LeastRAM sys.error("Router LeastRAM not supported yet")
case LeastMessages sys.error("Router LeastMessages not supported yet")
}
}
@ -42,22 +39,49 @@ object Router {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Router {
def connections: Map[InetSocketAddress, ActorRef]
def signalDeadActor(ref: ActorRef): Unit
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
/**
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* This also is the location where a failover is done in the future if an ActorRef fails and a different
* one needs to be selected.
*/
trait BasicRouter extends Router {
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
case Some(actor) actor.!(message)(sender)
case _ throwNoConnectionsError()
case Some(actor) {
try {
actor.!(message)(sender)
} catch {
case e: Exception =>
signalDeadActor(actor)
throw e
}
}
case _ throwNoConnectionsError()
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
case Some(actor) actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
case _ throwNoConnectionsError()
case Some(actor) {
try {
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Throwable =>
signalDeadActor(actor)
throw e
}
}
case _ throwNoConnectionsError()
}
protected def next: Option[ActorRef]
@ -73,6 +97,7 @@ object Router {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Direct extends BasicRouter {
lazy val next: Option[ActorRef] = {
val connection = connections.values.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
@ -90,7 +115,9 @@ object Router {
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
} else {
Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
}
}
/**
@ -109,7 +136,7 @@ object Router {
val currentItems = current.get
val newItems = currentItems match {
case Nil items
case xs xs
case xs xs
}
if (newItems.isEmpty) {
@ -124,4 +151,5 @@ object Router {
findNext
}
}
}

View file

@ -36,6 +36,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
}
trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
override def beforeAll() = {
ClusterTestNode.waitForReady(getClass.getName)
}

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
akka.cluster.client.buffering.retry-message-send-on-failure = false

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
akka.cluster.client.buffering.retry-message-send-on-failure = false

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
akka.cluster.client.buffering.retry-message-send-on-failure = false

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -0,0 +1,150 @@
/*
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.reflogic
import akka.cluster._
import akka.cluster.Cluster._
import akka.actor.Actor
import akka.routing.RoutingException
import java.nio.channels.{ClosedChannelException, NotYetConnectedException}
object ClusterActorRefCleanupMultiJvmSpec {
val NrOfNodes = 3
class TestActor extends Actor with Serializable {
println("--------------------------------------")
println("TestActor created")
println("--------------------------------------")
def receive = {
case "Die" =>
println("Killing JVM: " + Cluster.node.nodeAddress)
System.exit(0)
case _ =>
println("Hello")
}
}
}
class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
import ClusterActorRefCleanupMultiJvmSpec._
val testNodes = NrOfNodes
"ClusterActorRef" must {
"cleanup itself" in {
node.start
barrier("awaitStarted", NrOfNodes).await()
val ref = Actor.actorOf[ClusterActorRefCleanupMultiJvmSpec.TestActor]("service-test")
ref.isInstanceOf[ClusterActorRef] must be(true)
val clusteredRef = ref.asInstanceOf[ClusterActorRef]
//verify that all remote actors are there.
clusteredRef.connections.size must be(2)
//let one of the actors die.
clusteredRef ! "Die"
//just some waiting to make sure that the node has died.
Thread.sleep(5000)
//send some request, this should trigger the cleanup
try {
clusteredRef ! "hello"
clusteredRef ! "hello"
} catch {
case e: NotYetConnectedException =>
}
//since the call to the node failed, the node must have been removed from the list.
clusteredRef.connections.size must be(1)
//send a message to this node,
clusteredRef ! "hello"
//now kill another node
clusteredRef ! "Die"
//just some waiting to make sure that the node has died.
Thread.sleep(5000)
//trigger the cleanup.
try {
clusteredRef ! "hello"
} catch {
case e: ClosedChannelException =>
case e: NotYetConnectedException =>
case e: RoutingException =>
}
//now there must not be any remaining connections after the dead of the last actor.
clusteredRef.connections.size must be(0)
//and lets make sure we now get the correct exception if we try to use the ref.
try {
clusteredRef ! "Hello"
assert(false)
} catch {
case e: RoutingException =>
}
node.shutdown()
}
}
}
class ClusterActorRefCleanupMultiJvmNode2 extends ClusterTestNode {
import ClusterActorRefCleanupMultiJvmSpec._
val testNodes = NrOfNodes
//we are only using the nodes for their capacity, not for testing on this node itself.
"___" must {
"___" in {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
ClusterTestNode.exit(classOf[ClusterActorRefCleanupMultiJvmNode2].getName)
}
})
node.start()
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()
node.shutdown()
}
}
}
class ClusterActorRefCleanupMultiJvmNode3 extends ClusterTestNode {
import ClusterActorRefCleanupMultiJvmSpec._
val testNodes = NrOfNodes
//we are only using the nodes for their capacity, not for testing on this node itself.
"___" must {
"___" in {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
ClusterTestNode.exit(classOf[ClusterActorRefCleanupMultiJvmNode3].getName)
}
})
node.start()
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()
node.shutdown()
}
}
}