Rewrote new executor based event-driven dispatcher to use actor-specific mailboxes

This commit is contained in:
Jonas Bonér 2009-12-13 12:29:18 +01:00
parent e35e9581bc
commit 416bad068a
21 changed files with 511 additions and 622 deletions

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.actor
import java.net.InetSocketAddress
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.config.ScalaConfig._
@ -18,6 +17,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.lang.reflect.{InvocationTargetException, Method}
import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult}
object Annotations {
import se.scalablesolutions.akka.annotation._
@ -30,11 +30,13 @@ object Annotations {
}
/**
* Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(false, None), None, timeout)
@ -233,12 +235,11 @@ private[akka] sealed case class AspectInit(
*/
@Aspect("perInstance")
private[akka] sealed class ActiveObjectAspect {
@volatile var isInitialized = false
var target: Class[_] = _
var actor: Dispatcher = _
var remoteAddress: Option[InetSocketAddress] = _
var timeout: Long = _
@volatile private var isInitialized = false
private var target: Class[_] = _
private var actor: Dispatcher = _
private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _
@Around("execution(* *.*(..))")
def invoke(joinPoint: JoinPoint): AnyRef = {
@ -312,9 +313,9 @@ private[akka] sealed class ActiveObjectAspect {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
val clazz = arg.getClass
if (clazz.getName.contains("$$ProxiedByAW")) {
if (clazz.getName.contains(ActiveObject.AW_PROXY_PREFIX)) {
isEscaped = true
"$$ProxiedByAW" + clazz.getSuperclass.getName
ActiveObject.AW_PROXY_PREFIX + clazz.getSuperclass.getName
} else arg
}
(escapedArgs, isEscaped)
@ -375,10 +376,12 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
case Some(RestartCallbacks(pre, post)) =>
preRestart = Some(try {
targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*)
} catch { case e => throw new IllegalStateException("Could not find pre restart method [" + pre + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
} catch { case e => throw new IllegalStateException(
"Could not find pre restart method [" + pre + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
postRestart = Some(try {
targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*)
} catch { case e => throw new IllegalStateException("Could not find post restart method [" + post + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
} catch { case e => throw new IllegalStateException(
"Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
@ -386,9 +389,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
throw new IllegalStateException("Method annotated with @prerestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
throw new IllegalStateException(
"Method annotated with @prerestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition")
if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0)
throw new IllegalStateException("Method annotated with @postrestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
throw new IllegalStateException(
"Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition")
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
@ -399,7 +404,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
//if (initTxState.isDefined) initTxState.get.setAccessible(true)
}
def receive: PartialFunction[Any, Unit] = {
def receive = {
case Invocation(joinPoint, isOneWay, _) =>
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
@ -449,7 +454,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
!arg.getClass.isAnnotationPresent(Annotations.immutable)) {
hasMutableArgument = true
}
if (arg.getClass.getName.contains("$$ProxiedByAWSubclassing$$")) unserializable = true
if (arg.getClass.getName.contains(ActiveObject.AW_PROXY_PREFIX)) unserializable = true
}
if (!unserializable && hasMutableArgument) {
// FIXME: can we have another default deep cloner?

View file

@ -5,8 +5,6 @@
package se.scalablesolutions.akka.actor
import java.net.InetSocketAddress
import java.util.HashSet
import se.scalablesolutions.akka.Config._
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.ScalaConfig._
@ -17,10 +15,13 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
import se.scalablesolutions.akka.util.{HashCode, Logging}
import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.api.ThreadLocalTransaction._
import se.scalablesolutions.akka.util.{HashCode, Logging}
import java.util.{Queue, LinkedList, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -47,7 +48,7 @@ case class Restart(reason: AnyRef) extends LifeCycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
class ActorKilledException(val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message")
class ActorKilledException private[akka] (val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message")
sealed abstract class DispatcherType
object DispatcherType {
@ -224,8 +225,10 @@ trait Actor extends TransactionManagement {
// private fields
// ====================================
@volatile private var _isRunning: Boolean = false
@volatile private var _isRunning = false
@volatile private var _isSuspended = true
@volatile private var _isShutDown: Boolean = false
private var _isEventBased: Boolean = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private var _config: Option[AnyRef] = None
private val _remoteFlagLock = new ReadWriteLock
@ -233,6 +236,8 @@ trait Actor extends TransactionManagement {
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
// ====================================
// protected fields
// ====================================
@ -290,7 +295,11 @@ trait Actor extends TransactionManagement {
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
protected[akka] var messageDispatcher: MessageDispatcher = {
val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
_isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
dispatcher
}
/**
* User overridable callback/setting.
@ -407,10 +416,10 @@ trait Actor extends TransactionManagement {
/**
* Starts up the actor and its message queue.
*/
def start: Actor = synchronized {
def start: Actor = _mailbox.synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
//if (isTransactional) this !! TransactionalInit
@ -428,9 +437,9 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = synchronized {
def stop = _mailbox.synchronized {
if (_isRunning) {
messageDispatcher.unregisterHandler(this)
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
_isRunning = false
_isShutDown = true
@ -468,22 +477,20 @@ trait Actor extends TransactionManagement {
* actor.send(message)
* </pre>
*/
def !(message: Any)(implicit sender: AnyRef) = {
def !(message: Any)(implicit sender: AnyRef) = if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
if (_isRunning) postMessageToMailbox(message, from)
else throw new IllegalStateException(
postMessageToMailbox(message, from)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Same as the '!' method but does not take an implicit sender as second parameter.
*/
def send(message: Any) = {
def send(message: Any) =
if (_isRunning) postMessageToMailbox(message, None)
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
@ -535,13 +542,12 @@ trait Actor extends TransactionManagement {
/**
* Forwards the message and passes the original sender actor as the sender.
*/
def forward(message: Any)(implicit sender: AnyRef) = {
def forward(message: Any)(implicit sender: AnyRef) = if (_isRunning) {
val forwarder = if (sender != null && sender.isInstanceOf[Actor]) sender.asInstanceOf[Actor]
else throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")
if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor")
if (_isRunning) postMessageToMailbox(message, forwarder.getSender)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
postMessageToMailbox(message, forwarder.getSender)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
@ -571,15 +577,20 @@ trait Actor extends TransactionManagement {
/**
* Get the dispatcher for this actor.
*/
def dispatcher: MessageDispatcher = synchronized { messageDispatcher }
def dispatcher: MessageDispatcher =
if (_isRunning) messageDispatcher
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
def dispatcher_=(dispatcher: MessageDispatcher): Unit = _mailbox.synchronized {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = dispatcher
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
messageDispatcher.register(this)
_isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@ -606,7 +617,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = synchronized {
def makeTransactionRequired = _mailbox.synchronized {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@ -734,6 +745,9 @@ trait Actor extends TransactionManagement {
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def _suspend = _isSuspended = true
private[akka] def _resume = _isSuspended = false
private[akka] def getSender = sender
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
@ -759,8 +773,17 @@ trait Actor extends TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
val handle = new MessageInvocation(this, message, None, sender, currentTransaction.get)
handle.send
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
if (_isEventBased) {
_mailbox.synchronized {
_mailbox.add(invocation)
if (_isSuspended) {
_resume
invocation.send
}
}
}
else invocation.send
}
}
@ -784,8 +807,16 @@ trait Actor extends TransactionManagement {
"Expected a future from remote call to actor " + toString)
} else {
val future = new DefaultCompletableFutureResult(timeout)
val handle = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
handle.send
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.synchronized {
_mailbox.add(invocation)
if (_isSuspended) {
_resume
invocation.send
}
}
} else invocation.send
future
}
}
@ -793,7 +824,7 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
private[akka] def invoke(messageHandle: MessageInvocation) = {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@ -856,7 +887,7 @@ trait Actor extends TransactionManagement {
} else proceed
} catch {
case e =>
Actor.log.error(e, "Exception when invoking actor [%s] with message [%s]", this, message)
Actor.log.error(e, "Exception when \ninvoking actor [%s] \nwith message [%s]", this, message)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
@ -914,13 +945,13 @@ trait Actor extends TransactionManagement {
}
}
private[Actor] def restart(reason: AnyRef) = synchronized {
private[Actor] def restart(reason: AnyRef) = _mailbox.synchronized {
preRestart(reason, _config)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason, _config)
}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
private[akka] def registerSupervisorAsRemoteActor: Option[String] = _mailbox.synchronized {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
@ -935,11 +966,6 @@ trait Actor extends TransactionManagement {
} else _linkedActors.get
}
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
messageDispatcher = disp
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&

View file

@ -102,7 +102,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(
@ -119,7 +119,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))

View file

@ -7,25 +7,25 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
protected val queue = new ReactiveMessageQueue(name)
@volatile protected var active: Boolean = false
protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
protected val queue = new ReactiveMessageQueue(name)
protected val messageInvokers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
messageHandlers.put(key, handler)
override def register(actor: Actor) = synchronized {
messageInvokers.put(actor, new ActorMessageInvoker(actor))
super.register(actor)
}
def unregisterHandler(key: AnyRef) = guard.synchronized {
messageHandlers.remove(key)
}
def canBeShutDown: Boolean = guard.synchronized {
messageHandlers.isEmpty
override def unregister(actor: Actor) = synchronized {
messageInvokers.remove(actor)
super.register(actor)
}
def shutdown = if (active) {

View file

@ -40,27 +40,31 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global")
object globalForkJoinBasedEventDrivenDispatcher extends ForkJoinBasedEventDrivenDispatcher("global")
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
/**
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name)
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread.
*/
def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name)
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
def newForkJoinBasedEventDrivenDispatcher(name: String) = new ForkJoinBasedEventDrivenDispatcher(name)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)

View file

@ -4,8 +4,6 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.Executors
/**
* Default settings are:
* <pre/>
@ -62,11 +60,19 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
//private val _executor = Executors.newFixedThreadPool(4)
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = invocation.invoke
def run = {
val mailbox = invocation.receiver._mailbox
mailbox.synchronized {
val messages = mailbox.iterator
while (messages.hasNext) {
messages.next.invoke
messages.remove
}
invocation.receiver._suspend
}
}
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
@ -74,16 +80,11 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = true
}
def canBeShutDown = true
def shutdown = if (active) {
executor.shutdownNow
active = false
}
def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
def unregisterHandler(key: AnyRef) = {}
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -15,9 +15,7 @@ class ForkJoinBasedEventDrivenDispatcher(val name: String) extends MessageDispat
// FIXME: add name "event-driven:fork-join:dispatcher" + name
def dispatch(invocation: MessageInvocation) = {
scheduler.execute(new Runnable() {
def run = {
invocation.invoke
}
def run = invocation.invoke
})
}
@ -25,12 +23,8 @@ class ForkJoinBasedEventDrivenDispatcher(val name: String) extends MessageDispat
active = true
}
def canBeShutDown = true
def shutdown = if (active) {
scheduler.shutdown
active = false
}
def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
def unregisterHandler(key: AnyRef) = {}
}

View file

@ -10,34 +10,9 @@ import se.scalablesolutions.akka.util.HashCode
import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.actor.Actor
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ConcurrentHashMap
trait MessageQueue {
def append(handle: MessageInvocation)
def prepend(handle: MessageInvocation)
}
trait MessageInvoker {
def invoke(message: MessageInvocation)
}
trait MessageDispatcher {
def dispatch(invocation: MessageInvocation)
def registerHandler(key: AnyRef, handler: MessageInvoker)
def unregisterHandler(key: AnyRef)
def canBeShutDown: Boolean
def start
def shutdown
}
trait MessageDemultiplexer {
def select
def acquireSelectedInvocations: List[MessageInvocation]
def releaseSelectedInvocations
def wakeUp
}
class MessageInvocation(val receiver: Actor,
final class MessageInvocation(val receiver: Actor,
val message: Any,
val future: Option[CompletableFutureResult],
val sender: Option[Actor],
@ -73,3 +48,29 @@ class MessageInvocation(val receiver: Actor,
"\n]"
}
}
trait MessageQueue {
def append(handle: MessageInvocation)
def prepend(handle: MessageInvocation)
}
trait MessageInvoker {
def invoke(message: MessageInvocation)
}
trait MessageDispatcher {
protected val references = new ConcurrentHashMap[String, Actor]
def dispatch(invocation: MessageInvocation)
def start
def shutdown
def register(actor: Actor) = references.put(actor.uuid, actor)
def unregister(actor: Actor) = references.remove(actor.uuid)
def canBeShutDown: Boolean = references.isEmpty
}
trait MessageDemultiplexer {
def select
def wakeUp
def acquireSelectedInvocations: List[MessageInvocation]
def releaseSelectedInvocations
}

View file

@ -26,7 +26,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
val invoker = messageHandlers.get(invocation.receiver)
val invoker = messageInvokers.get(invocation.receiver)
if (invoker != null) invoker.invoke(invocation)
iter.remove
}

View file

@ -65,7 +65,9 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("event-driven:reactor:thread-pool:dispatcher:" + _name)
with ThreadPoolBuilder {
private val busyInvokers = new HashSet[AnyRef]
private var fair = true
private val busyActors = new HashSet[AnyRef]
private val messageDemultiplexer = new Demultiplexer(queue)
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
@ -76,30 +78,15 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
val messageDemultiplexer = new Demultiplexer(queue)
selectorThread = new Thread(name) {
override def run = {
while (active) {
try {
try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
// guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val reservedInvocations = reserve(selectedInvocations)
val it = reservedInvocations.entrySet.iterator
while (it.hasNext) {
val entry = it.next
val invocation = entry.getKey
val invoker = entry.getValue
executor.execute(new Runnable() {
def run = {
invoker.invoke(invocation)
free(invocation.receiver)
messageDemultiplexer.wakeUp
}
})
}
process(messageDemultiplexer.acquireSelectedInvocations)
} finally {
messageDemultiplexer.releaseSelectedInvocations
}
@ -111,30 +98,46 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
override protected def doShutdown = executor.shutdownNow
private def reserve(invocations: List[MessageInvocation]): HashMap[MessageInvocation, MessageInvoker] = guard.synchronized {
val result = new HashMap[MessageInvocation, MessageInvoker]
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
var nrOfBusyMessages = 0
val totalNrOfActors = messageInvokers.size
val totalNrOfBusyActors = busyActors.size
val invocations = selectedInvocations.iterator
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
val invocation = invocations.next
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (!busyInvokers.contains(invocation.receiver)) {
val invoker = messageHandlers.get(invocation.receiver)
if (!busyActors.contains(invocation.receiver)) {
val invoker = messageInvokers.get(invocation.receiver)
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
result.put(invocation, invoker)
busyInvokers.add(invocation.receiver)
iterator.remove
resume(invocation.receiver)
invocations.remove
executor.execute(new Runnable() {
def run = {
invoker.invoke(invocation)
suspend(invocation.receiver)
messageDemultiplexer.wakeUp
}
})
} else nrOfBusyMessages += 1
}
}
result
private def resume(actor: AnyRef) = synchronized {
busyActors.add(actor)
}
private def suspend(actor: AnyRef) = synchronized {
busyActors.remove(actor)
}
private def passFairnessCheck(nrOfBusyMessages: Int) = {
if (fair) true
else nrOfBusyMessages < 100
}
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
private def free(invoker: AnyRef) = guard.synchronized {
busyInvokers.remove(invoker)
}
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedInvocationsLock = new ReentrantLock

View file

@ -39,15 +39,10 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
selectorThread.start
}
def canBeShutDown = true
def shutdown = if (active) {
active = false
selectorThread.interrupt
}
def registerHandler(key: AnyRef, handler: MessageInvoker) = {}
def unregisterHandler(key: AnyRef) = {}
}
class BlockingMessageQueue(name: String) extends MessageQueue {

View file

@ -48,7 +48,7 @@ trait ThreadPoolBuilder {
}
/**
* Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeeded.
* Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded.
* <p/>
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
*/

View file

@ -3,11 +3,15 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
object state {
var s = "NIL"
}
class ReplyActor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "Send" => reply("Reply")
case "SendImplicit" => sender.get ! "ReplyImplicit"
@ -15,6 +19,8 @@ class ReplyActor extends Actor {
}
class SenderActor(replyActor: Actor) extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "Init" => replyActor ! "Send"
case "Reply" => state.s = "Reply"

View file

@ -5,7 +5,6 @@ import junit.framework.TestCase
import junit.framework.TestSuite
import se.scalablesolutions.akka.actor.{RemoteActorTest, InMemoryActorTest, ThreadBasedActorTest, SupervisorTest, RemoteSupervisorTest, SchedulerTest}
import se.scalablesolutions.akka.dispatch.{ReactorBasedSingleThreadEventDrivenDispatcherTest, ReactorBasedThreadPoolEventDrivenDispatcherTest}
object AllTest extends TestCase {
def suite(): Test = {

View file

@ -2,10 +2,25 @@ package test
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import net.lag.logging.Logger
import se.scalablesolutions.akka.actor.Actor
/**
* The Computer Language Benchmarks Game
* <p/>
* URL: [http://shootout.alioth.debian.org/]
* <p/>
* Contributed by Julien Gaugaz.
* <p/>
* Inspired by the version contributed by Yura Taras and modified by Isaac Gouy.
*/
class PerformanceTest extends JUnitSuite {
@Test
def benchAkkaActorsVsScalaActors = {
def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
import se.scalablesolutions.akka.actor.Actor
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
@ -19,7 +34,7 @@ class PerformanceTest extends JUnitSuite {
case class MeetingCount(count: int)
case class ExitActor(actor: Actor, reason: String)
var totalTime = -1
var totalTime = 0L
class Mall(var nrMeets: int, numChameneos: int) extends Actor {
var waitingChameneo: Option[Actor] = None
@ -44,7 +59,7 @@ class PerformanceTest extends JUnitSuite {
sumMeetings = sumMeetings + i
if (numFaded == numChameneos) {
totalTime = System.currentTimeMillis - startTime
println("Total time Akka Actors: " + totalTime)
println("time: " + totalTime)
exit
}
}
@ -93,7 +108,7 @@ class PerformanceTest extends JUnitSuite {
case ExitActor(_, _) =>
colour = FADED
sender.get ! MeetingCount(meetings)
//exit
exit
}
def complement(otherColour: Colour): Colour = {
@ -123,15 +138,157 @@ class PerformanceTest extends JUnitSuite {
override def toString() = cid + "(" + colour + ")"
}
@Test def dummy {assert(true)}
@Test
def stressTest {
val N = 1000000
val numChameneos = 4
val mall = new Mall(N, numChameneos)
val mall = new Mall(nrOfMessages, nrOfActors)
mall.startChameneos
Thread.sleep(1000 * 10)
assert(totalTime < 5000)
Thread.sleep(sleepTime)
totalTime
}
def stressTestScalaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
var totalTime = 0L
import scala.actors._
import scala.actors.Actor._
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
case object BLUE extends Colour
case object FADED extends Colour
val colours = Array(BLUE, RED, YELLOW)
case class Meet(colour: Colour)
case class Change(colour: Colour)
case class MeetingCount(count: int)
class Mall(var n: int, numChameneos: int) extends Actor {
var waitingChameneo: Option[OutputChannel[Any]] = None
var startTime: Long = 0L
start()
def startChameneos(): Unit = {
startTime = System.currentTimeMillis
var i = 0
while (i < numChameneos) {
Chameneo(this, colours(i % 3), i).start()
i = i + 1
}
}
def act() {
var sumMeetings = 0
var numFaded = 0
loop {
react {
case MeetingCount(i) => {
numFaded = numFaded + 1
sumMeetings = sumMeetings + i
if (numFaded == numChameneos) {
totalTime = System.currentTimeMillis - startTime
exit()
}
}
case msg@Meet(c) => {
if (n > 0) {
waitingChameneo match {
case Some(chameneo) =>
n = n - 1
chameneo.forward(msg)
waitingChameneo = None
case None =>
waitingChameneo = Some(sender)
}
} else {
waitingChameneo match {
case Some(chameneo) =>
chameneo ! Exit(this, "normal")
case None =>
}
sender ! Exit(this, "normal")
}
}
}
}
}
}
case class Chameneo(var mall: Mall, var colour: Colour, id: int) extends Actor {
var meetings = 0
def act() {
loop {
mall ! Meet(colour)
react {
case Meet(otherColour) =>
colour = complement(otherColour)
meetings = meetings + 1
sender ! Change(colour)
case Change(newColour) =>
colour = newColour
meetings = meetings + 1
case Exit(_, _) =>
colour = FADED
sender ! MeetingCount(meetings)
exit()
}
}
}
def complement(otherColour: Colour): Colour = {
colour match {
case RED => otherColour match {
case RED => RED
case YELLOW => BLUE
case BLUE => YELLOW
case FADED => FADED
}
case YELLOW => otherColour match {
case RED => BLUE
case YELLOW => YELLOW
case BLUE => RED
case FADED => FADED
}
case BLUE => otherColour match {
case RED => YELLOW
case YELLOW => RED
case BLUE => BLUE
case FADED => FADED
}
case FADED => FADED
}
}
override def toString() = id + "(" + colour + ")"
}
val mall = new Mall(nrOfMessages, nrOfActors)
mall.startChameneos
Thread.sleep(sleepTime)
totalTime
}
Logger.INFO
println("===========================================")
println("== Benchmark Akka Actors vs Scala Actors ==")
var nrOfMessages = 2000000
var nrOfActors = 4
var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 20)
var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
println("\tNr of messages:\t" + nrOfMessages)
println("\tNr of actors:\t" + nrOfActors)
println("\tAkka Actors:\t" + akkaTime + "\t milliseconds")
println("\tScala Actors:\t" + scalaTime + "\t milliseconds")
println("\tAkka is " + ratio + " times faster\n")
println("===========================================")
assert(ratio >= 2.0)
}
}

View file

@ -1,116 +0,0 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import org.junit.{Test, Before}
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
class ReactorBasedSingleThreadEventDrivenDispatcherTest extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test def shouldMessagesDispatchedToDifferentHandlersAreExecutedSequentially = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially
}
@Test def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
val key1 = new Actor { def receive = { case _ => {}} }
val key2 = new Actor { def receive = { case _ => {}} }
val key3 = new Actor { def receive = { case _ => {}} }
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially: Unit = {
val handleLatch = new CountDownLatch(2)
val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
dispatcher.dispatch(new MessageInvocation(key2, new Object, None, None, None))
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val dispatcher = new ReactorBasedSingleThreadEventDrivenDispatcher("name")
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
dispatcher.dispatch(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
dispatcher.shutdown
}
}

View file

@ -1,160 +0,0 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import se.scalablesolutions.akka.actor.Actor
class ReactorBasedThreadPoolEventDrivenDispatcherTest extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
val key1 = new Actor { def receive = { case _ => {}} }
val key2 = new Actor { def receive = { case _ => {}} }
val key3 = new Actor { def receive = { case _ => {}} }
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def shouldMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
return
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val guardLock1 = new ReentrantLock
val guardLock2 = new ReentrantLock
val handlersBarrier = new CyclicBarrier(3)
val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock1.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock2.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.dispatch(new MessageInvocation(key1, "Sending Message 1", None, None, None))
dispatcher.dispatch(new MessageInvocation(key1, "Sending Message 1.1", None, None, None))
dispatcher.dispatch(new MessageInvocation(key2, "Sending Message 2", None, None, None))
dispatcher.dispatch(new MessageInvocation(key2, "Sending Message 2.2", None, None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
dispatcher.dispatch(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
}

View file

@ -5,6 +5,8 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.{OneWay, Die, Ping}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@ -15,7 +17,6 @@ import org.junit.Test
class SupervisorTest extends JUnitSuite {
import Actor.Sender.Self
var messageLog: String = ""
var oneWayLog: String = ""
@ -446,13 +447,6 @@ class SupervisorTest extends JUnitSuite {
// Creat some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = {
// Create an abstract SupervisorContainer that works for all implementations
// of the different Actors (Services).
//
// Then create a concrete container in which we mix in support for the specific
// implementation of the Actors we want to use.
pingpong1 = new PingPong1Actor
val factory = SupervisorFactory(
@ -593,29 +587,4 @@ class SupervisorTest extends JUnitSuite {
messageLog += reason.asInstanceOf[Exception].getMessage
}
}
// =============================================
/*
class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) {
override def postRestart(serverContainer: ActorContainer) = {
messageLog += "allforone"
}
}
class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) {
override def postRestart(serverContainer: ActorContainer) = {
messageLog += "oneforone"
}
}
abstract class TestSupervisorFactory extends SupervisorFactory {
override def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
scheme match {
case AllForOne => new Supervisor(new TestAllForOneStrategy(maxNrOfRetries, timeRange))
case OneForOne => new Supervisor(new TestOneForOneStrategy(maxNrOfRetries, timeRange))
}
}
}
*/
}

View file

@ -12,8 +12,7 @@ import junit.framework.TestCase;
import se.scalablesolutions.akka.Config;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import java.util.concurrent.ThreadPoolExecutor;
import se.scalablesolutions.akka.dispatch.*;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
static String messageLog = "";
@ -22,14 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
protected void setUp() {
Config.config();
se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolDispatcher dispatcher = new se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolDispatcher("name");
dispatcher
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())
.buildThreadPool();
MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test");
conf.addExternalGuiceModule(new AbstractModule() {
protected void configure() {

View file

@ -37,10 +37,14 @@ public class InMemNestedStateTest extends TestCase {
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
Thread.sleep(100);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
Thread.sleep(100);
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
Thread.sleep(100);
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@ -66,10 +70,15 @@ public class InMemNestedStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setVectorState("init"); // set init state
Thread.sleep(100);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
Thread.sleep(100);
nested.setVectorState("init"); // set init state
Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
Thread.sleep(100);
assertEquals("new state", stateful.getVectorState());
Thread.sleep(100);
assertEquals("new state", nested.getVectorState());
}
@ -96,9 +105,13 @@ public class InMemNestedStateTest extends TestCase {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
stateful.setRefState("init"); // set init state
Thread.sleep(100);
nested.setRefState("init"); // set init state
Thread.sleep(100);
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
Thread.sleep(100);
assertEquals("new state", stateful.getRefState());
Thread.sleep(100);
assertEquals("new state", nested.getRefState());
}

View file

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" relativePaths="false" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />