diff --git a/akka-actor/src/main/resources/logback.xml b/akka-actor/src/main/resources/logback.xml
deleted file mode 100644
index 4635396601..0000000000
--- a/akka-actor/src/main/resources/logback.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
- [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n
-
-
-
- ./logs/akka.log
-
- [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n
-
-
- ./logs/akka.log.%d{yyyy-MM-dd-HH}
-
-
-
-
-
-
-
-
diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala
index 8267a93a54..21c407255f 100644
--- a/akka-actor/src/main/scala/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/actor/ActorRef.scala
@@ -829,9 +829,8 @@ class LocalActorRef private[akka](
actor.shutdown
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
- remoteAddress.foreach { address =>
- RemoteClientModule.unregister(address, uuid)
- }
+ if(remoteAddress.isDefined)
+ RemoteClientModule.unregister(remoteAddress.get, uuid)
RemoteServerModule.unregister(this)
}
nullOutActorRefReferencesFor(actorInstance.get)
@@ -1153,17 +1152,13 @@ class LocalActorRef private[akka](
isInInitialization = true
val actor = actorFactory match {
case Left(Some(clazz)) =>
- try {
- val ctor = clazz.getConstructor()
- ctor.setAccessible(true)
- ctor.newInstance()
- } catch {
- case e: InstantiationException => throw new ActorInitializationException(
- "Could not instantiate Actor due to:\n" + e +
+ import ReflectiveAccess.{createInstance,noParams,noArgs}
+ createInstance(clazz.asInstanceOf[Class[_]],noParams,noArgs).
+ getOrElse(throw new ActorInitializationException(
+ "Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
- "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")
- }
+ "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
case Right(Some(factory)) =>
factory()
case _ =>
diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala
index 90ecdc30d5..f3a479e6fd 100644
--- a/akka-actor/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala
@@ -157,16 +157,18 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
+ private val emptySet = new ConcurrentSkipListSet[V]
def put(key: K, value: V) {
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized {
- if (!set.isEmpty) {
+ if (set.isEmpty) true //IF the set is empty then it has been removed, so signal retry
+ else { //Else add the value to the set and signal that retry is not needed
set add v
false
- } else true
+ }
}
}
@@ -203,6 +205,14 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
set foreach fun
}
+ def findValue(key: K)(f: (V) => Boolean): Option[V] = {
+ val set = container get key
+ if (set ne null)
+ set.iterator.find(f)
+ else
+ None
+ }
+
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
@@ -213,12 +223,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
val set = container get key
if (set ne null) {
set.synchronized {
- set remove value
- if (set.isEmpty)
- container remove key
+ if (set.remove(value)) { //If we can remove the value
+ if (set.isEmpty) //and the set becomes empty
+ container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
+ }
}
}
}
- def clear = container.clear
+ def clear = { foreach(remove _) }
}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
deleted file mode 100644
index 24c566b48c..0000000000
--- a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.dispatch
-
-import java.util.{LinkedList, Queue, List}
-import java.util.HashMap
-
-import se.scalablesolutions.akka.actor.{Actor, ActorRef}
-
-abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
- @volatile protected var active: Boolean = false
- protected val queue = new ReactiveMessageQueue(name)
- protected var selectorThread: Thread = _
- protected val guard = new Object
-
- def dispatch(invocation: MessageInvocation) = queue.append(invocation)
-
- def shutdown = if (active) {
- log.debug("Shutting down %s", toString)
- active = false
- selectorThread.interrupt
- doShutdown
- }
-
- /**
- * Subclass callback. Override if additional shutdown behavior is needed.
- */
- protected def doShutdown = {}
-}
-
-class ReactiveMessageQueue(name: String) extends MessageQueue {
- private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
- @volatile private var interrupted = false
-
- def append(handle: MessageInvocation) = queue.synchronized {
- queue.offer(handle)
- queue.notifyAll
- }
-
- def read(destination: List[MessageInvocation]) = queue.synchronized {
- while (queue.isEmpty && !interrupted) queue.wait
- if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
- else interrupted = false
- }
-
- def interrupt = queue.synchronized {
- interrupted = true
- queue.notifyAll
- }
-}
diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala
index b4ef1f1f44..9a7e44a197 100644
--- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala
@@ -46,10 +46,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
- val MAILBOX_BOUNDS = BoundedMailbox(
- Dispatchers.MAILBOX_CAPACITY,
- config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").
- map(Duration(_,TimeUnit.MILLISECONDS))
+ val MAILBOX_CONFIG = MailboxConfig(
+ capacity = Dispatchers.MAILBOX_CAPACITY,
+ pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),
+ blockingDequeue = false
)
lazy val defaultGlobalDispatcher = {
@@ -58,17 +58,13 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
- object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_BOUNDS) {
+ object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
}
}
- object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
-
- object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
-
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
*
@@ -99,7 +95,7 @@ object Dispatchers extends Logging {
*
* E.g. each actor consumes its own thread.
*/
- def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, BoundedMailbox(mailboxCapacity,Option(pushTimeOut)))
+ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, MailboxConfig(mailboxCapacity,Option(pushTimeOut),true))
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@@ -122,6 +118,14 @@ object Dispatchers extends Logging {
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity)
+ /**
+ * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ *
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
+
+
/**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
*
@@ -136,18 +140,6 @@ object Dispatchers extends Logging {
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxCapacity: Int) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxCapacity)
- /**
- * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
- *
- * Has a fluent builder interface for configuring its semantics.
- */
- def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name)
-
- /**
- * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread.
- */
- def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name)
-
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
@@ -160,9 +152,8 @@ object Dispatchers extends Logging {
*
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
- * # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
- * # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
- * # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
+ * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
+ * # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt
* keep-alive-ms = 60000 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@@ -200,20 +191,16 @@ object Dispatchers extends Logging {
})
}
- lazy val mailboxBounds: BoundedMailbox = {
+ lazy val mailboxBounds: MailboxConfig = {
val capacity = cfg.getInt("mailbox-capacity",Dispatchers.MAILBOX_CAPACITY)
val timeout = cfg.getInt("mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS))
- BoundedMailbox(capacity,timeout)
+ MailboxConfig(capacity,timeout,false)
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
- case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig)
- case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
- case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
- case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 3ed81ff740..dbbfe3442a 100644
--- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -65,15 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
class ExecutorBasedEventDrivenDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
- mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS,
+ mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
- def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None))
+ def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
-
- mailboxCapacity = mailboxBounds.capacity
+ //FIXME remove this from ThreadPoolBuilder
+ mailboxCapacity = mailboxConfig.capacity
@volatile private var active: Boolean = false
@@ -81,27 +81,18 @@ class ExecutorBasedEventDrivenDispatcher(
init
def dispatch(invocation: MessageInvocation) = {
- getMailbox(invocation.receiver).add(invocation)
+ getMailbox(invocation.receiver) enqueue invocation
dispatch(invocation.receiver)
}
/**
* @return the mailbox associated with the actor
*/
- private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
+ private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
- override def createMailbox(actorRef: ActorRef): AnyRef = {
- if (mailboxCapacity <= 0)
- new ConcurrentLinkedQueue[MessageInvocation]
- else if (mailboxBounds.pushTimeOut.isDefined) {
- val timeout = mailboxBounds.pushTimeOut.get
- new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit)
- }
- else
- new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
- }
+ override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
def dispatch(receiver: ActorRef): Unit = if (active) {
@@ -140,12 +131,12 @@ class ExecutorBasedEventDrivenDispatcher(
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
val mailbox = getMailbox(receiver)
- var messageInvocation = mailbox.poll
+ var messageInvocation = mailbox.dequeue
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
- if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
+ if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue
else {
messageInvocation = null
return !mailbox.isEmpty
diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
index c14ff00c3c..383c58905a 100644
--- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
@@ -4,14 +4,14 @@
package se.scalablesolutions.akka.dispatch
-import java.util.List
-
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import se.scalablesolutions.akka.AkkaException
-import java.util.concurrent.{ConcurrentSkipListSet}
import se.scalablesolutions.akka.util.{Duration, HashCode, Logging}
+import java.util.{Queue, List}
+import java.util.concurrent._
+import concurrent.forkjoin.LinkedTransferQueue
/**
* @author Jonas Bonér
@@ -63,16 +63,65 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
* @author Jonas Bonér
*/
trait MessageQueue {
- def append(handle: MessageInvocation)
+ def enqueue(handle: MessageInvocation)
+ def dequeue(): MessageInvocation
+ def size: Int
+ def isEmpty: Boolean
}
/* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout
* (If capacity > 0)
*/
-case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration])
+case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingDequeue: Boolean) {
+
+ /**
+ * Creates a MessageQueue (Mailbox) with the specified properties
+ * bounds = whether the mailbox should be bounded (< 0 means unbounded)
+ * pushTime = only used if bounded, indicates if and how long an enqueue should block
+ * blockDequeue = whether dequeues should block or not
+ *
+ * The bounds + pushTime generates a MessageQueueAppendFailedException if enqueue times out
+ */
+ def newMailbox(bounds: Int = capacity,
+ pushTime: Option[Duration] = pushTimeOut,
+ blockDequeue: Boolean = blockingDequeue) : MessageQueue = {
+ if (bounds <= 0) { //UNBOUNDED: Will never block enqueue and optionally blocking dequeue
+ new LinkedTransferQueue[MessageInvocation] with MessageQueue {
+ def enqueue(handle: MessageInvocation): Unit = this add handle
+ def dequeue(): MessageInvocation = {
+ if(blockDequeue) this.take()
+ else this.poll()
+ }
+ }
+ }
+ else if (pushTime.isDefined) { //BOUNDED: Timeouted enqueue with MessageQueueAppendFailedException and optionally blocking dequeue
+ val time = pushTime.get
+ new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue {
+ def enqueue(handle: MessageInvocation) {
+ if (!this.offer(handle,time.length,time.unit))
+ throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
+ }
+
+ def dequeue(): MessageInvocation = {
+ if (blockDequeue) this.take()
+ else this.poll()
+ }
+ }
+ }
+ else { //BOUNDED: Blocking enqueue and optionally blocking dequeue
+ new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue {
+ def enqueue(handle: MessageInvocation): Unit = this put handle
+ def dequeue(): MessageInvocation = {
+ if(blockDequeue) this.take()
+ else this.poll()
+ }
+ }
+ }
+ }
+}
/**
- * @author Jonas Bonér
+ * @author Jonas Bonér
*/
trait MessageDispatcher extends Logging {
protected val uuids = new ConcurrentSkipListSet[String]
diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala
index 303b6499d7..2ba88f25c3 100644
--- a/akka-actor/src/main/scala/dispatch/Queues.scala
+++ b/akka-actor/src/main/scala/dispatch/Queues.scala
@@ -9,14 +9,8 @@ import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.Iterator
import se.scalablesolutions.akka.util.Logger
-class BoundedTransferQueue[E <: AnyRef](
- val capacity: Int,
- val pushTimeout: Long,
- val pushTimeUnit: TimeUnit)
- extends LinkedTransferQueue[E] {
+class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
require(capacity > 0)
- require(pushTimeout > 0)
- require(pushTimeUnit ne null)
protected val guard = new Semaphore(capacity)
@@ -50,7 +44,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def offer(e: E): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire) {
val result = try {
super.offer(e)
} catch {
@@ -63,9 +57,9 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire(timeout,unit)) {
val result = try {
- super.offer(e,timeout,unit)
+ super.offer(e)
} catch {
case e => guard.release; throw e
}
@@ -76,7 +70,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def add(e: E): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire) {
val result = try {
super.add(e)
} catch {
@@ -89,17 +83,16 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def put(e :E): Unit = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
- try {
- super.put(e)
- } catch {
- case e => guard.release; throw e
- }
+ guard.acquire
+ try {
+ super.put(e)
+ } catch {
+ case e => guard.release; throw e
}
}
override def tryTransfer(e: E): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire) {
val result = try {
super.tryTransfer(e)
} catch {
@@ -112,9 +105,9 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire(timeout,unit)) {
val result = try {
- super.tryTransfer(e,timeout,unit)
+ super.tryTransfer(e)
} catch {
case e => guard.release; throw e
}
@@ -125,7 +118,7 @@ class BoundedTransferQueue[E <: AnyRef](
}
override def transfer(e: E): Unit = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ if (guard.tryAcquire) {
try {
super.transfer(e)
} catch {
diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
deleted file mode 100644
index c698b22c15..0000000000
--- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-/**
- * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
- * See also this article: [http://today.java.net/cs/user/print/a/350].
- *
- * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
- */
-package se.scalablesolutions.akka.dispatch
-
-import java.util.{LinkedList, List}
-import se.scalablesolutions.akka.actor.ActorRef
-
-class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
- extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
-
- def start = if (!active) {
- log.debug("Starting up %s", toString)
- active = true
- val messageDemultiplexer = new Demultiplexer(queue)
- selectorThread = new Thread(name) {
- override def run = {
- while (active) {
- try {
- messageDemultiplexer.select
- } catch { case e: InterruptedException => active = false }
- val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
- val iter = selectedInvocations.iterator
- while (iter.hasNext) {
- val invocation = iter.next
- val invoker = invocation.receiver
- if (invoker ne null) invoker invoke invocation
- iter.remove
- }
- }
- }
- }
- selectorThread.start
- }
-
- def mailboxSize(a: ActorRef) = 0
-
- def isShutdown = !active
-
- override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"
-
- class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
-
- private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]
-
- def select = messageQueue.read(selectedQueue)
-
- def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue
-
- def releaseSelectedInvocations = throw new UnsupportedOperationException("Demultiplexer can't release its queue")
-
- def wakeUp = throw new UnsupportedOperationException("Demultiplexer can't be woken up")
- }
-}
-
diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
deleted file mode 100644
index 684f737c07..0000000000
--- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.dispatch
-
-import java.util.concurrent.locks.ReentrantLock
-
-import java.util.{HashSet, HashMap, LinkedList, List}
-import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
-
-/**
- * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
- * See also this article: [http://today.java.net/cs/user/print/a/350].
- *
- *
- * Default settings are:
- *
- * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
- * - NR_START_THREADS = 16
- * - NR_MAX_THREADS = 128
- * - KEEP_ALIVE_TIME = 60000L // one minute
- *
- *
- *
- * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
- * There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
- *
- *
- * Scala API.
- *
- * Example usage:
- *
- * val dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name")
- * dispatcher
- * .withNewThreadPoolWithBoundedBlockingQueue(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy)
- * .buildThreadPool
- *
- *
- *
- * Java API.
- *
- * Example usage:
- *
- * ReactorBasedThreadPoolEventDrivenDispatcher dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name");
- * dispatcher
- * .withNewThreadPoolWithBoundedBlockingQueue(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy())
- * .buildThreadPool();
- *
- *
- *
- * But the preferred way of creating dispatchers is to use
- * the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
- *
- * @author Jonas Bonér
- */
-class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit)
- extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
- with ThreadPoolBuilder {
-
- def this(_name: String) = this(_name,_ => ())
-
- private var fair = true
- private val busyActors = new HashSet[AnyRef]
- private val messageDemultiplexer = new Demultiplexer(queue)
-
- // build default thread pool
- init
-
- def start = if (!active) {
- log.debug("Starting up %s", toString)
- active = true
-
- /**
- * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa
- * [http://code.google.com/p/actorom/].
- */
- selectorThread = new Thread(name) {
- override def run = {
- while (active) {
- try {
- try {
- messageDemultiplexer.select
- } catch { case e: InterruptedException => active = false }
- process(messageDemultiplexer.acquireSelectedInvocations)
- } finally {
- messageDemultiplexer.releaseSelectedInvocations
- }
- }
- }
- };
- selectorThread.start
- }
-
- override protected def doShutdown = executor.shutdownNow
-
- private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
- var nrOfBusyMessages = 0
- val totalNrOfActors = uuids.size
- val totalNrOfBusyActors = busyActors.size
- val invocations = selectedInvocations.iterator
- while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
- val invocation = invocations.next
- if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
- if (!busyActors.contains(invocation.receiver)) {
- val invoker = invocation.receiver
- if (invoker eq null) throw new IllegalActorStateException(
- "Message invoker for invocation [" + invocation + "] is null")
- resume(invocation.receiver)
- invocations.remove
- executor.execute(new Runnable() {
- def run = {
- invoker.invoke(invocation)
- suspend(invocation.receiver)
- messageDemultiplexer.wakeUp
- }
- })
- } else nrOfBusyMessages += 1
- }
- }
-
- private def resume(actor: AnyRef) = synchronized {
- busyActors.add(actor)
- }
-
- private def suspend(actor: AnyRef) = synchronized {
- busyActors.remove(actor)
- }
-
- private def passFairnessCheck(nrOfBusyMessages: Int) = {
- if (fair) true
- else nrOfBusyMessages < 100
- }
-
- def mailboxSize(a: ActorRef) = 0
-
- def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
- "Can't build a new thread pool for a dispatcher that is already up and running")
-
- override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]"
-
- class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
- private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
- private val selectedInvocationsLock = new ReentrantLock
-
- def select = try {
- selectedInvocationsLock.lock
- messageQueue.read(selectedInvocations)
- } finally {
- selectedInvocationsLock.unlock
- }
-
- def acquireSelectedInvocations: List[MessageInvocation] = {
- selectedInvocationsLock.lock
- selectedInvocations
- }
-
- def releaseSelectedInvocations = selectedInvocationsLock.unlock
-
- def wakeUp = messageQueue.interrupt
- }
-
- private[akka] def init = {
- withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
- config(this)
- buildThreadPool
- }
-}
diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index 599bac87fd..eda5a86a9e 100644
--- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -9,7 +9,7 @@ import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
-import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
+import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@@ -17,9 +17,9 @@ import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
* @author Jonas Bonér
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
- val mailboxBounds: BoundedMailbox
+ val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
- def this(actor: ActorRef, capacity: Int) = this(actor,BoundedMailbox(capacity,None))
+ def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
@@ -27,17 +27,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
- override def createMailbox(actorRef: ActorRef): AnyRef = {
- if (mailboxBounds.capacity <= 0)
- new LinkedTransferQueue[MessageInvocation] with ThreadMessageBlockingQueue
- else if (mailboxBounds.pushTimeOut.isDefined) {
- val timeout = mailboxBounds.pushTimeOut.get
- new BoundedTransferQueue[MessageInvocation](mailboxBounds.capacity, timeout.length, timeout.unit) with ThreadMessageBlockingQueue
- }
- else
- new LinkedBlockingQueue[MessageInvocation](mailboxBounds.capacity) with ThreadMessageBlockingQueue
- }
-
+ override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
+
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
throw new IllegalArgumentException("Cannot register to anyone but " + actor)
@@ -45,11 +36,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
- def mailbox = actor.mailbox.asInstanceOf[ThreadMessageBlockingQueue]
+ def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
- def dispatch(invocation: MessageInvocation) = mailbox append invocation
+ def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
@@ -58,7 +49,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
override def run = {
while (active) {
try {
- actor.invoke(mailbox.next)
+ actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
@@ -76,16 +67,4 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
-}
-
-trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[MessageInvocation] {
- final def next: MessageInvocation = take
- def append(invocation: MessageInvocation): Unit = put(invocation)
-}
-
-trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] {
- final override def append(invocation: MessageInvocation): Unit = {
- if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
- throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
- }
-}
+}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala
index 878403d026..8bfb7f857e 100644
--- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala
@@ -66,7 +66,7 @@ object ReflectiveAccess {
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] =
- createInstance("se.scalablesolutions.akka.remote.RemoteClient$")
+ getObject("se.scalablesolutions.akka.remote.RemoteClient$")
def register(address: InetSocketAddress, uuid: String) = {
ensureRemotingEnabled
@@ -124,10 +124,10 @@ object ReflectiveAccess {
}
val remoteServerObjectInstance: Option[RemoteServerObject] =
- createInstance("se.scalablesolutions.akka.remote.RemoteServer$")
+ getObject("se.scalablesolutions.akka.remote.RemoteServer$")
val remoteNodeObjectInstance: Option[RemoteNodeObject] =
- createInstance("se.scalablesolutions.akka.remote.RemoteNode$")
+ getObject("se.scalablesolutions.akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = {
ensureRemotingEnabled
@@ -163,7 +163,7 @@ object ReflectiveAccess {
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] =
- createInstance("se.scalablesolutions.akka.actor.TypedActor$")
+ getObject("se.scalablesolutions.akka.actor.TypedActor$")
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled
@@ -192,7 +192,7 @@ object ReflectiveAccess {
"Can't load the typed actor module, make sure that akka-jta.jar is on the classpath")
val transactionContainerObjectInstance: Option[TransactionContainerObject] =
- createInstance("se.scalablesolutions.akka.actor.TransactionContainer$")
+ getObject("se.scalablesolutions.akka.actor.TransactionContainer$")
def createTransactionContainer: TransactionContainer = {
ensureJtaEnabled
@@ -200,16 +200,37 @@ object ReflectiveAccess {
}
}
- protected def createInstance[T](fqn: String,
- ctorSpec: Array[Class[_]] = noParams,
- ctorArgs: Array[AnyRef] = noArgs): Option[T] = try {
- val clazz = loader.loadClass(fqn)
- val ctor = clazz.getDeclaredConstructor(ctorSpec: _*)
+ val noParams = Array[Class[_]]()
+ val noArgs = Array[AnyRef]()
+
+ def createInstance[T](clazz: Class[_],
+ params: Array[Class[_]],
+ args: Array[AnyRef]): Option[T] = try {
+ val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
- Some(ctor.newInstance(ctorArgs: _*).asInstanceOf[T])
+ Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
- case e: Exception =>
- Logger("createInstance").error(e, "Couldn't load [%s(%s) => %s(%s)]",fqn,ctorSpec.mkString(", "),fqn,ctorArgs.mkString(", "))
- None
+ case e: Exception => None
+ }
+
+ def createInstance[T](fqn: String,
+ params: Array[Class[_]],
+ args: Array[AnyRef],
+ classloader: ClassLoader = loader): Option[T] = try {
+ val clazz = classloader.loadClass(fqn)
+ val ctor = clazz.getDeclaredConstructor(params: _*)
+ ctor.setAccessible(true)
+ Some(ctor.newInstance(args: _*).asInstanceOf[T])
+ } catch {
+ case e: Exception => None
+ }
+
+ def getObject[T](fqn: String, classloader: ClassLoader = loader): Option[T] = try {//Obtains a reference to $MODULE$
+ val clazz = classloader.loadClass(fqn)
+ val instance = clazz.getDeclaredField("MODULE$")
+ instance.setAccessible(true)
+ Option(instance.get(null).asInstanceOf[T])
+ } catch {
+ case e: Exception => None
}
}
diff --git a/akka-actor/src/test/resources/logback-test.xml b/akka-actor/src/test/resources/logback-test.xml
deleted file mode 100644
index 78eae40ec4..0000000000
--- a/akka-actor/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
- [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n
-
-
-
-
-
-
-
diff --git a/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala b/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala
index bb548b9251..81fd933cda 100644
--- a/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala
+++ b/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala
@@ -28,13 +28,9 @@ object DispatchersSpec {
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
- "ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher],
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
- "ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher],
"Hawt" -> ofType[HawtDispatcher],
- "GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
- "GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
"GlobalHawt" -> instance(globalHawtDispatcher)
)
diff --git a/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala b/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala
new file mode 100644
index 0000000000..27afdbbce6
--- /dev/null
+++ b/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala
@@ -0,0 +1,53 @@
+package se.scalablesolutions.akka.actor.dispatch
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import se.scalablesolutions.akka.actor.Actor
+import Actor._
+import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
+import se.scalablesolutions.akka.util.Duration
+import se.scalablesolutions.akka.dispatch.{MessageQueueAppendFailedException, MessageInvocation, MailboxConfig, Dispatchers}
+import java.util.concurrent.atomic.{AtomicReference}
+
+object MailboxConfigSpec {
+
+}
+
+class MailboxConfigSpec extends JUnitSuite {
+ import MailboxConfigSpec._
+
+ private val unit = TimeUnit.MILLISECONDS
+
+ @Test def shouldCreateUnboundedQueue = {
+ val m = MailboxConfig(-1,None,false)
+ assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE)
+ }
+
+ @Test def shouldCreateBoundedQueue = {
+ val m = MailboxConfig(1,None,false)
+ assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1)
+ }
+
+ @Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = {
+ val m = MailboxConfig(1,Some(Duration(1,unit)),false)
+ val testActor = actorOf( new Actor { def receive = { case _ => }} )
+ val mbox = m.newMailbox()
+ (1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor,i,None,None,None)) }
+ }
+
+
+ @Test def shouldBeAbleToDequeueUnblocking = {
+ val m = MailboxConfig(1,Some(Duration(1,unit)),false)
+ val mbox = m.newMailbox()
+ val latch = new CountDownLatch(1)
+ val t = new Thread { override def run = {
+ mbox.dequeue
+ latch.countDown
+ }}
+ t.start
+ val result = latch.await(5000,unit)
+ if (!result)
+ t.interrupt
+ assert(result === true)
+ }
+}
diff --git a/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala
deleted file mode 100644
index de9b912bf5..0000000000
--- a/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-package se.scalablesolutions.akka.actor.dispatch
-
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
-import se.scalablesolutions.akka.dispatch.Dispatchers
-import se.scalablesolutions.akka.actor.Actor
-import Actor._
-
-object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
- class TestActor extends Actor {
- self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid)
-
- def receive = {
- case "Hello" =>
- self.reply("World")
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
- }
-
- object OneWayTestActor {
- val oneWay = new CountDownLatch(1)
- }
- class OneWayTestActor extends Actor {
- self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
- def receive = {
- case "OneWay" => OneWayTestActor.oneWay.countDown
- }
- }
-}
-
-class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite {
- import ReactorBasedSingleThreadEventDrivenDispatcherActorSpec._
-
- private val unit = TimeUnit.MILLISECONDS
-
- @Test def shouldSendOneWay = {
- val actor = actorOf[OneWayTestActor].start
- val result = actor ! "OneWay"
- assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
- actor.stop
- }
-
- @Test def shouldSendReplySync = {
- val actor = actorOf[TestActor].start
- val result = (actor !! ("Hello", 10000)).as[String].get
- assert("World" === result)
- actor.stop
- }
-
- @Test def shouldSendReplyAsync = {
- val actor = actorOf[TestActor].start
- val result = actor !! "Hello"
- assert("World" === result.get.asInstanceOf[String])
- actor.stop
- }
-
- @Test def shouldSendReceiveException = {
- val actor = actorOf[TestActor].start
- try {
- actor !! "Failure"
- fail("Should have thrown an exception")
- } catch {
- case e =>
- assert("Expected exception; to test fault-tolerance" === e.getMessage())
- }
- actor.stop
- }
-}
diff --git a/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala
deleted file mode 100644
index 4001df8f56..0000000000
--- a/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package se.scalablesolutions.akka.actor.dispatch
-
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
-import se.scalablesolutions.akka.dispatch.Dispatchers
-import se.scalablesolutions.akka.actor.Actor
-import Actor._
-
-object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
- class TestActor extends Actor {
- self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
- def receive = {
- case "Hello" =>
- self.reply("World")
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
- }
-}
-
-class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
- import ReactorBasedThreadPoolEventDrivenDispatcherActorSpec._
-
- private val unit = TimeUnit.MILLISECONDS
-
- @Test def shouldSendOneWay {
- val oneWay = new CountDownLatch(1)
- val actor = actorOf(new Actor {
- self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
- def receive = {
- case "OneWay" => oneWay.countDown
- }
- }).start
- val result = actor ! "OneWay"
- assert(oneWay.await(1, TimeUnit.SECONDS))
- actor.stop
- }
-
- @Test def shouldSendReplySync = {
- val actor = actorOf[TestActor].start
- val result = (actor !! ("Hello", 10000)).as[String].get
- assert("World" === result)
- actor.stop
- }
-
- @Test def shouldSendReplyAsync = {
- val actor = actorOf[TestActor].start
- val result = actor !! "Hello"
- assert("World" === result.get.asInstanceOf[String])
- actor.stop
- }
-
- @Test def shouldSendReceiveException = {
- val actor = actorOf[TestActor].start
- try {
- actor !! "Failure"
- fail("Should have thrown an exception")
- } catch {
- case e =>
- assert("Expected exception; to test fault-tolerance" === e.getMessage())
- }
- actor.stop
- }
-}
diff --git a/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
index 3d048684cd..26fdb6e1ef 100644
--- a/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
+++ b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
@@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
}).start
val actor2 = Actor.actorOf(new Actor {
- self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test")
+ self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
override def postRestart(cause: Throwable) {countDownLatch.countDown}
protected def receive = {
diff --git a/akka-http/src/main/scala/AkkaBroadcaster.scala b/akka-http/src/main/scala/AkkaBroadcaster.scala
index 8f724ff445..ca5abc6f1d 100644
--- a/akka-http/src/main/scala/AkkaBroadcaster.scala
+++ b/akka-http/src/main/scala/AkkaBroadcaster.scala
@@ -9,10 +9,13 @@ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.Dispatchers
-class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
- name = classOf[AkkaBroadcaster].getName
-
+object AkkaBroadcaster {
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
+}
+
+class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
+ import AkkaBroadcaster._
+ name = classOf[AkkaBroadcaster].getName
//FIXME should be supervised
val caster = actorOf(new Actor {
diff --git a/akka-remote/src/test/resources/logback-test.xml b/akka-remote/src/test/resources/logback-test.xml
deleted file mode 100644
index 78eae40ec4..0000000000
--- a/akka-remote/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
- [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n
-
-
-
-
-
-
-
diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
index 2fe8bad905..7a2676d040 100644
--- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala
@@ -134,6 +134,20 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop
}
+ @Test
+ def reflectiveAccessShouldNotCreateNewRemoteServerObject {
+ val server1 = new RemoteServer()
+ server1.start("localhost", 9990)
+
+ var found = RemoteServer.serverFor("localhost", 9990)
+ assert(found.isDefined, "sever not found")
+
+ val a = actor { case _ => }
+
+ found = RemoteServer.serverFor("localhost", 9990)
+ assert(found.isDefined, "sever not found after creating an actor")
+ }
+
@Test
def shouldNotRecreateRegisteredActor {
diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
index 2a42ec0900..e66090fe16 100644
--- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
+++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd
@@ -39,8 +39,6 @@
-
-
diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
index 80b37c41f5..84a382a78e 100644
--- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
+++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
@@ -39,8 +39,6 @@
-
-
diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala
index c47efcdb78..caa344825a 100644
--- a/akka-spring/src/main/scala/ActorFactoryBean.scala
+++ b/akka-spring/src/main/scala/ActorFactoryBean.scala
@@ -54,7 +54,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
@BeanProperty var property: PropertyEntries = _
@BeanProperty var applicationContext: ApplicationContext = _
- // Holds info about if deps has been set or not. Depends on
+ // Holds info about if deps have been set or not. Depends on
// if interface is specified or not. We must set deps on
// target instance if interface is specified
var hasSetDependecies = false
diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
index 2d9807a806..0e4de3576f 100644
--- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
+++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
@@ -101,8 +101,6 @@ object AkkaSpringConfigurationTags {
// dispatcher types
val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven"
val EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING = "executor-based-event-driven-work-stealing"
- val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven"
- val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven"
val THREAD_BASED = "thread-based"
val HAWT = "hawt"
diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala
index 5986b5a697..4d13fa6814 100644
--- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala
+++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala
@@ -26,8 +26,6 @@ object DispatcherFactoryBean {
var dispatcher = properties.dispatcherType match {
case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name)
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name)
- case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name)
- case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name)
case THREAD_BASED => if (!actorRef.isDefined) {
throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
} else {
diff --git a/akka-spring/src/test/resources/dispatcher-config.xml b/akka-spring/src/test/resources/dispatcher-config.xml
index 37c33516e0..728917c6c8 100644
--- a/akka-spring/src/test/resources/dispatcher-config.xml
+++ b/akka-spring/src/test/resources/dispatcher-config.xml
@@ -66,13 +66,6 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
-
-
-
-
-
-
-
diff --git a/akka-spring/src/test/scala/ActorFactoryBeanTest.scala b/akka-spring/src/test/scala/ActorFactoryBeanTest.scala
index f765cc3307..13c6203929 100644
--- a/akka-spring/src/test/scala/ActorFactoryBeanTest.scala
+++ b/akka-spring/src/test/scala/ActorFactoryBeanTest.scala
@@ -34,7 +34,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
assert(bean.isRemote)
}
- it("should create an typed actor with dispatcher if dispatcher is set") {
+ it("should create a typed actor with dispatcher if dispatcher is set") {
val props = new DispatcherProperties()
props.dispatcherType = "executor-based-event-driven"
bean.setDispatcher(props);
@@ -60,12 +60,12 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
bean.setProperty(entries)
assert(classOf[PojoInf].isAssignableFrom(bean.getObjectType))
- // Check that we have injected the depencency correctly
+ // Check that we have injected the dependency correctly
val target = bean.createInstance.asInstanceOf[PojoInf]
assert(target.getStringFromVal === entry.value)
}
- it("should create an application context and verify dependency injection for tryped") {
+ it("should create an application context and verify dependency injection for typed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val ta = ctx.getBean("typedActor").asInstanceOf[PojoInf];
assert(ta.isInitInvoked)
diff --git a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala
index 83c179e29a..9dfb5bce94 100644
--- a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala
+++ b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala
@@ -57,7 +57,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
it("should be able to parse the dispatcher with a thread pool configuration") {
val xml =
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
- assert(props.dispatcherType == "reactor-based-thread-pool-event-driven")
+ assert(props.dispatcherType == "executor-based-event-driven")
assert(props.name == "myDispatcher")
assert(props.threadPool.corePoolSize == 2)
assert(props.threadPool.maxPoolSize == 10)
@@ -86,16 +86,6 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
}
- it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") {
- val xml =
-
-
- evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
- }
-
-
it("should throw IllegalArgumentException when configuring a thread based dispatcher without TypedActor or UntypedActor") {
val xml =
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
index ff10cabb31..db62acde3f 100644
--- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala
@@ -96,19 +96,6 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
}
- scenario("get a reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue from context") {
- val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
- val dispatcher = context.getBean("reactor-based-thread-pool-event-driven-dispatcher").asInstanceOf[ReactorBasedThreadPoolEventDrivenDispatcher]
- val executor = getThreadPoolExecutorAndAssert(dispatcher)
- assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
- }
-
- scenario("get a reactor-based-single-thread-event-driven-dispatcher with synchronous-queue from context") {
- val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
- val dispatcher = context.getBean("reactor-based-single-thread-event-driven-dispatcher").asInstanceOf[ReactorBasedSingleThreadEventDrivenDispatcher]
- assert(dispatcher != null)
- }
-
scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]
diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
index 0397d30bf0..67ab6398a6 100644
--- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
@@ -59,6 +59,7 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with
feature("parse Spring application context") {
+<<<<<<< HEAD
scenario("get a untyped actor") {
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
myactor.sendOneWay("Hello")