From 3d4463fb782ed12072428edd86e40e0ce88912e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 29 Mar 2011 06:46:54 -0700 Subject: [PATCH 01/12] AspectWerkz license changed to Apache 2 --- project/build/AkkaProject.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 3a57215144..0c3466f259 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -126,7 +126,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // Compile lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain - lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //LGPL 2.1 + lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 From dcd49bd052373b352fe20024da8483de23d5ac2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 29 Mar 2011 17:07:41 +0200 Subject: [PATCH 02/12] Added configuration to define capacity to the remote client buffer messages on failure to send --- .../main/scala/akka/event/EventHandler.scala | 10 +++++----- .../remote/netty/NettyRemoteSupport.scala | 19 +++++++++++++++---- .../src/test/scala/config/ConfigSpec.scala | 2 -- config/akka-reference.conf | 10 +++++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index d4fc55b0a9..5b8245d1d4 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -25,7 +25,7 @@ import akka.AkkaException * case EventHandler.Warning(instance, message) => ... * case EventHandler.Info(instance, message) => ... * case EventHandler.Debug(instance, message) => ... - * case genericEvent => ... + * case genericEvent => ... * } * }) * @@ -35,7 +35,7 @@ import akka.AkkaException * *

* However best is probably to register the listener in the 'akka.conf' - * configuration file. + * configuration file. *

* Log an error event: *

@@ -45,7 +45,7 @@ import akka.AkkaException
  * 
  * EventHandler.error(exception, this, message.toString)
  * 
- * + * * @author Jonas Bonér */ object EventHandler extends ListenerManagement { @@ -73,7 +73,7 @@ object EventHandler extends ListenerManagement { val debug = "[DEBUG] [%s] [%s] [%s] %s".intern val generic = "[GENERIC] [%s] [%s]".intern val ID = "event:handler".intern - + class EventHandlerException extends AkkaException lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build @@ -129,7 +129,7 @@ object EventHandler extends ListenerManagement { else if (eventClass.isInstanceOf[Debug]) DebugLevel else DebugLevel } - + class DefaultListener extends Actor { self.id = ID self.dispatcher = EventHandlerDispatcher diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 210b818784..3d603508e8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -39,6 +39,9 @@ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import java.util.concurrent._ +import akka.AkkaException + +class RemoteClientMessageBufferException(message: String) extends AkkaException(message) object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -159,7 +162,8 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true) + val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true) + val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + @@ -167,7 +171,10 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + protected val pendingRequests = { + if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) + } private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) @@ -243,7 +250,10 @@ abstract class RemoteClient private[akka] ( case e: Throwable => // add the request to the tx log after a failing send notifyListeners(RemoteClientError(e, module, remoteAddress)) - if (useTransactionLog) pendingRequests.add((true, null, request)) + if (useTransactionLog) { + if (!pendingRequests.offer((true, null, request))) + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") + } else throw e } None @@ -256,7 +266,8 @@ abstract class RemoteClient private[akka] ( def handleRequestReplyError(future: ChannelFuture) = { notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) if (useTransactionLog) { - pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") } else { val f = futures.remove(futureUuid) // Clean up future if (f ne null) f.completeWithException(future.getCause) diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 4108a99d63..8636254ced 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -16,8 +16,6 @@ class ConfigSpec extends WordSpec with MustMatchers { "contain all configuration properties for akka-stm that are used in code with their correct defaults" in { import Config.config._ - getInt("akka.storage.max-retries") must equal(Some(10)) - getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 164fae1edc..1c3676ad31 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -146,7 +146,11 @@ akka { } client { - retry-message-send-on-failure = on + buffering { + retry-message-send-on-failure = on + capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) + # If positive then a bounded mailbox is used and the capacity is set using the property + } reconnect-delay = 5 read-timeout = 10 message-frame-size = 1048576 @@ -154,8 +158,4 @@ akka { reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for } } - - storage { - max-retries = 10 - } } From 8bc017f8b63975e3c242bd97eefdbd816aaf2d92 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 29 Mar 2011 22:50:23 +0200 Subject: [PATCH 03/12] Temporarily disabling send-time-work-redistribution until I can devise a good way of avoiding a worst-case-stack-overflow --- ...xecutorBasedEventDrivenWorkStealingDispatcher.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 451cdf8b80..f2f63a3ff4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -78,12 +78,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override private[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) - if (mbox.dispatcherLock.locked && attemptDonationOf(invocation, mbox)) { + /*if (!mbox.isEmpty && attemptDonationOf(invocation, mbox)) { //We were busy and we got to donate the message to some other lucky guy, we're done here - } else { + } else {*/ mbox enqueue invocation registerForExecution(mbox) - } + //} } override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { @@ -110,13 +110,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Returns true if the donation succeeded or false otherwise */ - protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = { + /*protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = { val actors = members // copy to prevent concurrent modifications having any impact doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match { case null => false case recipient => donate(message, recipient) } - } + }*/ /** * Rewrites the message and adds that message to the recipients mailbox From 4ee194fc3af80449872c744635eb99e3df5d98e8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 29 Mar 2011 22:50:42 +0200 Subject: [PATCH 04/12] Adding -optimise to the compile options --- project/build/AkkaProject.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 0c3466f259..621c945653 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val scalaCompileSettings = Seq("-deprecation", "-Xmigration", - //"-optimise", + "-optimise", "-encoding", "utf8") val javaCompileSettings = Seq("-Xlint:unchecked") From 4f0c22cfa31b03284241c03a070ca7bb9fe1dde0 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 30 Mar 2011 10:48:07 +1300 Subject: [PATCH 05/12] Add some testing times to FSM tests (for Jenkins) --- .../scala/akka/actor/actor/FSMActorSpec.scala | 20 ++++++++++--------- .../akka/actor/actor/FSMTimingSpec.scala | 3 ++- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala index 5213557048..cf910925c8 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -14,6 +14,8 @@ import java.util.concurrent.TimeUnit import akka.util.duration._ +import akka.Testing + object FSMActorSpec { @@ -100,7 +102,7 @@ class FSMActorSpec extends JUnitSuite { def unlockTheLock = { // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", (1, TimeUnit.SECONDS))).start + val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start val transitionTester = Actor.actorOf(new Actor { def receive = { case Transition(_, _, _) => transitionCallBackLatch.open @@ -108,7 +110,7 @@ class FSMActorSpec extends JUnitSuite { }}).start lock ! SubscribeTransitionCallBack(transitionTester) - assert(initialStateLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) lock ! '3' lock ! '3' @@ -116,14 +118,14 @@ class FSMActorSpec extends JUnitSuite { lock ! '2' lock ! '1' - assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(transitionLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(transitionCallBackLatch.tryAwait(1, TimeUnit.SECONDS)) - assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) lock ! "not_handled" - assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) val answerLatch = new StandardLatch object Hello @@ -136,9 +138,9 @@ class FSMActorSpec extends JUnitSuite { } }).start tester ! Hello - assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) tester ! Bye - assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) + assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index a59785ab7a..07491c967f 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -2,6 +2,7 @@ package akka.actor import akka.testkit.TestKit import akka.util.duration._ +import akka.Testing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers @@ -16,7 +17,7 @@ class FSMTimingSpec val fsm = Actor.actorOf(new StateMachine(testActor)).start fsm ! SubscribeTransitionCallBack(testActor) - expectMsg(200 millis, CurrentState(fsm, Initial)) + expectMsg(Testing.time(200).millis, CurrentState(fsm, Initial)) ignoreMsg { case Transition(_, Initial, _) => true From fa8809a772a6c378b47d51db763be987cfd0ce1a Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 30 Mar 2011 12:32:12 +1300 Subject: [PATCH 06/12] Multiply test timing in FSMTimingSpec --- .../scala/akka/actor/actor/FSMTimingSpec.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index 07491c967f..0b67272244 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -26,7 +26,7 @@ class FSMTimingSpec "A Finite State Machine" must { "receive StateTimeout" in { - within (50 millis, 150 millis) { + within (Testing.time(50).millis, Testing.time(150).millis) { fsm ! TestStateTimeout expectMsg(Transition(fsm, TestStateTimeout, Initial)) expectNoMsg @@ -34,7 +34,7 @@ class FSMTimingSpec } "receive single-shot timer" in { - within (50 millis, 150 millis) { + within (Testing.time(50).millis, Testing.time(150).millis) { fsm ! TestSingleTimer expectMsg(Tick) expectMsg(Transition(fsm, TestSingleTimer, Initial)) @@ -48,7 +48,7 @@ class FSMTimingSpec case Tick => Tick } seq must have length (5) - within(250 millis) { + within(Testing.time(250) millis) { expectMsg(Transition(fsm, TestRepeatedTimer, Initial)) expectNoMsg } @@ -56,21 +56,21 @@ class FSMTimingSpec "notify unhandled messages" in { fsm ! TestUnhandled - within(100 millis) { + within(Testing.time(100) millis) { fsm ! Tick expectNoMsg } - within(100 millis) { + within(Testing.time(100) millis) { fsm ! SetHandler fsm ! Tick expectMsg(Unhandled(Tick)) expectNoMsg } - within(100 millis) { + within(Testing.time(100) millis) { fsm ! Unhandled("test") expectNoMsg } - within(100 millis) { + within(Testing.time(100) millis) { fsm ! Cancel expectMsg(Transition(fsm, TestUnhandled, Initial)) } From de2566e01ead6a3deca49b00a680fe1ec35844d5 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 30 Mar 2011 13:41:39 +1300 Subject: [PATCH 07/12] Multiply test timing in ActorModelSpec --- .../scala/akka/dispatch/ActorModelSpec.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index 55c2e001af..6b154b42a9 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.{Duration, Switch} +import akka.Testing object ActorModelSpec { @@ -224,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite { a.start a ! CountDown(start) - assertCountDown(start,3000, "Should process first message within 3 seconds") + assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) a ! Wait(1000) a ! CountDown(oneAtATime) // in case of serialization violation, restart would happen instead of count down - assertCountDown(oneAtATime,1500,"Processed message when allowed") + assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) a.stop @@ -245,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite { def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } for (i <- 1 to 10) { start } - assertCountDown(counter, 3000, "Should process 200 messages") + assertCountDown(counter, Testing.time(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) a.stop @@ -263,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite { val aStart,aStop,bParallel = new CountDownLatch(1) a ! Meet(aStart,aStop) - assertCountDown(aStart,3000, "Should process first message within 3 seconds") + assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds") b ! CountDown(bParallel) - assertCountDown(bParallel, 3000, "Should process other actors in parallel") + assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel") aStop.countDown() a.stop @@ -281,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite { val done = new CountDownLatch(1) a ! Restart a ! CountDown(done) - assertCountDown(done, 3000, "Should be suspended+resumed and done with next message within 3 seconds") + assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds") a.stop assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2, msgsProcessed = 2, suspensions = 1, resumes = 1) @@ -297,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) dispatcher.resume(a) - assertCountDown(done, 3000, "Should resume processing of messages when resumed") + assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) @@ -314,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite { (1 to num) foreach { _ => newTestActor.start ! cachedMessage } - assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns") + assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns") } for(run <- 1 to 3) { flood(10000) From fef54b0884d1256d54f749bd14293cca23b5e8e2 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 30 Mar 2011 14:38:14 +1300 Subject: [PATCH 08/12] More test timing adjustments --- .../scala/actor/typed-actor/TypedActorLifecycleSpec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 0946aa26c0..0e27557607 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -12,6 +12,8 @@ import akka.config.Supervision._ import java.util.concurrent.CountDownLatch import akka.config.TypedActorConfigurator +import akka.Testing + /** * @author Martin Krasser */ @@ -95,7 +97,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft } it("should be stopped when supervision cannot handle the problem in") { - val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000) + val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000)) val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise try { val first = conf.getInstance(classOf[TypedActorFailer]) @@ -121,7 +123,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft } it("should be restarted when supervision handles the problem in") { - val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000) + val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000)) val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise try { val first = conf.getInstance(classOf[TypedActorFailer]) @@ -146,4 +148,4 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft } } } -} \ No newline at end of file +} From f88a7cd207b28b5b71a2e8fd8e1b534dbb6a8413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 30 Mar 2011 09:16:09 +0200 Subject: [PATCH 09/12] added some methods to the TypedActor context and deprecated all methods starting with 'get*' --- .../main/scala/akka/actor/TypedActor.scala | 59 ++++++++++++------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 3fcf0789bc..e32ed03c32 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -17,10 +17,10 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicBoolean -import scala.reflect.BeanProperty import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} +import scala.reflect.BeanProperty + /** * TypedActor is a type-safe actor made out of a POJO with interface. * Void methods are turned into fire-forget messages. @@ -36,7 +36,7 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} * class TestActorImpl extends TypedActor implements TestActor { * * public void hit(int count) { - * Pong pong = (Pong) getContext().getSender(); + * Pong pong = (Pong) context().sender(); * pong.hit(count++); * } * @@ -124,15 +124,15 @@ abstract class TypedActor extends Actor with Proxyable { * This class does not contain static information but is updated by the runtime system * at runtime. *

- * You can get a hold of the context using either the 'getContext()' or 'context' - * methods from the 'TypedActor' base class. + * You can get a hold of the context using the 'context()' + * method from the 'TypedActor' base class. *

* * Here is an example of usage (in Java): *

    * class PingImpl extends TypedActor implements Ping {
    *   public void hit(int count) {
-   *     Pong pong = (Pong) getContext().getSender();
+   *     Pong pong = (Pong) context().sender();
    *     pong.hit(count++);
    *   }
    * }
@@ -148,7 +148,12 @@ abstract class TypedActor extends Actor with Proxyable {
    * }
    * 
*/ - @BeanProperty val context: TypedActorContext = new TypedActorContext(self) + val context: TypedActorContext = new TypedActorContext(self) + + /** + * @deprecated 'getContext()' is deprecated use 'context()' + */ + def getContext: TypedActorContext = context /** * This method is used to resolve the Future for TypedActor methods that are defined to return a @@ -180,15 +185,16 @@ abstract class TypedActor extends Actor with Proxyable { case joinPoint: JoinPoint => SenderContextInfo.senderActorRef.value = self SenderContextInfo.senderProxy.value = proxy - if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed else self.reply(joinPoint.proceed) + case coordinated @ Coordinated(joinPoint: JoinPoint) => SenderContextInfo.senderActorRef.value = self SenderContextInfo.senderProxy.value = proxy if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) coordinated atomic { joinPoint.proceed } + case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) case unexpected => throw new IllegalActorStateException( @@ -255,7 +261,7 @@ abstract class TypedActor extends Actor with Proxyable { *
  * class PingImpl extends TypedActor implements Ping {
  *   public void hit(int count) {
- *     Pong pong = (Pong) getContext().getSender();
+ *     Pong pong = (Pong) context().sender();
  *     pong.hit(count++);
  *   }
  * }
@@ -277,7 +283,8 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   private[akka] var _sender: AnyRef = _
 
   /**
-5  * Returns the uuid for the actor.
+   * Returns the uuid for the actor.
+   * @deprecated use 'uuid()'
    */
   def getUuid() = actorRef.uuid
 
@@ -287,31 +294,39 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   def uuid = actorRef.uuid
 
   def timeout = actorRef.timeout
+
+  /**
+   * @deprecated use 'timeout()'
+   */
   def getTimout = timeout
   def setTimout(timeout: Long) = actorRef.timeout = timeout
 
   def id =  actorRef.id
+
+  /**
+   * @deprecated use 'id()'
+   */
   def getId = id
   def setId(id: String) = actorRef.id = id
 
   def receiveTimeout = actorRef.receiveTimeout
+
+  /**
+   * @deprecated use 'receiveTimeout()'
+   */
   def getReceiveTimeout = receiveTimeout
   def setReceiveTimeout(timeout: Long) = actorRef.setReceiveTimeout(timeout)
 
-  /**
-   * Is the actor running?
-   */
+  def mailboxSize = actorRef.mailboxSize
+
+  def dispatcher = actorRef.getDispatcher
+
+  def lifeCycle = actorRef.getLifeCycle
+
   def isRunning: Boolean = actorRef.isRunning
-
-  /**
-   * Is the actor shut down?
-   */
   def isShutdown: Boolean = actorRef.isShutdown
-
-  /**
-   * Is the actor ever started?
-   */
   def isUnstarted: Boolean = actorRef.isUnstarted
+  def isBeingRestarted: Boolean = actorRef.isBeingRestarted
 
   /**
    * Returns the current sender reference.
@@ -449,7 +464,7 @@ object TypedActor {
    * @param intfClass interface the typed actor implements
    * @param targetClass implementation class of the typed actor
    */
-  def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = 
+  def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
     newInstance(intfClass, targetClass, TypedActorConfiguration())
 
   /**

From 3d529e8ca46649b5156b7be24cf84b3a4ce0d972 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Wed, 30 Mar 2011 15:12:58 +0200
Subject: [PATCH 10/12] Added check to ensure that messages are not null. Also
 cleaned up misc code

---
 .../src/main/scala/akka/actor/Actor.scala     | 13 ++-
 .../src/main/scala/akka/actor/ActorRef.scala  |  1 -
 .../akka/dispatch/ThreadPoolBuilder.scala     | 10 +-
 .../akka/util/BoundedBlockingQueue.scala      | 12 ++-
 .../test/scala/akka/dispatch/FutureSpec.scala |  1 -
 .../dispatch/ThreadBasedDispatcherSpec.scala  | 91 -------------------
 akka-http/src/main/scala/akka/http/Mist.scala | 81 +++++------------
 .../remote/netty/NettyRemoteSupport.scala     | 12 ++-
 .../ScalaJSONSerializerSpec.scala             |  3 +-
 .../main/scala/akka/actor/TypedActor.scala    |  2 +-
 .../config/TypedActorGuiceConfigurator.scala  |  2 -
 11 files changed, 55 insertions(+), 173 deletions(-)
 delete mode 100644 akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala

diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 882331b177..e70b4a98ae 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -28,9 +28,9 @@ import akka.japi. {Creator, Procedure}
 /* Marker trait to show which Messages are automatically handled by Akka */
 sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
 
-case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) 
+case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
   extends AutoReceivedMessage with LifeCycleMessage {
-  
+
   /**
    * Java API
    */
@@ -75,6 +75,7 @@ class IllegalActorStateException   private[akka](message: String) extends AkkaEx
 class ActorKilledException         private[akka](message: String) extends AkkaException(message)
 class ActorInitializationException private[akka](message: String) extends AkkaException(message)
 class ActorTimeoutException        private[akka](message: String) extends AkkaException(message)
+class InvalidMessageException      private[akka](message: String) extends AkkaException(message)
 
 /**
  * This message is thrown by default when an Actors behavior doesn't match a message
@@ -90,7 +91,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception
  * @author Jonas Bonér
  */
 object Actor extends ListenerManagement {
-  
+
   /**
    * Add shutdown cleanups
    */
@@ -128,7 +129,7 @@ object Actor extends ListenerManagement {
   type Receive = PartialFunction[Any, Unit]
 
   private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
-  
+
    /**
    *  Creates an ActorRef out of the Actor with type T.
    * 
@@ -443,8 +444,10 @@ trait Actor {
   // =========================================
 
   private[akka] final def apply(msg: Any) = {
+    if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
+      throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
     val behaviorStack = self.hotswap
-    msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
+    msg match {
       case l: AutoReceivedMessage           => autoReceiveMessage(l)
       case msg if behaviorStack.nonEmpty &&
         behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 81574dacff..673cb487a1 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
 
       failedActor match {
         case p: Proxyable =>
-          //p.swapProxiedActor(freshActor) //TODO: broken
           failedActor.preRestart(reason)
           failedActor.postRestart(reason)
         case _ =>
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index 31d5dca0eb..83c30f23e0 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -160,12 +160,11 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
  */
 object MonitorableThread {
   val DEFAULT_NAME = "MonitorableThread"
-  val created = new AtomicInteger
-  val alive = new AtomicInteger
-  @volatile var debugLifecycle = false
-}
 
-// FIXME fix the issues with using the monitoring in MonitorableThread
+  // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
+  val created      = new AtomicInteger
+  val alive        = new AtomicInteger
+}
 
 /**
  * @author Jonas Bonér
@@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String)
   })
 
   override def run = {
-    val debug = MonitorableThread.debugLifecycle
     try {
       MonitorableThread.alive.incrementAndGet
       super.run
diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
index 8c37845baf..ba4e508454 100644
--- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
+++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
@@ -1,11 +1,15 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB 
+ */
+
 package akka.util
 
 import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ TimeUnit, BlockingQueue }
 import java.util.{ AbstractQueue, Queue, Collection, Iterator }
 
-class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
+class BoundedBlockingQueue[E <: AnyRef](
+  val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
 
   backing match {
     case null => throw new IllegalArgumentException("Backing Queue may not be null")
@@ -32,7 +36,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
       require(backing.offer(e))
       notEmpty.signal()
     } finally {
-        lock.unlock()
+      lock.unlock()
     }
   }
 
@@ -319,4 +323,4 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
       lock.unlock()
     }
   }
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
index f99f5f5305..83dc4e294b 100644
--- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite {
     actor.stop
   }
 
