Switching dispatching strategy to 1 runnable per mailbox and removing use of TransferQueue

This commit is contained in:
Viktor Klang 2010-09-12 21:00:50 +02:00
parent 8b6895c328
commit 5a39c3b7b1
4 changed files with 33 additions and 227 deletions

View file

@ -199,10 +199,7 @@ trait ActorRef extends
/**
* This is a reference to the message currently being processed by the actor
*/
protected[akka] var _currentMessage: Option[MessageInvocation] = None
protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg }
protected[akka] def currentMessage = guard.withGuard { _currentMessage }
@volatile protected[akka] var currentMessage: MessageInvocation = null
/**
* Comparison only takes uuid into account.
@ -1010,7 +1007,7 @@ class LocalActorRef private[akka](
if (isShutdown)
Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
else {
currentMessage = Option(messageHandle)
currentMessage = messageHandle
try {
dispatch(messageHandle)
} catch {
@ -1018,7 +1015,7 @@ class LocalActorRef private[akka](
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
} finally {
currentMessage = None //TODO: Don't reset this, we might want to resend the message
currentMessage = null //TODO: Don't reset this, we might want to resend the message
}
}
}
@ -1182,7 +1179,7 @@ class LocalActorRef private[akka](
}
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.trace("Invoking actor with message:\n" + messageHandle)
Actor.log.trace("Invoking actor with message: %s\n",messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
@ -1529,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* Is defined if the message was sent from another Actor, else None.
*/
def sender: Option[ActorRef] = {
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender }
val msg = currentMessage
if (msg.isEmpty) None
else msg.get.sender
if (msg eq null) None
else msg.sender
}
/**
@ -1540,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture(): Option[CompletableFuture[Any]] = {
// Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture }
val msg = currentMessage
if (msg.isEmpty) None
else msg.get.senderFuture
if (msg eq null) None
else msg.senderFuture
}

View file

@ -85,30 +85,15 @@ class ExecutorBasedEventDrivenDispatcher(
*/
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
final def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
finishedBeforeMailboxEmpty = false //Reset this every run
if (dispatcherLock.tryLock()) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true
finishedBeforeMailboxEmpty = try {
processMailbox()
} catch {
case e =>
dispatcherLock.unlock()
if (!self.isEmpty)
registerForExecution(self)
throw e
}
dispatcherLock.unlock()
if (finishedBeforeMailboxEmpty)
registerForExecution(self)
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty))
val reschedule = try {
processMailbox()
} finally {
dispatcherLock.unlock()
}
if (reschedule || !self.isEmpty)
registerForExecution(self)
}
/**
@ -144,6 +129,20 @@ class ExecutorBasedEventDrivenDispatcher(
registerForExecution(mbox)
}
protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
if (mailbox.dispatcherLock.tryLock()) {
try {
executor execute mailbox
} catch {
case e: RejectedExecutionException =>
mailbox.dispatcherLock.unlock()
throw e
}
}
} else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
}
/**
* @return the mailbox associated with the actor
*/
@ -158,11 +157,6 @@ class ExecutorBasedEventDrivenDispatcher(
new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox
}
protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
executor execute mailbox
} else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
}
def start = if (!active) {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)

View file

@ -1,182 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import concurrent.forkjoin.LinkedTransferQueue
import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.Iterator
import se.scalablesolutions.akka.util.Logger
class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
val bounded = (capacity > 0)
protected lazy val guard = new Semaphore(capacity)
override def take(): E = {
if (!bounded) {
super.take
} else {
val e = super.take
if (e ne null) guard.release
e
}
}
override def poll(): E = {
if (!bounded) {
super.poll
} else {
val e = super.poll
if (e ne null) guard.release
e
}
}
override def poll(timeout: Long, unit: TimeUnit): E = {
if (!bounded) {
super.poll(timeout,unit)
} else {
val e = super.poll(timeout,unit)
if (e ne null) guard.release
e
}
}
override def remainingCapacity: Int = {
if (!bounded) super.remainingCapacity
else guard.availablePermits
}
override def remove(o: AnyRef): Boolean = {
if (!bounded) {
super.remove(o)
} else {
if (super.remove(o)) {
guard.release
true
} else false
}
}
override def offer(e: E): Boolean = {
if (!bounded) {
super.offer(e)
} else {
if (guard.tryAcquire) {
val result = try {
super.offer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else false
}
}
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (!bounded) {
super.offer(e,timeout,unit)
} else {
if (guard.tryAcquire(timeout,unit)) {
val result = try {
super.offer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else false
}
}
override def add(e: E): Boolean = {
if (!bounded) {
super.add(e)
} else {
if (guard.tryAcquire) {
val result = try {
super.add(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else false
}
}
override def put(e :E): Unit = {
if (!bounded) {
super.put(e)
} else {
guard.acquire
try {
super.put(e)
} catch {
case e => guard.release; throw e
}
}
}
override def tryTransfer(e: E): Boolean = {
if (!bounded) {
super.tryTransfer(e)
} else {
if (guard.tryAcquire) {
val result = try {
super.tryTransfer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else false
}
}
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
if (!bounded) {
super.tryTransfer(e,timeout,unit)
} else {
if (guard.tryAcquire(timeout,unit)) {
val result = try {
super.tryTransfer(e)
} catch {
case e => guard.release; throw e
}
if (!result) guard.release
result
} else false
}
}
override def transfer(e: E): Unit = {
if (!bounded) {
super.transfer(e)
} else {
if (guard.tryAcquire) {
try {
super.transfer(e)
} catch {
case e => guard.release; throw e
}
}
}
}
override def iterator: Iterator[E] = {
val it = super.iterator
new Iterator[E] {
def hasNext = it.hasNext
def next = it.next
def remove {
it.remove
if (bounded)
guard.release //Assume remove worked if no exception was thrown
}
}
}
}

View file

@ -11,7 +11,6 @@ import ThreadPoolExecutor.CallerRunsPolicy
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.util.{Logger, Logging}
import concurrent.forkjoin.LinkedTransferQueue
trait ThreadPoolBuilder extends Logging {
val name: String
@ -70,7 +69,7 @@ trait ThreadPoolBuilder extends Logging {
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new LinkedTransferQueue[Runnable]
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
boundedExecutorBound = bound
this
@ -79,7 +78,7 @@ trait ThreadPoolBuilder extends Logging {
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new LinkedTransferQueue[Runnable]
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
@ -88,7 +87,7 @@ trait ThreadPoolBuilder extends Logging {
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new BoundableTransferQueue[Runnable](capacity)
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this