diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 882331b177..e70b4a98ae 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,9 +28,9 @@ import akka.japi. {Creator, Procedure} /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { - + /** * Java API */ @@ -75,6 +75,7 @@ class IllegalActorStateException private[akka](message: String) extends AkkaEx class ActorKilledException private[akka](message: String) extends AkkaException(message) class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message) +class InvalidMessageException private[akka](message: String) extends AkkaException(message) /** * This message is thrown by default when an Actors behavior doesn't match a message @@ -90,7 +91,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception * @author Jonas Bonér */ object Actor extends ListenerManagement { - + /** * Add shutdown cleanups */ @@ -128,7 +129,7 @@ object Actor extends ListenerManagement { type Receive = PartialFunction[Any, Unit] private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - + /** * Creates an ActorRef out of the Actor with type T. *
@@ -443,8 +444,10 @@ trait Actor {
   // =========================================
 
   private[akka] final def apply(msg: Any) = {
+    if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
+      throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
     val behaviorStack = self.hotswap
-    msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
+    msg match {
       case l: AutoReceivedMessage           => autoReceiveMessage(l)
       case msg if behaviorStack.nonEmpty &&
         behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 81574dacff..673cb487a1 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
 
       failedActor match {
         case p: Proxyable =>
-          //p.swapProxiedActor(freshActor) //TODO: broken
           failedActor.preRestart(reason)
           failedActor.postRestart(reason)
         case _ =>
diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index 451cdf8b80..f2f63a3ff4 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -78,12 +78,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
 
   override private[akka] def dispatch(invocation: MessageInvocation) = {
     val mbox = getMailbox(invocation.receiver)
-    if (mbox.dispatcherLock.locked && attemptDonationOf(invocation, mbox)) {
+    /*if (!mbox.isEmpty && attemptDonationOf(invocation, mbox)) {
       //We were busy and we got to donate the message to some other lucky guy, we're done here
-    } else {
+    } else {*/
       mbox enqueue invocation
       registerForExecution(mbox)
-    }
+    //}
   }
 
   override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
@@ -110,13 +110,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
   /**
    * Returns true if the donation succeeded or false otherwise
    */
-  protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
+  /*protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
     val actors = members // copy to prevent concurrent modifications having any impact
     doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
       case null => false
       case recipient => donate(message, recipient)
     }
-  }
+  }*/
 
   /**
    * Rewrites the message and adds that message to the recipients mailbox
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index ba0b7b83ba..8d7fcd7390 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -57,7 +57,7 @@ object Futures {
   }
 
   /**
-   * Java API
+   * Java API.
    * Returns a Future to the result of the first future in the list that is completed
    */
   def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
@@ -68,6 +68,10 @@ object Futures {
    * The fold is performed on the thread where the last future is completed,
    * the result will be the first failure of any of the futures, or any failure in the actual fold,
    * or the result of the fold.
+   * Example:
+   * 
+   *   val result = Futures.fold(0)(futures)(_ + _).await.result
+   * 
*/ def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { if(futures.isEmpty) { @@ -115,6 +119,10 @@ object Futures { /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + *
+   *   val result = Futures.reduce(futures)(_ + _).await.result
+   * 
*/ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { if (futures.isEmpty) @@ -138,7 +146,7 @@ object Futures { } /** - * Java API + * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = @@ -147,18 +155,25 @@ object Futures { import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom + /** + * FIXME document Futures.sequence + */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + /** + * FIXME document Futures.traverse + */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - //Deprecations - - + // ===================================== + // Deprecations + // ===================================== + /** * (Blocking!) */ @@ -299,6 +314,12 @@ sealed trait Future[+T] { /** * When the future is compeleted with a valid result, apply the provided * PartialFunction to the result. + *
+   *   val result = future receive {
+   *     case Foo => "foo"
+   *     case Bar => "bar"
+   *   }.await.result
+   * 
*/ final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f => val optr = f.result @@ -313,6 +334,14 @@ sealed trait Future[+T] { * result of this Future if a match is found, or else return a MatchError. * If this Future is completed with an exception then the new Future will * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a <- actor !!! Req("Hello") collect { case Res(x: Int)    => x }
+   *   b <- actor !!! Req(a)       collect { case Res(x: String) => x }
+   *   c <- actor !!! Req(7)       collect { case Res(x: String) => x }
+   * } yield b + "-" + c
+   * 
*/ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -343,6 +372,14 @@ sealed trait Future[+T] { * Creates a new Future by applying a function to the successful result of * this Future. If this Future is completed with an exception then the new * Future will also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor !!! "Hello" // returns 5
+   *   b: String <- actor !!! a       // returns "10"
+   *   c: String <- actor !!! 7       // returns "14"
+   * } yield b + "-" + c
+   * 
*/ final def map[A](f: T => A): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -371,6 +408,14 @@ sealed trait Future[+T] { * this Future, and returns the result of the function as the new Future. * If this Future is completed with an exception then the new Future will * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor !!! "Hello" // returns 5
+   *   b: String <- actor !!! a       // returns "10"
+   *   c: String <- actor !!! 7       // returns "14"
+   * } yield b + "-" + c
+   * 
*/ final def flatMap[A](f: T => Future[A]): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -425,7 +470,7 @@ sealed trait Future[+T] { } /** - * Returns the current result, throws the exception is one has been raised, else returns None + * Returns the current result, throws the exception is one has been raised, else returns None */ final def resultOrException: Option[T] = { val v = value @@ -450,31 +495,31 @@ sealed trait Future[+T] { } /** - * Essentially this is the Promise (or write-side) of a Future (read-side) + * Essentially this is the Promise (or write-side) of a Future (read-side). */ trait CompletableFuture[T] extends Future[T] { /** - * Completes this Future with the specified result, if not already completed, - * returns this + * Completes this Future with the specified result, if not already completed. + * @return this */ def complete(value: Either[Throwable, T]): CompletableFuture[T] /** - * Completes this Future with the specified result, if not already completed, - * returns this + * Completes this Future with the specified result, if not already completed. + * @return this */ final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) /** - * Completes this Future with the specified exception, if not already completed, - * returns this + * Completes this Future with the specified exception, if not already completed. + * @return this */ final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, - * unless this Future has already been completed - * returns this + * unless this Future has already been completed. + * @return this. */ final def completeWith(other: Future[T]): CompletableFuture[T] = { other onComplete { f => complete(f.value.get) } @@ -482,18 +527,18 @@ trait CompletableFuture[T] extends Future[T] { } /** - * Alias for complete(Right(value)) + * Alias for complete(Right(value)). */ final def << (value: T): CompletableFuture[T] = complete(Right(value)) /** - * Alias for completeWith(other) + * Alias for completeWith(other). */ final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) } /** - * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. + * The default concrete Future implementation. */ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 31d5dca0eb..83c30f23e0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -160,12 +160,11 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory { */ object MonitorableThread { val DEFAULT_NAME = "MonitorableThread" - val created = new AtomicInteger - val alive = new AtomicInteger - @volatile var debugLifecycle = false -} -// FIXME fix the issues with using the monitoring in MonitorableThread + // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring + val created = new AtomicInteger + val alive = new AtomicInteger +} /** * @author Jonas Bonér @@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String) }) override def run = { - val debug = MonitorableThread.debugLifecycle try { MonitorableThread.alive.incrementAndGet super.run diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index d4fc55b0a9..5b8245d1d4 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -25,7 +25,7 @@ import akka.AkkaException * case EventHandler.Warning(instance, message) => ... * case EventHandler.Info(instance, message) => ... * case EventHandler.Debug(instance, message) => ... - * case genericEvent => ... + * case genericEvent => ... * } * }) * @@ -35,7 +35,7 @@ import akka.AkkaException *
*

* However best is probably to register the listener in the 'akka.conf' - * configuration file. + * configuration file. *

* Log an error event: *

@@ -45,7 +45,7 @@ import akka.AkkaException
  * 
  * EventHandler.error(exception, this, message.toString)
  * 
- * + * * @author Jonas Bonér */ object EventHandler extends ListenerManagement { @@ -73,7 +73,7 @@ object EventHandler extends ListenerManagement { val debug = "[DEBUG] [%s] [%s] [%s] %s".intern val generic = "[GENERIC] [%s] [%s]".intern val ID = "event:handler".intern - + class EventHandlerException extends AkkaException lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build @@ -129,7 +129,7 @@ object EventHandler extends ListenerManagement { else if (eventClass.isInstanceOf[Debug]) DebugLevel else DebugLevel } - + class DefaultListener extends Actor { self.id = ID self.dispatcher = EventHandlerDispatcher diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index 8c37845baf..ba4e508454 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -1,11 +1,15 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.util import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ TimeUnit, BlockingQueue } import java.util.{ AbstractQueue, Queue, Collection, Iterator } -class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { +class BoundedBlockingQueue[E <: AnyRef]( + val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { backing match { case null => throw new IllegalArgumentException("Backing Queue may not be null") @@ -32,7 +36,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin require(backing.offer(e)) notEmpty.signal() } finally { - lock.unlock() + lock.unlock() } } @@ -319,4 +323,4 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin lock.unlock() } } -} \ No newline at end of file +} diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala index 5213557048..cf910925c8 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -14,6 +14,8 @@ import java.util.concurrent.TimeUnit import akka.util.duration._ +import akka.Testing + object FSMActorSpec { @@ -100,7 +102,7 @@ class FSMActorSpec extends JUnitSuite { def unlockTheLock = { // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", (1, TimeUnit.SECONDS))).start + val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start val transitionTester = Actor.actorOf(new Actor { def receive = { case Transition(_, _, _) => transitionCallBackLatch.open @@ -108,7 +110,7 @@ class FSMActorSpec extends JUnitSuite { }}).start lock ! SubscribeTransitionCallBack(transitionTester) - assert(initialStateLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) lock ! '3' lock ! '3' @@ -116,14 +118,14 @@ class FSMActorSpec extends JUnitSuite { lock ! '2' lock ! '1' - assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(transitionLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(transitionCallBackLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) lock ! "not_handled" - assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) val answerLatch = new StandardLatch object Hello @@ -136,9 +138,9 @@ class FSMActorSpec extends JUnitSuite { } }).start tester ! Hello - assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) tester ! Bye - assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) } } diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index 55c2e001af..6b154b42a9 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.{Duration, Switch} +import akka.Testing object ActorModelSpec { @@ -224,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite { a.start a ! CountDown(start) - assertCountDown(start,3000, "Should process first message within 3 seconds") + assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) a ! Wait(1000) a ! CountDown(oneAtATime) // in case of serialization violation, restart would happen instead of count down - assertCountDown(oneAtATime,1500,"Processed message when allowed") + assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) a.stop @@ -245,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite { def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } for (i <- 1 to 10) { start } - assertCountDown(counter, 3000, "Should process 200 messages") + assertCountDown(counter, Testing.time(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) a.stop @@ -263,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite { val aStart,aStop,bParallel = new CountDownLatch(1) a ! Meet(aStart,aStop) - assertCountDown(aStart,3000, "Should process first message within 3 seconds") + assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds") b ! CountDown(bParallel) - assertCountDown(bParallel, 3000, "Should process other actors in parallel") + assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel") aStop.countDown() a.stop @@ -281,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite { val done = new CountDownLatch(1) a ! Restart a ! CountDown(done) - assertCountDown(done, 3000, "Should be suspended+resumed and done with next message within 3 seconds") + assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds") a.stop assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2, msgsProcessed = 2, suspensions = 1, resumes = 1) @@ -297,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) dispatcher.resume(a) - assertCountDown(done, 3000, "Should resume processing of messages when resumed") + assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) @@ -314,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite { (1 to num) foreach { _ => newTestActor.start ! cachedMessage } - assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns") + assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns") } for(run <- 1 to 3) { flood(10000) diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index f99f5f5305..83dc4e294b 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - // FIXME: implement Futures.awaitEither, and uncomment these two tests @Test def shouldFutureAwaitEitherLeft = { val actor1 = actorOf[TestActor].start val actor2 = actorOf[TestActor].start diff --git a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala deleted file mode 100644 index 603b17e336..0000000000 --- a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala +++ /dev/null @@ -1,91 +0,0 @@ -package akka.dispatch - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.Lock -import java.util.concurrent.locks.ReentrantLock - -import org.scalatest.junit.JUnitSuite -import org.junit.{Test, Before} - -import akka.actor.Actor -import Actor._ - -// FIXME use this test when we have removed the MessageInvoker classes -/* -class ThreadBasedDispatcherSpec extends JUnitSuite { - private var threadingIssueDetected: AtomicBoolean = null - val key1 = actorOf(new Actor { def receive = { case _ => {}} }) - val key2 = actorOf(new Actor { def receive = { case _ => {}} }) - val key3 = actorOf(new Actor { def receive = { case _ => {}} }) - - class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker { - val guardLock: Lock = new ReentrantLock - - def invoke(message: MessageInvocation) { - try { - if (threadingIssueDetected.get) return - if (guardLock.tryLock) { - handleLatch.countDown - } else { - threadingIssueDetected.set(true) - } - } catch { - case e: Exception => threadingIssueDetected.set(true) - } finally { - guardLock.unlock - } - } - } - - @Before - def setUp = { - threadingIssueDetected = new AtomicBoolean(false) - } - - @Test - def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = { - internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially - } - - @Test - def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = { - internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder - } - - private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = { - val guardLock = new ReentrantLock - val handleLatch = new CountDownLatch(100) - val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch)) - dispatcher.start - for (i <- 0 until 100) { - dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None)) - } - assert(handleLatch.await(5, TimeUnit.SECONDS)) - assert(!threadingIssueDetected.get) - } - - private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = { - val handleLatch = new CountDownLatch(100) - val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker { - var currentValue = -1; - def invoke(message: MessageInvocation) { - if (threadingIssueDetected.get) return - val messageValue = message.message.asInstanceOf[Int] - if (messageValue.intValue == currentValue + 1) { - currentValue = messageValue.intValue - handleLatch.countDown - } else threadingIssueDetected.set(true) - } - }) - dispatcher.start - for (i <- 0 until 100) { - dispatcher.dispatch(new MessageInvocation(key1, i, None, None)) - } - assert(handleLatch.await(5, TimeUnit.SECONDS)) - assert(!threadingIssueDetected.get) - dispatcher.postStop - } -} -*/ diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index eb91b9737f..379cbfb36d 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -4,7 +4,7 @@ package akka.http -import akka.actor.{ActorRegistry, ActorRef, Actor} +import akka.actor.{ActorRef, Actor} import akka.event.EventHandler import javax.servlet.http.{HttpServletResponse, HttpServletRequest} @@ -17,8 +17,8 @@ import javax.servlet.Filter object MistSettings { import akka.config.Config._ - final val JettyServer = "jetty" - final val TimeoutAttribute = "timeout" + val JettyServer = "jetty" + val TimeoutAttribute = "timeout" val ConnectionClose = config.getBool("akka.http.connection-close", true) val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true) @@ -64,7 +64,7 @@ import Types._ * */ trait Mist { - import javax.servlet.{ServletContext} + import javax.servlet.ServletContext import MistSettings._ /** @@ -84,28 +84,21 @@ trait Mist { response: HttpServletResponse) (builder: (() => tAsyncRequestContext) => RequestMethod) = { def suspend: tAsyncRequestContext = { - // + // set to right now, which is effectively "already expired" - // response.setDateHeader("Expires", System.currentTimeMillis) response.setHeader("Cache-Control", "no-cache, must-revalidate") - // // no keep-alive? - // if (ConnectionClose) response.setHeader("Connection","close") - // // suspend the request // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs - // request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext] } - // // shoot the message to the root endpoint for processing // IMPORTANT: the suspend method is invoked on the server thread not in the actor - // val method = builder(suspend _) if (method.go) _root ! method } @@ -117,7 +110,6 @@ trait Mist { def initMist(context: ServletContext) { val server = context.getServerInfo val (major, minor) = (context.getMajorVersion, context.getMinorVersion) - _factory = if (major >= 3) { Some(Servlet30ContextMethodFactory) } else if (server.toLowerCase startsWith JettyServer) { @@ -200,7 +192,7 @@ object Endpoint { /** * leverage the akka config to tweak the dispatcher for our endpoints */ - final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher") + val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher") type Hook = Function[String, Boolean] type Provider = Function[String, ActorRef] @@ -236,25 +228,21 @@ trait Endpoint { this: Actor => * Message handling common to all endpoints, must be chained */ protected def handleHttpRequest: Receive = { - // + // add the endpoint - the if the uri hook matches, // the message will be sent to the actor returned by the provider func - // case Attach(hook, provider) => _attach(hook, provider) - // // dispatch the suspended requests - // case req: RequestMethod => { val uri = req.request.getPathInfo val endpoints = _attachments.filter { _._1(uri) } - if (!endpoints.isEmpty) - endpoints.foreach { _._2(uri) ! req } + if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req } else { self.sender match { case Some(s) => s reply NoneAvailable(uri, req) - case None => _na(uri, req) + case None => _na(uri, req) } } } @@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint { final val Root = "/" - // // use the configurable dispatcher - // self.dispatcher = Endpoint.Dispatcher - // // adopt the configured id - // if (RootActorBuiltin) self.id = RootActorID override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments - //TODO: Is this needed? - //override def postRestart = - // _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments - def recv: Receive = { case NoneAvailable(uri, req) => _na(uri, req) case unknown => {} @@ -317,10 +297,7 @@ trait RequestMethod { import java.io.IOException import javax.servlet.http.{HttpServletResponse, HttpServletRequest} - // // required implementations - // - val builder: () => tAsyncRequestContext /** @@ -353,35 +330,31 @@ trait RequestMethod { def getHeaderOrElse(name: String, default: Function[Any, String]): String = request.getHeader(name) match { case null => default(null) - case s => s - } + case s => s + } def getParameterOrElse(name: String, default: Function[Any, String]): String = request.getParameter(name) match { case null => default(null) - case s => s + case s => s } def complete(status: Int, body: String): Boolean = complete(status, body, Headers()) def complete(status: Int, body: String, headers: Headers): Boolean = - rawComplete { - res => { - res.setStatus(status) - headers foreach {h => response.setHeader(h._1, h._2)} - res.getWriter.write(body) - res.getWriter.close - res.flushBuffer - } + rawComplete { res => + res.setStatus(status) + headers foreach {h => response.setHeader(h._1, h._2)} + res.getWriter.write(body) + res.getWriter.close + res.flushBuffer } def rawComplete(completion: HttpServletResponse => Unit): Boolean = context match { - case Some(pipe) => { + case Some(pipe) => try { - if (!suspended) { - false - } + if (!suspended) false else { completion(response) pipe.complete @@ -392,34 +365,28 @@ trait RequestMethod { EventHandler.error(io, this, io.getMessage) false } - } - - case None => - false + case None => false } def complete(t: Throwable) { context match { - case Some(pipe) => { + case Some(pipe) => try { if (suspended) { response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume") pipe.complete } } catch { - case io: IOException => + case io: IOException => EventHandler.error(io, this, io.getMessage) } - } - case None => {} } } - /** + /* * Utility methods to send responses back */ - def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers) def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 210b818784..dd7a22df52 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -39,6 +39,9 @@ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import java.util.concurrent._ +import akka.AkkaException + +class RemoteClientMessageBufferException(message: String) extends AkkaException(message) object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -159,7 +162,8 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true) + val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true) + val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + @@ -167,7 +171,10 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + protected val pendingRequests = { + if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) + } private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) @@ -207,7 +214,7 @@ abstract class RemoteClient private[akka] ( isOneWay: Boolean, actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], - actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race + actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race send(createRemoteMessageProtocolBuilder( Some(actorRef), @@ -243,7 +250,10 @@ abstract class RemoteClient private[akka] ( case e: Throwable => // add the request to the tx log after a failing send notifyListeners(RemoteClientError(e, module, remoteAddress)) - if (useTransactionLog) pendingRequests.add((true, null, request)) + if (useTransactionLog) { + if (!pendingRequests.offer((true, null, request))) + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") + } else throw e } None @@ -256,7 +266,8 @@ abstract class RemoteClient private[akka] ( def handleRequestReplyError(future: ChannelFuture) = { notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) if (useTransactionLog) { - pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") } else { val f = futures.remove(futureUuid) // Clean up future if (f ne null) f.completeWithException(future.getCause) @@ -1005,9 +1016,15 @@ class RemoteServerHandler( val typedActor = createTypedActor(actorInfo, channel) //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo? - val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]] + val (ownerTypeHint, argClasses, args) = + MessageSerializer + .deserialize(request.getMessage) + .asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]] - def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = { + def resolveMethod(bottomType: Class[_], + typeHint: String, + methodName: String, + methodSignature: Array[Class[_]]): java.lang.reflect.Method = { var typeToResolve = bottomType var targetMethod: java.lang.reflect.Method = null var firstException: NoSuchMethodException = null diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala index 02b29e6de1..cd8f71058e 100644 --- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala +++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala @@ -8,7 +8,7 @@ import org.junit.runner.RunWith import akka.serialization.Serializer.ScalaJSON //TODO: FIXME WHY IS THIS COMMENTED OUT? -/* + object Protocols { import sjson.json.DefaultProtocol._ case class Shop(store: String, item: String, price: Int) @@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends } } } -*/ diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 4108a99d63..8636254ced 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -16,8 +16,6 @@ class ConfigSpec extends WordSpec with MustMatchers { "contain all configuration properties for akka-stm that are used in code with their correct defaults" in { import Config.config._ - getInt("akka.storage.max-retries") must equal(Some(10)) - getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 3fcf0789bc..1cdf735e8e 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -17,10 +17,10 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicBoolean -import scala.reflect.BeanProperty import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} +import scala.reflect.BeanProperty + /** * TypedActor is a type-safe actor made out of a POJO with interface. * Void methods are turned into fire-forget messages. @@ -36,7 +36,7 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} * class TestActorImpl extends TypedActor implements TestActor { * * public void hit(int count) { - * Pong pong = (Pong) getContext().getSender(); + * Pong pong = (Pong) context().sender(); * pong.hit(count++); * } * @@ -124,15 +124,15 @@ abstract class TypedActor extends Actor with Proxyable { * This class does not contain static information but is updated by the runtime system * at runtime. *

- * You can get a hold of the context using either the 'getContext()' or 'context' - * methods from the 'TypedActor' base class. + * You can get a hold of the context using the 'context()' + * method from the 'TypedActor' base class. *

* * Here is an example of usage (in Java): *

    * class PingImpl extends TypedActor implements Ping {
    *   public void hit(int count) {
-   *     Pong pong = (Pong) getContext().getSender();
+   *     Pong pong = (Pong) context().sender();
    *     pong.hit(count++);
    *   }
    * }
@@ -148,7 +148,12 @@ abstract class TypedActor extends Actor with Proxyable {
    * }
    * 
*/ - @BeanProperty val context: TypedActorContext = new TypedActorContext(self) + val context: TypedActorContext = new TypedActorContext(self) + + /** + * @deprecated 'getContext()' is deprecated use 'context()' + */ + def getContext: TypedActorContext = context /** * This method is used to resolve the Future for TypedActor methods that are defined to return a @@ -180,15 +185,16 @@ abstract class TypedActor extends Actor with Proxyable { case joinPoint: JoinPoint => SenderContextInfo.senderActorRef.value = self SenderContextInfo.senderProxy.value = proxy - if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed else self.reply(joinPoint.proceed) + case coordinated @ Coordinated(joinPoint: JoinPoint) => SenderContextInfo.senderActorRef.value = self SenderContextInfo.senderProxy.value = proxy if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) coordinated atomic { joinPoint.proceed } + case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) case unexpected => throw new IllegalActorStateException( @@ -255,7 +261,7 @@ abstract class TypedActor extends Actor with Proxyable { *
  * class PingImpl extends TypedActor implements Ping {
  *   public void hit(int count) {
- *     Pong pong = (Pong) getContext().getSender();
+ *     Pong pong = (Pong) context().sender();
  *     pong.hit(count++);
  *   }
  * }
@@ -277,7 +283,8 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   private[akka] var _sender: AnyRef = _
 
   /**
-5  * Returns the uuid for the actor.
+   * Returns the uuid for the actor.
+   * @deprecated use 'uuid()'
    */
   def getUuid() = actorRef.uuid
 
@@ -287,31 +294,39 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   def uuid = actorRef.uuid
 
   def timeout = actorRef.timeout
+
+  /**
+   * @deprecated use 'timeout()'
+   */
   def getTimout = timeout
   def setTimout(timeout: Long) = actorRef.timeout = timeout
 
   def id =  actorRef.id
+
+  /**
+   * @deprecated use 'id()'
+   */
   def getId = id
   def setId(id: String) = actorRef.id = id
 
   def receiveTimeout = actorRef.receiveTimeout
+
+  /**
+   * @deprecated use 'receiveTimeout()'
+   */
   def getReceiveTimeout = receiveTimeout
   def setReceiveTimeout(timeout: Long) = actorRef.setReceiveTimeout(timeout)
 
-  /**
-   * Is the actor running?
-   */
+  def mailboxSize = actorRef.mailboxSize
+
+  def dispatcher = actorRef.getDispatcher
+
+  def lifeCycle = actorRef.getLifeCycle
+
   def isRunning: Boolean = actorRef.isRunning
-
-  /**
-   * Is the actor shut down?
-   */
   def isShutdown: Boolean = actorRef.isShutdown
-
-  /**
-   * Is the actor ever started?
-   */
   def isUnstarted: Boolean = actorRef.isUnstarted
+  def isBeingRestarted: Boolean = actorRef.isBeingRestarted
 
   /**
    * Returns the current sender reference.
@@ -349,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   /**
     * Returns the home address and port for this actor.
     */
-  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
+  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
 }
 
 object TypedActorConfiguration {
@@ -449,7 +464,7 @@ object TypedActor {
    * @param intfClass interface the typed actor implements
    * @param targetClass implementation class of the typed actor
    */
-  def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = 
+  def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
     newInstance(intfClass, targetClass, TypedActorConfiguration())
 
   /**
diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
index 315467f2ee..f2f2a7a1fc 100644
--- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
+++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
@@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
   private var components: List[SuperviseTypedActor] = _
   private var supervised: List[Supervise] = Nil
   private var bindings: List[DependencyBinding] = Nil
-  private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
   private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
   private var modules = new java.util.ArrayList[Module]
   private var methodToUriRegistry = new HashMap[Method, String]
@@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
 
   def reset = synchronized {
     modules = new java.util.ArrayList[Module]
-    configRegistry = new HashMap[Class[_], SuperviseTypedActor]
     typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
     methodToUriRegistry = new HashMap[Method, String]
     injector = null
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
index 0946aa26c0..0e27557607 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
@@ -12,6 +12,8 @@ import akka.config.Supervision._
 import java.util.concurrent.CountDownLatch
 import akka.config.TypedActorConfigurator
 
+import akka.Testing
+
 /**
  * @author Martin Krasser
  */
@@ -95,7 +97,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
     }
 
     it("should be stopped when supervision cannot handle the problem in") {
-      val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
+      val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
       val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
       try {
         val first = conf.getInstance(classOf[TypedActorFailer])
@@ -121,7 +123,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
     }
 
     it("should be restarted when supervision handles the problem in") {
-     val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
+     val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
      val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
      try {
        val first = conf.getInstance(classOf[TypedActorFailer])
@@ -146,4 +148,4 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
      }
    }
  }
-}
\ No newline at end of file
+}
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 164fae1edc..1c3676ad31 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -146,7 +146,11 @@ akka {
     }
 
     client {
-      retry-message-send-on-failure = on
+      buffering {
+        retry-message-send-on-failure = on
+        capacity = -1                      # If negative (or zero) then an unbounded mailbox is used (default)
+                                           # If positive then a bounded mailbox is used and the capacity is set using the property
+      }
       reconnect-delay = 5
       read-timeout = 10
       message-frame-size = 1048576
@@ -154,8 +158,4 @@ akka {
       reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
     }
   }
-
-  storage {
-    max-retries = 10
-  }
 }
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index d937405d0d..9ffa80723f 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
   val scalaCompileSettings =
     Seq("-deprecation",
         "-Xmigration",
-        //"-optimise",
+        "-optimise",
         "-encoding", "utf8")
 
   val javaCompileSettings = Seq("-Xlint:unchecked")
@@ -126,7 +126,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
     // Compile
     lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
 
-    lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //LGPL 2.1
+    lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
 
     lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
 
@@ -452,8 +452,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
   }
 
   def akkaArtifacts = descendents(info.projectPath / "dist", "*-" + version + ".jar")
-  lazy val integrationTestsEnabled = systemOptional[Boolean]("integration.tests",false)
-  lazy val stressTestsEnabled = systemOptional[Boolean]("stress.tests",false)
 
   // ------------------------------------------------------------
   class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) 
@@ -472,21 +470,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
     override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc)
     override def pomPostProcess(node: scala.xml.Node): scala.xml.Node = mcPom(AkkaParentProject.this.moduleConfigurations)(super.pomPostProcess(node))
 
-    /**
-     * Used for testOptions, possibility to enable the running of integration and or stresstests
-     *
-     * To enable set true and disable set false
-     * set integration.tests true
-     * set stress.tests true
-     */
-    def createTestFilter(defaultTests: (String) => Boolean) = { TestFilter({
-        case s: String if defaultTests(s) => true
-        case s: String if integrationTestsEnabled.value => s.endsWith("TestIntegration")
-        case s: String if stressTestsEnabled.value      => s.endsWith("TestStress")
-        case _ => false
-      }) :: Nil
+    lazy val excludeTestsProperty = systemOptional[String]("akka.test.exclude", "")
+
+    def excludeTests = {
+      val exclude = excludeTestsProperty.value
+      if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq
     }
 
+    override def testOptions = super.testOptions ++ excludeTests.map(exclude => TestFilter(test => !test.contains(exclude)))
+
     lazy val publishRelease = {
       val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
       publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)