-  // FIXME: implement Futures.awaitEither, and uncomment these two tests
   @Test def shouldFutureAwaitEitherLeft = {
     val actor1 = actorOf[TestActor].start
     val actor2 = actorOf[TestActor].start
diff --git a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
deleted file mode 100644
index 603b17e336..0000000000
--- a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package akka.dispatch
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-import org.scalatest.junit.JUnitSuite
-import org.junit.{Test, Before}
-
-import akka.actor.Actor
-import Actor._
-
-// FIXME use this test when we have removed the MessageInvoker classes
-/*
-class ThreadBasedDispatcherSpec extends JUnitSuite {
-  private var threadingIssueDetected: AtomicBoolean = null
-  val key1 = actorOf(new Actor { def receive = { case _ => {}} })
-  val key2 = actorOf(new Actor { def receive = { case _ => {}} })
-  val key3 = actorOf(new Actor { def receive = { case _ => {}} })
-
-  class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
-    val guardLock: Lock = new ReentrantLock
-
-    def invoke(message: MessageInvocation) {
-      try {
-        if (threadingIssueDetected.get) return
-        if (guardLock.tryLock) {
-          handleLatch.countDown
-        } else {
-          threadingIssueDetected.set(true)
-        }
-      } catch {
-        case e: Exception => threadingIssueDetected.set(true)
-      } finally {
-        guardLock.unlock
-      }
-    }
-  }
-
-  @Before
-  def setUp = {
-    threadingIssueDetected = new AtomicBoolean(false)
-  }
-
-  @Test
-  def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
-    internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
-  }
-
-  @Test
-  def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
-    internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
-  }
-
-  private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = {
-    val guardLock = new ReentrantLock
-    val handleLatch = new CountDownLatch(100)
-    val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
-    dispatcher.start
-    for (i <- 0 until 100) {
-      dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None))
-    }
-    assert(handleLatch.await(5, TimeUnit.SECONDS))
-    assert(!threadingIssueDetected.get)
-  }
-
-  private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = {
-    val handleLatch = new CountDownLatch(100)
-    val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
-      var currentValue = -1;
-      def invoke(message: MessageInvocation) {
-        if (threadingIssueDetected.get) return
-        val messageValue = message.message.asInstanceOf[Int]
-        if (messageValue.intValue == currentValue + 1) {
-          currentValue = messageValue.intValue
-          handleLatch.countDown
-        } else threadingIssueDetected.set(true)
-      }
-    })
-    dispatcher.start
-    for (i <- 0 until 100) {
-      dispatcher.dispatch(new MessageInvocation(key1, i, None, None))
-    }
-    assert(handleLatch.await(5, TimeUnit.SECONDS))
-    assert(!threadingIssueDetected.get)
-    dispatcher.postStop
-  }
-}
-*/
diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala
index eb91b9737f..379cbfb36d 100644
--- a/akka-http/src/main/scala/akka/http/Mist.scala
+++ b/akka-http/src/main/scala/akka/http/Mist.scala
@@ -4,7 +4,7 @@
 
 package akka.http
 
-import akka.actor.{ActorRegistry, ActorRef, Actor}
+import akka.actor.{ActorRef, Actor}
 import akka.event.EventHandler
 
 import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@@ -17,8 +17,8 @@ import javax.servlet.Filter
 object MistSettings {
   import akka.config.Config._
 
-  final val JettyServer = "jetty"
-  final val TimeoutAttribute = "timeout"
+  val JettyServer      = "jetty"
+  val TimeoutAttribute = "timeout"
 
   val    ConnectionClose = config.getBool("akka.http.connection-close", true)
   val   RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@@ -64,7 +64,7 @@ import Types._
  *
  */
 trait Mist {
-  import javax.servlet.{ServletContext}
+  import javax.servlet.ServletContext
   import MistSettings._
 
   /**
@@ -84,28 +84,21 @@ trait Mist {
                         response: HttpServletResponse)
                        (builder: (() => tAsyncRequestContext) => RequestMethod) = {
     def suspend: tAsyncRequestContext = {
-      //
+
       // set to right now, which is effectively "already expired"
-      //
       response.setDateHeader("Expires", System.currentTimeMillis)
       response.setHeader("Cache-Control", "no-cache, must-revalidate")
 
-      //
       // no keep-alive?
-      //
       if (ConnectionClose) response.setHeader("Connection","close")
 
-      //
       // suspend the request
       // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
-      //
       request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
     }
 
-      //
       // shoot the message to the root endpoint for processing
       // IMPORTANT: the suspend method is invoked on the server thread not in the actor
-      //
     val method = builder(suspend _)
     if (method.go) _root ! method
   }
@@ -117,7 +110,6 @@ trait Mist {
   def initMist(context: ServletContext) {
     val server = context.getServerInfo
     val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
-
     _factory = if (major >= 3) {
       Some(Servlet30ContextMethodFactory)
     } else if (server.toLowerCase startsWith JettyServer) {
@@ -200,7 +192,7 @@ object Endpoint {
   /**
    * leverage the akka config to tweak the dispatcher for our endpoints
    */
-  final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
+  val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
 
   type Hook     = Function[String, Boolean]
   type Provider = Function[String, ActorRef]
@@ -236,25 +228,21 @@ trait Endpoint { this: Actor =>
    * Message handling common to all endpoints, must be chained
    */
   protected def handleHttpRequest: Receive = {
-    //
+
     // add the endpoint - the if the uri hook matches,
     // the message will be sent to the actor returned by the provider func
-    //
     case Attach(hook, provider) => _attach(hook, provider)
 
-    //
     // dispatch the suspended requests
-    //
     case req: RequestMethod => {
       val uri = req.request.getPathInfo
       val endpoints = _attachments.filter { _._1(uri) }
 
-      if (!endpoints.isEmpty)
-        endpoints.foreach { _._2(uri) ! req }
+      if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
       else {
         self.sender match {
           case Some(s) => s reply NoneAvailable(uri, req)
-          case None => _na(uri, req)
+          case None    => _na(uri, req)
         }
       }
     }
@@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
 
   final val Root = "/"
 
-  //
   // use the configurable dispatcher
-  //
   self.dispatcher = Endpoint.Dispatcher
 
-  //
   // adopt the configured id
-  //
   if (RootActorBuiltin) self.id = RootActorID
 
   override def preStart =
     _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
 
-  //TODO: Is this needed?
-  //override def postRestart =
-  //  _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
-
   def recv: Receive = {
     case NoneAvailable(uri, req) => _na(uri, req)
     case unknown => {}
@@ -317,10 +297,7 @@ trait RequestMethod {
   import java.io.IOException
   import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
 
-  //
   // required implementations
-  //
-
   val builder: () => tAsyncRequestContext
 
   /**
@@ -353,35 +330,31 @@ trait RequestMethod {
   def getHeaderOrElse(name: String, default: Function[Any, String]): String =
     request.getHeader(name) match {
       case null => default(null)
-            case s => s
-          }
+      case s    => s
+    }
 
   def getParameterOrElse(name: String, default: Function[Any, String]): String =
     request.getParameter(name) match {
       case null => default(null)
-      case s => s
+      case s    => s
     }
 
   def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
 
   def complete(status: Int, body: String, headers: Headers): Boolean =
-    rawComplete {
-      res => {
-        res.setStatus(status)
-        headers foreach {h => response.setHeader(h._1, h._2)}
-        res.getWriter.write(body)
-        res.getWriter.close
-        res.flushBuffer
-      }
+    rawComplete { res =>
+      res.setStatus(status)
+      headers foreach {h => response.setHeader(h._1, h._2)}
+      res.getWriter.write(body)
+      res.getWriter.close
+      res.flushBuffer
     }
 
   def rawComplete(completion: HttpServletResponse => Unit): Boolean =
     context match {
-      case Some(pipe) => {
+      case Some(pipe) =>
         try {
-          if (!suspended) {
-            false
-          }
+          if (!suspended) false
           else {
             completion(response)
             pipe.complete
@@ -392,34 +365,28 @@ trait RequestMethod {
             EventHandler.error(io, this, io.getMessage)
             false
         }
-    }
-
-    case None =>
-      false
+      case None => false
   }
 
   def complete(t: Throwable) {
     context match {
-      case Some(pipe) => {
+      case Some(pipe) =>
         try {
           if (suspended) {
             response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
             pipe.complete
           }
         } catch {
-          case io: IOException => 
+          case io: IOException =>
             EventHandler.error(io, this, io.getMessage)
         }
-      }
-
       case None => {}
     }
   }
 
-  /**
+  /*
    * Utility methods to send responses back
    */
-
   def OK(body: String): Boolean                      = complete(HttpServletResponse.SC_OK, body)
   def OK(body: String, headers:Headers): Boolean     = complete(HttpServletResponse.SC_OK, body, headers)
   def Created(body: String): Boolean                 = complete(HttpServletResponse.SC_CREATED, body)
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 3d603508e8..dd7a22df52 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -214,7 +214,7 @@ abstract class RemoteClient private[akka] (
     isOneWay: Boolean,
     actorRef: ActorRef,
     typedActorInfo: Option[Tuple2[String, String]],
-    actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race
+    actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
 
     send(createRemoteMessageProtocolBuilder(
         Some(actorRef),
@@ -1016,9 +1016,15 @@ class RemoteServerHandler(
 
     val typedActor = createTypedActor(actorInfo, channel)
     //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
-    val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
+    val (ownerTypeHint, argClasses, args) =
+      MessageSerializer
+          .deserialize(request.getMessage)
+          .asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
 
-    def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
+    def resolveMethod(bottomType: Class[_],
+                      typeHint: String,
+                      methodName: String,
+                      methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
       var typeToResolve = bottomType
       var targetMethod: java.lang.reflect.Method = null
       var firstException: NoSuchMethodException = null
diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
index 02b29e6de1..cd8f71058e 100644
--- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
+++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
@@ -8,7 +8,7 @@ import org.junit.runner.RunWith
 
 import akka.serialization.Serializer.ScalaJSON
 //TODO: FIXME WHY IS THIS COMMENTED OUT?
-/*
+
 object Protocols {
   import sjson.json.DefaultProtocol._
   case class Shop(store: String, item: String, price: Int)
@@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends
     }
   }
 }
-*/
diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
index e32ed03c32..1cdf735e8e 100644
--- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -364,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   /**
     * Returns the home address and port for this actor.
     */
-  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
+  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
 }
 
 object TypedActorConfiguration {
diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
index 315467f2ee..f2f2a7a1fc 100644
--- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
+++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
@@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
   private var components: List[SuperviseTypedActor] = _
   private var supervised: List[Supervise] = Nil
   private var bindings: List[DependencyBinding] = Nil
-  private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
   private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
   private var modules = new java.util.ArrayList[Module]
   private var methodToUriRegistry = new HashMap[Method, String]
@@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
 
   def reset = synchronized {
     modules = new java.util.ArrayList[Module]
-    configRegistry = new HashMap[Class[_], SuperviseTypedActor]
     typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
     methodToUriRegistry = new HashMap[Method, String]
     injector = null

From 2868b3348a3ba7c58ff781e145ff3c3b4ba91532 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Wed, 30 Mar 2011 21:19:26 +0200
Subject: [PATCH 11/12] improved scaladoc in Future

---
 .../src/main/scala/akka/dispatch/Future.scala | 81 ++++++++++++++-----
 1 file changed, 63 insertions(+), 18 deletions(-)

diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index ba0b7b83ba..8d7fcd7390 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -57,7 +57,7 @@ object Futures {
   }
 
   /**
-   * Java API
+   * Java API.
    * Returns a Future to the result of the first future in the list that is completed
    */
   def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
@@ -68,6 +68,10 @@ object Futures {
    * The fold is performed on the thread where the last future is completed,
    * the result will be the first failure of any of the futures, or any failure in the actual fold,
    * or the result of the fold.
+   * Example:
+   * 
+   *   val result = Futures.fold(0)(futures)(_ + _).await.result
+   * 
*/ def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { if(futures.isEmpty) { @@ -115,6 +119,10 @@ object Futures { /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + *
+   *   val result = Futures.reduce(futures)(_ + _).await.result
+   * 
*/ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { if (futures.isEmpty) @@ -138,7 +146,7 @@ object Futures { } /** - * Java API + * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = @@ -147,18 +155,25 @@ object Futures { import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom + /** + * FIXME document Futures.sequence + */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + /** + * FIXME document Futures.traverse + */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - //Deprecations - - + // ===================================== + // Deprecations + // ===================================== + /** * (Blocking!) */ @@ -299,6 +314,12 @@ sealed trait Future[+T] { /** * When the future is compeleted with a valid result, apply the provided * PartialFunction to the result. + *
+   *   val result = future receive {
+   *     case Foo => "foo"
+   *     case Bar => "bar"
+   *   }.await.result
+   * 
*/ final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f => val optr = f.result @@ -313,6 +334,14 @@ sealed trait Future[+T] { * result of this Future if a match is found, or else return a MatchError. * If this Future is completed with an exception then the new Future will * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a <- actor !!! Req("Hello") collect { case Res(x: Int)    => x }
+   *   b <- actor !!! Req(a)       collect { case Res(x: String) => x }
+   *   c <- actor !!! Req(7)       collect { case Res(x: String) => x }
+   * } yield b + "-" + c
+   * 
*/ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -343,6 +372,14 @@ sealed trait Future[+T] { * Creates a new Future by applying a function to the successful result of * this Future. If this Future is completed with an exception then the new * Future will also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor !!! "Hello" // returns 5
+   *   b: String <- actor !!! a       // returns "10"
+   *   c: String <- actor !!! 7       // returns "14"
+   * } yield b + "-" + c
+   * 
*/ final def map[A](f: T => A): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -371,6 +408,14 @@ sealed trait Future[+T] { * this Future, and returns the result of the function as the new Future. * If this Future is completed with an exception then the new Future will * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor !!! "Hello" // returns 5
+   *   b: String <- actor !!! a       // returns "10"
+   *   c: String <- actor !!! 7       // returns "14"
+   * } yield b + "-" + c
+   * 
*/ final def flatMap[A](f: T => Future[A]): Future[A] = { val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) @@ -425,7 +470,7 @@ sealed trait Future[+T] { } /** - * Returns the current result, throws the exception is one has been raised, else returns None + * Returns the current result, throws the exception is one has been raised, else returns None */ final def resultOrException: Option[T] = { val v = value @@ -450,31 +495,31 @@ sealed trait Future[+T] { } /** - * Essentially this is the Promise (or write-side) of a Future (read-side) + * Essentially this is the Promise (or write-side) of a Future (read-side). */ trait CompletableFuture[T] extends Future[T] { /** - * Completes this Future with the specified result, if not already completed, - * returns this + * Completes this Future with the specified result, if not already completed. + * @return this */ def complete(value: Either[Throwable, T]): CompletableFuture[T] /** - * Completes this Future with the specified result, if not already completed, - * returns this + * Completes this Future with the specified result, if not already completed. + * @return this */ final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) /** - * Completes this Future with the specified exception, if not already completed, - * returns this + * Completes this Future with the specified exception, if not already completed. + * @return this */ final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, - * unless this Future has already been completed - * returns this + * unless this Future has already been completed. + * @return this. */ final def completeWith(other: Future[T]): CompletableFuture[T] = { other onComplete { f => complete(f.value.get) } @@ -482,18 +527,18 @@ trait CompletableFuture[T] extends Future[T] { } /** - * Alias for complete(Right(value)) + * Alias for complete(Right(value)). */ final def << (value: T): CompletableFuture[T] = complete(Right(value)) /** - * Alias for completeWith(other) + * Alias for completeWith(other). */ final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) } /** - * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. + * The default concrete Future implementation. */ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { From 804812b63515ffc4572342f9b6ae8970a2a7a08c Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 31 Mar 2011 11:36:49 +1300 Subject: [PATCH 12/12] Add general mechanism for excluding tests --- .../akka/actor/actor/FSMTimingSpec.scala | 17 +++++++------- project/build/AkkaProject.scala | 22 ++++++------------- 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index 0b67272244..a59785ab7a 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -2,7 +2,6 @@ package akka.actor import akka.testkit.TestKit import akka.util.duration._ -import akka.Testing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers @@ -17,7 +16,7 @@ class FSMTimingSpec val fsm = Actor.actorOf(new StateMachine(testActor)).start fsm ! SubscribeTransitionCallBack(testActor) - expectMsg(Testing.time(200).millis, CurrentState(fsm, Initial)) + expectMsg(200 millis, CurrentState(fsm, Initial)) ignoreMsg { case Transition(_, Initial, _) => true @@ -26,7 +25,7 @@ class FSMTimingSpec "A Finite State Machine" must { "receive StateTimeout" in { - within (Testing.time(50).millis, Testing.time(150).millis) { + within (50 millis, 150 millis) { fsm ! TestStateTimeout expectMsg(Transition(fsm, TestStateTimeout, Initial)) expectNoMsg @@ -34,7 +33,7 @@ class FSMTimingSpec } "receive single-shot timer" in { - within (Testing.time(50).millis, Testing.time(150).millis) { + within (50 millis, 150 millis) { fsm ! TestSingleTimer expectMsg(Tick) expectMsg(Transition(fsm, TestSingleTimer, Initial)) @@ -48,7 +47,7 @@ class FSMTimingSpec case Tick => Tick } seq must have length (5) - within(Testing.time(250) millis) { + within(250 millis) { expectMsg(Transition(fsm, TestRepeatedTimer, Initial)) expectNoMsg } @@ -56,21 +55,21 @@ class FSMTimingSpec "notify unhandled messages" in { fsm ! TestUnhandled - within(Testing.time(100) millis) { + within(100 millis) { fsm ! Tick expectNoMsg } - within(Testing.time(100) millis) { + within(100 millis) { fsm ! SetHandler fsm ! Tick expectMsg(Unhandled(Tick)) expectNoMsg } - within(Testing.time(100) millis) { + within(100 millis) { fsm ! Unhandled("test") expectNoMsg } - within(Testing.time(100) millis) { + within(100 millis) { fsm ! Cancel expectMsg(Transition(fsm, TestUnhandled, Initial)) } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 621c945653..004494c8b8 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -465,8 +465,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } def akkaArtifacts = descendents(info.projectPath / "dist", "*-" + version + ".jar") - lazy val integrationTestsEnabled = systemOptional[Boolean]("integration.tests",false) - lazy val stressTestsEnabled = systemOptional[Boolean]("stress.tests",false) // ------------------------------------------------------------ class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) @@ -485,21 +483,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc) override def pomPostProcess(node: scala.xml.Node): scala.xml.Node = mcPom(AkkaParentProject.this.moduleConfigurations)(super.pomPostProcess(node)) - /** - * Used for testOptions, possibility to enable the running of integration and or stresstests - * - * To enable set true and disable set false - * set integration.tests true - * set stress.tests true - */ - def createTestFilter(defaultTests: (String) => Boolean) = { TestFilter({ - case s: String if defaultTests(s) => true - case s: String if integrationTestsEnabled.value => s.endsWith("TestIntegration") - case s: String if stressTestsEnabled.value => s.endsWith("TestStress") - case _ => false - }) :: Nil + lazy val excludeTestsProperty = systemOptional[String]("akka.test.exclude", "") + + def excludeTests = { + val exclude = excludeTestsProperty.value + if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq } + override def testOptions = super.testOptions ++ excludeTests.map(exclude => TestFilter(test => !test.contains(exclude))) + lazy val publishRelease = { val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false) publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)