Removed reflective coupling to akka cloud

This commit is contained in:
Jonas Bonér 2010-11-22 17:58:21 +01:00
parent fd2faeccba
commit 188d7e9c19
8 changed files with 33 additions and 99 deletions

View file

@ -5,9 +5,9 @@
package akka.dispatch package akka.dispatch
import akka.actor.{Actor, ActorRef} import akka.actor.{Actor, ActorRef}
import akka.actor.newUuid
import akka.config.Config._ import akka.config.Config._
import akka.util.{Duration, Logging} import akka.util.{Duration, Logging}
import akka.actor.newUuid
import net.lag.configgy.ConfigMap import net.lag.configgy.ConfigMap

View file

@ -5,12 +5,11 @@
package akka.dispatch package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException} import akka.actor.{ActorRef, IllegalActorStateException}
import akka.util.ReflectiveAccess.AkkaCloudModule import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue import java.util.Queue
import akka.util.Switch
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/** /**
* Default settings are: * Default settings are:
@ -119,17 +118,10 @@ class ExecutorBasedEventDrivenDispatcher(
} }
/** /**
* Creates and returns a durable mailbox for the given actor. * Creates and returns a durable mailbox for the given actor.
*/ */
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
// FIXME make generic (work for TypedActor as well) createMailbox(mailboxType.mailboxImplClassname, actorRef)
case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)

View file

