Merge branch 'dispatcherimprovements' of git@github.com:jboner/akka into jans_dispatcher_changes

This commit is contained in:
Jonas Bonér 2010-03-16 22:59:16 +01:00
commit 2a43c2ad71
5 changed files with 25 additions and 20 deletions

View file

@ -43,8 +43,7 @@ trait BootableCometActorService extends Bootable with Logging {
adapter.setHandleStaticResources(true)
adapter.setServletInstance(new AkkaServlet)
adapter.setContextPath(uri.getPath)
//Using autodetection for now
//adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)

View file

@ -62,19 +62,24 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock
if (lockedForDispatching) {
try {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
var lockAcquiredOnce = false
// this do-wile loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
if (invocation.receiver._dispatcherLock.tryLock) {
lockAcquiredOnce = true
try {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
}
} finally {
invocation.receiver._dispatcherLock.unlock
}
} finally {
invocation.receiver._dispatcherLock.unlock
}
}
} while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty))
}
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
@ -94,4 +99,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
"Can't build a new thread pool for a dispatcher that is already up and running")
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
}
}

View file

@ -78,7 +78,7 @@ class ThreadBasedDispatcherTest extends JUnitSuite {
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new Integer(i), None, None, None))
dispatcher.dispatch(new MessageInvocation(key1, i, None, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)

View file

@ -87,10 +87,11 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
override def filter(request: ContainerRequest): ContainerRequest =
rolesAllowed match {
case Some(roles) => {
(authenticator !! (Authenticate(request, roles), 10000)).get.asInstanceOf[AnyRef] match {
case OK => request
case r if r.isInstanceOf[Response] =>
(authenticator.!![AnyRef](Authenticate(request, roles), 10000)) match {
case Some(OK) => request
case Some(r) if r.isInstanceOf[Response] =>
throw new WebApplicationException(r.asInstanceOf[Response])
case None => throw new WebApplicationException(408)
case x => {
log.error("Authenticator replied with unexpected result [%s]", x);
throw new WebApplicationException(Response.Status.INTERNAL_SERVER_ERROR)

View file

@ -42,7 +42,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
// project versions
val JERSEY_VERSION = "1.1.5"
val ATMO_VERSION = "0.6-SNAPSHOT"
val ATMO_VERSION = "0.5.4"
val CASSANDRA_VERSION = "0.5.0"
// ------------------------------------------------------------
@ -359,7 +359,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
def deployTask(info: ProjectInfo, toDir: Path) = task {
val projectPath = info.projectPath.toString
val moduleName = projectPath.substring(projectPath.lastIndexOf('/') + 1, projectPath.length)
val moduleName = projectPath.substring(projectPath.lastIndexOf(System.getProperty("file.separator")) + 1, projectPath.length)
// FIXME need to find out a way to grab these paths from the sbt system
val JAR_FILE_NAME = moduleName + "_%s-%s.jar".format(defScalaVersion.value, version)
val JAR_FILE_PATH = projectPath + "/target/scala_%s/".format(defScalaVersion.value) + JAR_FILE_NAME