Merge branch 'oldmaster'

This commit is contained in:
Viktor Klang 2010-08-31 16:44:23 +02:00
commit 69822ae07a
5 changed files with 198 additions and 107 deletions

View file

@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.Config.config
import net.lag.configgy.ConfigMap
import se.scalablesolutions.akka.util.UUID
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import java.util.concurrent.TimeUnit
/**
* Scala API. Dispatcher factory.
@ -44,8 +45,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
@ -75,6 +76,7 @@ object Dispatchers extends Logging {
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
* <p/>
* E.g. each actor consumes its own thread.
*/
@ -82,11 +84,19 @@ object Dispatchers extends Logging {
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>

View file

@ -10,6 +10,7 @@ import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import se.scalablesolutions.akka.AkkaException
import java.util.concurrent.{ConcurrentSkipListSet}
/**
@ -56,6 +57,8 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -0,0 +1,148 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import concurrent.forkjoin.LinkedTransferQueue
import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.Iterator
import se.scalablesolutions.akka.util.Logger
class BoundedTransferQueue[E <: AnyRef](
val capacity: Int,
val pushTimeout: Long,
val pushTimeUnit: TimeUnit)
extends LinkedTransferQueue[E] {
require(capacity > 0)
require(pushTimeout > 0)
require(pushTimeUnit ne null)
protected val guard = new Semaphore(capacity)
override def take(): E = {
val e = super.take
if (e ne null) guard.release
e
}
override def poll(): E = {
val e = super.poll
if (e ne null) guard.release
e
}
override def poll(timeout: Long, unit: TimeUnit): E = {
val e = super.poll(timeout,unit)
if (e ne null) guard.release
e
}
override def remainingCapacity = guard.availablePermits
override def remove(o: AnyRef): Boolean = {
if (super.remove(o)) {
guard.release
true
} else {
false
}
}
override def offer(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
super.offer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else
false
}
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
super.offer(e,timeout,unit)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else
false
}
override def add(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
super.add(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else
false
}
override def put(e :E): Unit = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
try {
super.put(e)
} catch {
case e => guard.release; throw e
}
}
}
override def tryTransfer(e: E): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
super.tryTransfer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else
false
}
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
super.tryTransfer(e,timeout,unit)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else
false
}
override def transfer(e: E): Unit = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
try {
super.transfer(e)
} catch {
case e => guard.release; throw e
}
}
}
override def iterator: Iterator[E] = {
val it = super.iterator
new Iterator[E] {
def hasNext = it.hasNext
def next = it.next
def remove {
it.remove
guard.release //Assume remove worked if no exception was thrown
}
}
}
}

View file

@ -4,27 +4,47 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher {
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY,
val pushTimeout: Long = 10000,
val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS
) extends MessageDispatcher {
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private val queue = new BlockingMessageQueue(name, mailboxCapacity)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
if (actor.mailbox eq null) {
actor.mailbox = if (mailboxCapacity > 0)
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue
else
new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue
}
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
throw new IllegalArgumentException("Cannot register to anyone but " + actor)
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue]
def dispatch(invocation: MessageInvocation) = mailbox append invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
@ -33,7 +53,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def run = {
while (active) {
try {
actor.invoke(queue.take)
actor.invoke(mailbox.next)
} catch { case e: InterruptedException => active = false }
}
}
@ -53,12 +73,14 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue {
private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
else new LinkedBlockingQueue[MessageInvocation]
def append(invocation: MessageInvocation) = queue.put(invocation)
def take: MessageInvocation = queue.take
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
def interrupt = throw new UnsupportedOperationException
trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] =>
final def append(invocation: MessageInvocation): Unit = {
if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
}
}
final def next: MessageInvocation = self.take
}

View file

@ -1,92 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import concurrent.forkjoin.LinkedTransferQueue
import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.Iterator
import se.scalablesolutions.akka.util.Logger
class BoundedTransferQueue[E <: AnyRef](
val capacity: Int,
val pushTimeout: Long,
val pushTimeUnit: TimeUnit)
extends LinkedTransferQueue[E] {
require(capacity > 0)
require(pushTimeout > 0)
require(pushTimeUnit ne null)
protected val guard = new Semaphore(capacity)
//Enqueue an item within the push timeout (acquire Semaphore)
protected def enq(f: => Boolean): Boolean = {
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
val result = try {
f
} catch {
case e =>
guard.release //If something broke, release
throw e
}
if (!result) guard.release //Didn't add anything
result
} else
false
}
//Dequeue an item (release Semaphore)
protected def deq(e: E): E = {
if (e ne null) guard.release //Signal removal of item
e
}
override def take(): E = deq(super.take)
override def poll(): E = deq(super.poll)
override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit))
override def remainingCapacity = guard.availablePermits
override def remove(o: AnyRef): Boolean = {
if (super.remove(o)) {
guard.release
true
} else {
false
}
}
override def offer(e: E): Boolean =
enq(super.offer(e))
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean =
enq(super.offer(e,timeout,unit))
override def add(e: E): Boolean =
enq(super.add(e))
override def put(e :E): Unit =
enq({ super.put(e); true })
override def tryTransfer(e: E): Boolean =
enq(super.tryTransfer(e))
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean =
enq(super.tryTransfer(e,timeout,unit))
override def transfer(e: E): Unit =
enq({ super.transfer(e); true })
override def iterator: Iterator[E] = {
val it = super.iterator
new Iterator[E] {
def hasNext = it.hasNext
def next = it.next
def remove {
it.remove
guard.release //Assume remove worked if no exception was thrown
}
}
}
}