@ -4,13 +4,15 @@
package akka.dispatch package akka.dispatch
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
import akka.actor.{Actor, ActorRef, IllegalActorStateException} import akka.actor.{Actor, ActorRef, IllegalActorStateException}
import akka.util.Switch import akka.util.Switch
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
/** /**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
* that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
@ -222,15 +224,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/** /**
* Creates and returns a durable mailbox for the given actor. * Creates and returns a durable mailbox for the given actor.
*/ */
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
// FIXME make generic (work for TypedActor as well) createMailbox(mailboxType.mailboxImplClassname, actorRef)
case FileBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("FileBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case BeanstalkBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("BeanstalkBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case RedisBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("RedisBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
}
private[akka] override def register(actorRef: ActorRef) = { private[akka] override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef) verifyActorsAreOfSameType(actorRef)

View file

@ -5,6 +5,7 @@
package akka.dispatch package akka.dispatch
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.util.Switch
import org.fusesource.hawtdispatch.DispatchQueue import org.fusesource.hawtdispatch.DispatchQueue
import org.fusesource.hawtdispatch.ScalaDispatch._ import org.fusesource.hawtdispatch.ScalaDispatch._
@ -13,7 +14,6 @@ import org.fusesource.hawtdispatch.ListEventAggregator
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import akka.util.Switch
/** /**
* Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher. * Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher.

View file

@ -5,7 +5,6 @@
package akka.dispatch package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.util.ReflectiveAccess.AkkaCloudModule
import akka.AkkaException import akka.AkkaException
import java.util.{Queue, List} import java.util.{Queue, List}
@ -42,15 +41,7 @@ case class BoundedMailbox(
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
} }
abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType { case class DurableMailboxType(mailboxImplClassname: String) extends MailboxType
if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null")
}
case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
class DefaultUnboundedMessageQueue(blockDequeue: Boolean) class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {

View file

@ -6,7 +6,8 @@ package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.util. {Switch, ReentrantGuard, Logging, HashCode}
import akka.util.{Switch, ReentrantGuard, Logging, HashCode, ReflectiveAccess}
import akka.actor._ import akka.actor._
/** /**
@ -59,11 +60,27 @@ object MessageDispatcher {
*/ */
trait MessageDispatcher extends MailboxFactory with Logging { trait MessageDispatcher extends MailboxFactory with Logging {
import MessageDispatcher._ import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid] protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val guard = new ReentrantGuard protected val guard = new ReentrantGuard
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
protected val active = new Switch(false) protected val active = new Switch(false)
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
/**
* Creates and returns a mailbox for the given actor.
*/
def createMailbox(mailboxImplClassname: String, actorRef: ActorRef): MessageQueue = {
ReflectiveAccess.createInstance(
mailboxImplClassname,
Array(classOf[ActorRef]),
Array(actorRef).asInstanceOf[Array[AnyRef]],
ReflectiveAccess.loader)
.getOrElse(throw new IllegalActorStateException(
"Could not create mailbox [" + mailboxImplClassname + "] for actor [" + actorRef + "]"))
.asInstanceOf[MessageQueue]
}
/** /**
* Attaches the specified actorRef to this dispatcher * Attaches the specified actorRef to this dispatcher
*/ */

View file

@ -22,11 +22,9 @@ object ReflectiveAccess extends Logging {
lazy val isRemotingEnabled = RemoteClientModule.isEnabled lazy val isRemotingEnabled = RemoteClientModule.isEnabled
lazy val isTypedActorEnabled = TypedActorModule.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled
lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureEnabled def ensureRemotingEnabled = RemoteClientModule.ensureEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureEnabled def ensureTypedActorEnabled = TypedActorModule.ensureEnabled
def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled
/** /**
* Reflective access to the RemoteClient module. * Reflective access to the RemoteClient module.
@ -173,49 +171,6 @@ object ReflectiveAccess extends Logging {
} }
} }
object AkkaCloudModule {
type Mailbox = {
def enqueue(message: MessageInvocation)
def dequeue: MessageInvocation
}
type Serializer = {
def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
lazy val isEnabled = clusterObjectInstance.isDefined
val clusterObjectInstance: Option[AnyRef] =
getObjectFor("akka.cloud.cluster.Cluster$")
val serializerClass: Option[Class[_]] =
getClassFor("akka.serialization.Serializer")
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Feature is only available in Akka Cloud")
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef)
private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = {
ensureEnabled
createInstance(
mailboxClassname,
Array(classOf[ActorRef]),
Array(actorRef).asInstanceOf[Array[AnyRef]],
loader)
.getOrElse(throw new IllegalActorStateException("Could not create durable mailbox [" + mailboxClassname + "] for actor [" + actorRef + "]"))
.asInstanceOf[Mailbox]
}
}
val noParams = Array[Class[_]]() val noParams = Array[Class[_]]()
val noArgs = Array[AnyRef]() val noArgs = Array[AnyRef]()

View file

@ -8,7 +8,7 @@
akka { akka {
version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "remote.ssl", "camel", "http"] enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
time-unit = "seconds" # Time unit for all timeout properties throughout the config time-unit = "seconds" # Time unit for all timeout properties throughout the config
@ -117,22 +117,6 @@ akka {
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
ssl {
service = off # NOTE: This feature is not deemed production ready and is not possible to turn on yet
# You can either use java command-line options or use the settings below
#key-store-type = "pkcs12" # Same as -Djavax.net.ssl.keyStoreType=pkcs12
#key-store = "yourcertificate.p12" # Same as -Djavax.net.ssl.keyStore=yourcertificate.p12
#key-store-pass = "$PASS" # Same as -Djavax.net.ssl.keyStorePassword=$PASS
#trust-store-type = "jks" # Same as -Djavax.net.ssl.trustStoreType=jks
#trust-store = "your.keystore" # Same as -Djavax.net.ssl.trustStore=your.keystore
#trust-store-pass = "$PASS" # Same as -Djavax.net.ssl.trustStorePassword=$PASS
debug = off # This can be useful for debugging. If on, very verbose debug, same as -Djavax.net.debug=ssl
}
server { server {
hostname = "localhost" # The hostname or IP that clients should connect to hostname = "localhost" # The hostname or IP that clients should connect to
port = 2552 # The port clients should connect to. Default is 2552 (AKKA) port = 2552 # The port clients should connect to. Default is 2552 (AKKA)