From 3d529e8ca46649b5156b7be24cf84b3a4ce0d972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 30 Mar 2011 15:12:58 +0200 Subject: [PATCH] Added check to ensure that messages are not null. Also cleaned up misc code --- .../src/main/scala/akka/actor/Actor.scala | 13 ++- .../src/main/scala/akka/actor/ActorRef.scala | 1 - .../akka/dispatch/ThreadPoolBuilder.scala | 10 +- .../akka/util/BoundedBlockingQueue.scala | 12 ++- .../test/scala/akka/dispatch/FutureSpec.scala | 1 - .../dispatch/ThreadBasedDispatcherSpec.scala | 91 ------------------- akka-http/src/main/scala/akka/http/Mist.scala | 81 +++++------------ .../remote/netty/NettyRemoteSupport.scala | 12 ++- .../ScalaJSONSerializerSpec.scala | 3 +- .../main/scala/akka/actor/TypedActor.scala | 2 +- .../config/TypedActorGuiceConfigurator.scala | 2 - 11 files changed, 55 insertions(+), 173 deletions(-) delete mode 100644 akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 882331b177..e70b4a98ae 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,9 +28,9 @@ import akka.japi. {Creator, Procedure} /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { - + /** * Java API */ @@ -75,6 +75,7 @@ class IllegalActorStateException private[akka](message: String) extends AkkaEx class ActorKilledException private[akka](message: String) extends AkkaException(message) class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message) +class InvalidMessageException private[akka](message: String) extends AkkaException(message) /** * This message is thrown by default when an Actors behavior doesn't match a message @@ -90,7 +91,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception * @author Jonas Bonér */ object Actor extends ListenerManagement { - + /** * Add shutdown cleanups */ @@ -128,7 +129,7 @@ object Actor extends ListenerManagement { type Receive = PartialFunction[Any, Unit] private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - + /** * Creates an ActorRef out of the Actor with type T. *
@@ -443,8 +444,10 @@ trait Actor {
   // =========================================
 
   private[akka] final def apply(msg: Any) = {
+    if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
+      throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
     val behaviorStack = self.hotswap
-    msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
+    msg match {
       case l: AutoReceivedMessage           => autoReceiveMessage(l)
       case msg if behaviorStack.nonEmpty &&
         behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 81574dacff..673cb487a1 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
 
       failedActor match {
         case p: Proxyable =>
-          //p.swapProxiedActor(freshActor) //TODO: broken
           failedActor.preRestart(reason)
           failedActor.postRestart(reason)
         case _ =>
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index 31d5dca0eb..83c30f23e0 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -160,12 +160,11 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
  */
 object MonitorableThread {
   val DEFAULT_NAME = "MonitorableThread"
-  val created = new AtomicInteger
-  val alive = new AtomicInteger
-  @volatile var debugLifecycle = false
-}
 
-// FIXME fix the issues with using the monitoring in MonitorableThread
+  // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
+  val created      = new AtomicInteger
+  val alive        = new AtomicInteger
+}
 
 /**
  * @author Jonas Bonér
@@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String)
   })
 
   override def run = {
-    val debug = MonitorableThread.debugLifecycle
     try {
       MonitorableThread.alive.incrementAndGet
       super.run
diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
index 8c37845baf..ba4e508454 100644
--- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
+++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
@@ -1,11 +1,15 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB 
+ */
+
 package akka.util
 
 import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ TimeUnit, BlockingQueue }
 import java.util.{ AbstractQueue, Queue, Collection, Iterator }
 
-class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
+class BoundedBlockingQueue[E <: AnyRef](
+  val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
 
   backing match {
     case null => throw new IllegalArgumentException("Backing Queue may not be null")
@@ -32,7 +36,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
       require(backing.offer(e))
       notEmpty.signal()
     } finally {
-        lock.unlock()
+      lock.unlock()
     }
   }
 
@@ -319,4 +323,4 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
       lock.unlock()
     }
   }
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
index f99f5f5305..83dc4e294b 100644
--- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite {
     actor.stop
   }
 
