Merging in master

This commit is contained in:
Viktor Klang 2011-03-26 19:40:36 +01:00
commit 7fc7e4d60c
43 changed files with 688 additions and 674 deletions

View file

@ -19,7 +19,8 @@ import java.net.{InetAddress, UnknownHostException}
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@serializable abstract class AkkaException(message: String) extends { @serializable abstract class AkkaException(message: String = "") extends RuntimeException(message) {
import AkkaException._
val exceptionName = getClass.getName val exceptionName = getClass.getName
val uuid = "%s_%s".format(AkkaException.hostname, newUuid) val uuid = "%s_%s".format(AkkaException.hostname, newUuid)
} with RuntimeException(message) { } with RuntimeException(message) {

View file

@ -76,163 +76,6 @@ class ActorKilledException private[akka](message: String) extends AkkaEx
class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorInitializationException private[akka](message: String) extends AkkaException(message)
class ActorTimeoutException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
/**
* Error handler.
*
* Create, add and remove a listener:
* <pre>
* val errorHandlerEventListener = Actor.actorOf(new Actor {
* self.dispatcher = EventHandler.EventHandlerDispatcher
*
* def receive = {
* case EventHandler.Error(cause, instance, message) => ...
* case EventHandler.Warning(instance, message) => ...
* case EventHandler.Info(instance, message) => ...
* case EventHandler.Debug(instance, message) => ...
* }
* })
*
* EventHandler.addListener(errorHandlerEventListener)
* ...
* EventHandler.removeListener(errorHandlerEventListener)
* </pre>
*
* Log an error event:
* <pre>
* EventHandler.notify(EventHandler.Error(exception, this, message.toString))
* </pre>
* Or use the direct methods (better performance):
* <pre>
* EventHandler.error(exception, this, message.toString)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
import java.io.{StringWriter, PrintWriter}
import java.text.DateFormat
import java.util.Date
import akka.dispatch.Dispatchers
val ErrorLevel = 1
val WarningLevel = 2
val InfoLevel = 3
val DebugLevel = 4
sealed trait Event {
@transient val thread: Thread = Thread.currentThread
}
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
case class Warning(instance: AnyRef, message: String = "") extends Event
case class Info(instance: AnyRef, message: String = "") extends Event
case class Debug(instance: AnyRef, message: String = "") extends Event
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
val warning = "[WARN] [%s] [%s] [%s] %s".intern
val info = "[INFO] [%s] [%s] [%s] %s".intern
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
val generic = "[GENERIC] [%s] [%s]".intern
val ID = "event:handler".intern
val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
val level: Int = config.getString("akka.event-handler-level", "DEBUG") match {
case "ERROR" => ErrorLevel
case "WARNING" => WarningLevel
case "INFO" => InfoLevel
case "DEBUG" => DebugLevel
case unknown => throw new ConfigurationException(
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
}
def notify(event: => AnyRef) = notifyListeners(event)
def notify[T <: Event : ClassManifest](event: => T) {
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
}
def error(cause: Throwable, instance: AnyRef, message: => String) = {
if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message))
}
def warning(instance: AnyRef, message: => String) = {
if (level >= WarningLevel) notifyListeners(Warning(instance, message))
}
def info(instance: AnyRef, message: => String) = {
if (level >= InfoLevel) notifyListeners(Info(instance, message))
}
def debug(instance: AnyRef, message: => String) = {
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
}
def formattedTimestamp = DateFormat.getInstance.format(new Date)
def stackTraceFor(e: Throwable) = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
sw.toString
}
private def levelFor(eventClass: Class[_ <: Event]) = {
if (eventClass.isInstanceOf[Error]) ErrorLevel
else if (eventClass.isInstanceOf[Warning]) WarningLevel
else if (eventClass.isInstanceOf[Info]) InfoLevel
else if (eventClass.isInstanceOf[Debug]) DebugLevel
else DebugLevel
}
class DefaultListener extends Actor {
self.id = ID
self.dispatcher = EventHandlerDispatcher
def receive = {
case event @ Error(cause, instance, message) =>
println(error.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message,
stackTraceFor(cause)))
case event @ Warning(instance, message) =>
println(warning.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event @ Info(instance, message) =>
println(info.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event @ Debug(instance, message) =>
println(debug.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event =>
println(generic.format(formattedTimestamp, event.toString))
}
}
config.getList("akka.event-handlers") foreach { listenerName =>
try {
ReflectiveAccess.getClassFor[Actor](listenerName) map {
clazz => addListener(Actor.actorOf(clazz).start)
}
} catch {
case e: Exception =>
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
}
}
}
/** /**
* This message is thrown by default when an Actors behavior doesn't match a message * This message is thrown by default when an Actors behavior doesn't match a message
*/ */
@ -565,14 +408,17 @@ trait Actor {
/** /**
* Is the actor able to handle the message passed in as arguments? * Is the actor able to handle the message passed in as arguments?
*/ */
def isDefinedAt(message: Any): Boolean = message match { //Same logic as apply(msg) but without the unhandled catch-all def isDefinedAt(message: Any): Boolean = {
val behaviorStack = self.hotswap
message match { //Same logic as apply(msg) but without the unhandled catch-all
case l: AutoReceivedMessage => true case l: AutoReceivedMessage => true
case msg if self.hotswap.nonEmpty && case msg if behaviorStack.nonEmpty &&
self.hotswap.head.isDefinedAt(msg) => true behaviorStack.head.isDefinedAt(msg) => true
case msg if self.hotswap.isEmpty && case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) => true processingBehavior.isDefinedAt(msg) => true
case _ => false case _ => false
} }
}
/** /**
* Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
@ -596,14 +442,17 @@ trait Actor {
// ==== INTERNAL IMPLEMENTATION DETAILS ==== // ==== INTERNAL IMPLEMENTATION DETAILS ====
// ========================================= // =========================================
private[akka] final def apply(msg: Any) = msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException? private[akka] final def apply(msg: Any) = {
val behaviorStack = self.hotswap
msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException?
case l: AutoReceivedMessage => autoReceiveMessage(l) case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if self.hotswap.nonEmpty && case msg if behaviorStack.nonEmpty &&
self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
case msg if self.hotswap.isEmpty && case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg)
case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior
} }
}
private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
case HotSwap(code, discardOld) => become(code(self), discardOld) case HotSwap(code, discardOld) => become(code(self), discardOld)
@ -615,8 +464,8 @@ trait Actor {
case Restart(reason) => throw reason case Restart(reason) => throw reason
case PoisonPill => case PoisonPill =>
val f = self.senderFuture val f = self.senderFuture
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
self.stop self.stop
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
} }
private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior

View file

@ -4,6 +4,7 @@
package akka.actor package akka.actor
import akka.event.EventHandler
import akka.dispatch._ import akka.dispatch._
import akka.config.Config._ import akka.config.Config._
import akka.config.Supervision._ import akka.config.Supervision._

View file

@ -19,6 +19,7 @@ import scala.collection.JavaConversions
import java.util.concurrent._ import java.util.concurrent._
import akka.event.EventHandler
import akka.AkkaException import akka.AkkaException
object Scheduler { object Scheduler {

View file

@ -5,10 +5,6 @@
package akka.config package akka.config
import akka.AkkaException import akka.AkkaException
import akka.actor.EventHandler
import java.net.InetSocketAddress
import java.lang.reflect.Method
class ConfigurationException(message: String) extends AkkaException(message) class ConfigurationException(message: String) extends AkkaException(message)
class ModuleNotAvailableException(message: String) extends AkkaException(message) class ModuleNotAvailableException(message: String) extends AkkaException(message)
@ -35,10 +31,8 @@ object Config {
envHome orElse systemHome envHome orElse systemHome
} }
val config = { val config: Configuration = try {
val confName = { val confName = {
val envConf = System.getenv("AKKA_MODE") match { val envConf = System.getenv("AKKA_MODE") match {
case null | "" => None case null | "" => None
case value => Some(value) case value => Some(value)
@ -52,7 +46,7 @@ object Config {
(envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf") (envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf")
} }
try { val newInstance =
if (System.getProperty("akka.config", "") != "") { if (System.getProperty("akka.config", "") != "") {
val configFile = System.getProperty("akka.config", "") val configFile = System.getProperty("akka.config", "")
println("Loading config from -Dakka.config=" + configFile) println("Loading config from -Dakka.config=" + configFile)
@ -75,18 +69,23 @@ object Config {
"\nUsing default values everywhere.") "\nUsing default values everywhere.")
Configuration.fromString("akka {}") // default empty config Configuration.fromString("akka {}") // default empty config
} }
val configVersion = newInstance.getString("akka.version", VERSION)
if (configVersion != VERSION)
throw new ConfigurationException(
"Akka JAR version [" + VERSION + "] is different than the provided config version [" + configVersion + "]")
newInstance
} catch { } catch {
case e => case e =>
EventHandler.error(e, this, e.getMessage) System.err.println("Couldn't parse config, fatal error.")
e.printStackTrace(System.err)
System.exit(-1)
throw e throw e
} }
}
val CONFIG_VERSION = config.getString("akka.version", VERSION) val CONFIG_VERSION = config.getString("akka.version", VERSION)
if (VERSION != CONFIG_VERSION) throw new ConfigurationException(
"Akka JAR version [" + VERSION + "] is different than the provided config version [" + CONFIG_VERSION + "]")
val TIME_UNIT = config.getString("akka.time-unit", "seconds") val TIME_UNIT = config.getString("akka.time-unit", "seconds")
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis

View file

@ -7,7 +7,8 @@ package akka.dataflow
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import akka.actor.{Actor, ActorRef, EventHandler} import akka.event.EventHandler
import akka.actor.{Actor, ActorRef}
import akka.actor.Actor._ import akka.actor.Actor._
import akka.dispatch.CompletableFuture import akka.dispatch.CompletableFuture
import akka.AkkaException import akka.AkkaException
@ -65,6 +66,7 @@ object DataFlow {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@deprecated("Superceeded by Future and CompletableFuture as of 1.1")
sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
import DataFlowVariable._ import DataFlowVariable._

View file

@ -4,7 +4,8 @@
package akka.dispatch package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException, EventHandler} import akka.event.EventHandler
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.util.{ReflectiveAccess, Switch} import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue import java.util.Queue
@ -208,20 +209,18 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
else { //But otherwise, if we are throttled, we need to do some book-keeping else { //But otherwise, if we are throttled, we need to do some book-keeping
var processedMessages = 0 var processedMessages = 0
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
else 0
do { do {
nextMessage.invoke nextMessage.invoke
nextMessage = nextMessage =
if (self.suspended.locked) { if (self.suspended.locked) {
null //If we are suspended, abort null // If we are suspended, abort
} } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1 processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
null //We reached our boundaries, abort null //We reached our boundaries, abort
else else self.dequeue //Dequeue the next message
self.dequeue //Dequeue the next message
} }
} while (nextMessage ne null) } while (nextMessage ne null)
} }

View file

@ -5,7 +5,8 @@
package akka.dispatch package akka.dispatch
import akka.AkkaException import akka.AkkaException
import akka.actor.{Actor, EventHandler} import akka.event.EventHandler
import akka.actor.Actor
import akka.routing.Dispatcher import akka.routing.Dispatcher
import akka.japi.{ Procedure, Function => JFunc } import akka.japi.{ Procedure, Function => JFunc }
@ -19,50 +20,48 @@ class FutureTimeoutException(message: String) extends AkkaException(message)
object Futures { object Futures {
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T]): Future[T] = def future[T](body: Callable[T]): Future[T] =
Future(body.call) Future(body.call)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Long): Future[T] = def future[T](body: Callable[T], timeout: Long): Future[T] =
Future(body.call, timeout) Future(body.call, timeout)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher) Future(body.call)(dispatcher)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
Future(body.call, timeout)(dispatcher) Future(body.call, timeout)(dispatcher)
/**
* (Blocking!)
*/
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed (blocking!)
*/
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await
/** /**
* Returns a Future to the result of the first future in the list that is completed * Returns a Future to the result of the first future in the list that is completed
*/ */
def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
val futureResult = new DefaultCompletableFuture[Any](timeout) val futureResult = new DefaultCompletableFuture[T](timeout)
val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]]) val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _)
for(f <- futures) f onComplete completeFirst for(f <- futures) f onComplete completeFirst
futureResult futureResult
} }
/** /**
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed * Java API
* Returns a Future to the result of the first future in the list that is completed
*/ */
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
in map { f => fun(f.await) } firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout)
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
/** /**
* A non-blocking fold over the specified futures. * A non-blocking fold over the specified futures.
@ -104,6 +103,16 @@ object Futures {
} }
} }
/**
* Java API
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ )
/** /**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/ */
@ -128,6 +137,13 @@ object Futures {
} }
} }
/**
* Java API
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _)
import scala.collection.mutable.Builder import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom import scala.collection.generic.CanBuildFrom
@ -139,9 +155,42 @@ object Futures {
val fb = fn(a.asInstanceOf[A]) val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b) for (r <- fr; b <-fb) yield (r += b)
}.map(_.result) }.map(_.result)
//Deprecations
/**
* (Blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)")
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await")
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
/**
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }")
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
in map { f => fun(f.await) }
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException")
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException
} }
object Future { object Future {
/**
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = { def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout) val f = new DefaultCompletableFuture[T](timeout)
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
@ -150,6 +199,20 @@ object Future {
} }
sealed trait Future[+T] { sealed trait Future[+T] {
/**
* Returns the result of this future after waiting for it to complete,
* this method will throw any throwable that this Future was completed with
* and will throw a java.util.concurrent.TimeoutException if there is no result
* within the Futures timeout
*/
def apply(): T = this.await.resultOrException.get
/**
* Java API for apply()
*/
def get: T = apply()
/** /**
* Blocks the current thread until the Future has been completed or the * Blocks the current thread until the Future has been completed or the
* timeout has expired. In the case of the timeout expiring a * timeout has expired. In the case of the timeout expiring a
@ -206,7 +269,7 @@ sealed trait Future[+T] {
* *
* Equivalent to calling future.await.value. * Equivalent to calling future.await.value.
*/ */
def awaitResult: Option[Either[Throwable, T]] def awaitValue: Option[Either[Throwable, T]]
/** /**
* Returns the result of the Future if one is available within the specified * Returns the result of the Future if one is available within the specified
@ -215,7 +278,7 @@ sealed trait Future[+T] {
* returns None if no result, Some(Right(t)) if a result, or * returns None if no result, Some(Right(t)) if a result, or
* Some(Left(error)) if there was an exception * Some(Left(error)) if there was an exception
*/ */
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
/** /**
* Returns the contained exception of this Future if it exists. * Returns the contained exception of this Future if it exists.
@ -390,13 +453,43 @@ sealed trait Future[+T] {
* Essentially this is the Promise (or write-side) of a Future (read-side) * Essentially this is the Promise (or write-side) of a Future (read-side)
*/ */
trait CompletableFuture[T] extends Future[T] { trait CompletableFuture[T] extends Future[T] {
/**
* Completes this Future with the specified result, if not already completed,
* returns this
*/
def complete(value: Either[Throwable, T]): CompletableFuture[T] def complete(value: Either[Throwable, T]): CompletableFuture[T]
/**
* Completes this Future with the specified result, if not already completed,
* returns this
*/
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
/**
* Completes this Future with the specified exception, if not already completed,
* returns this
*/
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed
* returns this
*/
final def completeWith(other: Future[T]): CompletableFuture[T] = { final def completeWith(other: Future[T]): CompletableFuture[T] = {
other onComplete { f => complete(f.value.get) } other onComplete { f => complete(f.value.get) }
this this
} }
/**
* Alias for complete(Right(value))
*/
final def << (value: T): CompletableFuture[T] = complete(Right(value))
/**
* Alias for completeWith(other)
*/
final def << (other : Future[T]): CompletableFuture[T] = completeWith(other)
} }
/** /**
@ -431,7 +524,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
} }
} }
def awaitResult: Option[Either[Throwable, T]] = { def awaitValue: Option[Either[Throwable, T]] = {
_lock.lock _lock.lock
try { try {
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
@ -441,7 +534,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
} }
} }
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = {
_lock.lock _lock.lock
try { try {
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
@ -536,10 +629,10 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte
def complete(value: Either[Throwable, T]): CompletableFuture[T] = this def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
def awaitResult: Option[Either[Throwable, T]] = value def awaitValue: Option[Either[Throwable, T]] = value
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value
def await : Future[T] = this def await : Future[T] = this
def awaitBlocking : Future[T] = this def awaitBlocking : Future[T] = this
def isExpired: Boolean = false def isExpired: Boolean = true
def timeoutInNanos: Long = 0 def timeoutInNanos: Long = 0
} }

View file

@ -6,6 +6,7 @@ package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.event.EventHandler
import akka.config.Configuration import akka.config.Configuration
import akka.config.Config.TIME_UNIT import akka.config.Config.TIME_UNIT
import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess} import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess}

View file

@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration import akka.util.Duration
import akka.actor.EventHandler import akka.event.EventHandler
object ThreadPoolConfig { object ThreadPoolConfig {
type Bounds = Int type Bounds = Int

View file

@ -0,0 +1,180 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.event
import akka.actor._
import Actor._
import akka.dispatch._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.util.{ListenerManagement, ReflectiveAccess}
import akka.AkkaException
/**
* Event handler.
* <p/>
* Create, add and remove a listener:
* <pre>
* val eventHandlerListener = Actor.actorOf(new Actor {
* self.dispatcher = EventHandler.EventHandlerDispatcher
*
* def receive = {
* case EventHandler.Error(cause, instance, message) => ...
* case EventHandler.Warning(instance, message) => ...
* case EventHandler.Info(instance, message) => ...
* case EventHandler.Debug(instance, message) => ...
* case genericEvent => ...
* }
* })
*
* EventHandler.addListener(eventHandlerListener)
* ...
* EventHandler.removeListener(eventHandlerListener)
* </pre>
* <p/>
* However best is probably to register the listener in the 'akka.conf'
* configuration file.
* <p/>
* Log an error event:
* <pre>
* EventHandler.notify(EventHandler.Error(exception, this, message.toString))
* </pre>
* Or use the direct methods (better performance):
* <pre>
* EventHandler.error(exception, this, message.toString)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
import java.io.{StringWriter, PrintWriter}
import java.text.DateFormat
import java.util.Date
import akka.dispatch.Dispatchers
val ErrorLevel = 1
val WarningLevel = 2
val InfoLevel = 3
val DebugLevel = 4
sealed trait Event {
@transient val thread: Thread = Thread.currentThread
}
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
case class Warning(instance: AnyRef, message: String = "") extends Event
case class Info(instance: AnyRef, message: String = "") extends Event
case class Debug(instance: AnyRef, message: String = "") extends Event
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
val warning = "[WARN] [%s] [%s] [%s] %s".intern
val info = "[INFO] [%s] [%s] [%s] %s".intern
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
val generic = "[GENERIC] [%s] [%s]".intern
val ID = "event:handler".intern
class EventHandlerException extends AkkaException
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
val level: Int = config.getString("akka.event-handler-level", "DEBUG") match {
case "ERROR" => ErrorLevel
case "WARNING" => WarningLevel
case "INFO" => InfoLevel
case "DEBUG" => DebugLevel
case unknown => throw new ConfigurationException(
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
}
def notify(event: => AnyRef) = notifyListeners(event)
def notify[T <: Event : ClassManifest](event: => T) {
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
}
def error(cause: Throwable, instance: AnyRef, message: => String) = {
if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message))
}
def error(instance: AnyRef, message: => String) = {
if (level >= ErrorLevel) notifyListeners(Error(new EventHandlerException, instance, message))
}
def warning(instance: AnyRef, message: => String) = {
if (level >= WarningLevel) notifyListeners(Warning(instance, message))
}
def info(instance: AnyRef, message: => String) = {
if (level >= InfoLevel) notifyListeners(Info(instance, message))
}
def debug(instance: AnyRef, message: => String) = {
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
}
def formattedTimestamp = DateFormat.getInstance.format(new Date)
def stackTraceFor(e: Throwable) = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
sw.toString
}
private def levelFor(eventClass: Class[_ <: Event]) = {
if (eventClass.isInstanceOf[Error]) ErrorLevel
else if (eventClass.isInstanceOf[Warning]) WarningLevel
else if (eventClass.isInstanceOf[Info]) InfoLevel
else if (eventClass.isInstanceOf[Debug]) DebugLevel
else DebugLevel
}
class DefaultListener extends Actor {
self.id = ID
self.dispatcher = EventHandlerDispatcher
def receive = {
case event @ Error(cause, instance, message) =>
println(error.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message,
stackTraceFor(cause)))
case event @ Warning(instance, message) =>
println(warning.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event @ Info(instance, message) =>
println(info.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event @ Debug(instance, message) =>
println(debug.format(
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
case event =>
println(generic.format(formattedTimestamp, event.toString))
}
}
config.getList("akka.event-handlers") foreach { listenerName =>
try {
ReflectiveAccess.getClassFor[Actor](listenerName) map {
clazz => addListener(Actor.actorOf(clazz).start)
}
} catch {
case e: Exception =>
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
}
}
}

View file

@ -7,6 +7,13 @@ trait Function[T,R] {
def apply(param: T): R def apply(param: T): R
} }
/**
* A Function interface. Used to create 2-arg first-class-functions is Java (sort of).
*/
trait Function2[T1, T2, R] {
def apply(arg1: T1, arg2: T2): R
}
/** A Procedure is like a Function, but it doesn't produce a return value /** A Procedure is like a Function, but it doesn't produce a return value
*/ */
trait Procedure[T] { trait Procedure[T] {

View file

@ -6,7 +6,7 @@ package akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic. {AtomicBoolean} import java.util.concurrent.atomic. {AtomicBoolean}
import akka.actor.EventHandler import akka.event.EventHandler
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>

View file

@ -11,6 +11,7 @@ import akka.AkkaException
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.remoteinterface.RemoteSupport import akka.remoteinterface.RemoteSupport
import akka.actor._ import akka.actor._
import akka.event.EventHandler
/** /**
* Helper class for reflective access to different modules in order to allow optional loading of modules. * Helper class for reflective access to different modules in order to allow optional loading of modules.
@ -33,25 +34,34 @@ object ReflectiveAccess {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Remote { object Remote {
val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.netty.NettyRemoteSupport") val TRANSPORT = Config.config.getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport")
private[akka] val configDefaultAddress = private[akka] val configDefaultAddress =
new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"),
Config.config.getInt("akka.remote.server.port", 2552)) Config.config.getInt("akka.remote.server.port", 2552))
lazy val isEnabled = remoteSupportClass.isDefined lazy val isEnabled = remoteSupportClass.isDefined
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( def ensureEnabled = if (!isEnabled) {
val e = new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath") "Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
EventHandler.warning(this, e.toString)
throw e
}
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
remoteClass => () => createInstance[RemoteSupport](remoteClass,Array[Class[_]](),Array[AnyRef]()). remoteSupportClass map { remoteClass =>
getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ () => createInstance[RemoteSupport](
remoteClass.getName+ remoteClass,
", make sure that akka-remote.jar is on the classpath")) Array[Class[_]](),
Array[AnyRef]()
) getOrElse {
val e = new ModuleNotAvailableException(
"Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName))
EventHandler.warning(this, e.toString)
throw e
}
} }
} }
@ -125,6 +135,7 @@ object ReflectiveAccess {
Some(ctor.newInstance(args: _*).asInstanceOf[T]) Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.warning(this, e.toString)
None None
} }
@ -143,6 +154,7 @@ object ReflectiveAccess {
} }
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.warning(this, e.toString)
None None
} }
@ -155,33 +167,58 @@ object ReflectiveAccess {
case None => None case None => None
} }
} catch { } catch {
case ei: ExceptionInInitializerError => case e: ExceptionInInitializerError =>
throw ei EventHandler.warning(this, e.toString)
throw e
} }
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Option[Class[T]] = { def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Option[Class[T]] = {
assert(fqn ne null) assert(fqn ne null)
val first = try { Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //First, use the specified CL // First, use the specified CL
val first = try {
Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException =>
EventHandler.warning(this, c.toString)
None
}
if (first.isDefined) if (first.isDefined) first
first else {
else { //Second option is to use the ContextClassLoader // Second option is to use the ContextClassLoader
val second = try { Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } val second = try {
if (second.isDefined) Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
second } catch {
case c: ClassNotFoundException =>
EventHandler.warning(this, c.toString)
None
}
if (second.isDefined) second
else { else {
val third = try { val third = try {
if(classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]]) //Don't try to use "loader" if we got the default "classloader" parameter // Don't try to use "loader" if we got the default "classloader" parameter
if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]])
else None else None
} catch { case c: ClassNotFoundException => None } } catch {
case c: ClassNotFoundException =>
if (third.isDefined) EventHandler.warning(this, c.toString)
third None
else
try { Option(Class.forName(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //Last option is Class.forName
}
} }
if (third.isDefined) third
else {
// Last option is Class.forName
try {
Option(Class.forName(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException =>
EventHandler.warning(this, c.toString)
None
}
}
}
}
} }
} }

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka
/**
* Multiplying numbers used in test timeouts by a factor, set by system property.
* Useful for Jenkins builds (where the machine may need more time).
*/
object Testing {
val timeFactor: Double = {
val factor = System.getProperty("akka.test.timefactor", "1.0")
try {
factor.toDouble
} catch {
case e: java.lang.NumberFormatException => 1.0
}
}
def time(t: Int): Int = (timeFactor * t).toInt
def time(t: Long): Long = (timeFactor * t).toLong
def time(t: Float): Float = (timeFactor * t).toFloat
def time(t: Double): Double = timeFactor * t
}

View file

@ -8,6 +8,8 @@ import org.junit.Test
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import Actor._ import Actor._
import akka.Testing
object ActorFireForgetRequestReplySpec { object ActorFireForgetRequestReplySpec {
class ReplyActor extends Actor { class ReplyActor extends Actor {
@ -85,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
actor ! "Die" actor ! "Die"
try { state.finished.await(10L, TimeUnit.SECONDS) } try { state.finished.await(10L, TimeUnit.SECONDS) }
catch { case e: TimeoutException => fail("Never got the message") } catch { case e: TimeoutException => fail("Never got the message") }
Thread.sleep(100) Thread.sleep(Testing.time(500))
assert(actor.isShutdown) assert(actor.isShutdown)
} }
} }

View file

@ -77,8 +77,8 @@ class ReceiveTimeoutSpec extends JUnitSuite {
protected def receive = { protected def receive = {
case Tick => () case Tick => ()
case ReceiveTimeout => case ReceiveTimeout =>
timeoutLatch.open
count.incrementAndGet count.incrementAndGet
timeoutLatch.open
self.receiveTimeout = None self.receiveTimeout = None
} }
}).start }).start

View file

@ -260,11 +260,23 @@ class RestartStrategySpec extends JUnitSuite {
// now crash again... should not restart // now crash again... should not restart
slave ! Crash slave ! Crash
// may not be running
try {
slave ! Ping slave ! Ping
} catch {
case e: ActorInitializationException => ()
}
assert(countDownLatch.await(1, TimeUnit.SECONDS)) assert(countDownLatch.await(1, TimeUnit.SECONDS))
// may not be running
try {
slave ! Crash slave ! Crash
} catch {
case e: ActorInitializationException => ()
}
assert(stopLatch.tryAwait(1, TimeUnit.SECONDS)) assert(stopLatch.tryAwait(1, TimeUnit.SECONDS))
assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS)) assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS))

View file

@ -105,8 +105,8 @@ object ActorModelSpec {
} }
private[akka] abstract override def dispatch(invocation: MessageInvocation) { private[akka] abstract override def dispatch(invocation: MessageInvocation) {
super.dispatch(invocation)
getStats(invocation.receiver).msgsReceived.incrementAndGet() getStats(invocation.receiver).msgsReceived.incrementAndGet()
super.dispatch(invocation)
} }
private[akka] abstract override def start { private[akka] abstract override def start {

View file

@ -277,14 +277,17 @@ class FutureSpec extends JUnitSuite {
} }
@Test def resultWithinShouldNotThrowExceptions { @Test def resultWithinShouldNotThrowExceptions {
val latch = new StandardLatch
val actors = (1 to 10).toList map { _ => val actors = (1 to 10).toList map { _ =>
actorOf(new Actor { actorOf(new Actor {
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add } def receive = { case (add: Int, wait: Boolean, latch: StandardLatch) => if (wait) latch.await; self reply_? add }
}).start }).start
} }
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, idx >= 5, latch)) }
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS)
latch.open
val done = result collect { case Some(Right(x)) => x } val done = result collect { case Some(Right(x)) => x }
val undone = result collect { case None => None } val undone = result collect { case None => None }
val errors = result collect { case Some(Left(t)) => t } val errors = result collect { case Some(Left(t)) => t }
@ -324,4 +327,39 @@ class FutureSpec extends JUnitSuite {
// make sure all futures are completed in dispatcher // make sure all futures are completed in dispatcher
assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0)
} }
@Test def shouldBlockUntilResult {
val latch = new StandardLatch
val f = Future({ latch.await; 5})
val f2 = Future({ f() + 5 })
assert(f2.resultOrException === None)
latch.open
assert(f2() === 10)
val f3 = Future({ Thread.sleep(100); 5}, 10)
intercept[FutureTimeoutException] {
f3()
}
}
@Test def lesslessIsMore {
import akka.actor.Actor.spawn
val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue)
val begin, end = new StandardLatch
spawn {
begin.await
dataflowVar2 << dataflowVar
end.open
}
spawn {
dataflowVar << 5
}
begin.open
end.await
assert(dataflowVar2() === 5)
assert(dataflowVar.get === 5)
}
} }

View file

@ -1,42 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.http
import akka.serialization.Serializer
import java.io.OutputStream
import javax.ws.rs.core.{MultivaluedMap, MediaType}
import javax.ws.rs.ext.{MessageBodyWriter, Provider}
import javax.ws.rs.Produces
/**
* Writes Lists of JSON serializable objects.
*/
@Provider
@Produces(Array("application/json"))
class ListWriter extends MessageBodyWriter[List[_]] {
def isWriteable(aClass: Class[_],
aType: java.lang.reflect.Type,
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType) =
classOf[List[_]].isAssignableFrom(aClass) || aClass == ::.getClass
def getSize(list: List[_],
aClass: Class[_],
aType: java.lang.reflect.Type,
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType) =
-1L
def writeTo(list: List[_],
aClass: Class[_],
aType: java.lang.reflect.Type,
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType,
stringObjectMultivaluedMap: MultivaluedMap[String, Object],
outputStream: OutputStream): Unit =
if (list.isEmpty) outputStream.write(" ".getBytes)
else outputStream.write(Serializer.ScalaJSON.toBinary(list))
}

View file

@ -5,7 +5,7 @@
package akka.http package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor} import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.EventHandler import akka.event.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import javax.servlet.http.HttpServlet import javax.servlet.http.HttpServlet

View file

@ -7,7 +7,7 @@ package akka.http
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
import Types._ import Types._
import akka.actor.EventHandler import akka.event.EventHandler
/** /**
* @author Garrick Evans * @author Garrick Evans

View file

@ -23,7 +23,7 @@
package akka.security package akka.security
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
import akka.actor.EventHandler import akka.event.EventHandler
import akka.actor.Actor._ import akka.actor.Actor._
import akka.config.Config import akka.config.Config

View file

@ -1,33 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.servlet
import akka.remote.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import akka.config.Config
import akka.util.{ Bootable, AkkaLoader }
import javax.servlet.{ServletContextListener, ServletContextEvent}
/**
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
*
*<web-app>
* ...
* <listener>
* <listener-class>akka.servlet.Initializer</listener-class>
* </listener>
* ...
*</web-app>
*/
class Initializer extends ServletContextListener {
lazy val loader = new AkkaLoader
def contextDestroyed(e: ServletContextEvent): Unit =
loader.shutdown
def contextInitialized(e: ServletContextEvent): Unit =
loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService)
}

View file

@ -44,4 +44,24 @@ object RemoteServerSettings {
} }
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT)
val EXECUTION_POOL_SIZE = {
val sz = config.getInt("akka.remote.server.execution-pool-size",16)
if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
sz
}
val MAX_CHANNEL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
sz
}
val MAX_TOTAL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-total-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
sz
}
} }

View file

@ -13,13 +13,14 @@ import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator import akka.japi.Creator
import akka.config.Config._ import akka.config.Config._
import akka.remoteinterface._ import akka.remoteinterface._
import akka.actor.{PoisonPill, EventHandler, Index, import akka.actor.{PoisonPill, Index,
ActorInitializationException, LocalActorRef, newUuid, ActorInitializationException, LocalActorRef, newUuid,
ActorRegistry, Actor, RemoteActorRef, ActorRegistry, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType} Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException import akka.AkkaException
import akka.event.EventHandler
import akka.actor.Actor._ import akka.actor.Actor._
import akka.util._ import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
@ -33,6 +34,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler import org.jboss.netty.handler.ssl.SslHandler
@ -80,8 +82,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private[akka] def withClientFor[T]( private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = {
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
val key = Address(address) val key = Address(address)
lock.readLock.lock lock.readLock.lock
try { try {
@ -216,15 +216,13 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) { if (isRunning) {
if (request.getOneWay) { if (request.getOneWay) {
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { val future = currentChannel.write(RemoteEncoder.encode(request))
def operationComplete(future: ChannelFuture) { future.awaitUninterruptibly()
if (future.isCancelled) { if (!future.isCancelled && !future.isSuccess) {
//We don't care about that right now
} else if (!future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
} }
}
})
None None
} else { } else {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
@ -237,7 +235,9 @@ abstract class RemoteClient private[akka] (
futures.remove(futureUuid) //Clean this up futures.remove(futureUuid) //Clean this up
//We don't care about that right now //We don't care about that right now
} else if (!future.isSuccess) { } else if (!future.isSuccess) {
futures.remove(futureUuid) //Clean this up val f = futures.remove(futureUuid) //Clean this up
if (f ne null)
f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
} }
} }
@ -753,9 +753,17 @@ class RemoteServerPipelineFactory(
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil, Nil) case _ => (Nil, Nil)
} }
val execution = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit
)
)
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil
new StaticChannelPipeline(stages: _*) new StaticChannelPipeline(stages: _*)
} }
} }
@ -856,8 +864,6 @@ class RemoteServerHandler(
} }
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
//FIXME we should definitely spawn off this in a thread pool or something,
// potentially using Actor.spawn or something similar
request.getActorInfo.getActorType match { request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel) case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel)

