merged with master

This commit is contained in:
Michael Kober 2010-09-13 13:49:22 +02:00
commit aa906bf705
31 changed files with 245 additions and 690 deletions

View file

@ -1,31 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<!-- FOR AKKA INTERNAL USE ONLY -->
<configuration scan="false" debug="false">
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<appender name="R" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>./logs/akka.log</File>
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>./logs/akka.log.%d{yyyy-MM-dd-HH}</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="se.scalablesolutions" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="R"/>
</root>
</configuration>

View file

@ -829,9 +829,8 @@ class LocalActorRef private[akka](
actor.shutdown
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
remoteAddress.foreach { address =>
RemoteClientModule.unregister(address, uuid)
}
if(remoteAddress.isDefined)
RemoteClientModule.unregister(remoteAddress.get, uuid)
RemoteServerModule.unregister(this)
}
nullOutActorRefReferencesFor(actorInstance.get)
@ -1153,17 +1152,13 @@ class LocalActorRef private[akka](
isInInitialization = true
val actor = actorFactory match {
case Left(Some(clazz)) =>
try {
val ctor = clazz.getConstructor()
ctor.setAccessible(true)
ctor.newInstance()
} catch {
case e: InstantiationException => throw new ActorInitializationException(
"Could not instantiate Actor due to:\n" + e +
import ReflectiveAccess.{createInstance,noParams,noArgs}
createInstance(clazz.asInstanceOf[Class[_]],noParams,noArgs).
getOrElse(throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")
}
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
case Right(Some(factory)) =>
factory()
case _ =>

View file

@ -157,16 +157,18 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
def put(key: K, value: V) {
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized {
if (!set.isEmpty) {
if (set.isEmpty) true //IF the set is empty then it has been removed, so signal retry
else { //Else add the value to the set and signal that retry is not needed
set add v
false
} else true
}
}
}
@ -203,6 +205,14 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
set foreach fun
}
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
val set = container get key
if (set ne null)
set.iterator.find(f)
else
None
}
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
@ -213,12 +223,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
val set = container get key
if (set ne null) {
set.synchronized {
set remove value
if (set.isEmpty)
container remove key
if (set.remove(value)) { //If we can remove the value
if (set.isEmpty) //and the set becomes empty
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
}
}
}
}
def clear = container.clear
def clear = { foreach(remove _) }
}

View file

@ -1,52 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
selectorThread.interrupt
doShutdown
}
/**
* Subclass callback. Override if additional shutdown behavior is needed.
*/
protected def doShutdown = {}
}
class ReactiveMessageQueue(name: String) extends MessageQueue {
private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
@volatile private var interrupted = false
def append(handle: MessageInvocation) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
}
def read(destination: List[MessageInvocation]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
else interrupted = false
}
def interrupt = queue.synchronized {
interrupted = true
queue.notifyAll
}
}

View file

@ -46,10 +46,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
val MAILBOX_BOUNDS = BoundedMailbox(
Dispatchers.MAILBOX_CAPACITY,
config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").
map(Duration(_,TimeUnit.MILLISECONDS))
val MAILBOX_CONFIG = MailboxConfig(
capacity = Dispatchers.MAILBOX_CAPACITY,
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),
blockingDequeue = false
)
lazy val defaultGlobalDispatcher = {
@ -58,17 +58,13 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_BOUNDS) {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
}
}
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
* <p/>
@ -99,7 +95,7 @@ object Dispatchers extends Logging {
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, BoundedMailbox(mailboxCapacity,Option(pushTimeOut)))
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, MailboxConfig(mailboxCapacity,Option(pushTimeOut),true))
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -122,6 +118,14 @@ object Dispatchers extends Logging {
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
/**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
* <p/>
@ -136,18 +140,6 @@ object Dispatchers extends Logging {
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxCapacity: Int) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxCapacity)
/**
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name)
/**
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread.
*/
def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name)
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
@ -160,9 +152,8 @@ object Dispatchers extends Logging {
*
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt
* keep-alive-ms = 60000 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@ -200,20 +191,16 @@ object Dispatchers extends Logging {
})
}
lazy val mailboxBounds: BoundedMailbox = {
lazy val mailboxBounds: MailboxConfig = {
val capacity = cfg.getInt("mailbox-capacity",Dispatchers.MAILBOX_CAPACITY)
val timeout = cfg.getInt("mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS))
BoundedMailbox(capacity,timeout)
MailboxConfig(capacity,timeout,false)
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig)
case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher

View file

@ -65,15 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
class ExecutorBasedEventDrivenDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS,
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None))
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
mailboxCapacity = mailboxBounds.capacity
//FIXME remove this from ThreadPoolBuilder
mailboxCapacity = mailboxConfig.capacity
@volatile private var active: Boolean = false
@ -81,27 +81,18 @@ class ExecutorBasedEventDrivenDispatcher(
init
def dispatch(invocation: MessageInvocation) = {
getMailbox(invocation.receiver).add(invocation)
getMailbox(invocation.receiver) enqueue invocation
dispatch(invocation.receiver)
}
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0)
new ConcurrentLinkedQueue[MessageInvocation]
else if (mailboxBounds.pushTimeOut.isDefined) {
val timeout = mailboxBounds.pushTimeOut.get
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit)
}
else
new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
def dispatch(receiver: ActorRef): Unit = if (active) {
@ -140,12 +131,12 @@ class ExecutorBasedEventDrivenDispatcher(
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
val mailbox = getMailbox(receiver)
var messageInvocation = mailbox.poll
var messageInvocation = mailbox.dequeue
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue
else {
messageInvocation = null
return !mailbox.isEmpty

View file

@ -4,14 +4,14 @@
package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import se.scalablesolutions.akka.AkkaException
import java.util.concurrent.{ConcurrentSkipListSet}
import se.scalablesolutions.akka.util.{Duration, HashCode, Logging}
import java.util.{Queue, List}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -63,16 +63,65 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageQueue {
def append(handle: MessageInvocation)
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def size: Int
def isEmpty: Boolean
}
/* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout
* (If capacity > 0)
*/
case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration])
case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingDequeue: Boolean) {
/**
* Creates a MessageQueue (Mailbox) with the specified properties
* bounds = whether the mailbox should be bounded (< 0 means unbounded)
* pushTime = only used if bounded, indicates if and how long an enqueue should block
* blockDequeue = whether dequeues should block or not
*
* The bounds + pushTime generates a MessageQueueAppendFailedException if enqueue times out
*/
def newMailbox(bounds: Int = capacity,
pushTime: Option[Duration] = pushTimeOut,
blockDequeue: Boolean = blockingDequeue) : MessageQueue = {
if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue
new LinkedTransferQueue[MessageInvocation] with MessageQueue {
def enqueue(handle: MessageInvocation): Unit = this add handle
def dequeue(): MessageInvocation = {
if(blockDequeue) this.take()
else this.poll()
}
}
}
else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue
val time = pushTime.get
new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue {
def enqueue(handle: MessageInvocation) {
if (!this.offer(handle,time.length,time.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
}
def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}
}
}
else { //BOUNDED: Blocking enqueue and optionally blocking dequeue
new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue {
def enqueue(handle: MessageInvocation): Unit = this put handle
def dequeue(): MessageInvocation = {
if(blockDequeue) this.take()
else this.poll()
}
}
}
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends Logging {
protected val uuids = new ConcurrentSkipListSet[String]

View file

@ -9,14 +9,8 @@ import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.Iterator
import se.scalablesolutions.akka.util.Logger
class BoundedTransferQueue[E <: AnyRef](
val capacity: Int,
val pushTimeout: Long,
val pushTimeUnit: TimeUnit)
extends LinkedTransferQueue[E] {
class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
require(capacity > 0)
require(pushTimeout > 0)
require(pushTimeUnit ne null)
protected val guard = new Semaphore(capacity)
@ -50,7 +44,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def offer(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire) {
val result = try {
super.offer(e)
} catch {
@ -63,9 +57,9 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire(timeout,unit)) {
val result = try {
super.offer(e,timeout,unit)
super.offer(e)
} catch {
case e => guard.release; throw e
}
@ -76,7 +70,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def add(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire) {
val result = try {
super.add(e)
} catch {
@ -89,17 +83,16 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def put(e :E): Unit = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
try {
super.put(e)
} catch {
case e => guard.release; throw e
}
guard.acquire
try {
super.put(e)
} catch {
case e => guard.release; throw e
}
}
override def tryTransfer(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire) {
val result = try {
super.tryTransfer(e)
} catch {
@ -112,9 +105,9 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire(timeout,unit)) {
val result = try {
super.tryTransfer(e,timeout,unit)
super.tryTransfer(e)
} catch {
case e => guard.release; throw e
}
@ -125,7 +118,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def transfer(e: E): Unit = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
if (guard.tryAcquire) {
try {
super.transfer(e)
} catch {

View file

@ -1,62 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, List}
import se.scalablesolutions.akka.actor.ActorRef
class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
val messageDemultiplexer = new Demultiplexer(queue)
selectorThread = new Thread(name) {
override def run = {
while (active) {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
val invoker = invocation.receiver
if (invoker ne null) invoker invoke invocation
iter.remove
}
}
}
}
selectorThread.start
}
def mailboxSize(a: ActorRef) = 0
def isShutdown = !active
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]
def select = messageQueue.read(selectedQueue)
def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue
def releaseSelectedInvocations = throw new UnsupportedOperationException("Demultiplexer can't release its queue")
def wakeUp = throw new UnsupportedOperationException("Demultiplexer can't be woken up")
}
}

View file

@ -1,176 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
import java.util.{HashSet, HashMap, LinkedList, List}
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
* See also this article: [http://today.java.net/cs/user/print/a/350].
* <p/>
*
* Default settings are:
* <pre/>
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
* - NR_START_THREADS = 16
* - NR_MAX_THREADS = 128
* - KEEP_ALIVE_TIME = 60000L // one minute
* </pre>
* <p/>
*
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
* <p/>
*
* Scala API.
* <p/>
* Example usage:
* <pre/>
* val dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .buildThreadPool
* </pre>
* <p/>
*
* Java API.
* <p/>
* Example usage:
* <pre/>
* ReactorBasedThreadPoolEventDrivenDispatcher dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy())
* .buildThreadPool();
* </pre>
* <p/>
*
* But the preferred way of creating dispatchers is to use
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
with ThreadPoolBuilder {
def this(_name: String) = this(_name,_ => ())
private var fair = true
private val busyActors = new HashSet[AnyRef]
private val messageDemultiplexer = new Demultiplexer(queue)
// build default thread pool
init
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa
* [http://code.google.com/p/actorom/].
*/
selectorThread = new Thread(name) {
override def run = {
while (active) {
try {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
process(messageDemultiplexer.acquireSelectedInvocations)
} finally {
messageDemultiplexer.releaseSelectedInvocations
}
}
}
};
selectorThread.start
}
override protected def doShutdown = executor.shutdownNow
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
var nrOfBusyMessages = 0
val totalNrOfActors = uuids.size
val totalNrOfBusyActors = busyActors.size
val invocations = selectedInvocations.iterator
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
val invocation = invocations.next
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
if (!busyActors.contains(invocation.receiver)) {
val invoker = invocation.receiver
if (invoker eq null) throw new IllegalActorStateException(
"Message invoker for invocation [" + invocation + "] is null")
resume(invocation.receiver)
invocations.remove
executor.execute(new Runnable() {
def run = {
invoker.invoke(invocation)
suspend(invocation.receiver)
messageDemultiplexer.wakeUp
}
})
} else nrOfBusyMessages += 1
}
}
private def resume(actor: AnyRef) = synchronized {
busyActors.add(actor)
}
private def suspend(actor: AnyRef) = synchronized {
busyActors.remove(actor)
}
private def passFairnessCheck(nrOfBusyMessages: Int) = {
if (fair) true
else nrOfBusyMessages < 100
}
def mailboxSize(a: ActorRef) = 0
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]"
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedInvocationsLock = new ReentrantLock
def select = try {
selectedInvocationsLock.lock
messageQueue.read(selectedInvocations)
} finally {
selectedInvocationsLock.unlock
}
def acquireSelectedInvocations: List[MessageInvocation] = {
selectedInvocationsLock.lock
selectedInvocations
}
def releaseSelectedInvocations = selectedInvocationsLock.unlock
def wakeUp = messageQueue.interrupt
}
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}

View file

@ -9,7 +9,7 @@ import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@ -17,9 +17,9 @@ import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxBounds: BoundedMailbox
val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
def this(actor: ActorRef, capacity: Int) = this(actor,BoundedMailbox(capacity,None))
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
@ -27,17 +27,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxBounds.capacity <= 0)
new LinkedTransferQueue[MessageInvocation] with ThreadMessageBlockingQueue
else if (mailboxBounds.pushTimeOut.isDefined) {
val timeout = mailboxBounds.pushTimeOut.get
new BoundedTransferQueue[MessageInvocation](mailboxBounds.capacity, timeout.length, timeout.unit) with ThreadMessageBlockingQueue
}
else
new LinkedBlockingQueue[MessageInvocation](mailboxBounds.capacity) with ThreadMessageBlockingQueue
}
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
throw new IllegalArgumentException("Cannot register to anyone but " + actor)
@ -45,11 +36,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageBlockingQueue]
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox append invocation
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
@ -58,7 +49,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
override def run = {
while (active) {
try {
actor.invoke(mailbox.next)
actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
@ -76,16 +67,4 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[MessageInvocation] {
final def next: MessageInvocation = take
def append(invocation: MessageInvocation): Unit = put(invocation)
}
trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] {
final override def append(invocation: MessageInvocation): Unit = {
if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
}
}
}

View file

@ -66,7 +66,7 @@ object ReflectiveAccess {
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] =
createInstance("se.scalablesolutions.akka.remote.RemoteClient$")
getObject("se.scalablesolutions.akka.remote.RemoteClient$")
def register(address: InetSocketAddress, uuid: String) = {
ensureRemotingEnabled
@ -124,10 +124,10 @@ object ReflectiveAccess {
}
val remoteServerObjectInstance: Option[RemoteServerObject] =
createInstance("se.scalablesolutions.akka.remote.RemoteServer$")
getObject("se.scalablesolutions.akka.remote.RemoteServer$")
val remoteNodeObjectInstance: Option[RemoteNodeObject] =
createInstance("se.scalablesolutions.akka.remote.RemoteNode$")
getObject("se.scalablesolutions.akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = {
ensureRemotingEnabled
@ -163,7 +163,7 @@ object ReflectiveAccess {
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] =
createInstance("se.scalablesolutions.akka.actor.TypedActor$")
getObject("se.scalablesolutions.akka.actor.TypedActor$")
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled
@ -192,7 +192,7 @@ object ReflectiveAccess {
"Can't load the typed actor module, make sure that akka-jta.jar is on the classpath")
val transactionContainerObjectInstance: Option[TransactionContainerObject] =
createInstance("se.scalablesolutions.akka.actor.TransactionContainer$")
getObject("se.scalablesolutions.akka.actor.TransactionContainer$")
def createTransactionContainer: TransactionContainer = {
ensureJtaEnabled
@ -200,16 +200,37 @@ object ReflectiveAccess {
}
}
protected def createInstance[T](fqn: String,
ctorSpec: Array[Class[_]] = noParams,
ctorArgs: Array[AnyRef] = noArgs): Option[T] = try {
val clazz = loader.loadClass(fqn)
val ctor = clazz.getDeclaredConstructor(ctorSpec: _*)
val noParams = Array[Class[_]]()
val noArgs = Array[AnyRef]()
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Option[T] = try {
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Some(ctor.newInstance(ctorArgs: _*).asInstanceOf[T])
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: Exception =>
Logger("createInstance").error(e, "Couldn't load [%s(%s) => %s(%s)]",fqn,ctorSpec.mkString(", "),fqn,ctorArgs.mkString(", "))
None
case e: Exception => None
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Option[T] = try {
val clazz = classloader.loadClass(fqn)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: Exception => None
}
def getObject[T](fqn: String, classloader: ClassLoader = loader): Option[T] = try {//Obtains a reference to $MODULE$
val clazz = classloader.loadClass(fqn)
val instance = clazz.getDeclaredField("MODULE$")
instance.setAccessible(true)
Option(instance.get(null).asInstanceOf[T])
} catch {
case e: Exception => None
}
}

View file

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<configuration scan="false" debug="false">
<!-- FOR AKKA INTERNAL USE ONLY -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<logger name="se.scalablesolutions" level="DEBUG"/>
<root level="DEBUG">
<appender-ref ref="stdout"/>
</root>
</configuration>

View file

@ -28,13 +28,9 @@ object DispatchersSpec {
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
"ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher],
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
"ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher],
"Hawt" -> ofType[HawtDispatcher],
"GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
"GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
"GlobalHawt" -> instance(globalHawtDispatcher)
)

View file

@ -0,0 +1,53 @@
package se.scalablesolutions.akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.actor.Actor
import Actor._
import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.util.Duration
import se.scalablesolutions.akka.dispatch.{MessageQueueAppendFailedException, MessageInvocation, MailboxConfig, Dispatchers}
import java.util.concurrent.atomic.{AtomicReference}
object MailboxConfigSpec {
}
class MailboxConfigSpec extends JUnitSuite {
import MailboxConfigSpec._
private val unit = TimeUnit.MILLISECONDS
@Test def shouldCreateUnboundedQueue = {
val m = MailboxConfig(-1,None,false)
assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE)
}
@Test def shouldCreateBoundedQueue = {
val m = MailboxConfig(1,None,false)
assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1)
}
@Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = {
val m = MailboxConfig(1,Some(Duration(1,unit)),false)
val testActor = actorOf( new Actor { def receive = { case _ => }} )
val mbox = m.newMailbox()
(1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor,i,None,None,None)) }
}
@Test def shouldBeAbleToDequeueUnblocking = {
val m = MailboxConfig(1,Some(Duration(1,unit)),false)
val mbox = m.newMailbox()
val latch = new CountDownLatch(1)
val t = new Thread { override def run = {
mbox.dequeue
latch.countDown
}}
t.start
val result = latch.await(5000,unit)
if (!result)
t.interrupt
assert(result === true)
}
}

View file

@ -1,71 +0,0 @@
package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.actor.Actor
import Actor._
object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid)
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
object OneWayTestActor {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
}
}
class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite {
import ReactorBasedSingleThreadEventDrivenDispatcherActorSpec._
private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = {
val actor = actorOf[OneWayTestActor].start
val result = actor ! "OneWay"
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
val result = (actor !! ("Hello", 10000)).as[String].get
assert("World" === result)
actor.stop
}
@Test def shouldSendReplyAsync = {
val actor = actorOf[TestActor].start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test def shouldSendReceiveException = {
val actor = actorOf[TestActor].start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}

View file

@ -1,66 +0,0 @@
package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.actor.Actor
import Actor._
object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
}
class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
import ReactorBasedThreadPoolEventDrivenDispatcherActorSpec._
private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay {
val oneWay = new CountDownLatch(1)
val actor = actorOf(new Actor {
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
def receive = {
case "OneWay" => oneWay.countDown
}
}).start
val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
val result = (actor !! ("Hello", 10000)).as[String].get
assert("World" === result)
actor.stop
}
@Test def shouldSendReplyAsync = {
val actor = actorOf[TestActor].start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test def shouldSendReceiveException = {
val actor = actorOf[TestActor].start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}

View file

@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
}).start
val actor2 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test")
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
override def postRestart(cause: Throwable) {countDownLatch.countDown}
protected def receive = {

View file

@ -9,10 +9,13 @@ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.Dispatchers
class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
name = classOf[AkkaBroadcaster].getName
object AkkaBroadcaster {
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
}
class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
import AkkaBroadcaster._
name = classOf[AkkaBroadcaster].getName
//FIXME should be supervised
val caster = actorOf(new Actor {

View file

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<configuration scan="false" debug="false">
<!-- FOR AKKA INTERNAL USE ONLY -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<logger name="se.scalablesolutions" level="DEBUG"/>
<root level="DEBUG">
<appender-ref ref="stdout"/>
</root>
</configuration>

View file

@ -134,6 +134,20 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop
}
@Test
def reflectiveAccessShouldNotCreateNewRemoteServerObject {
val server1 = new RemoteServer()
server1.start("localhost", 9990)
var found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found")
val a = actor { case _ => }
found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found after creating an actor")
}
@Test
def shouldNotRecreateRegisteredActor {

View file

@ -39,8 +39,6 @@
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="executor-based-event-driven-work-stealing"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
<xsd:enumeration value="hawt"/>
</xsd:restriction>

View file

@ -39,8 +39,6 @@
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="executor-based-event-driven-work-stealing"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
<xsd:enumeration value="hawt"/>
</xsd:restriction>

View file

@ -54,7 +54,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
@BeanProperty var property: PropertyEntries = _
@BeanProperty var applicationContext: ApplicationContext = _
// Holds info about if deps has been set or not. Depends on
// Holds info about if deps have been set or not. Depends on
// if interface is specified or not. We must set deps on
// target instance if interface is specified
var hasSetDependecies = false

View file

@ -101,8 +101,6 @@ object AkkaSpringConfigurationTags {
// dispatcher types
val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven"
val EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING = "executor-based-event-driven-work-stealing"
val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven"
val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven"
val THREAD_BASED = "thread-based"
val HAWT = "hawt"

View file

@ -26,8 +26,6 @@ object DispatcherFactoryBean {
var dispatcher = properties.dispatcherType match {
case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name)
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name)
case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name)
case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name)
case THREAD_BASED => if (!actorRef.isDefined) {
throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
} else {

View file

@ -66,13 +66,6 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
<akka:thread-pool queue="synchronous-queue" fairness="true" />
</akka:dispatcher>
<!-- reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue -->
<akka:dispatcher id="reactor-based-thread-pool-event-driven-dispatcher" type="reactor-based-thread-pool-event-driven" name="myDispatcher">
<akka:thread-pool queue="synchronous-queue" fairness="true" />
</akka:dispatcher>
<akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher" type="reactor-based-single-thread-event-driven" name="myDispatcher" />
<!-- executor-based-event-driven-work-stealing-dispatcher -->
<akka:dispatcher id="executor-based-event-driven-work-stealing-dispatcher" type="executor-based-event-driven-work-stealing" name="workStealingDispatcher" />

View file

@ -34,7 +34,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
assert(bean.isRemote)
}
it("should create an typed actor with dispatcher if dispatcher is set") {
it("should create a typed actor with dispatcher if dispatcher is set") {
val props = new DispatcherProperties()
props.dispatcherType = "executor-based-event-driven"
bean.setDispatcher(props);
@ -60,12 +60,12 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
bean.setProperty(entries)
assert(classOf[PojoInf].isAssignableFrom(bean.getObjectType))
// Check that we have injected the depencency correctly
// Check that we have injected the dependency correctly
val target = bean.createInstance.asInstanceOf[PojoInf]
assert(target.getStringFromVal === entry.value)
}
it("should create an application context and verify dependency injection for tryped") {
it("should create an application context and verify dependency injection for typed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val ta = ctx.getBean("typedActor").asInstanceOf[PojoInf];
assert(ta.isInitInvoked)

View file

@ -57,7 +57,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
it("should be able to parse the dispatcher with a thread pool configuration") {
val xml = <akka:dispatcher id="dispatcher"
type="reactor-based-thread-pool-event-driven"
type="executor-based-event-driven"
name="myDispatcher">
<akka:thread-pool queue="linked-blocking-queue"
capacity="50"
@ -67,7 +67,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:dispatcher>
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcherType == "reactor-based-thread-pool-event-driven")
assert(props.dispatcherType == "executor-based-event-driven")
assert(props.name == "myDispatcher")
assert(props.threadPool.corePoolSize == 2)
assert(props.threadPool.maxPoolSize == 10)
@ -86,16 +86,6 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
}
it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") {
val xml = <akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher"
type="reactor-based-single-thread-event-driven"
name="myDispatcher">
<akka:thread-pool queue="synchronous-queue" fairness="true"/>
</akka:dispatcher>
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
}
it("should throw IllegalArgumentException when configuring a thread based dispatcher without TypedActor or UntypedActor") {
val xml = <akka:dispatcher id="dispatcher" type="thread-based" name="myDispatcher"/>
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]

View file

@ -96,19 +96,6 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
}
scenario("get a reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("reactor-based-thread-pool-event-driven-dispatcher").asInstanceOf[ReactorBasedThreadPoolEventDrivenDispatcher]
val executor = getThreadPoolExecutorAndAssert(dispatcher)
assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
}
scenario("get a reactor-based-single-thread-event-driven-dispatcher with synchronous-queue from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("reactor-based-single-thread-event-driven-dispatcher").asInstanceOf[ReactorBasedSingleThreadEventDrivenDispatcher]
assert(dispatcher != null)
}
scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]

View file

@ -59,6 +59,7 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with
feature("parse Spring application context") {
<<<<<<< HEAD
scenario("get a untyped actor") {
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
myactor.sendOneWay("Hello")