Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-05-17 13:22:25 +02:00
commit 61723ff54e
12 changed files with 255 additions and 148 deletions

View file

@ -17,7 +17,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
deployment must equal (Some(Deploy(
"service-pi",
RoundRobin,
"akka.serializer.Format$Default$",
"akka.serialization.Format$Default$",
Clustered(
Node("test-1"),
Replicate(3),

View file

@ -4,9 +4,11 @@
package akka.actor
import DeploymentConfig._
import akka.dispatch._
import akka.config.Config._
import akka.util.{ListenerManagement, ReflectiveAccess, Duration, Helpers}
import ReflectiveAccess._
import Helpers.{narrow, narrowSilently}
import akka.remoteinterface.RemoteSupport
import akka.japi.{Creator, Procedure}
@ -17,6 +19,7 @@ import akka.event.EventHandler
import scala.reflect.BeanProperty
import com.eaio.uuid.UUID
import java.lang.reflect.InvocationTargetException
/**
@ -203,96 +206,12 @@ object Actor extends ListenerManagement {
* </pre>
*/
def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = {
import DeploymentConfig._
import ReflectiveAccess._
Address.validate(address)
try {
Deployer.deploymentFor(address) match {
case Deploy(_, router, _, Local) =>
// FIXME handle 'router' in 'Local' actors
newLocalActorRef(clazz, address)
case Deploy(_, router, formatClassName, Clustered(home, replication, state)) =>
ClusterModule.ensureEnabled()
if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val hostname = home match {
case Host(hostname) => hostname
case IP(address) => address
case Node(nodeName) => "localhost" // FIXME lookup hostname for node name
}
val replicas = replication match {
case Replicate(replicas) => replicas
case AutoReplicate => -1
case NoReplicas => 0
}
import ClusterModule.node
if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor
def formatErrorDueTo(reason: String) = {
throw new akka.config.ConfigurationException(
"Could not create Format[T] object [" + formatClassName +
"] for serialization of actor [" + address +
"] since " + reason)
}
implicit val format: Format[T] = {
if (formatClassName == "N/A") formatErrorDueTo("no class name defined in configuration")
val f = ReflectiveAccess.getObjectFor(formatClassName) match {
case Right(actor) => actor
case Left(exception) =>
val cause = exception match {
case i: InvocationTargetException => i.getTargetException
case _ => exception
}
formatErrorDueTo(" " + cause.toString)
}
if (f.isInstanceOf[Format[T]]) f.asInstanceOf[Format[T]]
else formatErrorDueTo("class must be of type [akka.serialization.Format[T]]")
}
if (!node.isClustered(address)) node.store(address, clazz, replicas, false)
node.use(address)
} else {
//val router =
node.ref(address, null)
}
sys.error("Clustered deployment not yet supported")
/*
val remoteAddress = Actor.remote.address
if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) {
// home node for actor
} else {
}
*/
/*
2. Check Home(..)
a) If home is same as Actor.remote.address then:
- check if actor is stored in ZK, if not; node.store(..)
- checkout actor using node.use(..)
b) If not the same
- check out actor using node.ref(..)
Misc stuff:
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- ClusterNode API and Actor.remote API should be made private[akka]
- Rewrite ClusterSpec or remove it
- Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor
- Should we allow configuring of session-scoped remote actors? How?
*/
RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor)
case invalid => throw new IllegalActorStateException(
"Could not create actor [" + clazz.getName +
"] with address [" + address +
"], not bound to a valid deployment scheme [" + invalid + "]")
case Deploy(_, router, _, Local) => newLocalActorRef(clazz, address) // FIXME handle 'router' in 'Local' actors
case deploy => newClusterActorRef[T](clazz, address, deploy)
}
} catch {
case e: DeploymentException =>
@ -340,6 +259,7 @@ object Actor extends ListenerManagement {
* </pre>
*/
def actorOf[T <: Actor](factory: => T, address: String): ActorRef = {
// FIXME use deployment info
Address.validate(address)
new LocalActorRef(() => factory, address)
}
@ -364,6 +284,7 @@ object Actor extends ListenerManagement {
* JAVA API
*/
def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = {
// FIXME use deployment info
Address.validate(address)
new LocalActorRef(() => creator.create, address)
}
@ -393,6 +314,25 @@ object Actor extends ListenerManagement {
}).start() ! Spawn
}
/**
* Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
*/
implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)
/**
* Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
* This means that the following code is equivalent:
* (actor !! "foo").as[Int] (Deprecated)
* and
* (actor !!! "foo").as[Int] (Recommended)
*/
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
try { anyFuture.await } catch { case t: FutureTimeoutException => }
anyFuture.resultOrException
})
private[akka] def newLocalActorRef(clazz: Class[_ <: Actor], address: String): ActorRef = {
new LocalActorRef(() => {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
@ -413,24 +353,96 @@ object Actor extends ListenerManagement {
}, address)
}
/**
* Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
*/
implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)
private def newClusterActorRef[T <: Actor](clazz: Class[T], address: String, deploy: Deploy): ActorRef = {
deploy match {
case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) =>
ClusterModule.ensureEnabled()
/**
* Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
* This means that the following code is equivalent:
* (actor !! "foo").as[Int] (Deprecated)
* and
* (actor !!! "foo").as[Int] (Recommended)
*/
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
try { anyFuture.await } catch { case t: FutureTimeoutException => }
anyFuture.resultOrException
})
if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val hostname = home match {
case Host(hostname) => hostname
case IP(address) => address
case Node(nodeName) => "localhost" // FIXME lookup hostname for node name
}
val replicas = replication match {
case Replicate(replicas) => replicas
case AutoReplicate => -1
case AutoReplicate() => -1
case NoReplicas => 0
case NoReplicas() => 0
}
import ClusterModule.node
node.start() // start cluster node
if (hostname == RemoteModule.remoteServerHostname) { // home node for clustered actor
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + serializerClassName +
"] for serialization of actor [" + address +
"] since " + reason)
val serializer: Serializer = {
if (serializerClassName == "N/A") serializerErrorDueTo("no class name defined in configuration")
val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match {
case Right(clazz) => clazz
case Left(exception) =>
val cause = exception match {
case i: InvocationTargetException => i.getTargetException
case _ => exception
}
serializerErrorDueTo(cause.toString)
}
val f = clazz.newInstance.asInstanceOf[AnyRef]
if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer]
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer")
}
// FIXME use the serializer above instead of dummy Format, but then the ClusterNode AND ActorRef serialization needs to be rewritten
implicit val format: Format[T] = null
sys.error("FIXME use the serializer above instead of dummy Format, but then the ClusterNode AND ActorRef serialization needs to be rewritten")
if (!node.isClustered(address)) node.store(address, clazz, replicas, false)
node.use(address)
} else {
val routerType = router match {
case Direct => RouterType.Direct
case Direct() => RouterType.Direct
case RoundRobin => RouterType.RoundRobin
case RoundRobin() => RouterType.RoundRobin
case Random => RouterType.Random
case Random() => RouterType.Random
case LeastCPU => RouterType.LeastCPU
case LeastCPU() => RouterType.LeastCPU
case LeastRAM => RouterType.LeastRAM
case LeastRAM() => RouterType.LeastRAM
case LeastMessages => RouterType.LeastMessages
case LeastMessages() => RouterType.LeastMessages
}
node.ref(address, routerType)
}
/*
Misc stuff:
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- ClusterNode API and Actor.remote API should be made private[akka]
- Rewrite ClusterSpec or remove it
- Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor
- Should we allow configuring of session-scoped remote actors? How?
*/
RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor)
case invalid => throw new IllegalActorStateException(
"Could not create actor [" + clazz.getName +
"] with address [" + address +
"], not bound to a valid deployment scheme [" + invalid + "]")
}
}
}
/**

View file

@ -333,8 +333,7 @@ object LocalDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy]
private[akka] def init(deployments: List[Deploy]) {
EventHandler.info(this, "Initializing local deployer")
EventHandler.info(this, "Deploying locally [\n" + deployments.mkString("\n\t") + "\n]")
EventHandler.info(this, "Initializing local deployer\nDeploying actors locally [\n%s\n]" format deployments.mkString("\n\t"))
deployments foreach (deploy(_)) // deploy
}

View file

@ -8,20 +8,6 @@ import akka.actor.Actor
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
/**
* Type class definition for Actor Serialization
*/
trait FromBinary[T <: Actor] {
def fromBinary(bytes: Array[Byte], act: T): T
}
trait ToBinary[T <: Actor] {
def toBinary(t: T): Array[Byte]
}
// client needs to implement Format[] for the respective actor
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -33,6 +19,48 @@ trait Serializer extends scala.Serializable {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
trait FromBinary[T <: Actor] {
def fromBinary(bytes: Array[Byte], act: T): T
}
trait ToBinary[T <: Actor] {
def toBinary(t: T): Array[Byte]
}
/**
* Type class definition for Actor Serialization.
* Client needs to implement Format[] for the respective actor.
*/
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
/**
*
*/
object Format {
object Default extends Serializer {
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
//import org.apache.commons.io.input.ClassLoaderObjectInputStream
def toBinary(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
out.close
bos.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in =
//if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else
new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
obj
}
}
}
/**
* A default implementation for a stateless actor
*
@ -60,7 +88,7 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* val serializer = Serializers.Java
* }
* }
* }
* </pre>
*/

View file

@ -16,6 +16,9 @@ object RouterType {
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
object LeastCPU extends RouterType
object LeastRAM extends RouterType
object LeastMessages extends RouterType
}
// FIXME move all routing in cluster here when we can

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster
import akka.util.ReflectiveAccess.ClusterModule
/**
* Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class NodeAddress(
val clusterName: String,
val nodeName: String,
val hostname: String,
val port: Int) {
if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string")
if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string")
if (port < 1) throw new NullPointerException("Port can not be negative")
override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port)
override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.##
override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other)
}
object NodeAddress {
def apply(
clusterName: String = ClusterModule.name,
nodeName: String = ClusterModule.nodeName,
hostname: String = ClusterModule.hostname,
port: Int = ClusterModule.remoteServerPort): NodeAddress =
new NodeAddress(clusterName, nodeName, hostname, port)
def unapply(other: Any) = other match {
case address: NodeAddress => Some((address.clusterName, address.nodeName, address.hostname, address.port))
case _ => None
}
}

View file

@ -37,6 +37,27 @@ object ReflectiveAccess {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ClusterModule {
import java.net.InetAddress
import com.eaio.uuid.UUID
import akka.cluster.NodeAddress
import Config.{config, TIME_UNIT}
val name = config.getString("akka.cluster.name", "default")
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
// FIXME allow setting hostname from command line or node local property file
val hostname = InetAddress.getLocalHost.getHostName
// FIXME allow setting nodeName from command line or node local property file
val nodeName = new UUID().toString
val nodeAddress = NodeAddress(name, nodeName, hostname, remoteServerPort)
lazy val isEnabled = clusterInstance.isDefined
def ensureEnabled() {
@ -71,7 +92,7 @@ object ReflectiveAccess {
lazy val node: ClusterNode = {
ensureEnabled()
clusterInstance.get.newNode()
clusterInstance.get.newNode(nodeAddress, zooKeeperServers)
}
lazy val clusterDeployer: ClusterDeployer = {
@ -80,6 +101,9 @@ object ReflectiveAccess {
}
type ClusterNode = {
def start()
def shutdown()
def store[T <: Actor]
(address: String, actorClass: Class[T], replicas: Int, serializeMailbox: Boolean)
(implicit format: Format[T])
@ -105,11 +129,7 @@ object ReflectiveAccess {
}
type Cluster = {
def newNode(
//nodeAddress: NodeAddress,
//zkServerAddresses: String,
//serializer: ZkSerializer
): ClusterNode
def newNode(nodeAddress: NodeAddress, zkServerAddresses: String): ClusterNode
}
type Mailbox = {
@ -263,7 +283,7 @@ object ReflectiveAccess {
Left(e)
}
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception,Class[T]] = try {
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try {
assert(fqn ne null)
// First, use the specified CL

View file

@ -92,21 +92,6 @@ trait ClusterNodeMBean {
def getConfigElementKeys: Array[String]
}
/**
* Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class NodeAddress(
clusterName: String,
nodeName: String,
hostname: String = Cluster.lookupLocalhostName,
port: Int = Cluster.remoteServerPort) {
if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string")
if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string")
override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port)
}
/**
* Factory object for ClusterNode. Also holds global state such as configuration data etc.
*

View file

@ -14,12 +14,13 @@ import akka.cluster.zookeeper.AkkaZkClient
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener}
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import scala.collection.JavaConversions.collectionAsScalaIterable
import com.eaio.uuid.UUID
import java.util.concurrent.CountDownLatch
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -46,9 +47,6 @@ object ClusterDeployer {
zk
}
// FIXME invert dependency; let Cluster have an instance of ClusterDeployer instead
lazy val cluster = Cluster(NodeAddress(clusterName, nodeName))
private val clusterDeploymentLockListener = new LockListener {
def lockAcquired() {
EventHandler.debug(this, "Clustered deployment started")

View file

@ -41,6 +41,10 @@ object Router {
case RoundRobin => new ClusterActorRef(
addresses, serviceId, timeout,
actorType, replicationStrategy) with RoundRobin
case LeastCPU => sys.error("Router LeastCPU not supported yet")
case LeastRAM => sys.error("Router LeastRAM not supported yet")
case LeastMessages => sys.error("Router LeastMessages not supported yet")
}
}

View file

@ -8,7 +8,16 @@ import org.I0Itec.zkclient._
import akka.actor._
object ClusterDeployerSpec {
class Pi extends Actor {
def receive = {
case "Hello" => self.reply("World")
}
}
}
class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
import ClusterDeployerSpec._
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
@ -27,6 +36,12 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
oldDeployment must equal(newDeployment.get)
}
}
/*
"be able to create an actor deployed using ClusterDeployer" in {
val pi = Actor.actorOf[Pi]("service-pi")
pi must not equal(null)
}
*/
}
override def beforeAll() {

View file

@ -45,7 +45,7 @@ akka {
# available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class
# default is "direct";
format = "akka.serializer.Format$Default$"
format = "akka.serialization.Format$Default$"
clustered { # makes the actor available in the cluster registry
# default (if omitted) is local non-clustered actor
home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor