From ada24c7cf5b1dda26eb1ec2775ae2f91c4218e4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Mon, 22 Nov 2010 17:58:21 +0100 Subject: [PATCH] Removed reflective coupling to akka cloud --- .../scala/akka/dispatch/Dispatchers.scala | 2 +- .../ExecutorBasedEventDrivenDispatcher.scala | 18 +++----- ...sedEventDrivenWorkStealingDispatcher.scala | 15 +++---- .../scala/akka/dispatch/HawtDispatcher.scala | 2 +- .../scala/akka/dispatch/MailboxHandling.scala | 11 +---- .../scala/akka/dispatch/MessageHandling.scala | 21 ++++++++- .../scala/akka/util/ReflectiveAccess.scala | 45 ------------------- config/akka-reference.conf | 18 +------- 8 files changed, 33 insertions(+), 99 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 4d33bf03ce..bb58c8600a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -5,9 +5,9 @@ package akka.dispatch import akka.actor.{Actor, ActorRef} +import akka.actor.newUuid import akka.config.Config._ import akka.util.{Duration, Logging} -import akka.actor.newUuid import net.lag.configgy.ConfigMap diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index ee5dd890b8..2673d0af33 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,12 +5,11 @@ package akka.dispatch import akka.actor.{ActorRef, IllegalActorStateException} -import akka.util.ReflectiveAccess.AkkaCloudModule +import akka.util.{ReflectiveAccess, Switch} import java.util.Queue -import akka.util.Switch import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -119,17 +118,10 @@ class ExecutorBasedEventDrivenDispatcher( } /** - * Creates and returns a durable mailbox for the given actor. + * Creates and returns a durable mailbox for the given actor. */ - def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { - // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") - case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") - } + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = + createMailbox(mailboxType.mailboxImplClassname, actorRef) private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 69aa5d9365..42f3e0f4cf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -4,13 +4,15 @@ package akka.dispatch -import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} import akka.actor.{Actor, ActorRef, IllegalActorStateException} import akka.util.Switch + import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} import java.util.concurrent.atomic.AtomicReference +import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} + /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the @@ -222,15 +224,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Creates and returns a durable mailbox for the given actor. */ - private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { - // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("FileBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - case BeanstalkBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("BeanstalkBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - case RedisBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("RedisBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") - } + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = + createMailbox(mailboxType.mailboxImplClassname, actorRef) private[akka] override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) diff --git a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala index 1031ae4c9a..d35fc46617 100644 --- a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala @@ -5,6 +5,7 @@ package akka.dispatch import akka.actor.ActorRef +import akka.util.Switch import org.fusesource.hawtdispatch.DispatchQueue import org.fusesource.hawtdispatch.ScalaDispatch._ @@ -13,7 +14,6 @@ import org.fusesource.hawtdispatch.ListEventAggregator import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} import java.util.concurrent.CountDownLatch -import akka.util.Switch /** * Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher. diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 7e81d4a598..d6c13bfda4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -5,7 +5,6 @@ package akka.dispatch import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import akka.util.ReflectiveAccess.AkkaCloudModule import akka.AkkaException import java.util.{Queue, List} @@ -42,15 +41,7 @@ case class BoundedMailbox( if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") } -abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType { - if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null") -} -case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) -case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) -case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) -case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) -case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) -case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser) +case class DurableMailboxType(mailboxImplClassname: String) extends MailboxType class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 467bccd13e..b766a7a974 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -6,7 +6,8 @@ package akka.dispatch import java.util.concurrent._ import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} -import akka.util. {Switch, ReentrantGuard, Logging, HashCode} + +import akka.util.{Switch, ReentrantGuard, Logging, HashCode, ReflectiveAccess} import akka.actor._ /** @@ -59,11 +60,27 @@ object MessageDispatcher { */ trait MessageDispatcher extends MailboxFactory with Logging { import MessageDispatcher._ + protected val uuids = new ConcurrentSkipListSet[Uuid] protected val guard = new ReentrantGuard - private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard protected val active = new Switch(false) + private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard + + /** + * Creates and returns a mailbox for the given actor. + */ + def createMailbox(mailboxImplClassname: String, actorRef: ActorRef): MessageQueue = { + ReflectiveAccess.createInstance( + mailboxImplClassname, + Array(classOf[ActorRef]), + Array(actorRef).asInstanceOf[Array[AnyRef]], + ReflectiveAccess.loader) + .getOrElse(throw new IllegalActorStateException( + "Could not create mailbox [" + mailboxImplClassname + "] for actor [" + actorRef + "]")) + .asInstanceOf[MessageQueue] + } + /** * Attaches the specified actorRef to this dispatcher */ diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index b5a4c7edb7..f359ab9b98 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -22,11 +22,9 @@ object ReflectiveAccess extends Logging { lazy val isRemotingEnabled = RemoteClientModule.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled - lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled def ensureRemotingEnabled = RemoteClientModule.ensureEnabled def ensureTypedActorEnabled = TypedActorModule.ensureEnabled - def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled /** * Reflective access to the RemoteClient module. @@ -173,49 +171,6 @@ object ReflectiveAccess extends Logging { } } - object AkkaCloudModule { - - type Mailbox = { - def enqueue(message: MessageInvocation) - def dequeue: MessageInvocation - } - - type Serializer = { - def toBinary(obj: AnyRef): Array[Byte] - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef - } - - lazy val isEnabled = clusterObjectInstance.isDefined - - val clusterObjectInstance: Option[AnyRef] = - getObjectFor("akka.cloud.cluster.Cluster$") - - val serializerClass: Option[Class[_]] = - getClassFor("akka.serialization.Serializer") - - def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( - "Feature is only available in Akka Cloud") - - def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef) - - def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef) - - def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef) - - def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef) - - private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = { - ensureEnabled - createInstance( - mailboxClassname, - Array(classOf[ActorRef]), - Array(actorRef).asInstanceOf[Array[AnyRef]], - loader) - .getOrElse(throw new IllegalActorStateException("Could not create durable mailbox [" + mailboxClassname + "] for actor [" + actorRef + "]")) - .asInstanceOf[Mailbox] - } - } - val noParams = Array[Class[_]]() val noArgs = Array[AnyRef]() diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 04992fb83d..7e5d4af6d6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@ akka { version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "remote.ssl", "camel", "http"] + enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] time-unit = "seconds" # Time unit for all timeout properties throughout the config @@ -117,22 +117,6 @@ akka { compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 - ssl { - service = off # NOTE: This feature is not deemed production ready and is not possible to turn on yet - - # You can either use java command-line options or use the settings below - - #key-store-type = "pkcs12" # Same as -Djavax.net.ssl.keyStoreType=pkcs12 - #key-store = "yourcertificate.p12" # Same as -Djavax.net.ssl.keyStore=yourcertificate.p12 - #key-store-pass = "$PASS" # Same as -Djavax.net.ssl.keyStorePassword=$PASS - - #trust-store-type = "jks" # Same as -Djavax.net.ssl.trustStoreType=jks - #trust-store = "your.keystore" # Same as -Djavax.net.ssl.trustStore=your.keystore - #trust-store-pass = "$PASS" # Same as -Djavax.net.ssl.trustStorePassword=$PASS - - debug = off # This can be useful for debugging. If on, very verbose debug, same as -Djavax.net.debug=ssl - } - server { hostname = "localhost" # The hostname or IP that clients should connect to port = 2552 # The port clients should connect to. Default is 2552 (AKKA)