diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index f9ffc219cf..f8aaba1653 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -44,8 +44,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
* @author Jonas Bonér
*/
object Dispatchers extends Logging {
- val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
- val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
+ val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
+ val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala
index 92926bb253..0f8a4881a8 100644
--- a/akka-core/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala
@@ -12,6 +12,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationExce
import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
+import se.scalablesolutions.akka.AkkaException
/**
* @author Jonas Bonér
@@ -57,6 +58,8 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
+class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
+
/**
* @author Jonas Bonér
*/
diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index 684a38a97c..30c0884f26 100644
--- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -4,23 +4,28 @@
package se.scalablesolutions.akka.dispatch
-import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
+import concurrent.forkjoin.LinkedTransferQueue
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author Jonas Bonér
*/
-class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher {
+class ThreadBasedDispatcher(private val actor: ActorRef,
+ val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY,
+ val pushTimeout: Long = 1000,
+ val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS
+ ) extends MessageDispatcher {
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
- private val queue = new BlockingMessageQueue(name, mailboxCapacity)
+ private val queue = new BlockingMessageTransferQueue(name, mailboxCapacity,pushTimeout,pushTimeoutUnit)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@@ -53,12 +58,20 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
-// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
-class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue {
- private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
- else new LinkedBlockingQueue[MessageInvocation]
- def append(invocation: MessageInvocation) = queue.put(invocation)
- def take: MessageInvocation = queue.take
- def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
- def interrupt = throw new UnsupportedOperationException
+// FIXME: configure the LinkedTransferQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
+class BlockingMessageTransferQueue(name: String,
+ mailboxCapacity: Int,
+ pushTimeout: Long,
+ pushTimeoutUnit: TimeUnit) extends MessageQueue {
+ private val queue = if (mailboxCapacity > 0) new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit)
+ else new LinkedTransferQueue[MessageInvocation]
+
+ final def append(invocation: MessageInvocation): Unit = {
+ if(!queue.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
+ if(!queue.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
+ throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
+ }
+ }
+
+ final def take: MessageInvocation = queue.take
}