diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala
index d3584ed93b..b841193506 100644
--- a/akka-actors/src/main/scala/actor/ActiveObject.scala
+++ b/akka-actors/src/main/scala/actor/ActiveObject.scala
@@ -6,7 +6,6 @@ package se.scalablesolutions.akka.actor
import java.net.InetSocketAddress
-import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.config.ScalaConfig._
@@ -18,6 +17,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.lang.reflect.{InvocationTargetException, Method}
+import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult}
object Annotations {
import se.scalablesolutions.akka.annotation._
@@ -30,11 +30,13 @@ object Annotations {
}
/**
+ * Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces.
*
* @author Jonas Bonér
*/
object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
+ private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(false, None), None, timeout)
@@ -233,12 +235,11 @@ private[akka] sealed case class AspectInit(
*/
@Aspect("perInstance")
private[akka] sealed class ActiveObjectAspect {
-
- @volatile var isInitialized = false
- var target: Class[_] = _
- var actor: Dispatcher = _
- var remoteAddress: Option[InetSocketAddress] = _
- var timeout: Long = _
+ @volatile private var isInitialized = false
+ private var target: Class[_] = _
+ private var actor: Dispatcher = _
+ private var remoteAddress: Option[InetSocketAddress] = _
+ private var timeout: Long = _
@Around("execution(* *.*(..))")
def invoke(joinPoint: JoinPoint): AnyRef = {
@@ -312,9 +313,9 @@ private[akka] sealed class ActiveObjectAspect {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
val clazz = arg.getClass
- if (clazz.getName.contains("$$ProxiedByAW")) {
+ if (clazz.getName.contains(ActiveObject.AW_PROXY_PREFIX)) {
isEscaped = true
- "$$ProxiedByAW" + clazz.getSuperclass.getName
+ ActiveObject.AW_PROXY_PREFIX + clazz.getSuperclass.getName
} else arg
}
(escapedArgs, isEscaped)
@@ -375,10 +376,12 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
case Some(RestartCallbacks(pre, post)) =>
preRestart = Some(try {
targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*)
- } catch { case e => throw new IllegalStateException("Could not find pre restart method [" + pre + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
+ } catch { case e => throw new IllegalStateException(
+ "Could not find pre restart method [" + pre + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
postRestart = Some(try {
targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*)
- } catch { case e => throw new IllegalStateException("Could not find post restart method [" + post + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
+ } catch { case e => throw new IllegalStateException(
+ "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
@@ -386,9 +389,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
- throw new IllegalStateException("Method annotated with @prerestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
+ throw new IllegalStateException(
+ "Method annotated with @prerestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition")
if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0)
- throw new IllegalStateException("Method annotated with @postrestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
+ throw new IllegalStateException(
+ "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition")
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
@@ -399,7 +404,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
//if (initTxState.isDefined) initTxState.get.setAccessible(true)
}
- def receive: PartialFunction[Any, Unit] = {
+ def receive = {
case Invocation(joinPoint, isOneWay, _) =>
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
@@ -449,7 +454,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
!arg.getClass.isAnnotationPresent(Annotations.immutable)) {
hasMutableArgument = true
}
- if (arg.getClass.getName.contains("$$ProxiedByAWSubclassing$$")) unserializable = true
+ if (arg.getClass.getName.contains(ActiveObject.AW_PROXY_PREFIX)) unserializable = true
}
if (!unserializable && hasMutableArgument) {
// FIXME: can we have another default deep cloner?
diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index a0e2c4c3be..17d158a467 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -5,8 +5,6 @@
package se.scalablesolutions.akka.actor
import java.net.InetSocketAddress
-import java.util.HashSet
-
import se.scalablesolutions.akka.Config._
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.ScalaConfig._
@@ -17,10 +15,13 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
+import se.scalablesolutions.akka.util.{HashCode, Logging}
+
import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.api.ThreadLocalTransaction._
-import se.scalablesolutions.akka.util.{HashCode, Logging}
+import java.util.{Queue, LinkedList, HashSet}
+import java.util.concurrent.ConcurrentLinkedQueue
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@@ -47,7 +48,7 @@ case class Restart(reason: AnyRef) extends LifeCycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
-class ActorKilledException(val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message")
+class ActorKilledException private[akka] (val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message")
sealed abstract class DispatcherType
object DispatcherType {
@@ -224,8 +225,10 @@ trait Actor extends TransactionManagement {
// private fields
// ====================================
- @volatile private var _isRunning: Boolean = false
+ @volatile private var _isRunning = false
+ @volatile private var _isSuspended = true
@volatile private var _isShutDown: Boolean = false
+ private var _isEventBased: Boolean = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private var _config: Option[AnyRef] = None
private val _remoteFlagLock = new ReadWriteLock
@@ -233,6 +236,8 @@ trait Actor extends TransactionManagement {
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
+ private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
+
// ====================================
// protected fields
// ====================================
@@ -290,7 +295,11 @@ trait Actor extends TransactionManagement {
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
- protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
+ protected[akka] var messageDispatcher: MessageDispatcher = {
+ val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
+ _isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
+ dispatcher
+ }
/**
* User overridable callback/setting.
@@ -407,10 +416,10 @@ trait Actor extends TransactionManagement {
/**
* Starts up the actor and its message queue.
*/
- def start: Actor = synchronized {
+ def start: Actor = _mailbox.synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
- dispatcher.registerHandler(this, new ActorMessageInvoker(this))
+ messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
//if (isTransactional) this !! TransactionalInit
@@ -428,9 +437,9 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
- def stop = synchronized {
+ def stop = _mailbox.synchronized {
if (_isRunning) {
- messageDispatcher.unregisterHandler(this)
+ messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
_isRunning = false
_isShutDown = true
@@ -468,22 +477,20 @@ trait Actor extends TransactionManagement {
* actor.send(message)
*
*/
- def !(message: Any)(implicit sender: AnyRef) = {
+ def !(message: Any)(implicit sender: AnyRef) = if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
- if (_isRunning) postMessageToMailbox(message, from)
- else throw new IllegalStateException(
- "Actor has not been started, you need to invoke 'actor.start' before using it")
- }
+ postMessageToMailbox(message, from)
+ } else throw new IllegalStateException(
+ "Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Same as the '!' method but does not take an implicit sender as second parameter.
*/
- def send(message: Any) = {
+ def send(message: Any) =
if (_isRunning) postMessageToMailbox(message, None)
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
- }
/**
* Sends a message asynchronously and waits on a future for a reply message.
@@ -535,13 +542,12 @@ trait Actor extends TransactionManagement {
/**
* Forwards the message and passes the original sender actor as the sender.
*/
- def forward(message: Any)(implicit sender: AnyRef) = {
+ def forward(message: Any)(implicit sender: AnyRef) = if (_isRunning) {
val forwarder = if (sender != null && sender.isInstanceOf[Actor]) sender.asInstanceOf[Actor]
else throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")
if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor")
- if (_isRunning) postMessageToMailbox(message, forwarder.getSender)
- else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
- }
+ postMessageToMailbox(message, forwarder.getSender)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Use reply(..) to reply with a message to the original sender of the message currently
@@ -571,15 +577,20 @@ trait Actor extends TransactionManagement {
/**
* Get the dispatcher for this actor.
*/
- def dispatcher: MessageDispatcher = synchronized { messageDispatcher }
+ def dispatcher: MessageDispatcher =
+ if (_isRunning) messageDispatcher
+ else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
+ def dispatcher_=(dispatcher: MessageDispatcher): Unit = _mailbox.synchronized {
if (!_isRunning) {
+ messageDispatcher.unregister(this)
messageDispatcher = dispatcher
- messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
+ messageDispatcher.register(this)
+ _isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@@ -606,7 +617,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = synchronized {
+ def makeTransactionRequired = _mailbox.synchronized {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@@ -734,8 +745,11 @@ trait Actor extends TransactionManagement {
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
+ private[akka] def _suspend = _isSuspended = true
+ private[akka] def _resume = _isSuspended = false
+
private[akka] def getSender = sender
-
+
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
val actor = actorClass.newInstance.asInstanceOf[T]
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
@@ -759,8 +773,17 @@ trait Actor extends TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
- val handle = new MessageInvocation(this, message, None, sender, currentTransaction.get)
- handle.send
+ val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
+ if (_isEventBased) {
+ _mailbox.synchronized {
+ _mailbox.add(invocation)
+ if (_isSuspended) {
+ _resume
+ invocation.send
+ }
+ }
+ }
+ else invocation.send
}
}
@@ -784,8 +807,16 @@ trait Actor extends TransactionManagement {
"Expected a future from remote call to actor " + toString)
} else {
val future = new DefaultCompletableFutureResult(timeout)
- val handle = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
- handle.send
+ val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
+ if (_isEventBased) {
+ _mailbox.synchronized {
+ _mailbox.add(invocation)
+ if (_isSuspended) {
+ _resume
+ invocation.send
+ }
+ }
+ } else invocation.send
future
}
}
@@ -793,7 +824,7 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/
- private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
+ private[akka] def invoke(messageHandle: MessageInvocation) = {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@@ -856,7 +887,7 @@ trait Actor extends TransactionManagement {
} else proceed
} catch {
case e =>
- Actor.log.error(e, "Exception when invoking actor [%s] with message [%s]", this, message)
+ Actor.log.error(e, "Exception when \ninvoking actor [%s] \nwith message [%s]", this, message)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
@@ -914,13 +945,13 @@ trait Actor extends TransactionManagement {
}
}
- private[Actor] def restart(reason: AnyRef) = synchronized {
+ private[Actor] def restart(reason: AnyRef) = _mailbox.synchronized {
preRestart(reason, _config)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason, _config)
}
- private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = _mailbox.synchronized {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
@@ -935,11 +966,6 @@ trait Actor extends TransactionManagement {
} else _linkedActors.get
}
- private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
- messageDispatcher = disp
- messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
- }
-
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
diff --git a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 93a1e9c4e2..6edd5ed6d3 100644
--- a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -102,7 +102,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
- if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
+ if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(
@@ -119,7 +119,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
- if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
+ if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
diff --git a/akka-actors/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
index 28689fa48c..a7aa241180 100644
--- a/akka-actors/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
@@ -7,27 +7,27 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
+import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor}
+
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
- protected val queue = new ReactiveMessageQueue(name)
@volatile protected var active: Boolean = false
- protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
+ protected val queue = new ReactiveMessageQueue(name)
+ protected val messageInvokers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
- def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
- messageHandlers.put(key, handler)
+ override def register(actor: Actor) = synchronized {
+ messageInvokers.put(actor, new ActorMessageInvoker(actor))
+ super.register(actor)
}
- def unregisterHandler(key: AnyRef) = guard.synchronized {
- messageHandlers.remove(key)
+ override def unregister(actor: Actor) = synchronized {
+ messageInvokers.remove(actor)
+ super.register(actor)
}
- def canBeShutDown: Boolean = guard.synchronized {
- messageHandlers.isEmpty
- }
-
def shutdown = if (active) {
active = false
selectorThread.interrupt
diff --git a/akka-actors/src/main/scala/dispatch/Dispatchers.scala b/akka-actors/src/main/scala/dispatch/Dispatchers.scala
index 2e80673133..4c2674e40f 100644
--- a/akka-actors/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-actors/src/main/scala/dispatch/Dispatchers.scala
@@ -40,27 +40,31 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global")
- object globalForkJoinBasedEventDrivenDispatcher extends ForkJoinBasedEventDrivenDispatcher("global")
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
/**
- * Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
+ * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ *
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
+
+ /**
+ * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ *
* Has a fluent builder interface for configuring its semantics.
*/
def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name)
/**
- * Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
+ * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread.
*/
def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name)
- def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
-
- def newForkJoinBasedEventDrivenDispatcher(name: String) = new ForkJoinBasedEventDrivenDispatcher(name)
-
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
+ *
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
diff --git a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 85445a33c3..902b6ccd53 100644
--- a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -4,8 +4,6 @@
package se.scalablesolutions.akka.dispatch
-import java.util.concurrent.Executors
-
/**
* Default settings are:
*
@@ -57,16 +55,24 @@ import java.util.concurrent.Executors
*/
class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder {
@volatile private var active: Boolean = false
-
+
val name = "event-driven:executor:dispatcher:" + _name
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
- //private val _executor = Executors.newFixedThreadPool(4)
-
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
- def run = invocation.invoke
+ def run = {
+ val mailbox = invocation.receiver._mailbox
+ mailbox.synchronized {
+ val messages = mailbox.iterator
+ while (messages.hasNext) {
+ messages.next.invoke
+ messages.remove
+ }
+ invocation.receiver._suspend
+ }
+ }
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
@@ -74,16 +80,11 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = true
}
- def canBeShutDown = true
-
def shutdown = if (active) {
executor.shutdownNow
active = false
}
- def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
- def unregisterHandler(key: AnyRef) = {}
-
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
diff --git a/akka-actors/src/main/scala/dispatch/ForkJoinBasedEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ForkJoinBasedEventDrivenDispatcher.scala
index 5b639e802a..e0cab5d6f3 100644
--- a/akka-actors/src/main/scala/dispatch/ForkJoinBasedEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ForkJoinBasedEventDrivenDispatcher.scala
@@ -15,9 +15,7 @@ class ForkJoinBasedEventDrivenDispatcher(val name: String) extends MessageDispat
// FIXME: add name "event-driven:fork-join:dispatcher" + name
def dispatch(invocation: MessageInvocation) = {
scheduler.execute(new Runnable() {
- def run = {
- invocation.invoke
- }
+ def run = invocation.invoke
})
}
@@ -25,12 +23,8 @@ class ForkJoinBasedEventDrivenDispatcher(val name: String) extends MessageDispat
active = true
}
- def canBeShutDown = true
-
def shutdown = if (active) {
+ scheduler.shutdown
active = false
}
-
- def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
- def unregisterHandler(key: AnyRef) = {}
}
\ No newline at end of file
diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala
index 2628db4380..339bed0fca 100644
--- a/akka-actors/src/main/scala/dispatch/Reactor.scala
+++ b/akka-actors/src/main/scala/dispatch/Reactor.scala
@@ -10,38 +10,13 @@ import se.scalablesolutions.akka.util.HashCode
import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.actor.Actor
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentHashMap
-trait MessageQueue {
- def append(handle: MessageInvocation)
- def prepend(handle: MessageInvocation)
-}
-
-trait MessageInvoker {
- def invoke(message: MessageInvocation)
-}
-
-trait MessageDispatcher {
- def dispatch(invocation: MessageInvocation)
- def registerHandler(key: AnyRef, handler: MessageInvoker)
- def unregisterHandler(key: AnyRef)
- def canBeShutDown: Boolean
- def start
- def shutdown
-}
-
-trait MessageDemultiplexer {
- def select
- def acquireSelectedInvocations: List[MessageInvocation]
- def releaseSelectedInvocations
- def wakeUp
-}
-
-class MessageInvocation(val receiver: Actor,
- val message: Any,
- val future: Option[CompletableFutureResult],
- val sender: Option[Actor],
- val tx: Option[Transaction]) {
+final class MessageInvocation(val receiver: Actor,
+ val message: Any,
+ val future: Option[CompletableFutureResult],
+ val sender: Option[Actor],
+ val tx: Option[Transaction]) {
if (receiver == null) throw new IllegalArgumentException("receiver is null")
if (message == null) throw new IllegalArgumentException("message is null")
@@ -62,8 +37,8 @@ class MessageInvocation(val receiver: Actor,
that.asInstanceOf[MessageInvocation].receiver == receiver &&
that.asInstanceOf[MessageInvocation].message == message
}
-
- override def toString(): String = synchronized {
+
+ override def toString(): String = synchronized {
"MessageInvocation[" +
"\n\tmessage = " + message +
"\n\treceiver = " + receiver +
@@ -73,3 +48,29 @@ class MessageInvocation(val receiver: Actor,
"\n]"
}
}
+
+trait MessageQueue {
+ def append(handle: MessageInvocation)
+ def prepend(handle: MessageInvocation)
+}
+
+trait MessageInvoker {
+ def invoke(message: MessageInvocation)
+}
+
+trait MessageDispatcher {
+ protected val references = new ConcurrentHashMap[String, Actor]
+ def dispatch(invocation: MessageInvocation)
+ def start
+ def shutdown
+ def register(actor: Actor) = references.put(actor.uuid, actor)
+ def unregister(actor: Actor) = references.remove(actor.uuid)
+ def canBeShutDown: Boolean = references.isEmpty
+}
+
+trait MessageDemultiplexer {
+ def select
+ def wakeUp
+ def acquireSelectedInvocations: List[MessageInvocation]
+ def releaseSelectedInvocations
+}
diff --git a/akka-actors/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
index fd67859710..6131b43858 100644
--- a/akka-actors/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
@@ -26,7 +26,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
- val invoker = messageHandlers.get(invocation.receiver)
+ val invoker = messageInvokers.get(invocation.receiver)
if (invoker != null) invoker.invoke(invocation)
iter.remove
}
diff --git a/akka-actors/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
index 5ace624e0f..e6bccb70bb 100644
--- a/akka-actors/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
@@ -12,7 +12,7 @@ import java.util.{HashSet, HashMap, LinkedList, List}
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
- *
+ *
* Default settings are:
*
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
@@ -21,11 +21,11 @@ import java.util.{HashSet, HashMap, LinkedList, List}
* - KEEP_ALIVE_TIME = 60000L // one minute
*
*
- *
- * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
+ *
+ * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
*
- *
+ *
* Scala API.
*
* Example usage:
@@ -40,7 +40,7 @@ import java.util.{HashSet, HashMap, LinkedList, List}
* .buildThreadPool
*
*
- *
+ *
* Java API.
*
* Example usage:
@@ -56,50 +56,37 @@ import java.util.{HashSet, HashMap, LinkedList, List}
*
*
*
- * But the preferred way of creating dispatchers is to use
+ * But the preferred way of creating dispatchers is to use
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
- *
+ *
* @author Jonas Bonér
*/
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("event-driven:reactor:thread-pool:dispatcher:" + _name)
with ThreadPoolBuilder {
- private val busyInvokers = new HashSet[AnyRef]
+ private var fair = true
+ private val busyActors = new HashSet[AnyRef]
+ private val messageDemultiplexer = new Demultiplexer(queue)
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
-
+
def start = if (!active) {
active = true
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
- val messageDemultiplexer = new Demultiplexer(queue)
selectorThread = new Thread(name) {
override def run = {
while (active) {
try {
try {
- guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
+ // guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
- val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
- val reservedInvocations = reserve(selectedInvocations)
- val it = reservedInvocations.entrySet.iterator
- while (it.hasNext) {
- val entry = it.next
- val invocation = entry.getKey
- val invoker = entry.getValue
- executor.execute(new Runnable() {
- def run = {
- invoker.invoke(invocation)
- free(invocation.receiver)
- messageDemultiplexer.wakeUp
- }
- })
- }
+ process(messageDemultiplexer.acquireSelectedInvocations)
} finally {
messageDemultiplexer.releaseSelectedInvocations
}
@@ -111,30 +98,46 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
override protected def doShutdown = executor.shutdownNow
- private def reserve(invocations: List[MessageInvocation]): HashMap[MessageInvocation, MessageInvoker] = guard.synchronized {
- val result = new HashMap[MessageInvocation, MessageInvoker]
- val iterator = invocations.iterator
- while (iterator.hasNext) {
- val invocation = iterator.next
+ private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
+ var nrOfBusyMessages = 0
+ val totalNrOfActors = messageInvokers.size
+ val totalNrOfBusyActors = busyActors.size
+ val invocations = selectedInvocations.iterator
+ while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
+ val invocation = invocations.next
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
- if (!busyInvokers.contains(invocation.receiver)) {
- val invoker = messageHandlers.get(invocation.receiver)
+ if (!busyActors.contains(invocation.receiver)) {
+ val invoker = messageInvokers.get(invocation.receiver)
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
- result.put(invocation, invoker)
- busyInvokers.add(invocation.receiver)
- iterator.remove
- }
+ resume(invocation.receiver)
+ invocations.remove
+ executor.execute(new Runnable() {
+ def run = {
+ invoker.invoke(invocation)
+ suspend(invocation.receiver)
+ messageDemultiplexer.wakeUp
+ }
+ })
+ } else nrOfBusyMessages += 1
}
- result
+ }
+
+ private def resume(actor: AnyRef) = synchronized {
+ busyActors.add(actor)
+ }
+
+ private def suspend(actor: AnyRef) = synchronized {
+ busyActors.remove(actor)
+ }
+
+ private def passFairnessCheck(nrOfBusyMessages: Int) = {
+ if (fair) true
+ else nrOfBusyMessages < 100
}
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
- private def free(invoker: AnyRef) = guard.synchronized {
- busyInvokers.remove(invoker)
- }
-
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedInvocationsLock = new ReentrantLock
diff --git a/akka-actors/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actors/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index 3bc18f90a8..fed90d6ba7 100644
--- a/akka-actors/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -39,15 +39,10 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
selectorThread.start
}
- def canBeShutDown = true
-
def shutdown = if (active) {
active = false
selectorThread.interrupt
}
-
- def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
- def unregisterHandler(key: AnyRef) = {}
}
class BlockingMessageQueue(name: String) extends MessageQueue {
diff --git a/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala
index 746c4d0aa5..4a3659d981 100644
--- a/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala
@@ -48,7 +48,7 @@ trait ThreadPoolBuilder {
}
/**
- * Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeeded.
+ * Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded.
*
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
*/
diff --git a/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
index 8e0d75f1bb..8c561749a1 100644
--- a/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
+++ b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala
@@ -3,11 +3,15 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import se.scalablesolutions.akka.dispatch.Dispatchers
+
object state {
var s = "NIL"
}
class ReplyActor extends Actor {
+ dispatcher = Dispatchers.newThreadBasedDispatcher(this)
+
def receive = {
case "Send" => reply("Reply")
case "SendImplicit" => sender.get ! "ReplyImplicit"
@@ -15,6 +19,8 @@ class ReplyActor extends Actor {
}
class SenderActor(replyActor: Actor) extends Actor {
+ dispatcher = Dispatchers.newThreadBasedDispatcher(this)
+
def receive = {
case "Init" => replyActor ! "Send"
case "Reply" => state.s = "Reply"
diff --git a/akka-actors/src/test/scala/AllTest.scala b/akka-actors/src/test/scala/AllTest.scala
index e5176cd666..37604e2e7a 100644
--- a/akka-actors/src/test/scala/AllTest.scala
+++ b/akka-actors/src/test/scala/AllTest.scala
@@ -5,7 +5,6 @@ import junit.framework.TestCase
import junit.framework.TestSuite
import se.scalablesolutions.akka.actor.{RemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest}
-import se.scalablesolutions.akka.dispatch.{ReactorBasedSingleThreadEventDrivenDispatcherTest, ReactorBasedThreadPoolEventDrivenDispatcherTest}
object AllTest extends TestCase {
def suite(): Test = {
diff --git a/akka-actors/src/test/scala/PerformanceTest.scala b/akka-actors/src/test/scala/PerformanceTest.scala
index b960f68832..47b060784d 100644
--- a/akka-actors/src/test/scala/PerformanceTest.scala
+++ b/akka-actors/src/test/scala/PerformanceTest.scala
@@ -2,136 +2,293 @@ package test
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import net.lag.logging.Logger
-import se.scalablesolutions.akka.actor.Actor
-
+/**
+ * The Computer Language Benchmarks Game
+ *
+ * URL: [http://shootout.alioth.debian.org/]
+ *
+ * Contributed by Julien Gaugaz.
+ *
+ * Inspired by the version contributed by Yura Taras and modified by Isaac Gouy.
+ */
class PerformanceTest extends JUnitSuite {
- abstract class Colour
- case object RED extends Colour
- case object YELLOW extends Colour
- case object BLUE extends Colour
- case object FADED extends Colour
-
- val colours = Array(BLUE, RED, YELLOW)
-
- case class Meet(from: Actor, colour: Colour)
- case class Change(colour: Colour)
- case class MeetingCount(count: int)
- case class ExitActor(actor: Actor, reason: String)
-
- var totalTime = -1
-
- class Mall(var nrMeets: int, numChameneos: int) extends Actor {
- var waitingChameneo: Option[Actor] = None
- var sumMeetings = 0
- var numFaded = 0
- var startTime: Long = 0L
-
- start
-
- def startChameneos(): Unit = {
- startTime = System.currentTimeMillis
- var i = 0
- while (i < numChameneos) {
- Chameneo(this, colours(i % 3), i).start
- i = i + 1
- }
- }
-
- def receive = {
- case MeetingCount(i) => {
- numFaded = numFaded + 1
- sumMeetings = sumMeetings + i
- if (numFaded == numChameneos) {
- totalTime = System.currentTimeMillis - startTime
- println("Total time Akka Actors: " + totalTime)
- exit
- }
- }
-
- case msg@Meet(a, c) => {
- if (nrMeets > 0) {
- waitingChameneo match {
- case Some(chameneo) =>
- nrMeets = nrMeets - 1
- chameneo ! msg
- waitingChameneo = None
- case None =>
- waitingChameneo = sender
- }
- } else {
- waitingChameneo match {
- case Some(chameneo) =>
- chameneo ! ExitActor(this, "normal")
- case None =>
- }
- sender.get ! ExitActor(this, "normal")
- }
- }
- }
- }
-
- case class Chameneo(var mall: Mall, var colour: Colour, cid: int) extends Actor {
- var meetings = 0
-
- override def start = {
- val r = super.start
- mall ! Meet(this, colour)
- r
- }
-
- override def receive: PartialFunction[Any, Unit] = {
- case Meet(from, otherColour) =>
- colour = complement(otherColour)
- meetings = meetings + 1
- from ! Change(colour)
- mall ! Meet(this, colour)
- case Change(newColour) =>
- colour = newColour
- meetings = meetings + 1
- mall ! Meet(this, colour)
- case ExitActor(_, _) =>
- colour = FADED
- sender.get ! MeetingCount(meetings)
- //exit
- }
-
- def complement(otherColour: Colour): Colour = {
- colour match {
- case RED => otherColour match {
- case RED => RED
- case YELLOW => BLUE
- case BLUE => YELLOW
- case FADED => FADED
- }
- case YELLOW => otherColour match {
- case RED => BLUE
- case YELLOW => YELLOW
- case BLUE => RED
- case FADED => FADED
- }
- case BLUE => otherColour match {
- case RED => YELLOW
- case YELLOW => RED
- case BLUE => BLUE
- case FADED => FADED
- }
- case FADED => FADED
- }
- }
-
- override def toString() = cid + "(" + colour + ")"
- }
-
- @Test def dummy {assert(true)}
@Test
- def stressTest {
- val N = 1000000
- val numChameneos = 4
- val mall = new Mall(N, numChameneos)
- mall.startChameneos
- Thread.sleep(1000 * 10)
- assert(totalTime < 5000)
+ def benchAkkaActorsVsScalaActors = {
+
+ def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
+ import se.scalablesolutions.akka.actor.Actor
+
+ abstract class Colour
+ case object RED extends Colour
+ case object YELLOW extends Colour
+ case object BLUE extends Colour
+ case object FADED extends Colour
+
+ val colours = Array(BLUE, RED, YELLOW)
+
+ case class Meet(from: Actor, colour: Colour)
+ case class Change(colour: Colour)
+ case class MeetingCount(count: int)
+ case class ExitActor(actor: Actor, reason: String)
+
+ var totalTime = 0L
+
+ class Mall(var nrMeets: int, numChameneos: int) extends Actor {
+ var waitingChameneo: Option[Actor] = None
+ var sumMeetings = 0
+ var numFaded = 0
+ var startTime: Long = 0L
+
+ start
+
+ def startChameneos(): Unit = {
+ startTime = System.currentTimeMillis
+ var i = 0
+ while (i < numChameneos) {
+ Chameneo(this, colours(i % 3), i).start
+ i = i + 1
+ }
+ }
+
+ def receive = {
+ case MeetingCount(i) => {
+ numFaded = numFaded + 1
+ sumMeetings = sumMeetings + i
+ if (numFaded == numChameneos) {
+ totalTime = System.currentTimeMillis - startTime
+ println("time: " + totalTime)
+ exit
+ }
+ }
+
+ case msg@Meet(a, c) => {
+ if (nrMeets > 0) {
+ waitingChameneo match {
+ case Some(chameneo) =>
+ nrMeets = nrMeets - 1
+ chameneo ! msg
+ waitingChameneo = None
+ case None =>
+ waitingChameneo = sender
+ }
+ } else {
+ waitingChameneo match {
+ case Some(chameneo) =>
+ chameneo ! ExitActor(this, "normal")
+ case None =>
+ }
+ sender.get ! ExitActor(this, "normal")
+ }
+ }
+ }
+ }
+
+ case class Chameneo(var mall: Mall, var colour: Colour, cid: int) extends Actor {
+ var meetings = 0
+
+ override def start = {
+ val r = super.start
+ mall ! Meet(this, colour)
+ r
+ }
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case Meet(from, otherColour) =>
+ colour = complement(otherColour)
+ meetings = meetings + 1
+ from ! Change(colour)
+ mall ! Meet(this, colour)
+ case Change(newColour) =>
+ colour = newColour
+ meetings = meetings + 1
+ mall ! Meet(this, colour)
+ case ExitActor(_, _) =>
+ colour = FADED
+ sender.get ! MeetingCount(meetings)
+ exit
+ }
+
+ def complement(otherColour: Colour): Colour = {
+ colour match {
+ case RED => otherColour match {
+ case RED => RED
+ case YELLOW => BLUE
+ case BLUE => YELLOW
+ case FADED => FADED
+ }
+ case YELLOW => otherColour match {
+ case RED => BLUE
+ case YELLOW => YELLOW
+ case BLUE => RED
+ case FADED => FADED
+ }
+ case BLUE => otherColour match {
+ case RED => YELLOW
+ case YELLOW => RED
+ case BLUE => BLUE
+ case FADED => FADED
+ }
+ case FADED => FADED
+ }
+ }
+
+ override def toString() = cid + "(" + colour + ")"
+ }
+
+ val mall = new Mall(nrOfMessages, nrOfActors)
+ mall.startChameneos
+ Thread.sleep(sleepTime)
+ totalTime
+ }
+
+ def stressTestScalaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
+ var totalTime = 0L
+
+ import scala.actors._
+ import scala.actors.Actor._
+
+ abstract class Colour
+ case object RED extends Colour
+ case object YELLOW extends Colour
+ case object BLUE extends Colour
+ case object FADED extends Colour
+
+ val colours = Array(BLUE, RED, YELLOW)
+
+ case class Meet(colour: Colour)
+ case class Change(colour: Colour)
+ case class MeetingCount(count: int)
+
+
+ class Mall(var n: int, numChameneos: int) extends Actor {
+ var waitingChameneo: Option[OutputChannel[Any]] = None
+ var startTime: Long = 0L
+
+ start()
+
+ def startChameneos(): Unit = {
+ startTime = System.currentTimeMillis
+ var i = 0
+ while (i < numChameneos) {
+ Chameneo(this, colours(i % 3), i).start()
+ i = i + 1
+ }
+ }
+
+ def act() {
+ var sumMeetings = 0
+ var numFaded = 0
+ loop {
+ react {
+
+ case MeetingCount(i) => {
+ numFaded = numFaded + 1
+ sumMeetings = sumMeetings + i
+ if (numFaded == numChameneos) {
+ totalTime = System.currentTimeMillis - startTime
+ exit()
+ }
+ }
+
+ case msg@Meet(c) => {
+ if (n > 0) {
+ waitingChameneo match {
+ case Some(chameneo) =>
+ n = n - 1
+ chameneo.forward(msg)
+ waitingChameneo = None
+ case None =>
+ waitingChameneo = Some(sender)
+ }
+ } else {
+ waitingChameneo match {
+ case Some(chameneo) =>
+ chameneo ! Exit(this, "normal")
+ case None =>
+ }
+ sender ! Exit(this, "normal")
+ }
+ }
+
+ }
+ }
+ }
+ }
+
+ case class Chameneo(var mall: Mall, var colour: Colour, id: int) extends Actor {
+ var meetings = 0
+
+ def act() {
+ loop {
+ mall ! Meet(colour)
+ react {
+ case Meet(otherColour) =>
+ colour = complement(otherColour)
+ meetings = meetings + 1
+ sender ! Change(colour)
+ case Change(newColour) =>
+ colour = newColour
+ meetings = meetings + 1
+ case Exit(_, _) =>
+ colour = FADED
+ sender ! MeetingCount(meetings)
+ exit()
+ }
+ }
+ }
+
+ def complement(otherColour: Colour): Colour = {
+ colour match {
+ case RED => otherColour match {
+ case RED => RED
+ case YELLOW => BLUE
+ case BLUE => YELLOW
+ case FADED => FADED
+ }
+ case YELLOW => otherColour match {
+ case RED => BLUE
+ case YELLOW => YELLOW
+ case BLUE => RED
+ case FADED => FADED
+ }
+ case BLUE => otherColour match {
+ case RED => YELLOW
+ case YELLOW => RED
+ case BLUE => BLUE
+ case FADED => FADED
+ }
+ case FADED => FADED
+ }
+ }
+
+ override def toString() = id + "(" + colour + ")"
+ }
+
+ val mall = new Mall(nrOfMessages, nrOfActors)
+ mall.startChameneos
+ Thread.sleep(sleepTime)
+ totalTime
+ }
+
+ Logger.INFO
+ println("===========================================")
+ println("== Benchmark Akka Actors vs Scala Actors ==")
+
+ var nrOfMessages = 2000000
+ var nrOfActors = 4
+ var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 20)
+ var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
+ var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
+
+ println("\tNr of messages:\t" + nrOfMessages)
+ println("\tNr of actors:\t" + nrOfActors)
+ println("\tAkka Actors:\t" + akkaTime + "\t milliseconds")
+ println("\tScala Actors:\t" + scalaTime + "\t milliseconds")
+ println("\tAkka is " + ratio + " times faster\n")
+ println("===========================================")
+ assert(ratio >= 2.0)
}
}
diff --git a/akka-actors/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherTest.scala b/akka-actors/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherTest.scala
deleted file mode 100644
index a0fcd4f355..0000000000
--- a/akka-actors/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherTest.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package se.scalablesolutions.akka.dispatch
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-import org.junit.{Test, Before}
-import org.scalatest.junit.JUnitSuite
-
-import se.scalablesolutions.akka.actor.Actor
-
-class ReactorBasedSingleThreadEventDrivenDispatcherTest extends JUnitSuite {
- private var threadingIssueDetected: AtomicBoolean = null
-
- class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
- val guardLock: Lock = new ReentrantLock
-
- def invoke(message: MessageInvocation) {
- try {
- if (threadingIssueDetected.get) return
- if (guardLock.tryLock) {
- handleLatch.countDown
- } else {
- threadingIssueDetected.set(true)
- }
- } catch {
- case e: Exception => threadingIssueDetected.set(true)
- } finally {
- guardLock.unlock
- }
- }
- }
-
- @Before
- def setUp = {
- threadingIssueDetected = new AtomicBoolean(false)
- }
-
- @Test def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
- internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
- }
-
- @Test def shouldMessagesDispatchedToDifferentHandlersAreExecutedSequentially = {
- internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially
- }
-
- @Test def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
- internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
- }
-
- val key1 = new Actor { def receive = { case _ => {}} }
- val key2 = new Actor { def receive = { case _ => {}} }
- val key3 = new Actor { def receive = { case _ => {}} }
-
- private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
- val guardLock = new ReentrantLock
- val handleLatch = new CountDownLatch(100)
- val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
- dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially: Unit = {
- val handleLatch = new CountDownLatch(2)
- val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
- dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
- dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
- dispatcher.start
- dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
- dispatcher.dispatch(new MessageInvocation(key2, new Object, None, None, None))
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
- val handleLatch = new CountDownLatch(200)
- val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
- dispatcher.registerHandler(key1, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.registerHandler(key2, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.dispatch(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
- dispatcher.dispatch(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- dispatcher.shutdown
- }
-}
diff --git a/akka-actors/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherTest.scala b/akka-actors/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherTest.scala
deleted file mode 100644
index ec4e37fa52..0000000000
--- a/akka-actors/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherTest.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-package se.scalablesolutions.akka.dispatch
-
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit}
-
-import org.scalatest.junit.JUnitSuite
-import org.junit.{Test, Before}
-
-import se.scalablesolutions.akka.actor.Actor
-
-class ReactorBasedThreadPoolEventDrivenDispatcherTest extends JUnitSuite {
- private var threadingIssueDetected: AtomicBoolean = null
- val key1 = new Actor { def receive = { case _ => {}} }
- val key2 = new Actor { def receive = { case _ => {}} }
- val key3 = new Actor { def receive = { case _ => {}} }
-
- @Before
- def setUp = {
- threadingIssueDetected = new AtomicBoolean(false)
- }
-
- @Test
- def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
- internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
- }
-
- @Test
- def shouldMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
- internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
- }
-
- @Test
- def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
- internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
- }
-
- private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
- val guardLock = new ReentrantLock
- val handleLatch = new CountDownLatch(10)
- val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageInvoker {
- def invoke(message: MessageInvocation) {
- try {
- if (threadingIssueDetected.get) return
- if (guardLock.tryLock) {
- Thread.sleep(100)
- handleLatch.countDown
- } else {
- threadingIssueDetected.set(true)
- return
- }
- } catch {
- case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
- } finally {
- guardLock.unlock
- }
- }
- })
- dispatcher.start
- for (i <- 0 until 10) {
- dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
- val guardLock1 = new ReentrantLock
- val guardLock2 = new ReentrantLock
- val handlersBarrier = new CyclicBarrier(3)
- val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageInvoker {
- def invoke(message: MessageInvocation) = synchronized {
- try {
- if (guardLock1.tryLock) {
- handlersBarrier.await(1, TimeUnit.SECONDS)
- } else {
- threadingIssueDetected.set(true);
- }
- }
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.registerHandler(key2, new MessageInvoker {
- def invoke(message: MessageInvocation) = synchronized {
- try {
- if (guardLock2.tryLock) {
- handlersBarrier.await(1, TimeUnit.SECONDS)
- } else {
- threadingIssueDetected.set(true);
- }
- }
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.start
- dispatcher.dispatch(new MessageInvocation(key1, "Sending Message 1", None, None, None))
- dispatcher.dispatch(new MessageInvocation(key1, "Sending Message 1.1", None, None, None))
- dispatcher.dispatch(new MessageInvocation(key2, "Sending Message 2", None, None, None))
- dispatcher.dispatch(new MessageInvocation(key2, "Sending Message 2.2", None, None, None))
-
- handlersBarrier.await(5, TimeUnit.SECONDS)
- assert(!threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
- val handleLatch = new CountDownLatch(200)
- val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.registerHandler(key2, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.dispatch(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
- dispatcher.dispatch(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- }
-}
diff --git a/akka-actors/src/test/scala/SupervisorTest.scala b/akka-actors/src/test/scala/SupervisorTest.scala
index 699b260301..478d430317 100644
--- a/akka-actors/src/test/scala/SupervisorTest.scala
+++ b/akka-actors/src/test/scala/SupervisorTest.scala
@@ -5,6 +5,8 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.{OneWay, Die, Ping}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -15,7 +17,6 @@ import org.junit.Test
class SupervisorTest extends JUnitSuite {
import Actor.Sender.Self
-
var messageLog: String = ""
var oneWayLog: String = ""
@@ -446,13 +447,6 @@ class SupervisorTest extends JUnitSuite {
// Creat some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = {
-
- // Create an abstract SupervisorContainer that works for all implementations
- // of the different Actors (Services).
- //
- // Then create a concrete container in which we mix in support for the specific
- // implementation of the Actors we want to use.
-
pingpong1 = new PingPong1Actor
val factory = SupervisorFactory(
@@ -593,29 +587,4 @@ class SupervisorTest extends JUnitSuite {
messageLog += reason.asInstanceOf[Exception].getMessage
}
}
-
- // =============================================
-/*
- class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) {
- override def postRestart(serverContainer: ActorContainer) = {
- messageLog += "allforone"
- }
- }
-
- class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) {
- override def postRestart(serverContainer: ActorContainer) = {
- messageLog += "oneforone"
- }
- }
-
- abstract class TestSupervisorFactory extends SupervisorFactory {
- override def create(strategy: RestartStrategy): Supervisor = strategy match {
- case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
- scheme match {
- case AllForOne => new Supervisor(new TestAllForOneStrategy(maxNrOfRetries, timeRange))
- case OneForOne => new Supervisor(new TestOneForOneStrategy(maxNrOfRetries, timeRange))
- }
- }
- }
- */
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index 40f2995bfc..5c14294b99 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -12,8 +12,7 @@ import junit.framework.TestCase;
import se.scalablesolutions.akka.Config;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
-
-import java.util.concurrent.ThreadPoolExecutor;
+import se.scalablesolutions.akka.dispatch.*;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
static String messageLog = "";
@@ -22,14 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
protected void setUp() {
Config.config();
- se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolDispatcher dispatcher = new se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolDispatcher("name");
- dispatcher
- .withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(16)
- .setMaxPoolSize(128)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())
- .buildThreadPool();
+ MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test");
conf.addExternalGuiceModule(new AbstractModule() {
protected void configure() {
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 825d8f39fa..ae400d9382 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -37,10 +37,14 @@ public class InMemNestedStateTest extends TestCase {
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ Thread.sleep(100);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
+ Thread.sleep(100);
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
+ Thread.sleep(100);
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -66,10 +70,15 @@ public class InMemNestedStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setVectorState("init"); // set init state
+ Thread.sleep(100);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
+ Thread.sleep(100);
nested.setVectorState("init"); // set init state
+ Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
+ Thread.sleep(100);
assertEquals("new state", stateful.getVectorState());
+ Thread.sleep(100);
assertEquals("new state", nested.getVectorState());
}
@@ -96,9 +105,13 @@ public class InMemNestedStateTest extends TestCase {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
stateful.setRefState("init"); // set init state
+ Thread.sleep(100);
nested.setRefState("init"); // set init state
+ Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
+ Thread.sleep(100);
assertEquals("new state", stateful.getRefState());
+ Thread.sleep(100);
assertEquals("new state", nested.getRefState());
}
diff --git a/akka.iml b/akka.iml
index c418d66936..80942e1c1a 100644
--- a/akka.iml
+++ b/akka.iml
@@ -1,5 +1,5 @@
-
+