Fixing tickets #816, #814, #817 and Dereks fixes on #812

This commit is contained in:
Viktor Klang 2011-04-28 16:01:11 +02:00
parent c61f1a42dc
commit 4bedb4813d
5 changed files with 47 additions and 19 deletions

View file

@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{Duration, Switch}
import org.multiverse.api.latches.StandardLatch
object ActorModelSpec {
@ -216,6 +217,21 @@ abstract class ActorModelSpec extends JUnitSuite {
msgsProcessed = 0,
restarts = 0
)
val futures = for(i <- 1 to 10) yield Future { i }
await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 2, stops = 2)
val a2 = newTestActor
a2.start
val futures2 = for(i <- 1 to 10) yield Future { i }
await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 3, stops = 2)
a2.stop
await(dispatcher.stops.get == 3)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 3, stops = 3)
}
@Test def dispatcherShouldProcessMessagesOneAtATime {

View file

@ -88,24 +88,36 @@ trait MessageDispatcher {
private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = {
futures.getAndIncrement()
val future = new DefaultCompletableFuture[T](timeout)
if (active.isOff) { active.switchOn { start } }
executeFuture(FutureInvocation[T](future, block, futureCleanup))
future
try {
val future = new DefaultCompletableFuture[T](timeout)
if (active.isOff)
guard withGuard { if (active.isOff) active.switchOn { start } }
executeFuture(FutureInvocation[T](future, block, futureCleanup))
future
} catch {
case e =>
futures.decrementAndGet
throw e
}
}
private val futureCleanup: () => Unit = { () =>
if (futures.decrementAndGet() == 0) guard withGuard { if (uuids.isEmpty) {
shutdownSchedule match {
case UNSCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
shutdownSchedule = RESCHEDULED
case RESCHEDULED => //Already marked for reschedule
private val futureCleanup: () => Unit =
() => if (futures.decrementAndGet() == 0) {
guard withGuard {
if (futures.get == 0 && uuids.isEmpty) {
shutdownSchedule match {
case UNSCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
shutdownSchedule = RESCHEDULED
case RESCHEDULED => //Already marked for reschedule
}
}
}
}}
}
}
private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null)

View file

@ -182,7 +182,7 @@ trait AuthenticationActor[C <: Credentials] extends Actor {
* Responsible for the execution flow of authentication
*
* Credentials are extracted and verified from the request,
* and a se3curity context is created for the ContainerRequest
* and a security context is created for the ContainerRequest
* this should ensure good integration with current Jersey security
*/
protected val authenticate: Receive = {

View file

@ -19,7 +19,7 @@ class ConfigSpec extends WordSpec with MustMatchers {
getString("akka.http.authenticator") must equal(Some("N/A"))
getBool("akka.http.connection-close") must equal(Some(true))
getString("akka.http.expired-header-name") must equal(Some("Async-Timeout"))
getList("akka.http.filters") must equal(List("se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"))
getList("akka.http.filters") must equal(List("akka.security.AkkaSecurityFilterFactory"))
getList("akka.http.resource-packages") must equal(Nil)
getString("akka.http.hostname") must equal(Some("localhost"))
getString("akka.http.expired-header-value") must equal(Some("expired"))

View file

@ -85,7 +85,7 @@ akka {
port = 9998
#If you are using akka.http.AkkaRestServlet
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
filters = ["akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
# resource-packages = ["sample.rest.scala",
# "sample.rest.java",
# "sample.security"] # List with all resource packages for your Jersey services
@ -123,7 +123,7 @@ akka {
remote {
# secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
# secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
secure-cookie = ""
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression