Merge remote-tracking branch 'origin/master' into ticket-622

This commit is contained in:
Derek Williams 2011-03-30 22:07:13 -06:00
commit bef05febbc
20 changed files with 219 additions and 271 deletions

View file

@ -28,9 +28,9 @@ import akka.japi. {Creator, Procedure}
/* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
extends AutoReceivedMessage with LifeCycleMessage {
/**
* Java API
*/
@ -75,6 +75,7 @@ class IllegalActorStateException private[akka](message: String) extends AkkaEx
class ActorKilledException private[akka](message: String) extends AkkaException(message)
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
class InvalidMessageException private[akka](message: String) extends AkkaException(message)
/**
* This message is thrown by default when an Actors behavior doesn't match a message
@ -90,7 +91,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor extends ListenerManagement {
/**
* Add shutdown cleanups
*/
@ -128,7 +129,7 @@ object Actor extends ListenerManagement {
type Receive = PartialFunction[Any, Unit]
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
/**
* Creates an ActorRef out of the Actor with type T.
* <pre>
@ -443,8 +444,10 @@ trait Actor {
// =========================================
private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
msg match {
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)

View file

@ -902,7 +902,6 @@ class LocalActorRef private[akka] (
failedActor match {
case p: Proxyable =>
//p.swapProxiedActor(freshActor) //TODO: broken
failedActor.preRestart(reason)
failedActor.postRestart(reason)
case _ =>

View file

@ -78,12 +78,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override private[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
if (mbox.dispatcherLock.locked && attemptDonationOf(invocation, mbox)) {
/*if (!mbox.isEmpty && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here
} else {
} else {*/
mbox enqueue invocation
registerForExecution(mbox)
}
//}
}
override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
@ -110,13 +110,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/**
* Returns true if the donation succeeded or false otherwise
*/
protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
/*protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
case null => false
case recipient => donate(message, recipient)
}
}
}*/
/**
* Rewrites the message and adds that message to the recipients mailbox

View file

@ -57,7 +57,7 @@ object Futures {
}
/**
* Java API
* Java API.
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
@ -68,6 +68,10 @@ object Futures {
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
* Example:
* <pre>
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
if(futures.isEmpty) {
@ -115,6 +119,10 @@ object Futures {
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
* <pre>
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = {
if (futures.isEmpty)
@ -138,7 +146,7 @@ object Futures {
}
/**
* Java API
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
@ -147,18 +155,25 @@ object Futures {
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
/**
* FIXME document Futures.sequence
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
* FIXME document Futures.traverse
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b)
}.map(_.result)
//Deprecations
// =====================================
// Deprecations
// =====================================
/**
* (Blocking!)
*/
@ -299,6 +314,12 @@ sealed trait Future[+T] {
/**
* When the future is compeleted with a valid result, apply the provided
* PartialFunction to the result.
* <pre>
* val result = future receive {
* case Foo => "foo"
* case Bar => "bar"
* }.await.result
* </pre>
*/
final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f =>
val optr = f.result
@ -313,6 +334,14 @@ sealed trait Future[+T] {
* result of this Future if a match is found, or else return a MatchError.
* If this Future is completed with an exception then the new Future will
* also contain this exception.
* Example:
* <pre>
* val future1 = for {
* a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
* b <- actor !!! Req(a) collect { case Res(x: String) => x }
* c <- actor !!! Req(7) collect { case Res(x: String) => x }
* } yield b + "-" + c
* </pre>
*/
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
@ -343,6 +372,14 @@ sealed trait Future[+T] {
* Creates a new Future by applying a function to the successful result of
* this Future. If this Future is completed with an exception then the new
* Future will also contain this exception.
* Example:
* <pre>
* val future1 = for {
* a: Int <- actor !!! "Hello" // returns 5
* b: String <- actor !!! a // returns "10"
* c: String <- actor !!! 7 // returns "14"
* } yield b + "-" + c
* </pre>
*/
final def map[A](f: T => A): Future[A] = {
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
@ -371,6 +408,14 @@ sealed trait Future[+T] {
* this Future, and returns the result of the function as the new Future.
* If this Future is completed with an exception then the new Future will
* also contain this exception.
* Example:
* <pre>
* val future1 = for {
* a: Int <- actor !!! "Hello" // returns 5
* b: String <- actor !!! a // returns "10"
* c: String <- actor !!! 7 // returns "14"
* } yield b + "-" + c
* </pre>
*/
final def flatMap[A](f: T => Future[A]): Future[A] = {
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
@ -425,7 +470,7 @@ sealed trait Future[+T] {
}
/**
* Returns the current result, throws the exception is one has been raised, else returns None
* Returns the current result, throws the exception is one has been raised, else returns None
*/
final def resultOrException: Option[T] = {
val v = value
@ -450,31 +495,31 @@ sealed trait Future[+T] {
}
/**
* Essentially this is the Promise (or write-side) of a Future (read-side)
* Essentially this is the Promise (or write-side) of a Future (read-side).
*/
trait CompletableFuture[T] extends Future[T] {
/**
* Completes this Future with the specified result, if not already completed,
* returns this
* Completes this Future with the specified result, if not already completed.
* @return this
*/
def complete(value: Either[Throwable, T]): CompletableFuture[T]
/**
* Completes this Future with the specified result, if not already completed,
* returns this
* Completes this Future with the specified result, if not already completed.
* @return this
*/
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
/**
* Completes this Future with the specified exception, if not already completed,
* returns this
* Completes this Future with the specified exception, if not already completed.
* @return this
*/
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed
* returns this
* unless this Future has already been completed.
* @return this.
*/
final def completeWith(other: Future[T]): CompletableFuture[T] = {
other onComplete { f => complete(f.value.get) }
@ -482,18 +527,18 @@ trait CompletableFuture[T] extends Future[T] {
}
/**
* Alias for complete(Right(value))
* Alias for complete(Right(value)).
*/
final def << (value: T): CompletableFuture[T] = complete(Right(value))
/**
* Alias for completeWith(other)
* Alias for completeWith(other).
*/
final def << (other : Future[T]): CompletableFuture[T] = completeWith(other)
}
/**
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
* The default concrete Future implementation.
*/
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {

View file

@ -160,12 +160,11 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
// FIXME fix the issues with using the monitoring in MonitorableThread
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
val created = new AtomicInteger
val alive = new AtomicInteger
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -178,7 +177,6 @@ class MonitorableThread(runnable: Runnable, name: String)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
try {
MonitorableThread.alive.incrementAndGet
super.run

View file

@ -25,7 +25,7 @@ import akka.AkkaException
* case EventHandler.Warning(instance, message) => ...
* case EventHandler.Info(instance, message) => ...
* case EventHandler.Debug(instance, message) => ...
* case genericEvent => ...
* case genericEvent => ...
* }
* })
*
@ -35,7 +35,7 @@ import akka.AkkaException
* </pre>
* <p/>
* However best is probably to register the listener in the 'akka.conf'
* configuration file.
* configuration file.
* <p/>
* Log an error event:
* <pre>
@ -45,7 +45,7 @@ import akka.AkkaException
* <pre>
* EventHandler.error(exception, this, message.toString)
* </pre>
*
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
@ -73,7 +73,7 @@ object EventHandler extends ListenerManagement {
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
val generic = "[GENERIC] [%s] [%s]".intern
val ID = "event:handler".intern
class EventHandlerException extends AkkaException
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
@ -129,7 +129,7 @@ object EventHandler extends ListenerManagement {
else if (eventClass.isInstanceOf[Debug]) DebugLevel
else DebugLevel
}
class DefaultListener extends Actor {
self.id = ID
self.dispatcher = EventHandlerDispatcher

View file

@ -1,11 +1,15 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
class BoundedBlockingQueue[E <: AnyRef](
val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")
@ -32,7 +36,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
require(backing.offer(e))
notEmpty.signal()
} finally {
lock.unlock()
lock.unlock()
}
}
@ -319,4 +323,4 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
lock.unlock()
}
}
}
}

View file

@ -14,6 +14,8 @@ import java.util.concurrent.TimeUnit
import akka.util.duration._
import akka.Testing
object FSMActorSpec {
@ -100,7 +102,7 @@ class FSMActorSpec extends JUnitSuite {
def unlockTheLock = {
// lock that locked after being open for 1 sec
val lock = Actor.actorOf(new Lock("33221", (1, TimeUnit.SECONDS))).start
val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start
val transitionTester = Actor.actorOf(new Actor { def receive = {
case Transition(_, _, _) => transitionCallBackLatch.open
@ -108,7 +110,7 @@ class FSMActorSpec extends JUnitSuite {
}}).start
lock ! SubscribeTransitionCallBack(transitionTester)
assert(initialStateLatch.tryAwait(1, TimeUnit.SECONDS))
assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
lock ! '3'
lock ! '3'
@ -116,14 +118,14 @@ class FSMActorSpec extends JUnitSuite {
lock ! '2'
lock ! '1'
assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
assert(transitionLatch.tryAwait(1, TimeUnit.SECONDS))
assert(transitionCallBackLatch.tryAwait(1, TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
lock ! "not_handled"
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))
assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
val answerLatch = new StandardLatch
object Hello
@ -136,9 +138,9 @@ class FSMActorSpec extends JUnitSuite {
}
}).start
tester ! Hello
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
tester ! Bye
assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS))
assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS))
}
}

View file

@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{Duration, Switch}
import akka.Testing
object ActorModelSpec {
@ -224,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start
a ! CountDown(start)
assertCountDown(start,3000, "Should process first message within 3 seconds")
assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Wait(1000)
a ! CountDown(oneAtATime)
// in case of serialization violation, restart would happen instead of count down
assertCountDown(oneAtATime,1500,"Processed message when allowed")
assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop
@ -245,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite {
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
for (i <- 1 to 10) { start }
assertCountDown(counter, 3000, "Should process 200 messages")
assertCountDown(counter, Testing.time(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop
@ -263,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite {
val aStart,aStop,bParallel = new CountDownLatch(1)
a ! Meet(aStart,aStop)
assertCountDown(aStart,3000, "Should process first message within 3 seconds")
assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3000, "Should process other actors in parallel")
assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop
@ -281,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite {
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, 3000, "Should be suspended+resumed and done with next message within 3 seconds")
assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
@ -297,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite {
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
dispatcher.resume(a)
assertCountDown(done, 3000, "Should resume processing of messages when resumed")
assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
@ -314,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite {
(1 to num) foreach {
_ => newTestActor.start ! cachedMessage
}
assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns")
assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns")
}
for(run <- 1 to 3) {
flood(10000)

View file

@ -132,7 +132,6 @@ class FutureSpec extends JUnitSuite {
actor.stop
}
// FIXME: implement Futures.awaitEither, and uncomment these two tests
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start

View file

@ -1,91 +0,0 @@
package akka.dispatch
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import akka.actor.Actor
import Actor._
// FIXME use this test when we have removed the MessageInvoker classes
/*
class ThreadBasedDispatcherSpec extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
val key1 = actorOf(new Actor { def receive = { case _ => {}} })
val key2 = actorOf(new Actor { def receive = { case _ => {}} })
val key3 = actorOf(new Actor { def receive = { case _ => {}} })
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
@Before
def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def shouldMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def shouldMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially(): Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder(): Unit = {
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.dispatch(new MessageInvocation(key1, i, None, None))
}
assert(handleLatch.await(5, TimeUnit.SECONDS))
assert(!threadingIssueDetected.get)
dispatcher.postStop
}
}
*/

View file

@ -4,7 +4,7 @@
package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.{ActorRef, Actor}
import akka.event.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@ -17,8 +17,8 @@ import javax.servlet.Filter
object MistSettings {
import akka.config.Config._
final val JettyServer = "jetty"
final val TimeoutAttribute = "timeout"
val JettyServer = "jetty"
val TimeoutAttribute = "timeout"
val ConnectionClose = config.getBool("akka.http.connection-close", true)
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@ -64,7 +64,7 @@ import Types._
*
*/
trait Mist {
import javax.servlet.{ServletContext}
import javax.servlet.ServletContext
import MistSettings._
/**
@ -84,28 +84,21 @@ trait Mist {
response: HttpServletResponse)
(builder: (() => tAsyncRequestContext) => RequestMethod) = {
def suspend: tAsyncRequestContext = {
//
// set to right now, which is effectively "already expired"
//
response.setDateHeader("Expires", System.currentTimeMillis)
response.setHeader("Cache-Control", "no-cache, must-revalidate")
//
// no keep-alive?
//
if (ConnectionClose) response.setHeader("Connection","close")
//
// suspend the request
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
//
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
}
//
// shoot the message to the root endpoint for processing
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
//
val method = builder(suspend _)
if (method.go) _root ! method
}
@ -117,7 +110,6 @@ trait Mist {
def initMist(context: ServletContext) {
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
_factory = if (major >= 3) {
Some(Servlet30ContextMethodFactory)
} else if (server.toLowerCase startsWith JettyServer) {
@ -200,7 +192,7 @@ object Endpoint {
/**
* leverage the akka config to tweak the dispatcher for our endpoints
*/
final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
@ -236,25 +228,21 @@ trait Endpoint { this: Actor =>
* Message handling common to all endpoints, must be chained
*/
protected def handleHttpRequest: Receive = {
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
//
// dispatch the suspended requests
//
case req: RequestMethod => {
val uri = req.request.getPathInfo
val endpoints = _attachments.filter { _._1(uri) }
if (!endpoints.isEmpty)
endpoints.foreach { _._2(uri) ! req }
if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
else {
self.sender match {
case Some(s) => s reply NoneAvailable(uri, req)
case None => _na(uri, req)
case None => _na(uri, req)
}
}
}
@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
final val Root = "/"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// adopt the configured id
//
if (RootActorBuiltin) self.id = RootActorID
override def preStart =
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
//TODO: Is this needed?
//override def postRestart =
// _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
def recv: Receive = {
case NoneAvailable(uri, req) => _na(uri, req)
case unknown => {}
@ -317,10 +297,7 @@ trait RequestMethod {
import java.io.IOException
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
//
// required implementations
//
val builder: () => tAsyncRequestContext
/**
@ -353,35 +330,31 @@ trait RequestMethod {
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
request.getHeader(name) match {
case null => default(null)
case s => s
}
case s => s
}
def getParameterOrElse(name: String, default: Function[Any, String]): String =
request.getParameter(name) match {
case null => default(null)
case s => s
case s => s
}
def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
def complete(status: Int, body: String, headers: Headers): Boolean =
rawComplete {
res => {
res.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
res.getWriter.write(body)
res.getWriter.close
res.flushBuffer
}
rawComplete { res =>
res.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
res.getWriter.write(body)
res.getWriter.close
res.flushBuffer
}
def rawComplete(completion: HttpServletResponse => Unit): Boolean =
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (!suspended) {
false
}
if (!suspended) false
else {
completion(response)
pipe.complete
@ -392,34 +365,28 @@ trait RequestMethod {
EventHandler.error(io, this, io.getMessage)
false
}
}
case None =>
false
case None => false
}
def complete(t: Throwable) {
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (suspended) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
pipe.complete
}
} catch {
case io: IOException =>
case io: IOException =>
EventHandler.error(io, this, io.getMessage)
}
}
case None => {}
}
}
/**
/*
* Utility methods to send responses back
*/
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)

View file

@ -39,6 +39,9 @@ import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
import akka.AkkaException
class RemoteClientMessageBufferException(message: String) extends AkkaException(message)
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -159,7 +162,8 @@ abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true)
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true)
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
@ -167,7 +171,10 @@ abstract class RemoteClient private[akka] (
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
}
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
@ -207,7 +214,7 @@ abstract class RemoteClient private[akka] (
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { //TODO: find better strategy to prevent race
actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
@ -243,7 +250,10 @@ abstract class RemoteClient private[akka] (
case e: Throwable =>
// add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog) pendingRequests.add((true, null, request))
if (useTransactionLog) {
if (!pendingRequests.offer((true, null, request)))
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
}
else throw e
}
None
@ -256,7 +266,8 @@ abstract class RemoteClient private[akka] (
def handleRequestReplyError(future: ChannelFuture) = {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
if (useTransactionLog) {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
@ -1005,9 +1016,15 @@ class RemoteServerHandler(
val typedActor = createTypedActor(actorInfo, channel)
//FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo?
val (ownerTypeHint, argClasses, args) = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
val (ownerTypeHint, argClasses, args) =
MessageSerializer
.deserialize(request.getMessage)
.asInstanceOf[Tuple3[String,Array[Class[_]],Array[AnyRef]]]
def resolveMethod(bottomType: Class[_], typeHint: String, methodName: String, methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
def resolveMethod(bottomType: Class[_],
typeHint: String,
methodName: String,
methodSignature: Array[Class[_]]): java.lang.reflect.Method = {
var typeToResolve = bottomType
var targetMethod: java.lang.reflect.Method = null
var firstException: NoSuchMethodException = null

View file

@ -8,7 +8,7 @@ import org.junit.runner.RunWith
import akka.serialization.Serializer.ScalaJSON
//TODO: FIXME WHY IS THIS COMMENTED OUT?
/*
object Protocols {
import sjson.json.DefaultProtocol._
case class Shop(store: String, item: String, price: Int)
@ -51,4 +51,3 @@ class ScalaJSONSerializerSpec extends
}
}
}
*/

View file

@ -16,8 +16,6 @@ class ConfigSpec extends WordSpec with MustMatchers {
"contain all configuration properties for akka-stm that are used in code with their correct defaults" in {
import Config.config._
getInt("akka.storage.max-retries") must equal(Some(10))
getBool("akka.stm.blocking-allowed") must equal(Some(false))
getBool("akka.stm.fair") must equal(Some(true))
getBool("akka.stm.interruptible") must equal(Some(false))

View file

@ -17,10 +17,10 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.BeanProperty
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
import scala.reflect.BeanProperty
/**
* TypedActor is a type-safe actor made out of a POJO with interface.
* Void methods are turned into fire-forget messages.
@ -36,7 +36,7 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
* class TestActorImpl extends TypedActor implements TestActor {
*
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
* Pong pong = (Pong) context().sender();
* pong.hit(count++);
* }
*
@ -124,15 +124,15 @@ abstract class TypedActor extends Actor with Proxyable {
* This class does not contain static information but is updated by the runtime system
* at runtime.
* <p/>
* You can get a hold of the context using either the 'getContext()' or 'context'
* methods from the 'TypedActor' base class.
* You can get a hold of the context using the 'context()'
* method from the 'TypedActor' base class.
* <p/>
*
* Here is an example of usage (in Java):
* <pre>
* class PingImpl extends TypedActor implements Ping {
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
* Pong pong = (Pong) context().sender();
* pong.hit(count++);
* }
* }
@ -148,7 +148,12 @@ abstract class TypedActor extends Actor with Proxyable {
* }
* </pre>
*/
@BeanProperty val context: TypedActorContext = new TypedActorContext(self)
val context: TypedActorContext = new TypedActorContext(self)
/**
* @deprecated 'getContext()' is deprecated use 'context()'
*/
def getContext: TypedActorContext = context
/**
* This method is used to resolve the Future for TypedActor methods that are defined to return a
@ -180,15 +185,16 @@ abstract class TypedActor extends Actor with Proxyable {
case joinPoint: JoinPoint =>
SenderContextInfo.senderActorRef.value = self
SenderContextInfo.senderProxy.value = proxy
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed
else self.reply(joinPoint.proceed)
case coordinated @ Coordinated(joinPoint: JoinPoint) =>
SenderContextInfo.senderActorRef.value = self
SenderContextInfo.senderProxy.value = proxy
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
coordinated atomic { joinPoint.proceed }
case Link(proxy) => self.link(proxy)
case Unlink(proxy) => self.unlink(proxy)
case unexpected => throw new IllegalActorStateException(
@ -255,7 +261,7 @@ abstract class TypedActor extends Actor with Proxyable {
* <pre>
* class PingImpl extends TypedActor implements Ping {
* public void hit(int count) {
* Pong pong = (Pong) getContext().getSender();
* Pong pong = (Pong) context().sender();
* pong.hit(count++);
* }
* }
@ -277,7 +283,8 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
private[akka] var _sender: AnyRef = _
/**
5 * Returns the uuid for the actor.
* Returns the uuid for the actor.
* @deprecated use 'uuid()'
*/
def getUuid() = actorRef.uuid
@ -287,31 +294,39 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
def uuid = actorRef.uuid
def timeout = actorRef.timeout
/**
* @deprecated use 'timeout()'
*/
def getTimout = timeout
def setTimout(timeout: Long) = actorRef.timeout = timeout
def id = actorRef.id
/**
* @deprecated use 'id()'
*/
def getId = id
def setId(id: String) = actorRef.id = id
def receiveTimeout = actorRef.receiveTimeout
/**
* @deprecated use 'receiveTimeout()'
*/
def getReceiveTimeout = receiveTimeout
def setReceiveTimeout(timeout: Long) = actorRef.setReceiveTimeout(timeout)
/**
* Is the actor running?
*/
def mailboxSize = actorRef.mailboxSize
def dispatcher = actorRef.getDispatcher
def lifeCycle = actorRef.getLifeCycle
def isRunning: Boolean = actorRef.isRunning
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = actorRef.isShutdown
/**
* Is the actor ever started?
*/
def isUnstarted: Boolean = actorRef.isUnstarted
def isBeingRestarted: Boolean = actorRef.isBeingRestarted
/**
* Returns the current sender reference.
@ -349,7 +364,7 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) {
/**
* Returns the home address and port for this actor.
*/
def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)//TODO: REVISIT: Sensible to return null?
def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null)
}
object TypedActorConfiguration {
@ -449,7 +464,7 @@ object TypedActor {
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
*/
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
newInstance(intfClass, targetClass, TypedActorConfiguration())
/**

View file

@ -31,7 +31,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
private var components: List[SuperviseTypedActor] = _
private var supervised: List[Supervise] = Nil
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed?
private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@ -167,7 +166,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
def reset = synchronized {
modules = new java.util.ArrayList[Module]
configRegistry = new HashMap[Class[_], SuperviseTypedActor]
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
methodToUriRegistry = new HashMap[Method, String]
injector = null

View file

@ -12,6 +12,8 @@ import akka.config.Supervision._
import java.util.concurrent.CountDownLatch
import akka.config.TypedActorConfigurator
import akka.Testing
/**
* @author Martin Krasser
*/
@ -95,7 +97,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
it("should be stopped when supervision cannot handle the problem in") {
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
@ -121,7 +123,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
it("should be restarted when supervision handles the problem in") {
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000))
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
@ -146,4 +148,4 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
}
}
}
}
}

View file

@ -146,7 +146,11 @@ akka {
}
client {
retry-message-send-on-failure = on
buffering {
retry-message-send-on-failure = on
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
@ -154,8 +158,4 @@ akka {
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
}
}
storage {
max-retries = 10
}
}

View file

@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val scalaCompileSettings =
Seq("-deprecation",
"-Xmigration",
//"-optimise",
"-optimise",
"-encoding", "utf8")
val javaCompileSettings = Seq("-Xlint:unchecked")
@ -126,7 +126,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Compile
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //LGPL 2.1
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
@ -452,8 +452,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
}
def akkaArtifacts = descendents(info.projectPath / "dist", "*-" + version + ".jar")
lazy val integrationTestsEnabled = systemOptional[Boolean]("integration.tests",false)
lazy val stressTestsEnabled = systemOptional[Boolean]("stress.tests",false)
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info)
@ -472,21 +470,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc)
override def pomPostProcess(node: scala.xml.Node): scala.xml.Node = mcPom(AkkaParentProject.this.moduleConfigurations)(super.pomPostProcess(node))
/**
* Used for testOptions, possibility to enable the running of integration and or stresstests
*
* To enable set true and disable set false
* set integration.tests true
* set stress.tests true
*/
def createTestFilter(defaultTests: (String) => Boolean) = { TestFilter({
case s: String if defaultTests(s) => true
case s: String if integrationTestsEnabled.value => s.endsWith("TestIntegration")
case s: String if stressTestsEnabled.value => s.endsWith("TestStress")
case _ => false
}) :: Nil
lazy val excludeTestsProperty = systemOptional[String]("akka.test.exclude", "")
def excludeTests = {
val exclude = excludeTestsProperty.value
if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq
}
override def testOptions = super.testOptions ++ excludeTests.map(exclude => TestFilter(test => !test.contains(exclude)))
lazy val publishRelease = {
val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
publishTask(publishIvyModule, releaseConfiguration) dependsOn (deliver, publishLocal, makePom)