diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index fdc5601e4f..33ff34599b 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -19,7 +19,8 @@ import java.net.{InetAddress, UnknownHostException} * * @author Jonas Bonér */ -@serializable abstract class AkkaException(message: String) extends { +@serializable abstract class AkkaException(message: String = "") extends RuntimeException(message) { + import AkkaException._ val exceptionName = getClass.getName val uuid = "%s_%s".format(AkkaException.hostname, newUuid) } with RuntimeException(message) { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d9ee3ab41c..882331b177 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -76,163 +76,6 @@ class ActorKilledException private[akka](message: String) extends AkkaEx class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message) -/** - * Error handler. - * - * Create, add and remove a listener: - *
- * val errorHandlerEventListener = Actor.actorOf(new Actor {
- *   self.dispatcher = EventHandler.EventHandlerDispatcher
- *
- *   def receive = {
- *     case EventHandler.Error(cause, instance, message) => ...
- *     case EventHandler.Warning(instance, message) => ...
- *     case EventHandler.Info(instance, message) => ...
- *     case EventHandler.Debug(instance, message) => ...
- *   }
- * })
- *
- * EventHandler.addListener(errorHandlerEventListener)
- * ...
- * EventHandler.removeListener(errorHandlerEventListener)
- * 
- * - * Log an error event: - *
- * EventHandler.notify(EventHandler.Error(exception, this, message.toString))
- * 
- * Or use the direct methods (better performance): - *
- * EventHandler.error(exception, this, message.toString)
- * 
- * - * @author Jonas Bonér - */ -object EventHandler extends ListenerManagement { - import java.io.{StringWriter, PrintWriter} - import java.text.DateFormat - import java.util.Date - import akka.dispatch.Dispatchers - - val ErrorLevel = 1 - val WarningLevel = 2 - val InfoLevel = 3 - val DebugLevel = 4 - - sealed trait Event { - @transient val thread: Thread = Thread.currentThread - } - case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event - case class Warning(instance: AnyRef, message: String = "") extends Event - case class Info(instance: AnyRef, message: String = "") extends Event - case class Debug(instance: AnyRef, message: String = "") extends Event - - val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val warning = "[WARN] [%s] [%s] [%s] %s".intern - val info = "[INFO] [%s] [%s] [%s] %s".intern - val debug = "[DEBUG] [%s] [%s] [%s] %s".intern - val generic = "[GENERIC] [%s] [%s]".intern - val ID = "event:handler".intern - - val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build - - val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { - case "ERROR" => ErrorLevel - case "WARNING" => WarningLevel - case "INFO" => InfoLevel - case "DEBUG" => DebugLevel - case unknown => throw new ConfigurationException( - "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") - } - - def notify(event: => AnyRef) = notifyListeners(event) - - def notify[T <: Event : ClassManifest](event: => T) { - if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) - } - - def error(cause: Throwable, instance: AnyRef, message: => String) = { - if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message)) - } - - def warning(instance: AnyRef, message: => String) = { - if (level >= WarningLevel) notifyListeners(Warning(instance, message)) - } - - def info(instance: AnyRef, message: => String) = { - if (level >= InfoLevel) notifyListeners(Info(instance, message)) - } - - def debug(instance: AnyRef, message: => String) = { - if (level >= DebugLevel) notifyListeners(Debug(instance, message)) - } - - def formattedTimestamp = DateFormat.getInstance.format(new Date) - - def stackTraceFor(e: Throwable) = { - val sw = new StringWriter - val pw = new PrintWriter(sw) - e.printStackTrace(pw) - sw.toString - } - - private def levelFor(eventClass: Class[_ <: Event]) = { - if (eventClass.isInstanceOf[Error]) ErrorLevel - else if (eventClass.isInstanceOf[Warning]) WarningLevel - else if (eventClass.isInstanceOf[Info]) InfoLevel - else if (eventClass.isInstanceOf[Debug]) DebugLevel - else DebugLevel - } - - class DefaultListener extends Actor { - self.id = ID - self.dispatcher = EventHandlerDispatcher - - def receive = { - case event @ Error(cause, instance, message) => - println(error.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message, - stackTraceFor(cause))) - case event @ Warning(instance, message) => - println(warning.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event @ Info(instance, message) => - println(info.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event @ Debug(instance, message) => - println(debug.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event => - println(generic.format(formattedTimestamp, event.toString)) - } - } - - config.getList("akka.event-handlers") foreach { listenerName => - try { - ReflectiveAccess.getClassFor[Actor](listenerName) map { - clazz => addListener(Actor.actorOf(clazz).start) - } - } catch { - case e: Exception => - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]") - } - } -} - /** * This message is thrown by default when an Actors behavior doesn't match a message */ @@ -565,13 +408,16 @@ trait Actor { /** * Is the actor able to handle the message passed in as arguments? */ - def isDefinedAt(message: Any): Boolean = message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage => true - case msg if self.hotswap.nonEmpty && - self.hotswap.head.isDefinedAt(msg) => true - case msg if self.hotswap.isEmpty && - processingBehavior.isDefinedAt(msg) => true - case _ => false + def isDefinedAt(message: Any): Boolean = { + val behaviorStack = self.hotswap + message match { //Same logic as apply(msg) but without the unhandled catch-all + case l: AutoReceivedMessage => true + case msg if behaviorStack.nonEmpty && + behaviorStack.head.isDefinedAt(msg) => true + case msg if behaviorStack.isEmpty && + processingBehavior.isDefinedAt(msg) => true + case _ => false + } } /** @@ -596,13 +442,16 @@ trait Actor { // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] final def apply(msg: Any) = msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException? - case l: AutoReceivedMessage => autoReceiveMessage(l) - case msg if self.hotswap.nonEmpty && - self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) - case msg if self.hotswap.isEmpty && - processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) - case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior + private[akka] final def apply(msg: Any) = { + val behaviorStack = self.hotswap + msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException? + case l: AutoReceivedMessage => autoReceiveMessage(l) + case msg if behaviorStack.nonEmpty && + behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg) + case msg if behaviorStack.isEmpty && + processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) + case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior + } } private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { @@ -615,8 +464,8 @@ trait Actor { case Restart(reason) => throw reason case PoisonPill => val f = self.senderFuture - if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) self.stop + if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) } private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 32c03acf05..df29edd650 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -4,6 +4,7 @@ package akka.actor +import akka.event.EventHandler import akka.dispatch._ import akka.config.Config._ import akka.config.Supervision._ diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 01f4282874..cbda9d0af9 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -19,6 +19,7 @@ import scala.collection.JavaConversions import java.util.concurrent._ +import akka.event.EventHandler import akka.AkkaException object Scheduler { diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 1d9185e98d..7d50e59cd7 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -5,10 +5,6 @@ package akka.config import akka.AkkaException -import akka.actor.EventHandler - -import java.net.InetSocketAddress -import java.lang.reflect.Method class ConfigurationException(message: String) extends AkkaException(message) class ModuleNotAvailableException(message: String) extends AkkaException(message) @@ -35,10 +31,8 @@ object Config { envHome orElse systemHome } - val config = { - + val config: Configuration = try { val confName = { - val envConf = System.getenv("AKKA_MODE") match { case null | "" => None case value => Some(value) @@ -52,7 +46,7 @@ object Config { (envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf") } - try { + val newInstance = if (System.getProperty("akka.config", "") != "") { val configFile = System.getProperty("akka.config", "") println("Loading config from -Dakka.config=" + configFile) @@ -75,18 +69,23 @@ object Config { "\nUsing default values everywhere.") Configuration.fromString("akka {}") // default empty config } - } catch { - case e => - EventHandler.error(e, this, e.getMessage) - throw e - } + + val configVersion = newInstance.getString("akka.version", VERSION) + if (configVersion != VERSION) + throw new ConfigurationException( + "Akka JAR version [" + VERSION + "] is different than the provided config version [" + configVersion + "]") + + newInstance + } catch { + case e => + System.err.println("Couldn't parse config, fatal error.") + e.printStackTrace(System.err) + System.exit(-1) + throw e } val CONFIG_VERSION = config.getString("akka.version", VERSION) - if (VERSION != CONFIG_VERSION) throw new ConfigurationException( - "Akka JAR version [" + VERSION + "] is different than the provided config version [" + CONFIG_VERSION + "]") - val TIME_UNIT = config.getString("akka.time-unit", "seconds") val startTime = System.currentTimeMillis diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index fec6f04d45..72fbbaaeb2 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -7,7 +7,8 @@ package akka.dataflow import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} -import akka.actor.{Actor, ActorRef, EventHandler} +import akka.event.EventHandler +import akka.actor.{Actor, ActorRef} import akka.actor.Actor._ import akka.dispatch.CompletableFuture import akka.AkkaException @@ -65,6 +66,7 @@ object DataFlow { /** * @author Jonas Bonér */ + @deprecated("Superceeded by Future and CompletableFuture as of 1.1") sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { import DataFlowVariable._ diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index c15a26e00d..28c07c6af6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -4,7 +4,8 @@ package akka.dispatch -import akka.actor.{ActorRef, IllegalActorStateException, EventHandler} +import akka.event.EventHandler +import akka.actor.{ActorRef, IllegalActorStateException} import akka.util.{ReflectiveAccess, Switch} import java.util.Queue @@ -87,7 +88,7 @@ class ExecutorBasedEventDrivenDispatcher( def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - val name = "akka:event-driven:dispatcher:" + _name + val name = "akka:event-driven:dispatcher:" + _name private[akka] val threadFactory = new MonitorableThreadFactory(name) private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) @@ -208,20 +209,18 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => else { //But otherwise, if we are throttled, we need to do some book-keeping var processedMessages = 0 val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 - val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) + else 0 do { nextMessage.invoke - nextMessage = if (self.suspended.locked) { - null //If we are suspended, abort - } - else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries + null // If we are suspended, abort + } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort - else - self.dequeue //Dequeue the next message + else self.dequeue //Dequeue the next message } } while (nextMessage ne null) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2bdce31f6f..ba0b7b83ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -5,7 +5,8 @@ package akka.dispatch import akka.AkkaException -import akka.actor.{Actor, EventHandler} +import akka.event.EventHandler +import akka.actor.Actor import akka.routing.Dispatcher import akka.japi.{ Procedure, Function => JFunc } @@ -19,50 +20,48 @@ class FutureTimeoutException(message: String) extends AkkaException(message) object Futures { + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T]): Future[T] = Future(body.call) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long): Future[T] = Future(body.call, timeout) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call, timeout)(dispatcher) - /** - * (Blocking!) - */ - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await - /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { - val futureResult = new DefaultCompletableFuture[Any](timeout) + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { + val futureResult = new DefaultCompletableFuture[T](timeout) - val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]]) + val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _) for(f <- futures) f onComplete completeFirst futureResult } /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + * Java API + * Returns a Future to the result of the first future in the list that is completed */ - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException + def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = + firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) /** * A non-blocking fold over the specified futures. @@ -104,6 +103,16 @@ object Futures { } } + /** + * Java API + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + */ + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ @@ -128,6 +137,13 @@ object Futures { } } + /** + * Java API + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + */ + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = + reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -139,9 +155,42 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) + + //Deprecations + + + /** + * (Blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") + def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) + + /** + * Returns the First Future that is completed (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await + + + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") + def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { + /** + * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body + * The execution is performed by the specified Dispatcher. + */ def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) @@ -150,6 +199,20 @@ object Future { } sealed trait Future[+T] { + + /** + * Returns the result of this future after waiting for it to complete, + * this method will throw any throwable that this Future was completed with + * and will throw a java.util.concurrent.TimeoutException if there is no result + * within the Futures timeout + */ + def apply(): T = this.await.resultOrException.get + + /** + * Java API for apply() + */ + def get: T = apply() + /** * Blocks the current thread until the Future has been completed or the * timeout has expired. In the case of the timeout expiring a @@ -206,7 +269,7 @@ sealed trait Future[+T] { * * Equivalent to calling future.await.value. */ - def awaitResult: Option[Either[Throwable, T]] + def awaitValue: Option[Either[Throwable, T]] /** * Returns the result of the Future if one is available within the specified @@ -215,7 +278,7 @@ sealed trait Future[+T] { * returns None if no result, Some(Right(t)) if a result, or * Some(Left(error)) if there was an exception */ - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] /** * Returns the contained exception of this Future if it exists. @@ -390,13 +453,43 @@ sealed trait Future[+T] { * Essentially this is the Promise (or write-side) of a Future (read-side) */ trait CompletableFuture[T] extends Future[T] { + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ def complete(value: Either[Throwable, T]): CompletableFuture[T] + + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + + /** + * Completes this Future with the specified exception, if not already completed, + * returns this + */ final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + + /** + * Completes this Future with the specified other Future, when that Future is completed, + * unless this Future has already been completed + * returns this + */ final def completeWith(other: Future[T]): CompletableFuture[T] = { other onComplete { f => complete(f.value.get) } this } + + /** + * Alias for complete(Right(value)) + */ + final def << (value: T): CompletableFuture[T] = complete(Right(value)) + + /** + * Alias for completeWith(other) + */ + final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) } /** @@ -431,7 +524,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def awaitResult: Option[Either[Throwable, T]] = { + def awaitValue: Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) @@ -441,7 +534,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) @@ -536,10 +629,10 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte def complete(value: Either[Throwable, T]): CompletableFuture[T] = this def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } - def awaitResult: Option[Either[Throwable, T]] = value - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def awaitValue: Option[Either[Throwable, T]] = value + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value def await : Future[T] = this def awaitBlocking : Future[T] = this - def isExpired: Boolean = false + def isExpired: Boolean = true def timeoutInNanos: Long = 0 } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index d12ad7463f..b319d8ac87 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -6,6 +6,7 @@ package akka.dispatch import java.util.concurrent._ import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} +import akka.event.EventHandler import akka.config.Configuration import akka.config.Config.TIME_UNIT import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess} @@ -43,7 +44,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () = object MessageDispatcher { val UNSCHEDULED = 0 - val SCHEDULED = 1 + val SCHEDULED = 1 val RESCHEDULED = 2 implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher @@ -55,10 +56,10 @@ object MessageDispatcher { trait MessageDispatcher { import MessageDispatcher._ - protected val uuids = new ConcurrentSkipListSet[Uuid] + protected val uuids = new ConcurrentSkipListSet[Uuid] protected val futures = new ConcurrentSkipListSet[Uuid] - protected val guard = new ReentrantGuard - protected val active = new Switch(false) + protected val guard = new ReentrantGuard + protected val active = new Switch(false) private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 7e15ed69c3..31d5dca0eb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration -import akka.actor.EventHandler +import akka.event.EventHandler object ThreadPoolConfig { type Bounds = Int diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala new file mode 100644 index 0000000000..d4fc55b0a9 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.event + +import akka.actor._ +import Actor._ +import akka.dispatch._ +import akka.config.Config._ +import akka.config.ConfigurationException +import akka.util.{ListenerManagement, ReflectiveAccess} +import akka.AkkaException + +/** + * Event handler. + *

+ * Create, add and remove a listener: + *

+ * val eventHandlerListener = Actor.actorOf(new Actor {
+ *   self.dispatcher = EventHandler.EventHandlerDispatcher
+ *
+ *   def receive = {
+ *     case EventHandler.Error(cause, instance, message) => ...
+ *     case EventHandler.Warning(instance, message)      => ...
+ *     case EventHandler.Info(instance, message)         => ...
+ *     case EventHandler.Debug(instance, message)        => ...
+ *     case genericEvent                                 => ... 
+ *   }
+ * })
+ *
+ * EventHandler.addListener(eventHandlerListener)
+ * ...
+ * EventHandler.removeListener(eventHandlerListener)
+ * 
+ *

+ * However best is probably to register the listener in the 'akka.conf' + * configuration file. + *

+ * Log an error event: + *

+ * EventHandler.notify(EventHandler.Error(exception, this, message.toString))
+ * 
+ * Or use the direct methods (better performance): + *
+ * EventHandler.error(exception, this, message.toString)
+ * 
+ * + * @author Jonas Bonér + */ +object EventHandler extends ListenerManagement { + import java.io.{StringWriter, PrintWriter} + import java.text.DateFormat + import java.util.Date + import akka.dispatch.Dispatchers + + val ErrorLevel = 1 + val WarningLevel = 2 + val InfoLevel = 3 + val DebugLevel = 4 + + sealed trait Event { + @transient val thread: Thread = Thread.currentThread + } + case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event + case class Warning(instance: AnyRef, message: String = "") extends Event + case class Info(instance: AnyRef, message: String = "") extends Event + case class Debug(instance: AnyRef, message: String = "") extends Event + + val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warning = "[WARN] [%s] [%s] [%s] %s".intern + val info = "[INFO] [%s] [%s] [%s] %s".intern + val debug = "[DEBUG] [%s] [%s] [%s] %s".intern + val generic = "[GENERIC] [%s] [%s]".intern + val ID = "event:handler".intern + + class EventHandlerException extends AkkaException + + lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build + + val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { + case "ERROR" => ErrorLevel + case "WARNING" => WarningLevel + case "INFO" => InfoLevel + case "DEBUG" => DebugLevel + case unknown => throw new ConfigurationException( + "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") + } + + def notify(event: => AnyRef) = notifyListeners(event) + + def notify[T <: Event : ClassManifest](event: => T) { + if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) + } + + def error(cause: Throwable, instance: AnyRef, message: => String) = { + if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message)) + } + + def error(instance: AnyRef, message: => String) = { + if (level >= ErrorLevel) notifyListeners(Error(new EventHandlerException, instance, message)) + } + + def warning(instance: AnyRef, message: => String) = { + if (level >= WarningLevel) notifyListeners(Warning(instance, message)) + } + + def info(instance: AnyRef, message: => String) = { + if (level >= InfoLevel) notifyListeners(Info(instance, message)) + } + + def debug(instance: AnyRef, message: => String) = { + if (level >= DebugLevel) notifyListeners(Debug(instance, message)) + } + + def formattedTimestamp = DateFormat.getInstance.format(new Date) + + def stackTraceFor(e: Throwable) = { + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString + } + + private def levelFor(eventClass: Class[_ <: Event]) = { + if (eventClass.isInstanceOf[Error]) ErrorLevel + else if (eventClass.isInstanceOf[Warning]) WarningLevel + else if (eventClass.isInstanceOf[Info]) InfoLevel + else if (eventClass.isInstanceOf[Debug]) DebugLevel + else DebugLevel + } + + class DefaultListener extends Actor { + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + case event @ Error(cause, instance, message) => + println(error.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message, + stackTraceFor(cause))) + case event @ Warning(instance, message) => + println(warning.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event @ Info(instance, message) => + println(info.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event @ Debug(instance, message) => + println(debug.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event => + println(generic.format(formattedTimestamp, event.toString)) + } + } + + config.getList("akka.event-handlers") foreach { listenerName => + try { + ReflectiveAccess.getClassFor[Actor](listenerName) map { + clazz => addListener(Actor.actorOf(clazz).start) + } + } catch { + case e: Exception => + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + listenerName + + "] due to [" + e.toString + "]") + } + } +} diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 4454ed117a..20cd33b311 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -7,6 +7,13 @@ trait Function[T,R] { def apply(param: T): R } +/** + * A Function interface. Used to create 2-arg first-class-functions is Java (sort of). + */ +trait Function2[T1, T2, R] { + def apply(arg1: T1, arg2: T2): R +} + /** A Procedure is like a Function, but it doesn't produce a return value */ trait Procedure[T] { diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 055fdab3b0..bf72714c33 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -6,7 +6,7 @@ package akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.atomic. {AtomicBoolean} -import akka.actor.EventHandler +import akka.event.EventHandler /** * @author Jonas Bonér diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 7b9590746f..41d1106818 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -11,6 +11,7 @@ import akka.AkkaException import java.net.InetSocketAddress import akka.remoteinterface.RemoteSupport import akka.actor._ +import akka.event.EventHandler /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -33,25 +34,34 @@ object ReflectiveAccess { * @author Jonas Bonér */ object Remote { - val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.netty.NettyRemoteSupport") + val TRANSPORT = Config.config.getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport") private[akka] val configDefaultAddress = new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), Config.config.getInt("akka.remote.server.port", 2552)) - lazy val isEnabled = remoteSupportClass.isDefined - def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( - "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") - + def ensureEnabled = if (!isEnabled) { + val e = new ModuleNotAvailableException( + "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") + EventHandler.warning(this, e.toString) + throw e + } val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) - protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { - remoteClass => () => createInstance[RemoteSupport](remoteClass,Array[Class[_]](),Array[AnyRef]()). - getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ - remoteClass.getName+ - ", make sure that akka-remote.jar is on the classpath")) + protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = + remoteSupportClass map { remoteClass => + () => createInstance[RemoteSupport]( + remoteClass, + Array[Class[_]](), + Array[AnyRef]() + ) getOrElse { + val e = new ModuleNotAvailableException( + "Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName)) + EventHandler.warning(this, e.toString) + throw e + } } } @@ -125,6 +135,7 @@ object ReflectiveAccess { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e: Exception => + EventHandler.warning(this, e.toString) None } @@ -143,6 +154,7 @@ object ReflectiveAccess { } } catch { case e: Exception => + EventHandler.warning(this, e.toString) None } @@ -155,33 +167,58 @@ object ReflectiveAccess { case None => None } } catch { - case ei: ExceptionInInitializerError => - throw ei + case e: ExceptionInInitializerError => + EventHandler.warning(this, e.toString) + throw e } def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Option[Class[T]] = { assert(fqn ne null) - val first = try { Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //First, use the specified CL + // First, use the specified CL + val first = try { + Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } - if (first.isDefined) - first - else { //Second option is to use the ContextClassLoader - val second = try { Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } - if (second.isDefined) - second + if (first.isDefined) first + else { + // Second option is to use the ContextClassLoader + val second = try { + Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } + + if (second.isDefined) second else { val third = try { - if(classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]]) //Don't try to use "loader" if we got the default "classloader" parameter + // Don't try to use "loader" if we got the default "classloader" parameter + if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]]) else None - } catch { case c: ClassNotFoundException => None } + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } - if (third.isDefined) - third - else - try { Option(Class.forName(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //Last option is Class.forName + if (third.isDefined) third + else { + // Last option is Class.forName + try { + Option(Class.forName(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } + } } } - } } diff --git a/akka-actor/src/test/scala/akka/Testing.scala b/akka-actor/src/test/scala/akka/Testing.scala new file mode 100644 index 0000000000..afc0c4a05a --- /dev/null +++ b/akka-actor/src/test/scala/akka/Testing.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka + +/** + * Multiplying numbers used in test timeouts by a factor, set by system property. + * Useful for Jenkins builds (where the machine may need more time). + */ +object Testing { + val timeFactor: Double = { + val factor = System.getProperty("akka.test.timefactor", "1.0") + try { + factor.toDouble + } catch { + case e: java.lang.NumberFormatException => 1.0 + } + } + + def time(t: Int): Int = (timeFactor * t).toInt + def time(t: Long): Long = (timeFactor * t).toLong + def time(t: Float): Float = (timeFactor * t).toFloat + def time(t: Double): Double = timeFactor * t +} diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 0f9debe5b0..1eef7f068c 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -8,6 +8,8 @@ import org.junit.Test import akka.dispatch.Dispatchers import Actor._ +import akka.Testing + object ActorFireForgetRequestReplySpec { class ReplyActor extends Actor { @@ -85,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite { actor ! "Die" try { state.finished.await(10L, TimeUnit.SECONDS) } catch { case e: TimeoutException => fail("Never got the message") } - Thread.sleep(100) + Thread.sleep(Testing.time(500)) assert(actor.isShutdown) } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala index 7fea2a78d3..9e5fba863e 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala @@ -77,8 +77,8 @@ class ReceiveTimeoutSpec extends JUnitSuite { protected def receive = { case Tick => () case ReceiveTimeout => - timeoutLatch.open count.incrementAndGet + timeoutLatch.open self.receiveTimeout = None } }).start diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala index a930b86784..741cd7a49e 100644 --- a/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala @@ -260,11 +260,23 @@ class RestartStrategySpec extends JUnitSuite { // now crash again... should not restart slave ! Crash - slave ! Ping + + // may not be running + try { + slave ! Ping + } catch { + case e: ActorInitializationException => () + } assert(countDownLatch.await(1, TimeUnit.SECONDS)) - slave ! Crash + // may not be running + try { + slave ! Crash + } catch { + case e: ActorInitializationException => () + } + assert(stopLatch.tryAwait(1, TimeUnit.SECONDS)) assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS)) diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index c7bdd61241..55c2e001af 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -105,8 +105,8 @@ object ActorModelSpec { } private[akka] abstract override def dispatch(invocation: MessageInvocation) { - super.dispatch(invocation) getStats(invocation.receiver).msgsReceived.incrementAndGet() + super.dispatch(invocation) } private[akka] abstract override def start { diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index dfd94b40b5..f99f5f5305 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -277,14 +277,17 @@ class FutureSpec extends JUnitSuite { } @Test def resultWithinShouldNotThrowExceptions { + val latch = new StandardLatch + val actors = (1 to 10).toList map { _ => actorOf(new Actor { - def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add } + def receive = { case (add: Int, wait: Boolean, latch: StandardLatch) => if (wait) latch.await; self reply_? add } }).start } - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } - val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, idx >= 5, latch)) } + val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS) + latch.open val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } val errors = result collect { case Some(Left(t)) => t } @@ -324,4 +327,39 @@ class FutureSpec extends JUnitSuite { // make sure all futures are completed in dispatcher assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) } + + @Test def shouldBlockUntilResult { + val latch = new StandardLatch + + val f = Future({ latch.await; 5}) + val f2 = Future({ f() + 5 }) + + assert(f2.resultOrException === None) + latch.open + assert(f2() === 10) + + val f3 = Future({ Thread.sleep(100); 5}, 10) + intercept[FutureTimeoutException] { + f3() + } + } + + @Test def lesslessIsMore { + import akka.actor.Actor.spawn + val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue) + val begin, end = new StandardLatch + spawn { + begin.await + dataflowVar2 << dataflowVar + end.open + } + + spawn { + dataflowVar << 5 + } + begin.open + end.await + assert(dataflowVar2() === 5) + assert(dataflowVar.get === 5) + } } diff --git a/akka-http/src/main/scala/akka/http/ListWriter.scala b/akka-http/src/main/scala/akka/http/ListWriter.scala deleted file mode 100644 index 3f1123d4d8..0000000000 --- a/akka-http/src/main/scala/akka/http/ListWriter.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.http - -import akka.serialization.Serializer - -import java.io.OutputStream -import javax.ws.rs.core.{MultivaluedMap, MediaType} -import javax.ws.rs.ext.{MessageBodyWriter, Provider} -import javax.ws.rs.Produces - -/** - * Writes Lists of JSON serializable objects. - */ -@Provider -@Produces(Array("application/json")) -class ListWriter extends MessageBodyWriter[List[_]] { - - def isWriteable(aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType) = - classOf[List[_]].isAssignableFrom(aClass) || aClass == ::.getClass - - def getSize(list: List[_], - aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType) = - -1L - - def writeTo(list: List[_], - aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType, - stringObjectMultivaluedMap: MultivaluedMap[String, Object], - outputStream: OutputStream): Unit = - if (list.isEmpty) outputStream.write(" ".getBytes) - else outputStream.write(Serializer.ScalaJSON.toBinary(list)) -} diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 6a7adbe2cf..eb91b9737f 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -5,7 +5,7 @@ package akka.http import akka.actor.{ActorRegistry, ActorRef, Actor} -import akka.actor.EventHandler +import akka.event.EventHandler import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServlet diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index 6ce3d1041c..19a29f46cc 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -7,7 +7,7 @@ package akka.http import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; import Types._ -import akka.actor.EventHandler +import akka.event.EventHandler /** * @author Garrick Evans diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index 9f16d54886..dce249de46 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -23,7 +23,7 @@ package akka.security import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} -import akka.actor.EventHandler +import akka.event.EventHandler import akka.actor.Actor._ import akka.config.Config diff --git a/akka-http/src/main/scala/akka/servlet/Initializer.scala b/akka-http/src/main/scala/akka/servlet/Initializer.scala deleted file mode 100644 index a259a7fd34..0000000000 --- a/akka-http/src/main/scala/akka/servlet/Initializer.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.servlet - -import akka.remote.BootableRemoteActorService -import akka.actor.BootableActorLoaderService -import akka.config.Config -import akka.util.{ Bootable, AkkaLoader } - -import javax.servlet.{ServletContextListener, ServletContextEvent} - - /** - * This class can be added to web.xml mappings as a listener to start and postStop Akka. - * - * - * ... - * - * akka.servlet.Initializer - * - * ... - * - */ -class Initializer extends ServletContextListener { - lazy val loader = new AkkaLoader - - def contextDestroyed(e: ServletContextEvent): Unit = - loader.shutdown - - def contextInitialized(e: ServletContextEvent): Unit = - loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService) - } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index ee4e5cf809..9fa9d1b5c0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -44,4 +44,24 @@ object RemoteServerSettings { } val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) + + val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT) + + val EXECUTION_POOL_SIZE = { + val sz = config.getInt("akka.remote.server.execution-pool-size",16) + if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") + sz + } + + val MAX_CHANNEL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") + sz + } + + val MAX_TOTAL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-total-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") + sz + } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 893b22b059..b515b6706f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -13,13 +13,14 @@ import akka.serialization.RemoteActorSerialization._ import akka.japi.Creator import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{PoisonPill, EventHandler, Index, +import akka.actor.{PoisonPill, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException +import akka.event.EventHandler import akka.actor.Actor._ import akka.util._ import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} @@ -33,6 +34,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } +import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler @@ -80,8 +82,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { - loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY - val key = Address(address) lock.readLock.lock try { @@ -216,15 +216,13 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //We don't care about that right now - } else if (!future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + throw future.getCause + } + None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -237,7 +235,9 @@ abstract class RemoteClient private[akka] ( futures.remove(futureUuid) //Clean this up //We don't care about that right now } else if (!future.isSuccess) { - futures.remove(futureUuid) //Clean this up + val f = futures.remove(futureUuid) //Clean this up + if (f ne null) + f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } } @@ -753,9 +753,17 @@ class RemoteServerPipelineFactory( case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case _ => (Nil, Nil) } - + val execution = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor( + EXECUTION_POOL_SIZE, + MAX_CHANNEL_MEMORY_SIZE, + MAX_TOTAL_MEMORY_SIZE, + EXECUTION_POOL_KEEPALIVE.length, + EXECUTION_POOL_KEEPALIVE.unit + ) + ) val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -856,8 +864,6 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - //FIXME we should definitely spawn off this in a thread pool or something, - // potentially using Actor.spawn or something similar request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) diff --git a/akka-remote/src/test/scala/config/ConfigSpec.scala b/akka-remote/src/test/scala/config/ConfigSpec.scala index 7ba5193f6f..e37edcfc34 100644 --- a/akka-remote/src/test/scala/config/ConfigSpec.scala +++ b/akka-remote/src/test/scala/config/ConfigSpec.scala @@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.remote.ssl.debug") must equal(None) getBool("akka.remote.ssl.service") must equal(None) getInt("akka.remote.zlib-compression-level") must equal(Some(6)) + getInt("akka.remote.server.execution-pool-size") must equal(Some(16)) + getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60)) + getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0)) + getInt("akka.remote.server.max-total-memory-size") must equal(Some(0)) } } } diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index b72c49c204..c91565eec7 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -10,6 +10,8 @@ import akka.actor._ import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} +import akka.Testing + object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] val oneWayLog = new LinkedBlockingQueue[String] @@ -37,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest { classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], Permanent, - 10000, + Testing.time(10000), RemoteAddress(host,port)), new SuperviseTypedActor( classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwoImpl], Permanent, - 10000, + Testing.time(10000), RemoteAddress(host,port)) ).toArray).supervise } diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 7d5524bcd2..2eec948698 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -243,7 +243,7 @@ class MyStatelessActor extends Actor { class MyStatelessActorWithMessagesInMailbox extends Actor { def receive = { case "hello" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } @@ -263,7 +263,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { class MyActorWithSerializableMessages extends Actor { def receive = { case MyMessage(s, t) => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala index f22c876808..a6193d9914 100644 --- a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -117,7 +117,7 @@ class MyStatefulActor extends Actor { def receive = { case "hi" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello" => count = count + 1 diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala index 79b14d2e78..1eb38d6f0f 100644 --- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala +++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala @@ -67,7 +67,7 @@ object World { lazy val ants = setup lazy val evaporator = actorOf[Evaporator].start - private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false) + private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot") def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) } @@ -138,7 +138,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor { val locRef = Ref(initLoc) val name = "ant-from-" + initLoc._1 + "-" + initLoc._2 - implicit val txFactory = TransactionFactory(familyName = name, hooks = false) + implicit val txFactory = TransactionFactory(familyName = name) val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1)) val foraging = (p: Place) => p.pher + p.food @@ -210,7 +210,7 @@ class Evaporator extends WorldActor { import Config._ import World._ - implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false) + implicit val txFactory = TransactionFactory(familyName = "evaporator") val evaporate = (pher: Float) => pher * EvapRate def act = for (x <- 0 until Dim; y <- 0 until Dim) { diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala new file mode 100644 index 0000000000..fa1d4e713e --- /dev/null +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.event.slf4j + +import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory} + +import akka.event.EventHandler +import akka.actor._ +import Actor._ + +/** + * Base trait for all classes that wants to be able use the SLF4J logging infrastructure. + * + * @author Jonas Bonér + */ +trait Logging { + @transient lazy val log = Logger(this.getClass.getName) +} + +object Logger { + def apply(logger: String) : SLFLogger = SLFLoggerFactory getLogger logger + def apply(clazz: Class[_]): SLFLogger = apply(clazz.getName) + def root : SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) +} + +/** + * SLF4J Event Handler. + * + * @author Jonas Bonér + */ +class Slf4jEventHandler extends Actor with Logging { + import EventHandler._ + + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + case Error(cause, instance, message) => + log.error("\n\t[{}]\n\t[{}]\n\t[{}]", + Array[AnyRef](instance.getClass.getName, message, stackTraceFor(cause))) + + case Warning(instance, message) => + log.warn("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case Info(instance, message) => + log.info("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case Debug(instance, message) => + log.debug("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case event => log.debug("\n\t[{}]", event.toString) + } +} + + diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index d6ef09bccd..74b1bf5a9e 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -8,6 +8,13 @@ import akka.actor.{newUuid, Uuid} import org.multiverse.transactional.refs.BasicRef +/** + * Common trait for all the transactional objects. + */ +@serializable trait Transactional { + val uuid: String +} + /** * Transactional managed reference. See the companion class for more information. */ diff --git a/akka-stm/src/main/scala/akka/stm/Stm.scala b/akka-stm/src/main/scala/akka/stm/Stm.scala index be511f0e2c..6e949b1ded 100644 --- a/akka-stm/src/main/scala/akka/stm/Stm.scala +++ b/akka-stm/src/main/scala/akka/stm/Stm.scala @@ -48,10 +48,7 @@ trait Stm { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - factory.addHooks - body - } + def call(mtx: MultiverseTransaction): T = body }) } } diff --git a/akka-stm/src/main/scala/akka/stm/Transaction.scala b/akka-stm/src/main/scala/akka/stm/Transaction.scala deleted file mode 100644 index b2f0caaf07..0000000000 --- a/akka-stm/src/main/scala/akka/stm/Transaction.scala +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.stm - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable.HashMap - -import akka.util.ReflectiveAccess -import akka.config.Config._ -import akka.config.ModuleNotAvailableException -import akka.AkkaException - -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} -import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.api.{PropagationLevel => MultiversePropagationLevel} -import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} - -class NoTransactionInScopeException extends AkkaException("No transaction in scope") -class TransactionRetryException(message: String) extends AkkaException(message) -class StmConfigurationException(message: String) extends AkkaException(message) - - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -object TransactionManagement extends TransactionManagement { - private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { - override protected def initialValue: Option[Transaction] = None - } - - private[akka] def getTransaction: Transaction = { - val option = transaction.get - if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") - option.get - } -} - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -trait TransactionManagement { - private[akka] def setTransaction(tx: Option[Transaction]) = - if (tx.isDefined) TransactionManagement.transaction.set(tx) - - private[akka] def clearTransaction = { - TransactionManagement.transaction.set(None) - setThreadLocalTransaction(null) - } - - private[akka] def getTransactionInScope = TransactionManagement.getTransaction - - private[akka] def isTransactionInScope = { - val option = TransactionManagement.transaction.get - (option ne null) && option.isDefined - } -} - -object Transaction { - val idFactory = new AtomicLong(-1L) - - /** - * Attach an Akka-specific Transaction to the current Multiverse transaction. - * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks - */ - private[akka] def attach = { - val mtx = getRequiredThreadLocalTransaction - val tx = new Transaction - tx.begin - tx.transaction = Some(mtx) - TransactionManagement.transaction.set(Some(tx)) - mtx.registerLifecycleListener(new TransactionLifecycleListener() { - def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match { - case TransactionLifecycleEvent.PostCommit => tx.commitJta - case TransactionLifecycleEvent.PreCommit => tx.commitPersistentState - case TransactionLifecycleEvent.PostAbort => tx.abort - case _ => {} - } - }) - } -} - -/** - * The Akka-specific Transaction class. - * For integration with persistence modules and JTA support. - */ -@serializable class Transaction { - val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) - val STATE_RETRIES = config.getInt("akka.storage.max-retries",10) - - val id = Transaction.idFactory.incrementAndGet - @volatile private[this] var status: TransactionStatus = TransactionStatus.New - private[akka] var transaction: Option[MultiverseTransaction] = None - private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] - private[akka] val depth = new AtomicInteger(0) - - val jta: Option[ReflectiveJtaModule.TransactionContainer] = - if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer) - else None - - // --- public methods --------- - - def begin = synchronized { - jta.foreach { _.beginWithStmSynchronization(this) } - } - - def commitPersistentState = synchronized { - retry(STATE_RETRIES){ - persistentStateMap.valuesIterator.foreach(_.commit) - persistentStateMap.clear - } - status = TransactionStatus.Completed - } - - def commitJta = synchronized { - jta.foreach(_.commit) - } - - def abort = synchronized { - jta.foreach(_.rollback) - persistentStateMap.valuesIterator.foreach(_.abort) - persistentStateMap.clear - } - - def retry(tries:Int)(block: => Unit):Unit={ - if(tries==0){ - throw new TransactionRetryException("Exhausted Retries while committing persistent state") - } - try{ - block - } catch{ - case e:Exception=>{ - retry(tries-1){block} - } - } - } - - def isNew = synchronized { status == TransactionStatus.New } - - def isActive = synchronized { status == TransactionStatus.Active } - - def isCompleted = synchronized { status == TransactionStatus.Completed } - - def isAborted = synchronized { status == TransactionStatus.Aborted } - - // --- internal methods --------- - - //private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE - - private[akka] def status_? = status - - private[akka] def increment = depth.incrementAndGet - - private[akka] def decrement = depth.decrementAndGet - - private[akka] def isTopLevel = depth.get == 0 - //when calling this method, make sure to prefix the uuid with the type so you - //have no possibility of kicking a diffferent type with the same uuid out of a transction - private[akka] def register(uuid: String, storage: Committable with Abortable) = { - if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){ - throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid)) - } - } - - private def ensureIsActive = if (status != TransactionStatus.Active) - throw new StmConfigurationException( - "Expected ACTIVE transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrAborted = - if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new StmConfigurationException( - "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrNew = - if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new StmConfigurationException( - "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - - override def equals(that: Any): Boolean = synchronized { - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id - } - - override def hashCode: Int = synchronized { id.toInt } - - override def toString = synchronized { "Transaction[" + id + ", " + status + "]" } -} - -@serializable sealed abstract class TransactionStatus - -object TransactionStatus { - case object New extends TransactionStatus - case object Active extends TransactionStatus - case object Aborted extends TransactionStatus - case object Completed extends TransactionStatus -} - -/** - * Common trait for all the transactional objects: - * Ref, TransactionalMap, TransactionalVector, - * PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet - */ -@serializable trait Transactional { - val uuid: String -} - -/** - * Used for integration with the persistence modules. - */ -trait Committable { - def commit(): Unit -} - -/** - * Used for integration with the persistence modules. - */ -trait Abortable { - def abort(): Unit -} - -/** - * Used internally for reflective access to the JTA module. - * Allows JTA integration to work when akka-jta.jar is on the classpath. - */ -object ReflectiveJtaModule { - type TransactionContainerObject = { - def apply(): TransactionContainer - } - - type TransactionContainer = { - def beginWithStmSynchronization(transaction: Transaction): Unit - def commit: Unit - def rollback: Unit - } - - lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined - - def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException( - "Can't load the JTA module, make sure that akka-jta.jar is on the classpath") - - val transactionContainerObjectInstance: Option[TransactionContainerObject] = - ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$") - - def createTransactionContainer: TransactionContainer = { - ensureJtaEnabled - transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer] - } -} diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala index 0bb0caa494..d04e017a6b 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala @@ -32,7 +32,6 @@ object TransactionConfig { val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true) val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires")) val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none")) - val HOOKS = config.getBool("akka.stm.hooks", true) val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) @@ -65,7 +64,6 @@ object TransactionConfig { * @param quickRelease Whether locks should be released as quickly as possible (before whole commit). * @param propagation For controlling how nested transactions behave. * @param traceLevel Transaction trace level. - * @param hooks Whether hooks for persistence modules and JTA should be added to the transaction. */ def apply(familyName: String = FAMILY_NAME, readonly: JBoolean = READONLY, @@ -78,10 +76,9 @@ object TransactionConfig { speculative: Boolean = SPECULATIVE, quickRelease: Boolean = QUICK_RELEASE, propagation: MPropagation = PROPAGATION, - traceLevel: MTraceLevel = TRACE_LEVEL, - hooks: Boolean = HOOKS) = { + traceLevel: MTraceLevel = TRACE_LEVEL) = { new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } } @@ -100,7 +97,6 @@ object TransactionConfig { *

quickRelease - Whether locks should be released as quickly as possible (before whole commit). *

propagation - For controlling how nested transactions behave. *

traceLevel - Transaction trace level. - *

hooks - Whether hooks for persistence modules and JTA should be added to the transaction. */ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, val readonly: JBoolean = TransactionConfig.READONLY, @@ -113,8 +109,7 @@ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY val speculative: Boolean = TransactionConfig.SPECULATIVE, val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, val propagation: MPropagation = TransactionConfig.PROPAGATION, - val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - val hooks: Boolean = TransactionConfig.HOOKS) + val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) object DefaultTransactionConfig extends TransactionConfig @@ -137,11 +132,10 @@ object TransactionFactory { speculative: Boolean = TransactionConfig.SPECULATIVE, quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, propagation: MPropagation = TransactionConfig.PROPAGATION, - traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - hooks: Boolean = TransactionConfig.HOOKS) = { + traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } @@ -199,8 +193,6 @@ class TransactionFactory( } val boilerplate = new TransactionBoilerplate(factory) - - def addHooks = if (config.hooks) Transaction.attach } /** diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala index 0765652c6a..b71f68c375 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala @@ -27,7 +27,6 @@ class TransactionConfigBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -41,11 +40,10 @@ class TransactionConfigBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } /** @@ -64,7 +62,6 @@ class TransactionFactoryBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -78,12 +75,11 @@ class TransactionFactoryBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 2de7747607..8ce08cf624 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -129,7 +129,6 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { - factory.addHooks val result = body val timeout = factory.config.timeout barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 69f76eb89b..4108a99d63 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -20,9 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) - getBool("akka.stm.hooks") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) - getBool("akka.stm.jta-aware") must equal(Some(false)) getInt("akka.stm.max-retries") must equal(Some(1000)) getString("akka.stm.propagation") must equal(Some("requires")) getBool("akka.stm.quick-release") must equal(Some(true)) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 131c18b279..ce198be6bf 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,6 +1,7 @@ package akka.testkit -import akka.actor.{ActorRef, EventHandler} +import akka.event.EventHandler +import akka.actor.ActorRef import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation} import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 458f937e5e..342f8e6316 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -12,7 +12,7 @@ akka { time-unit = "seconds" # Time unit for all timeout properties throughout the config - event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) + event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up @@ -71,9 +71,6 @@ akka { quick-release = true propagation = "requires" trace-level = "none" - hooks = true - jta-aware = off # Option 'on' means that if there JTA Transaction Manager available then the STM will - # begin (or join), commit or rollback the JTA transaction. Default is 'off'. } jta { @@ -142,6 +139,10 @@ akka { require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. backlog = 4096 # Sets the size of the connection backlog + execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-size = 16# Size of the core pool of the remote execution unit + max-channel-memory-size = 0 # Maximum channel size, 0 for off + max-total-memory-size = 0 # Maximum total size of all channels, 0 for off } client { diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e42fa407e8..8d30289365 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -70,16 +70,16 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { - lazy val LocalMavenRepo = MavenRepository("Local Maven Repo", (Path.userHome / ".m2" / "repository").asURL.toString) - lazy val AkkaRepo = MavenRepository("Akka Repository", "http://akka.io/repository") - lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") - lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") - lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") - lazy val GlassfishRepo = MavenRepository("Glassfish Repo", "http://download.java.net/maven/glassfish") - lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases") - lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo") + lazy val LocalMavenRepo = MavenRepository("Local Maven Repo", (Path.userHome / ".m2" / "repository").asURL.toString) + lazy val AkkaRepo = MavenRepository("Akka Repository", "http://akka.io/repository") + lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") + lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") + lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") + lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") + lazy val GlassfishRepo = MavenRepository("Glassfish Repo", "http://download.java.net/maven/glassfish") + lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases") + lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo") lazy val ScalaToolsSnapshotRepo = MavenRepository("Scala-Tools Snapshot Repo", "http://scala-tools.org/repo-snapshots") } @@ -118,6 +118,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SCALATEST_VERSION = "1.4-SNAPSHOT" lazy val JETTY_VERSION = "7.2.2.v20101205" lazy val JAVAX_SERVLET_VERSION = "3.0" + lazy val SLF4J_VERSION = "1.6.0" // ------------------------------------------------------------------------------------------------------------------- // Dependencies @@ -157,8 +158,11 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD - lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 - lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 + lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.10" % "compile" //ApacheV2 + lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.10" % "test" //ApacheV2 + + lazy val slf4j = "org.slf4j" % "slf4j-api" % "1.6.0" + lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.24" // Test @@ -177,13 +181,14 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) - lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) - lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote) + lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor) lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) lazy val akka_sbt_plugin = project("akka-sbt-plugin", "akka-sbt-plugin", new AkkaSbtPluginProject(_)) + lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) + lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor) // ------------------------------------------------------------------------------------------------------------------- // Miscellaneous @@ -296,12 +301,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile) } - // ------------------------------------------------------------------------------------------------------------------- - // akka-testkit subproject - // ------------------------------------------------------------------------------------------------------------------- - - class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) - // ------------------------------------------------------------------------------------------------------------------- // akka-stm subproject // ------------------------------------------------------------------------------------------------------------------- @@ -360,6 +359,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val jetty = Dependencies.jetty val jersey = Dependencies.jersey_server val jsr311 = Dependencies.jsr311 + val commons_codec = Dependencies.commons_codec // testing val junit = Dependencies.junit @@ -420,6 +420,20 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } } + // ------------------------------------------------------------------------------------------------------------------- + // akka-testkit subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) + + // ------------------------------------------------------------------------------------------------------------------- + // akka-slf4j subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaSlf4jProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val sjson = Dependencies.slf4j + } + // ------------------------------------------------------------------------------------------------------------------- // Helpers // -------------------------------------------------------------------------------------------------------------------