diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 882331b177..e70b4a98ae 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,9 +28,9 @@ import akka.japi. {Creator, Procedure} /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { - + /** * Java API */ @@ -75,6 +75,7 @@ class IllegalActorStateException private[akka](message: String) extends AkkaEx class ActorKilledException private[akka](message: String) extends AkkaException(message) class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message) +class InvalidMessageException private[akka](message: String) extends AkkaException(message) /** * This message is thrown by default when an Actors behavior doesn't match a message @@ -90,7 +91,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception * @author Jonas Bonér */ object Actor extends ListenerManagement { - + /** * Add shutdown cleanups */ @@ -128,7 +129,7 @@ object Actor extends ListenerManagement { type Receive = PartialFunction[Any, Unit] private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - + /** * Creates an ActorRef out of the Actor with type T. *
@@ -443,8 +444,10 @@ trait Actor {
// =========================================
private[akka] final def apply(msg: Any) = {
+ if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
+ throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
- msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
+ msg match {
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 81574dacff..673cb487a1 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
failedActor match {
case p: Proxyable =>
- //p.swapProxiedActor(freshActor) //TODO: broken
failedActor.preRestart(reason)
failedActor.postRestart(reason)
case _ =>
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index 31d5dca0eb..83c30f23e0 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -160,12 +160,11 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
- val created = new AtomicInteger
- val alive = new AtomicInteger
- @volatile var debugLifecycle = false
-}
-// FIXME fix the issues with using the monitoring in MonitorableThread
+ // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
+ val created = new AtomicInteger
+ val alive = new AtomicInteger
+}
/**
* @author Jonas Bonér
@@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String)
})
override def run = {
- val debug = MonitorableThread.debugLifecycle
try {
MonitorableThread.alive.incrementAndGet
super.run
diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
index 8c37845baf..ba4e508454 100644
--- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
+++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala
@@ -1,11 +1,15 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
package akka.util
import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
-class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
+class BoundedBlockingQueue[E <: AnyRef](
+ val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")
@@ -32,7 +36,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
require(backing.offer(e))
notEmpty.signal()
} finally {
- lock.unlock()
+ lock.unlock()
}
}
@@ -319,4 +323,4 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
lock.unlock()
}
}
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
index f99f5f5305..83dc4e294b 100644
--- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite {
actor.stop
}
- // FIXME: implement Futures.awaitEither, and uncomment these two tests
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
diff --git a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
deleted file mode 100644
index 603b17e336..0000000000
--- a/akka-actor/src/test/scala/akka/dispatch/ThreadBasedDispatcherSpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package akka.dispatch
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-import org.scalatest.junit.JUnitSuite
-import org.junit.{Test, Before}
-
-import akka.actor.Actor
-import Actor._
-
-// FIXME use this test when we have removed the MessageInvoker classes
-/*
-class ThreadBasedDispatcherSpec extends JUnitSuite {
- private var threadingIssueDetected: AtomicBoolean = null
- val key1 = actorOf(new Actor { def receive = { case _ => {}} })
- val key2 = actorOf(new Actor { def receive = { case _ => {}} })
- val key3 = actorOf(new Actor { def receive = { case _ => {}} })
-
- class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
- val guardLock: Lock = new ReentrantLock
-
- def invoke(message: MessageInvocation) {
- try {
- if (threadingIssueDetected.get) return
- if (guardLock.tryLock) {
- handleLatch.countDown
- } else {
- threadingIssueDetected.set(true)
- }
- } catch {
- case e: Exception => threadingIssueDetected.set(true)
- } finally {
- guardLock.unlock
- }
- }
- }
-
- @Before
- def setUp = {
- threadingIssueDetected = new AtomicBoolean(false)
- }
-
- @Test
- def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
- internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
- }
-
- @Test
- def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
- internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
- }
-
- private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = {
- val guardLock = new ReentrantLock
- val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = {
- val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.dispatch(new MessageInvocation(key1, i, None, None))
- }
- assert(handleLatch.await(5, TimeUnit.SECONDS))
- assert(!threadingIssueDetected.get)
- dispatcher.postStop
- }
-}
-*/
diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala
index eb91b9737f..379cbfb36d 100644
--- a/akka-http/src/main/scala/akka/http/Mist.scala
+++ b/akka-http/src/main/scala/akka/http/Mist.scala
@@ -4,7 +4,7 @@
package akka.http
-import akka.actor.{ActorRegistry, ActorRef, Actor}
+import akka.actor.{ActorRef, Actor}
import akka.event.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@@ -17,8 +17,8 @@ import javax.servlet.Filter
object MistSettings {
import akka.config.Config._
- final val JettyServer = "jetty"
- final val TimeoutAttribute = "timeout"
+ val JettyServer = "jetty"
+ val TimeoutAttribute = "timeout"
val ConnectionClose = config.getBool("akka.http.connection-close", true)
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@@ -64,7 +64,7 @@ import Types._
*
*/
trait Mist {
- import javax.servlet.{ServletContext}
+ import javax.servlet.ServletContext
import MistSettings._
/**
@@ -84,28 +84,21 @@ trait Mist {
response: HttpServletResponse)
(builder: (() => tAsyncRequestContext) => RequestMethod) = {
def suspend: tAsyncRequestContext = {
- //
+
// set to right now, which is effectively "already expired"
- //
response.setDateHeader("Expires", System.currentTimeMillis)
response.setHeader("Cache-Control", "no-cache, must-revalidate")
- //
// no keep-alive?
- //
if (ConnectionClose) response.setHeader("Connection","close")
- //
// suspend the request
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
- //
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
}
- //
// shoot the message to the root endpoint for processing
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
- //
val method = builder(suspend _)
if (method.go) _root ! method
}
@@ -117,7 +110,6 @@ trait Mist {
def initMist(context: ServletContext) {
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
-
_factory = if (major >= 3) {
Some(Servlet30ContextMethodFactory)
} else if (server.toLowerCase startsWith JettyServer) {
@@ -200,7 +192,7 @@ object Endpoint {
/**
* leverage the akka config to tweak the dispatcher for our endpoints
*/
- final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
+ val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
@@ -236,25 +228,21 @@ trait Endpoint { this: Actor =>
* Message handling common to all endpoints, must be chained
*/
protected def handleHttpRequest: Receive = {
- //
+
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
- //
case Attach(hook, provider) => _attach(hook, provider)
- //
// dispatch the suspended requests
- //
case req: RequestMethod => {
val uri = req.request.getPathInfo
val endpoints = _attachments.filter { _._1(uri) }
- if (!endpoints.isEmpty)
- endpoints.foreach { _._2(uri) ! req }
+ if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
else {
self.sender match {
case Some(s) => s reply NoneAvailable(uri, req)
- case None => _na(uri, req)
+ case None => _na(uri, req)
}
}
}
@@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
final val Root = "/"
- //
// use the configurable dispatcher
- //
self.dispatcher = Endpoint.Dispatcher
- //
// adopt the configured id
- //
if (RootActorBuiltin) self.id = RootActorID
override def preStart =
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
- //TODO: Is this needed?
- //override def postRestart =
- // _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
-
def recv: Receive = {
case NoneAvailable(uri, req) => _na(uri, req)
case unknown => {}
@@ -317,10 +297,7 @@ trait RequestMethod {
import java.io.IOException
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
- //
// required implementations
- //
-
val builder: () => tAsyncRequestContext
/**
@@ -353,35 +330,31 @@ trait RequestMethod {
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
request.getHeader(name) match {
case null => default(null)
- case s => s
- }
+ case s => s
+ }
def getParameterOrElse(name: String, default: Function[Any, String]): String =
request.getParameter(name) match {
case null => default(null)
- case s => s
+ case s => s
}
def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
def complete(status: Int, body: String, headers: Headers): Boolean =
- rawComplete {
- res => {
- res.setStatus(status)
- headers foreach {h => response.setHeader(h._1, h._2)}
- res.getWriter.write(body)
- res.getWriter.close
- res.flushBuffer
- }
+ rawComplete { res =>
+ res.setStatus(status)
+ headers foreach {h => response.setHeader(h._1, h._2)}
+ res.getWriter.write(body)
+ res.getWriter.close
+ res.flushBuffer
}
def rawComplete(completion: HttpServletResponse => Unit): Boolean =
context match {
- case Some(pipe) => {
+ case Some(pipe) =>
try {
- if (!suspended) {
- false
- }
+ if (!suspended) false
else {
completion(response)
pipe.complete
@@ -392,34 +365,28 @@ trait RequestMethod {
EventHandler.error(io, this, io.getMessage)
false
}
- }
-
- case None =>
- false
+ case None => false
}
def complete(t: Throwable) {
context match {
- case Some(pipe) => {
+ case Some(pipe) =>
try {
if (suspended) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
pipe.complete
}
} catch {
- case io: IOException =>
+ case io: IOException =>
EventHandler.error(io, this, io.getMessage)
}
- }
-
case None => {}
}
}
- /**
+ /*
* Utility methods to send responses back
*/
-
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 3d603508e8..dd7a22df52 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -214,7 +214,7 @@ abstract class RemoteClient private[akka] (
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
- actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race
+ actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
@@ -1016,9 +1016,15 @@ class RemoteServerHandler(
val typedActor = createTypedActor(actorInfo, channel)
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
- val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
+ val (ownerTypeHint, argClasses, args) =
+ MessageSerializer
+ .deserialize(request.getMessage)
+ .asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
- def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
+ def resolveMethod(bottomType: Class[_],
+ typeHint: String,
+ methodName: String,
+ methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
var typeToResolve = bottomType
var targetMethod: java.lang.reflect.Method = null
var firstException: NoSuchMethodException = null
diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
index 02b29e6de1..cd8f71058e 100644
--- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
+++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
@@ -8,7 +8,7 @@ import org.junit.runner.RunWith
import akka.serialization.Serializer.ScalaJSON
//TODO: FIXME WHY IS THIS COMMENTED OUT?
-/*
+
object Protocols {
import sjson.json.DefaultProtocol._
case class Shop(store: String, item: String, price: Int)
@@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends
}
}
}
-*/
diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
index e32ed03c32..1cdf735e8e 100644
--- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -364,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
/**
* Returns the home address and port for this actor.
*/
- def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
+ def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
}
object TypedActorConfiguration {
diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
index 315467f2ee..f2f2a7a1fc 100644
--- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
+++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala
@@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
private var components: List[SuperviseTypedActor] = _
private var supervised: List[Supervise] = Nil
private var bindings: List[DependencyBinding] = Nil
- private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
def reset = synchronized {
modules = new java.util.ArrayList[Module]
- configRegistry = new HashMap[Class[_], SuperviseTypedActor]
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
methodToUriRegistry = new HashMap[Method, String]
injector = null