diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 81d4c40b74..7777051ac1 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -199,10 +199,7 @@ trait ActorRef extends /** * This is a reference to the message currently being processed by the actor */ - protected[akka] var _currentMessage: Option[MessageInvocation] = None - - protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } - protected[akka] def currentMessage = guard.withGuard { _currentMessage } + @volatile protected[akka] var currentMessage: MessageInvocation = null /** * Comparison only takes uuid into account. @@ -1010,7 +1007,7 @@ class LocalActorRef private[akka]( if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { - currentMessage = Option(messageHandle) + currentMessage = messageHandle try { dispatch(messageHandle) } catch { @@ -1018,7 +1015,7 @@ class LocalActorRef private[akka]( Actor.log.error(e, "Could not invoke actor [%s]", this) throw e } finally { - currentMessage = None //TODO: Don't reset this, we might want to resend the message + currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -1182,7 +1179,7 @@ class LocalActorRef private[akka]( } private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.trace("Invoking actor with message:\n" + messageHandle) + Actor.log.trace("Invoking actor with message: %s\n",messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = @@ -1529,10 +1526,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent from another Actor, else None. */ def sender: Option[ActorRef] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.sender + if (msg eq null) None + else msg.sender } /** @@ -1540,10 +1536,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture(): Option[CompletableFuture[Any]] = { - // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } val msg = currentMessage - if (msg.isEmpty) None - else msg.get.senderFuture + if (msg eq null) None + else msg.senderFuture } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 949d701a21..bd8416c95f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -85,30 +85,15 @@ class ExecutorBasedEventDrivenDispatcher( */ trait ExecutableMailbox extends Runnable { self: MessageQueue => final def run = { - var lockAcquiredOnce = false - var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock - do { - finishedBeforeMailboxEmpty = false //Reset this every run - if (dispatcherLock.tryLock()) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - lockAcquiredOnce = true - finishedBeforeMailboxEmpty = try { - processMailbox() - } catch { - case e => - dispatcherLock.unlock() - if (!self.isEmpty) - registerForExecution(self) - throw e - } - dispatcherLock.unlock() - if (finishedBeforeMailboxEmpty) - registerForExecution(self) - } - } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty)) + val reschedule = try { + processMailbox() + } finally { + dispatcherLock.unlock() + } + + if (reschedule || !self.isEmpty) + registerForExecution(self) } /** @@ -144,6 +129,20 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } + protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + if (mailbox.dispatcherLock.tryLock()) { + try { + executor execute mailbox + } catch { + case e: RejectedExecutionException => + mailbox.dispatcherLock.unlock() + throw e + } + } + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) + } + /** * @return the mailbox associated with the actor */ @@ -158,11 +157,6 @@ class ExecutorBasedEventDrivenDispatcher( new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox } - protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { - executor execute mailbox - } else { - log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) - } def start = if (!active) { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) @@ -186,6 +180,7 @@ class ExecutorBasedEventDrivenDispatcher( // FIXME: should we have an unbounded queue and not bounded as default ???? private[akka] def init = { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + //withNewThreadPoolWithLinkedBlockingQueueWithCapacity(16) config(this) buildThreadPool } diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala deleted file mode 100644 index 8c75d6a42b..0000000000 --- a/akka-actor/src/main/scala/dispatch/Queues.scala +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import concurrent.forkjoin.LinkedTransferQueue -import java.util.concurrent.{TimeUnit, Semaphore} -import java.util.Iterator -import se.scalablesolutions.akka.util.Logger - -class BoundableTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] { - val bounded = (capacity > 0) - - protected lazy val guard = new Semaphore(capacity) - - override def take(): E = { - if (!bounded) { - super.take - } else { - val e = super.take - if (e ne null) guard.release - e - } - } - - override def poll(): E = { - if (!bounded) { - super.poll - } else { - val e = super.poll - if (e ne null) guard.release - e - } - } - - override def poll(timeout: Long, unit: TimeUnit): E = { - if (!bounded) { - super.poll(timeout,unit) - } else { - val e = super.poll(timeout,unit) - if (e ne null) guard.release - e - } - } - - override def remainingCapacity: Int = { - if (!bounded) super.remainingCapacity - else guard.availablePermits - } - - override def remove(o: AnyRef): Boolean = { - if (!bounded) { - super.remove(o) - } else { - if (super.remove(o)) { - guard.release - true - } else false - } - } - - override def offer(e: E): Boolean = { - if (!bounded) { - super.offer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.offer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.offer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def add(e: E): Boolean = { - if (!bounded) { - super.add(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.add(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def put(e :E): Unit = { - if (!bounded) { - super.put(e) - } else { - guard.acquire - try { - super.put(e) - } catch { - case e => guard.release; throw e - } - } - } - - override def tryTransfer(e: E): Boolean = { - if (!bounded) { - super.tryTransfer(e) - } else { - if (guard.tryAcquire) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = { - if (!bounded) { - super.tryTransfer(e,timeout,unit) - } else { - if (guard.tryAcquire(timeout,unit)) { - val result = try { - super.tryTransfer(e) - } catch { - case e => guard.release; throw e - } - if (!result) guard.release - result - } else false - } - } - - override def transfer(e: E): Unit = { - if (!bounded) { - super.transfer(e) - } else { - if (guard.tryAcquire) { - try { - super.transfer(e) - } catch { - case e => guard.release; throw e - } - } - } - } - - override def iterator: Iterator[E] = { - val it = super.iterator - new Iterator[E] { - def hasNext = it.hasNext - def next = it.next - def remove { - it.remove - if (bounded) - guard.release //Assume remove worked if no exception was thrown - } - } - } -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index eb573cde70..5ad1b89aca 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -11,7 +11,6 @@ import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.{Logger, Logging} -import concurrent.forkjoin.LinkedTransferQueue trait ThreadPoolBuilder extends Logging { val name: String @@ -70,7 +69,7 @@ trait ThreadPoolBuilder extends Logging { def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this @@ -79,7 +78,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedTransferQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this @@ -88,7 +87,7 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new BoundableTransferQueue[Runnable](capacity) + blockingQueue = new LinkedBlockingQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this