View file

@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers {
getBool("akka.remote.ssl.debug") must equal(None) getBool("akka.remote.ssl.debug") must equal(None)
getBool("akka.remote.ssl.service") must equal(None) getBool("akka.remote.ssl.service") must equal(None)
getInt("akka.remote.zlib-compression-level") must equal(Some(6)) getInt("akka.remote.zlib-compression-level") must equal(Some(6))
getInt("akka.remote.server.execution-pool-size") must equal(Some(16))
getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60))
getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0))
getInt("akka.remote.server.max-total-memory-size") must equal(Some(0))
} }
} }
} }

View file

@ -10,6 +10,8 @@ import akka.actor._
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import akka.config. {RemoteAddress, Config, TypedActorConfigurator} import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
import akka.Testing
object RemoteTypedActorLog { object RemoteTypedActorLog {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
val oneWayLog = new LinkedBlockingQueue[String] val oneWayLog = new LinkedBlockingQueue[String]
@ -37,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest {
classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOne],
classOf[RemoteTypedActorOneImpl], classOf[RemoteTypedActorOneImpl],
Permanent, Permanent,
10000, Testing.time(10000),
RemoteAddress(host,port)), RemoteAddress(host,port)),
new SuperviseTypedActor( new SuperviseTypedActor(
classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl], classOf[RemoteTypedActorTwoImpl],
Permanent, Permanent,
10000, Testing.time(10000),
RemoteAddress(host,port)) RemoteAddress(host,port))
).toArray).supervise ).toArray).supervise
} }

View file

@ -243,7 +243,7 @@ class MyStatelessActor extends Actor {
class MyStatelessActorWithMessagesInMailbox extends Actor { class MyStatelessActorWithMessagesInMailbox extends Actor {
def receive = { def receive = {
case "hello" => case "hello" =>
println("# messages in mailbox " + self.mailboxSize) //println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500) Thread.sleep(500)
case "hello-reply" => self.reply("world") case "hello-reply" => self.reply("world")
} }
@ -263,7 +263,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor {
class MyActorWithSerializableMessages extends Actor { class MyActorWithSerializableMessages extends Actor {
def receive = { def receive = {
case MyMessage(s, t) => case MyMessage(s, t) =>
println("# messages in mailbox " + self.mailboxSize) //println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500) Thread.sleep(500)
case "hello-reply" => self.reply("world") case "hello-reply" => self.reply("world")
} }

View file

@ -117,7 +117,7 @@ class MyStatefulActor extends Actor {
def receive = { def receive = {
case "hi" => case "hi" =>
println("# messages in mailbox " + self.mailboxSize) //println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500) Thread.sleep(500)
case "hello" => case "hello" =>
count = count + 1 count = count + 1

View file

@ -67,7 +67,7 @@ object World {
lazy val ants = setup lazy val ants = setup
lazy val evaporator = actorOf[Evaporator].start lazy val evaporator = actorOf[Evaporator].start
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false) private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot")
def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) } def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) }
@ -138,7 +138,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor {
val locRef = Ref(initLoc) val locRef = Ref(initLoc)
val name = "ant-from-" + initLoc._1 + "-" + initLoc._2 val name = "ant-from-" + initLoc._1 + "-" + initLoc._2
implicit val txFactory = TransactionFactory(familyName = name, hooks = false) implicit val txFactory = TransactionFactory(familyName = name)
val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1)) val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1))
val foraging = (p: Place) => p.pher + p.food val foraging = (p: Place) => p.pher + p.food
@ -210,7 +210,7 @@ class Evaporator extends WorldActor {
import Config._ import Config._
import World._ import World._
implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false) implicit val txFactory = TransactionFactory(familyName = "evaporator")
val evaporate = (pher: Float) => pher * EvapRate val evaporate = (pher: Float) => pher * EvapRate
def act = for (x <- 0 until Dim; y <- 0 until Dim) { def act = for (x <- 0 until Dim; y <- 0 until Dim) {

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.event.slf4j
import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory}
import akka.event.EventHandler
import akka.actor._
import Actor._
/**
* Base trait for all classes that wants to be able use the SLF4J logging infrastructure.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Logging {
@transient lazy val log = Logger(this.getClass.getName)
}
object Logger {
def apply(logger: String) : SLFLogger = SLFLoggerFactory getLogger logger
def apply(clazz: Class[_]): SLFLogger = apply(clazz.getName)
def root : SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME)
}
/**
* SLF4J Event Handler.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Slf4jEventHandler extends Actor with Logging {
import EventHandler._
self.id = ID
self.dispatcher = EventHandlerDispatcher
def receive = {
case Error(cause, instance, message) =>
log.error("\n\t[{}]\n\t[{}]\n\t[{}]",
Array[AnyRef](instance.getClass.getName, message, stackTraceFor(cause)))
case Warning(instance, message) =>
log.warn("\n\t[{}]\n\t[{}]", instance.getClass.getName, message)
case Info(instance, message) =>
log.info("\n\t[{}]\n\t[{}]", instance.getClass.getName, message)
case Debug(instance, message) =>
log.debug("\n\t[{}]\n\t[{}]", instance.getClass.getName, message)
case event => log.debug("\n\t[{}]", event.toString)
}
}

View file

@ -8,6 +8,13 @@ import akka.actor.{newUuid, Uuid}
import org.multiverse.transactional.refs.BasicRef import org.multiverse.transactional.refs.BasicRef
/**
* Common trait for all the transactional objects.
*/
@serializable trait Transactional {
val uuid: String
}
/** /**
* Transactional managed reference. See the companion class for more information. * Transactional managed reference. See the companion class for more information.
*/ */

View file

@ -48,10 +48,7 @@ trait Stm {
def atomic[T](factory: TransactionFactory)(body: => T): T = { def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() { factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = { def call(mtx: MultiverseTransaction): T = body
factory.addHooks
body
}
}) })
} }
} }

View file

@ -1,252 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.HashMap
import akka.util.ReflectiveAccess
import akka.config.Config._
import akka.config.ModuleNotAvailableException
import akka.AkkaException
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.api.{PropagationLevel => MultiversePropagationLevel}
import org.multiverse.api.{TraceLevel => MultiverseTraceLevel}
class NoTransactionInScopeException extends AkkaException("No transaction in scope")
class TransactionRetryException(message: String) extends AkkaException(message)
class StmConfigurationException(message: String) extends AkkaException(message)
/**
* Internal helper methods for managing Akka-specific transaction.
*/
object TransactionManagement extends TransactionManagement {
private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
private[akka] def getTransaction: Transaction = {
val option = transaction.get
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
option.get
}
}
/**
* Internal helper methods for managing Akka-specific transaction.
*/
trait TransactionManagement {
private[akka] def setTransaction(tx: Option[Transaction]) =
if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransaction = {
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
private[akka] def getTransactionInScope = TransactionManagement.getTransaction
private[akka] def isTransactionInScope = {
val option = TransactionManagement.transaction.get
(option ne null) && option.isDefined
}
}
object Transaction {
val idFactory = new AtomicLong(-1L)
/**
* Attach an Akka-specific Transaction to the current Multiverse transaction.
* Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
*/
private[akka] def attach = {
val mtx = getRequiredThreadLocalTransaction
val tx = new Transaction
tx.begin
tx.transaction = Some(mtx)
TransactionManagement.transaction.set(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match {
case TransactionLifecycleEvent.PostCommit => tx.commitJta
case TransactionLifecycleEvent.PreCommit => tx.commitPersistentState
case TransactionLifecycleEvent.PostAbort => tx.abort
case _ => {}
}
})
}
}
/**
* The Akka-specific Transaction class.
* For integration with persistence modules and JTA support.
*/
@serializable class Transaction {
val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
val STATE_RETRIES = config.getInt("akka.storage.max-retries",10)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[ReflectiveJtaModule.TransactionContainer] =
if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer)
else None
// --- public methods ---------
def begin = synchronized {
jta.foreach { _.beginWithStmSynchronization(this) }
}
def commitPersistentState = synchronized {
retry(STATE_RETRIES){
persistentStateMap.valuesIterator.foreach(_.commit)
persistentStateMap.clear
}
status = TransactionStatus.Completed
}
def commitJta = synchronized {
jta.foreach(_.commit)
}
def abort = synchronized {
jta.foreach(_.rollback)
persistentStateMap.valuesIterator.foreach(_.abort)
persistentStateMap.clear
}
def retry(tries:Int)(block: => Unit):Unit={
if(tries==0){
throw new TransactionRetryException("Exhausted Retries while committing persistent state")
}
try{
block
} catch{
case e:Exception=>{
retry(tries-1){block}
}
}
}
def isNew = synchronized { status == TransactionStatus.New }
def isActive = synchronized { status == TransactionStatus.Active }
def isCompleted = synchronized { status == TransactionStatus.Completed }
def isAborted = synchronized { status == TransactionStatus.Aborted }
// --- internal methods ---------
//private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
private[akka] def status_? = status
private[akka] def increment = depth.incrementAndGet
private[akka] def decrement = depth.decrementAndGet
private[akka] def isTopLevel = depth.get == 0
//when calling this method, make sure to prefix the uuid with the type so you
//have no possibility of kicking a diffferent type with the same uuid out of a transction
private[akka] def register(uuid: String, storage: Committable with Abortable) = {
if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){
throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid))
}
}
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new StmConfigurationException(
"Expected ACTIVE transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new StmConfigurationException(
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new StmConfigurationException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
override def equals(that: Any): Boolean = synchronized {
that.isInstanceOf[Transaction] &&
that.asInstanceOf[Transaction].id == this.id
}
override def hashCode: Int = synchronized { id.toInt }
override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
}
@serializable sealed abstract class TransactionStatus
object TransactionStatus {
case object New extends TransactionStatus
case object Active extends TransactionStatus
case object Aborted extends TransactionStatus
case object Completed extends TransactionStatus
}
/**
* Common trait for all the transactional objects:
* Ref, TransactionalMap, TransactionalVector,
* PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet
*/
@serializable trait Transactional {
val uuid: String
}
/**
* Used for integration with the persistence modules.
*/
trait Committable {
def commit(): Unit
}
/**
* Used for integration with the persistence modules.
*/
trait Abortable {
def abort(): Unit
}
/**
* Used internally for reflective access to the JTA module.
* Allows JTA integration to work when akka-jta.jar is on the classpath.
*/
object ReflectiveJtaModule {
type TransactionContainerObject = {
def apply(): TransactionContainer
}
type TransactionContainer = {
def beginWithStmSynchronization(transaction: Transaction): Unit
def commit: Unit
def rollback: Unit
}
lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined
def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException(
"Can't load the JTA module, make sure that akka-jta.jar is on the classpath")
val transactionContainerObjectInstance: Option[TransactionContainerObject] =
ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$")
def createTransactionContainer: TransactionContainer = {
ensureJtaEnabled
transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer]
}
}

View file

