Added check to ensure that messages are not null. Also cleaned up misc code

This commit is contained in:
Jonas Bonér 2011-03-30 15:12:58 +02:00
parent f88a7cd207
commit 3d529e8ca4
11 changed files with 55 additions and 173 deletions

View file

@ -75,6 +75,7 @@ class IllegalActorStateException private[akka](message: String) extends AkkaEx
class ActorKilledException private[akka](message: String) extends AkkaException(message)
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
class InvalidMessageException private[akka](message: String) extends AkkaException(message)
/**
* This message is thrown by default when an Actors behavior doesn't match a message
@ -443,8 +444,10 @@ trait Actor {
// =========================================
private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
msg match {
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)

View file

@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
failedActor match {
case p: Proxyable =>
//p.swapProxiedActor(freshActor) //TODO: broken
failedActor.preRestart(reason)
failedActor.postRestart(reason)
case _ =>

View file

@ -160,13 +160,12 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
// FIXME fix the issues with using the monitoring in MonitorableThread
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
try {
MonitorableThread.alive.incrementAndGet
super.run

View file

@ -1,11 +1,15 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
class BoundedBlockingQueue[E <: AnyRef](
val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")

View file

@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite {
actor.stop
}
// FIXME: implement Futures.awaitEither, and uncomment these two tests
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start

View file

@ -1,91 +0,0 @@
package akka.dispatch
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import akka.actor.Actor
import Actor._
// FIXME use this test when we have removed the MessageInvoker classes
/*
class ThreadBasedDispatcherSpec extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
val key1 = actorOf(new Actor { def receive = { case _ => {}} })
val key2 = actorOf(new Actor { def receive = { case _ => {}} })
val key3 = actorOf(new Actor { def receive = { case _ => {}} })
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = {
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, i, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
dispatcher.postStop
}
}
*/

View file

@ -4,7 +4,7 @@
package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.{ActorRef, Actor}
import akka.event.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@ -17,8 +17,8 @@ import javax.servlet.Filter
object MistSettings {
import akka.config.Config._
final val JettyServer = "jetty"
final val TimeoutAttribute = "timeout"
val JettyServer = "jetty"
val TimeoutAttribute = "timeout"
val ConnectionClose = config.getBool("akka.http.connection-close", true)
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@ -64,7 +64,7 @@ import Types._
*
*/
trait Mist {
import javax.servlet.{ServletContext}
import javax.servlet.ServletContext
import MistSettings._
/**
@ -84,28 +84,21 @@ trait Mist {
response: HttpServletResponse)
(builder: (() => tAsyncRequestContext) => RequestMethod) = {
def suspend: tAsyncRequestContext = {
//
// set to right now, which is effectively "already expired"
//
response.setDateHeader("Expires", System.currentTimeMillis)
response.setHeader("Cache-Control", "no-cache, must-revalidate")
//
// no keep-alive?
//
if (ConnectionClose) response.setHeader("Connection","close")
//
// suspend the request
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
//
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
}
//
// shoot the message to the root endpoint for processing
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
//
val method = builder(suspend _)
if (method.go) _root ! method
}
@ -117,7 +110,6 @@ trait Mist {
def initMist(context: ServletContext) {
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
_factory = if (major >= 3) {
Some(Servlet30ContextMethodFactory)
} else if (server.toLowerCase startsWith JettyServer) {
@ -200,7 +192,7 @@ object Endpoint {
/**
* leverage the akka config to tweak the dispatcher for our endpoints
*/
final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
@ -236,21 +228,17 @@ trait Endpoint { this: Actor =>
* Message handling common to all endpoints, must be chained
*/
protected def handleHttpRequest: Receive = {
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
//
// dispatch the suspended requests
//
case req: RequestMethod => {
val uri = req.request.getPathInfo
val endpoints = _attachments.filter { _._1(uri) }
if (!endpoints.isEmpty)
endpoints.foreach { _._2(uri) ! req }
if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
else {
self.sender match {
case Some(s) => s reply NoneAvailable(uri, req)
@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
final val Root = "/"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// adopt the configured id
//
if (RootActorBuiltin) self.id = RootActorID
override def preStart =
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
//TODO: Is this needed?
//override def postRestart =
// _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
def recv: Receive = {
case NoneAvailable(uri, req) => _na(uri, req)
case unknown => {}
@ -317,10 +297,7 @@ trait RequestMethod {
import java.io.IOException
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
//
// required implementations
//
val builder: () => tAsyncRequestContext
/**
@ -365,23 +342,19 @@ trait RequestMethod {
def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
def complete(status: Int, body: String, headers: Headers): Boolean =
rawComplete {
res => {
rawComplete { res =>
res.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
res.getWriter.write(body)
res.getWriter.close
res.flushBuffer
}
}
def rawComplete(completion: HttpServletResponse => Unit): Boolean =
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (!suspended) {
false
}
if (!suspended) false
else {
completion(response)
pipe.complete
@ -392,15 +365,12 @@ trait RequestMethod {
EventHandler.error(io, this, io.getMessage)
false
}
}
case None =>
false
case None => false
}
def complete(t: Throwable) {
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (suspended) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
@ -410,16 +380,13 @@ trait RequestMethod {
case io: IOException =>
EventHandler.error(io, this, io.getMessage)
}
}
case None => {}
}
}
/**
/*
* Utility methods to send responses back
*/
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)

View file

@ -214,7 +214,7 @@ abstract class RemoteClient private[akka] (
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
@ -1016,9 +1016,15 @@ class RemoteServerHandler(
val typedActor = createTypedActor(actorInfo, channel)
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
val (ownerTypeHint, argClasses, args) =
MessageSerializer
.deserialize(request.getMessage)
.asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
def resolveMethod(bottomType: Class[_],
typeHint: String,
methodName: String,
methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
var typeToResolve = bottomType
var targetMethod: java.lang.reflect.Method = null
var firstException: NoSuchMethodException = null

View file

@ -8,7 +8,7 @@ import org.junit.runner.RunWith
import akka.serialization.Serializer.ScalaJSON
//TODO: FIXME WHY IS THIS COMMENTED OUT?
/*
object Protocols {
import sjson.json.DefaultProtocol._
case class Shop(store: String, item: String, price: Int)
@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends
}
}
}
*/

View file

@ -364,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
/**
* Returns the home address and port for this actor.
*/
def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
}
object TypedActorConfiguration {

View file

@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
private var components: List[SuperviseTypedActor] = _
private var supervised: List[Supervise] = Nil
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
def reset = synchronized {
modules = new java.util.ArrayList[Module]
configRegistry = new HashMap[Class[_], SuperviseTypedActor]
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
methodToUriRegistry = new HashMap[Method, String]
injector = null