-  // FIXME: implement Futures.awaitEither, and uncomment these two tests
   @Test def shouldFutureAwaitEitherLeft = {
     val actor1 = actorOf[TestActor].start
     val actor2 = actorOf[TestActor].start
diff --git a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
deleted file mode 100644
index 603b17e336..0000000000
--- a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package akka.dispatch
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-import org.scalatest.junit.JUnitSuite
-import org.junit.{Test, Before}
-
-import akka.actor.Actor
-import Actor._
-
-// FIXME use this test when we have removed the MessageInvoker classes
-/*
-class ThreadBasedDispatcherSpec extends JUnitSuite {
-  private var threadingIssueDetected: AtomicBoolean = null
-  val key1 = actorOf(new Actor { def receive = { case _ => {}} })
-  val key2 = actorOf(new Actor { def receive = { case _ => {}} })
-  val key3 = actorOf(new Actor { def receive = { case _ => {}} })
-
-  class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
-    val guardLock: Lock = new ReentrantLock
-
-    def invoke(message: MessageInvocation) {
-      try {
-        if (threadingIssueDetected.get) return
-        if (guardLock.tryLock) {
-          handleLatch.countDown
-        } else {
-          threadingIssueDetected.set(true)
-        }
-      } catch {
-        case e: Exception => threadingIssueDetected.set(true)
-      } finally {
-        guardLock.unlock
-      }
-    }
-  }
-
-  @Before
-  def setUp = {
-    threadingIssueDetected = new AtomicBoolean(false)
-  }
-
-  @Test
-  def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
-    internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
-  }
-
-  @Test
-  def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
-    internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
-  }
-
-  private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = {
-    val guardLock = new ReentrantLock
-    val handleLatch = new CountDownLatch(100)
-    val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
-    dispatcher.start
-    for (i <- 0 until 100) {
-      dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None))
-    }
-    assert(handleLatch.await(5, TimeUnit.SECONDS))
-    assert(!threadingIssueDetected.get)
-  }
-
-  private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = {
-    val handleLatch = new CountDownLatch(100)
-    val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
-      var currentValue = -1;
-      def invoke(message: MessageInvocation) {
-        if (threadingIssueDetected.get) return
-        val messageValue = message.message.asInstanceOf[Int]
-        if (messageValue.intValue == currentValue + 1) {
-          currentValue = messageValue.intValue
-          handleLatch.countDown
-        } else threadingIssueDetected.set(true)
-      }
-    })
-    dispatcher.start
-    for (i <- 0 until 100) {
-      dispatcher.dispatch(new MessageInvocation(key1, i, None, None))
-    }
-    assert(handleLatch.await(5, TimeUnit.SECONDS))
-    assert(!threadingIssueDetected.get)
-    dispatcher.postStop
-  }
-}
-*/
diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala
index eb91b9737f..379cbfb36d 100644
--- a/akka-http/src/main/scala/akka/http/Mist.scala
+++ b/akka-http/src/main/scala/akka/http/Mist.scala
@@ -4,7 +4,7 @@
 
 package akka.http
 
-import akka.actor.{ActorRegistry, ActorRef, Actor}
+import akka.actor.{ActorRef, Actor}
 import akka.event.EventHandler
 
 import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@@ -17,8 +17,8 @@ import javax.servlet.Filter
 object MistSettings {
   import akka.config.Config._
 
-  final val JettyServer = "jetty"
-  final val TimeoutAttribute = "timeout"
+  val JettyServer      = "jetty"
+  val TimeoutAttribute = "timeout"
 
   val    ConnectionClose = config.getBool("akka.http.connection-close", true)
   val   RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@@ -64,7 +64,7 @@ import Types._
  *
  */
 trait Mist {
-  import javax.servlet.{ServletContext}
+  import javax.servlet.ServletContext
   import MistSettings._
 
   /**
@@ -84,28 +84,21 @@ trait Mist {
                         response: HttpServletResponse)
                        (builder: (() => tAsyncRequestContext) => RequestMethod) = {
     def suspend: tAsyncRequestContext = {
-      //
+
       // set to right now, which is effectively "already expired"
-      //
       response.setDateHeader("Expires", System.currentTimeMillis)
       response.setHeader("Cache-Control", "no-cache, must-revalidate")
 
-      //
       // no keep-alive?
-      //
       if (ConnectionClose) response.setHeader("Connection","close")
 
-      //
       // suspend the request
       // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
-      //
       request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
     }
 
-      //
       // shoot the message to the root endpoint for processing
       // IMPORTANT: the suspend method is invoked on the server thread not in the actor
-      //
     val method = builder(suspend _)
     if (method.go) _root ! method
   }
@@ -117,7 +110,6 @@ trait Mist {
   def initMist(context: ServletContext) {
     val server = context.getServerInfo
     val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
-
     _factory = if (major >= 3) {
       Some(Servlet30ContextMethodFactory)
     } else if (server.toLowerCase startsWith JettyServer) {
@@ -200,7 +192,7 @@ object Endpoint {
   /**
    * leverage the akka config to tweak the dispatcher for our endpoints
    */
-  final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
+  val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
 
   type Hook     = Function[String, Boolean]
   type Provider = Function[String, ActorRef]
@@ -236,25 +228,21 @@ trait Endpoint { this: Actor =>
    * Message handling common to all endpoints, must be chained
    */
   protected def handleHttpRequest: Receive = {
-    //
+
     // add the endpoint - the if the uri hook matches,
     // the message will be sent to the actor returned by the provider func
-    //
     case Attach(hook, provider) => _attach(hook, provider)
 
-    //
     // dispatch the suspended requests
-    //
     case req: RequestMethod => {
       val uri = req.request.getPathInfo
       val endpoints = _attachments.filter { _._1(uri) }
 
-      if (!endpoints.isEmpty)
-        endpoints.foreach { _._2(uri) ! req }
+      if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
       else {
         self.sender match {
           case Some(s) => s reply NoneAvailable(uri, req)
-          case None => _na(uri, req)
+          case None    => _na(uri, req)
         }
       }
     }
@@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
 
   final val Root = "/"
 
-  //
   // use the configurable dispatcher
-  //
   self.dispatcher = Endpoint.Dispatcher
 
-  //
   // adopt the configured id
-  //
   if (RootActorBuiltin) self.id = RootActorID
 
   override def preStart =
     _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
 
-  //TODO: Is this needed?
-  //override def postRestart =
-  //  _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
-
   def recv: Receive = {
     case NoneAvailable(uri, req) => _na(uri, req)
     case unknown => {}
@@ -317,10 +297,7 @@ trait RequestMethod {
   import java.io.IOException
   import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
 
-  //
   // required implementations
-  //
-
   val builder: () => tAsyncRequestContext
 
   /**
@@ -353,35 +330,31 @@ trait RequestMethod {
   def getHeaderOrElse(name: String, default: Function[Any, String]): String =
     request.getHeader(name) match {
       case null => default(null)
-            case s => s
-          }
+      case s    => s
+    }
 
   def getParameterOrElse(name: String, default: Function[Any, String]): String =
     request.getParameter(name) match {
       case null => default(null)
-      case s => s
+      case s    => s
     }
 
   def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
 
   def complete(status: Int, body: String, headers: Headers): Boolean =
-    rawComplete {
-      res => {
-        res.setStatus(status)
-        headers foreach {h => response.setHeader(h._1, h._2)}
-        res.getWriter.write(body)
-        res.getWriter.close
-        res.flushBuffer
-      }
+    rawComplete { res =>
+      res.setStatus(status)
+      headers foreach {h => response.setHeader(h._1, h._2)}
+      res.getWriter.write(body)
+      res.getWriter.close
+      res.flushBuffer
     }
 
   def rawComplete(completion: HttpServletResponse => Unit): Boolean =
     context match {
-      case Some(pipe) => {
+      case Some(pipe) =>
         try {
-          if (!suspended) {
-            false
-          }
+          if (!suspended) false
           else {
             completion(response)
             pipe.complete
@@ -392,34 +365,28 @@ trait RequestMethod {
             EventHandler.error(io, this, io.getMessage)
             false
         }
-    }
-
-    case None =>
-      false
+      case None => false
   }
 
   def complete(t: Throwable) {
     context match {
-      case Some(pipe) => {
+      case Some(pipe) =>
         try {
           if (suspended) {
             response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
             pipe.complete
           }
         } catch {
-          case io: IOException => 
+          case io: IOException =>
             EventHandler.error(io, this, io.getMessage)
         }
-      }
-
       case None => {}
     }
   }
 
-  /**
+  /*
    * Utility methods to send responses back
    */
-
   def OK(body: String): Boolean                      = complete(HttpServletResponse.SC_OK, body)
   def OK(body: String, headers:Headers): Boolean     = complete(HttpServletResponse.SC_OK, body, headers)
   def Created(body: String): Boolean                 = complete(HttpServletResponse.SC_CREATED, body)
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 3d603508e8..dd7a22df52 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -214,7 +214,7 @@ abstract class RemoteClient private[akka] (
     isOneWay: Boolean,
     actorRef: ActorRef,
     typedActorInfo: Option[Tuple2[String, String]],
-    actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race
+    actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
 
     send(createRemoteMessageProtocolBuilder(
         Some(actorRef),
@@ -1016,9 +1016,15 @@ class RemoteServerHandler(
 
     val typedActor = createTypedActor(actorInfo, channel)
     //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
-    val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
+    val (ownerTypeHint, argClasses, args) =
+      MessageSerializer
+          .deserialize(request.getMessage)
+          .asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
 
-    def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
+    def resolveMethod(bottomType: Class[_],
+                      typeHint: String,
+                      methodName: String,
+                      methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
       var typeToResolve = bottomType
       var targetMethod: java.lang.reflect.Method = null
       var firstException: NoSuchMethodException = null
diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
index 02b29e6de1..cd8f71058e 100644
--- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
+++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
@@ -8,7 +8,7 @@ import org.junit.runner.RunWith
 
 import akka.serialization.Serializer.ScalaJSON
 //TODO: FIXME WHY IS THIS COMMENTED OUT?
-/*
+
 object Protocols {
   import sjson.json.DefaultProtocol._
   case class Shop(store: String, item: String, price: Int)
@@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends
     }
   }
 }
-*/
diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
index e32ed03c32..1cdf735e8e 100644
--- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -364,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
   /**
     * Returns the home address and port for this actor.
     */
-  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
+  def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
 }
 
 object TypedActorConfiguration {
diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
index 315467f2ee..f2f2a7a1fc 100644
--- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
+++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
@@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
   private var components: List[SuperviseTypedActor] = _
   private var supervised: List[Supervise] = Nil
   private var bindings: List[DependencyBinding] = Nil
-  private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
   private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
   private var modules = new java.util.ArrayList[Module]
   private var methodToUriRegistry = new HashMap[Method, String]
@@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
 
   def reset = synchronized {
     modules = new java.util.ArrayList[Module]
-    configRegistry = new HashMap[Class[_], SuperviseTypedActor]
     typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
     methodToUriRegistry = new HashMap[Method, String]
     injector = null