@ -32,7 +32,6 @@ object TransactionConfig {
val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true) val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true)
val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires")) val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires"))
val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none")) val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none"))
val HOOKS = config.getBool("akka.stm.hooks", true)
val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
@ -65,7 +64,6 @@ object TransactionConfig {
* @param quickRelease Whether locks should be released as quickly as possible (before whole commit). * @param quickRelease Whether locks should be released as quickly as possible (before whole commit).
* @param propagation For controlling how nested transactions behave. * @param propagation For controlling how nested transactions behave.
* @param traceLevel Transaction trace level. * @param traceLevel Transaction trace level.
* @param hooks Whether hooks for persistence modules and JTA should be added to the transaction.
*/ */
def apply(familyName: String = FAMILY_NAME, def apply(familyName: String = FAMILY_NAME,
readonly: JBoolean = READONLY, readonly: JBoolean = READONLY,
@ -78,10 +76,9 @@ object TransactionConfig {
speculative: Boolean = SPECULATIVE, speculative: Boolean = SPECULATIVE,
quickRelease: Boolean = QUICK_RELEASE, quickRelease: Boolean = QUICK_RELEASE,
propagation: MPropagation = PROPAGATION, propagation: MPropagation = PROPAGATION,
traceLevel: MTraceLevel = TRACE_LEVEL, traceLevel: MTraceLevel = TRACE_LEVEL) = {
hooks: Boolean = HOOKS) = {
new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks) interruptible, speculative, quickRelease, propagation, traceLevel)
} }
} }
@ -100,7 +97,6 @@ object TransactionConfig {
* <p>quickRelease - Whether locks should be released as quickly as possible (before whole commit). * <p>quickRelease - Whether locks should be released as quickly as possible (before whole commit).
* <p>propagation - For controlling how nested transactions behave. * <p>propagation - For controlling how nested transactions behave.
* <p>traceLevel - Transaction trace level. * <p>traceLevel - Transaction trace level.
* <p>hooks - Whether hooks for persistence modules and JTA should be added to the transaction.
*/ */
class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
val readonly: JBoolean = TransactionConfig.READONLY, val readonly: JBoolean = TransactionConfig.READONLY,
@ -113,8 +109,7 @@ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY
val speculative: Boolean = TransactionConfig.SPECULATIVE, val speculative: Boolean = TransactionConfig.SPECULATIVE,
val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
val propagation: MPropagation = TransactionConfig.PROPAGATION, val propagation: MPropagation = TransactionConfig.PROPAGATION,
val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL)
val hooks: Boolean = TransactionConfig.HOOKS)
object DefaultTransactionConfig extends TransactionConfig object DefaultTransactionConfig extends TransactionConfig
@ -137,11 +132,10 @@ object TransactionFactory {
speculative: Boolean = TransactionConfig.SPECULATIVE, speculative: Boolean = TransactionConfig.SPECULATIVE,
quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
propagation: MPropagation = TransactionConfig.PROPAGATION, propagation: MPropagation = TransactionConfig.PROPAGATION,
traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = {
hooks: Boolean = TransactionConfig.HOOKS) = {
val config = new TransactionConfig( val config = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks) interruptible, speculative, quickRelease, propagation, traceLevel)
new TransactionFactory(config) new TransactionFactory(config)
} }
} }
@ -199,8 +193,6 @@ class TransactionFactory(
} }
val boilerplate = new TransactionBoilerplate(factory) val boilerplate = new TransactionBoilerplate(factory)
def addHooks = if (config.hooks) Transaction.attach
} }
/** /**

View file

@ -27,7 +27,6 @@ class TransactionConfigBuilder {
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
var propagation: MPropagation = TransactionConfig.PROPAGATION var propagation: MPropagation = TransactionConfig.PROPAGATION
var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
var hooks: Boolean = TransactionConfig.HOOKS
def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@ -41,11 +40,10 @@ class TransactionConfigBuilder {
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
def build() = new TransactionConfig( def build() = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks) interruptible, speculative, quickRelease, propagation, traceLevel)
} }
/** /**
@ -64,7 +62,6 @@ class TransactionFactoryBuilder {
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
var propagation: MPropagation = TransactionConfig.PROPAGATION var propagation: MPropagation = TransactionConfig.PROPAGATION
var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
var hooks: Boolean = TransactionConfig.HOOKS
def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@ -78,12 +75,11 @@ class TransactionFactoryBuilder {
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
def build() = { def build() = {
val config = new TransactionConfig( val config = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks) interruptible, speculative, quickRelease, propagation, traceLevel)
new TransactionFactory(config) new TransactionFactory(config)
} }
} }

View file

@ -129,7 +129,6 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
def atomic[T](factory: TransactionFactory)(body: => T): T = { def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() { factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = { def call(mtx: MultiverseTransaction): T = {
factory.addHooks
val result = body val result = body
val timeout = factory.config.timeout val timeout = factory.config.timeout
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)

View file

@ -20,9 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers {
getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.blocking-allowed") must equal(Some(false))
getBool("akka.stm.fair") must equal(Some(true)) getBool("akka.stm.fair") must equal(Some(true))
getBool("akka.stm.hooks") must equal(Some(true))
getBool("akka.stm.interruptible") must equal(Some(false)) getBool("akka.stm.interruptible") must equal(Some(false))
getBool("akka.stm.jta-aware") must equal(Some(false))
getInt("akka.stm.max-retries") must equal(Some(1000)) getInt("akka.stm.max-retries") must equal(Some(1000))
getString("akka.stm.propagation") must equal(Some("requires")) getString("akka.stm.propagation") must equal(Some("requires"))
getBool("akka.stm.quick-release") must equal(Some(true)) getBool("akka.stm.quick-release") must equal(Some(true))

View file

@ -1,6 +1,7 @@
package akka.testkit package akka.testkit
import akka.actor.{ActorRef, EventHandler} import akka.event.EventHandler
import akka.actor.ActorRef
import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation} import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation}
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList import java.util.LinkedList

View file

@ -12,7 +12,7 @@ akka {
time-unit = "seconds" # Time unit for all timeout properties throughout the config time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
@ -71,9 +71,6 @@ akka {
quick-release = true quick-release = true
propagation = "requires" propagation = "requires"
trace-level = "none" trace-level = "none"
hooks = true
jta-aware = off # Option 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
} }
jta { jta {
@ -142,6 +139,10 @@ akka {
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog backlog = 4096 # Sets the size of the connection backlog
execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling
execution-pool-size = 16# Size of the core pool of the remote execution unit
max-channel-memory-size = 0 # Maximum channel size, 0 for off
max-total-memory-size = 0 # Maximum total size of all channels, 0 for off
} }
client { client {

View file

@ -118,6 +118,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val SCALATEST_VERSION = "1.4-SNAPSHOT" lazy val SCALATEST_VERSION = "1.4-SNAPSHOT"
lazy val JETTY_VERSION = "7.2.2.v20101205" lazy val JETTY_VERSION = "7.2.2.v20101205"
lazy val JAVAX_SERVLET_VERSION = "3.0" lazy val JAVAX_SERVLET_VERSION = "3.0"
lazy val SLF4J_VERSION = "1.6.0"
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
// Dependencies // Dependencies
@ -157,8 +158,11 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD
lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.10" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.10" % "test" //ApacheV2
lazy val slf4j = "org.slf4j" % "slf4j-api" % "1.6.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.24"
// Test // Test
@ -177,13 +181,14 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_))
lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor)
lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm)
lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor)
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote) lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor)
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
lazy val akka_sbt_plugin = project("akka-sbt-plugin", "akka-sbt-plugin", new AkkaSbtPluginProject(_)) lazy val akka_sbt_plugin = project("akka-sbt-plugin", "akka-sbt-plugin", new AkkaSbtPluginProject(_))
lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor)
lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor)
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
// Miscellaneous // Miscellaneous
@ -296,12 +301,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile) override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile)
} }
// -------------------------------------------------------------------------------------------------------------------
// akka-testkit subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath)
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
// akka-stm subproject // akka-stm subproject
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
@ -360,6 +359,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val jetty = Dependencies.jetty val jetty = Dependencies.jetty
val jersey = Dependencies.jersey_server val jersey = Dependencies.jersey_server
val jsr311 = Dependencies.jsr311 val jsr311 = Dependencies.jsr311
val commons_codec = Dependencies.commons_codec
// testing // testing
val junit = Dependencies.junit val junit = Dependencies.junit
@ -420,6 +420,20 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
} }
} }
// -------------------------------------------------------------------------------------------------------------------
// akka-testkit subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath)
// -------------------------------------------------------------------------------------------------------------------
// akka-slf4j subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaSlf4jProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val sjson = Dependencies.slf4j
}
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
// Helpers // Helpers
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------