diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 4e60ffcc96..2ee4d8a2f7 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.{Duration, Switch} +import org.multiverse.api.latches.StandardLatch object ActorModelSpec { @@ -216,6 +217,21 @@ abstract class ActorModelSpec extends JUnitSuite { msgsProcessed = 0, restarts = 0 ) + + val futures = for(i <- 1 to 10) yield Future { i } + await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5) + assertDispatcher(dispatcher)(starts = 2, stops = 2) + + val a2 = newTestActor + a2.start + val futures2 = for(i <- 1 to 10) yield Future { i } + + await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5) + assertDispatcher(dispatcher)(starts = 3, stops = 2) + + a2.stop + await(dispatcher.stops.get == 3)(withinMs = dispatcher.timeoutMs * 5) + assertDispatcher(dispatcher)(starts = 3, stops = 3) } @Test def dispatcherShouldProcessMessagesOneAtATime { diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index e63a72f366..d9017edc29 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -88,24 +88,36 @@ trait MessageDispatcher { private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = { futures.getAndIncrement() - val future = new DefaultCompletableFuture[T](timeout) - if (active.isOff) { active.switchOn { start } } - executeFuture(FutureInvocation[T](future, block, futureCleanup)) - future + try { + val future = new DefaultCompletableFuture[T](timeout) + + if (active.isOff) + guard withGuard { if (active.isOff) active.switchOn { start } } + + executeFuture(FutureInvocation[T](future, block, futureCleanup)) + future + } catch { + case e => + futures.decrementAndGet + throw e + } } - private val futureCleanup: () => Unit = { () => - if (futures.decrementAndGet() == 0) guard withGuard { if (uuids.isEmpty) { - shutdownSchedule match { - case UNSCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - shutdownSchedule = RESCHEDULED - case RESCHEDULED => //Already marked for reschedule + private val futureCleanup: () => Unit = + () => if (futures.decrementAndGet() == 0) { + guard withGuard { + if (futures.get == 0 && uuids.isEmpty) { + shutdownSchedule match { + case UNSCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + shutdownSchedule = RESCHEDULED + case RESCHEDULED => //Already marked for reschedule + } + } } - }} - } + } private[akka] def register(actorRef: ActorRef) { if (actorRef.mailbox eq null) diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index dce249de46..7789164fd3 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -182,7 +182,7 @@ trait AuthenticationActor[C <: Credentials] extends Actor { * Responsible for the execution flow of authentication * * Credentials are extracted and verified from the request, - * and a se3curity context is created for the ContainerRequest + * and a security context is created for the ContainerRequest * this should ensure good integration with current Jersey security */ protected val authenticate: Receive = { diff --git a/akka-http/src/test/scala/config/ConfigSpec.scala b/akka-http/src/test/scala/config/ConfigSpec.scala index 3adea2fc43..2b21f3cc34 100644 --- a/akka-http/src/test/scala/config/ConfigSpec.scala +++ b/akka-http/src/test/scala/config/ConfigSpec.scala @@ -19,7 +19,7 @@ class ConfigSpec extends WordSpec with MustMatchers { getString("akka.http.authenticator") must equal(Some("N/A")) getBool("akka.http.connection-close") must equal(Some(true)) getString("akka.http.expired-header-name") must equal(Some("Async-Timeout")) - getList("akka.http.filters") must equal(List("se.scalablesolutions.akka.security.AkkaSecurityFilterFactory")) + getList("akka.http.filters") must equal(List("akka.security.AkkaSecurityFilterFactory")) getList("akka.http.resource-packages") must equal(Nil) getString("akka.http.hostname") must equal(Some("localhost")) getString("akka.http.expired-header-value") must equal(Some("expired")) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index df2c2c3e0d..9a647c6ad5 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -85,7 +85,7 @@ akka { port = 9998 #If you are using akka.http.AkkaRestServlet - filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + filters = ["akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use # resource-packages = ["sample.rest.scala", # "sample.rest.java", # "sample.security"] # List with all resource packages for your Jersey services @@ -123,7 +123,7 @@ akka { remote { - # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' + # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie' secure-cookie = "" compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression