Merge branch 'dispatcherimprovements' into workstealing.
Also applied the same improvements on the work stealing dispatcher.
This commit is contained in:
commit
5b844f548e
4 changed files with 35 additions and 22 deletions
|
|
@ -224,7 +224,6 @@ trait Actor extends TransactionManagement {
|
|||
*/
|
||||
private[akka] val _dispatcherLock:Lock = new ReentrantLock
|
||||
|
||||
|
||||
// ====================================
|
||||
// protected fields
|
||||
// ====================================
|
||||
|
|
|
|||
|
|
@ -62,19 +62,24 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
|
|||
def dispatch(invocation: MessageInvocation) = if (active) {
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock
|
||||
if (lockedForDispatching) {
|
||||
try {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
var messageInvocation = invocation.receiver._mailbox.poll
|
||||
while (messageInvocation != null) {
|
||||
messageInvocation.invoke
|
||||
messageInvocation = invocation.receiver._mailbox.poll
|
||||
var lockAcquiredOnce = false
|
||||
// this do-wile loop is required to prevent missing new messages between the end of the inner while
|
||||
// loop and releasing the lock
|
||||
do {
|
||||
if (invocation.receiver._dispatcherLock.tryLock) {
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
var messageInvocation = invocation.receiver._mailbox.poll
|
||||
while (messageInvocation != null) {
|
||||
messageInvocation.invoke
|
||||
messageInvocation = invocation.receiver._mailbox.poll
|
||||
}
|
||||
} finally {
|
||||
invocation.receiver._dispatcherLock.unlock
|
||||
}
|
||||
} finally {
|
||||
invocation.receiver._dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
} while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty))
|
||||
}
|
||||
})
|
||||
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
|
||||
|
|
@ -94,4 +99,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
|
|||
"Can't build a new thread pool for a dispatcher that is already up and running")
|
||||
|
||||
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,14 +60,21 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
|
|||
* @return true if the mailbox was processed, false otherwise
|
||||
*/
|
||||
private def tryProcessMailbox(receiver: Actor): Boolean = {
|
||||
if (receiver._dispatcherLock.tryLock) {
|
||||
try {
|
||||
processMailbox(receiver)
|
||||
} finally {
|
||||
receiver._dispatcherLock.unlock
|
||||
var lockAcquiredOnce = false
|
||||
// this do-wile loop is required to prevent missing new messages between the end of processing
|
||||
// the mailbox and releasing the lock
|
||||
do {
|
||||
if (receiver._dispatcherLock.tryLock) {
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
processMailbox(receiver)
|
||||
} finally {
|
||||
receiver._dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
return true
|
||||
} else return false
|
||||
} while ((lockAcquiredOnce && !receiver._mailbox.isEmpty))
|
||||
|
||||
return lockAcquiredOnce
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -127,7 +134,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
|
|||
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
|
||||
val donated = receiver._mailbox.pollLast
|
||||
if (donated != null) {
|
||||
thief.forward(donated.message)(Some(donated.receiver))
|
||||
//TODO: forward seems to fail from time to time ?!
|
||||
//thief.forward(donated.message)(Some(donated.receiver))
|
||||
thief.send(donated.message)
|
||||
return Some(donated)
|
||||
} else return None
|
||||
}
|
||||
|
|
|
|||
|
|
@ -358,7 +358,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
def deployTask(info: ProjectInfo, toDir: Path) = task {
|
||||
val projectPath = info.projectPath.toString
|
||||
val moduleName = projectPath.substring(projectPath.lastIndexOf('/') + 1, projectPath.length)
|
||||
val moduleName = projectPath.substring(projectPath.lastIndexOf(System.getProperty("file.separator")) + 1, projectPath.length)
|
||||
// FIXME need to find out a way to grab these paths from the sbt system
|
||||
val JAR_FILE_NAME = moduleName + "_%s-%s.jar".format(defScalaVersion.value, version)
|
||||
val JAR_FILE_PATH = projectPath + "/target/scala_%s/".format(defScalaVersion.value) + JAR_FILE_NAME
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue