From 1dfd454887ac43acc89116d50939b4d35695f694 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Mar 2011 14:45:31 +0100 Subject: [PATCH 01/24] Switching AlreadyCompletedFuture to always be expired, good for GC eligibility etc --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2bdce31f6f..f30a06d4ef 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -540,6 +540,6 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value def await : Future[T] = this def awaitBlocking : Future[T] = this - def isExpired: Boolean = false + def isExpired: Boolean = true def timeoutInNanos: Long = 0 } From 566d55a8bb26d3de78afb4b47b0e714f2ab2714f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Mar 2011 17:20:35 +0100 Subject: [PATCH 02/24] Renaming resultWithin to valueWithin, awaitResult to awaitValue to aling the naming, and then deprecating the blocking methods in Futures --- .../src/main/scala/akka/dispatch/Future.scala | 80 ++++++++++++------- .../test/scala/akka/dispatch/FutureSpec.scala | 2 +- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f30a06d4ef..0b833f47b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -19,51 +19,42 @@ class FutureTimeoutException(message: String) extends AkkaException(message) object Futures { + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T]): Future[T] = Future(body.call) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long): Future[T] = Future(body.call, timeout) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call, timeout)(dispatcher) - /** - * (Blocking!) - */ - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await - /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { - val futureResult = new DefaultCompletableFuture[Any](timeout) + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { + val futureResult = new DefaultCompletableFuture[T](timeout) - val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]]) + val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _) for(f <- futures) f onComplete completeFirst futureResult } - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException - /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -139,6 +130,35 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) + + //Deprecations + + + /** + * (Blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") + def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) + + /** + * Returns the First Future that is completed (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await + + + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") + def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { @@ -206,7 +226,7 @@ sealed trait Future[+T] { * * Equivalent to calling future.await.value. */ - def awaitResult: Option[Either[Throwable, T]] + def awaitValue: Option[Either[Throwable, T]] /** * Returns the result of the Future if one is available within the specified @@ -215,7 +235,7 @@ sealed trait Future[+T] { * returns None if no result, Some(Right(t)) if a result, or * Some(Left(error)) if there was an exception */ - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] /** * Returns the contained exception of this Future if it exists. @@ -431,7 +451,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def awaitResult: Option[Either[Throwable, T]] = { + def awaitValue: Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) @@ -441,7 +461,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) @@ -536,8 +556,8 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte def complete(value: Either[Throwable, T]): CompletableFuture[T] = this def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } - def awaitResult: Option[Either[Throwable, T]] = value - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def awaitValue: Option[Either[Throwable, T]] = value + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value def await : Future[T] = this def awaitBlocking : Future[T] = this def isExpired: Boolean = true diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index dfd94b40b5..9e47ecdbe2 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -284,7 +284,7 @@ class FutureSpec extends JUnitSuite { } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } - val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) + val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS) val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } val errors = result collect { case Some(Left(t)) => t } From 3bd5cc9cd3250ab0912d45346587245a23098802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 22 Mar 2011 16:30:49 +0100 Subject: [PATCH 03/24] Added SLF4 module with Logging trait and Event Handler --- .../ExecutorBasedEventDrivenDispatcher.scala | 14 ++--- .../src/main/scala/akka/logging/SLF4J.scala | 56 +++++++++++++++++++ project/build/AkkaProject.scala | 47 ++++++++++------ 3 files changed, 92 insertions(+), 25 deletions(-) create mode 100644 akka-slf4j/src/main/scala/akka/logging/SLF4J.scala diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index c15a26e00d..f23b759c40 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -87,7 +87,7 @@ class ExecutorBasedEventDrivenDispatcher( def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - val name = "akka:event-driven:dispatcher:" + _name + val name = "akka:event-driven:dispatcher:" + _name private[akka] val threadFactory = new MonitorableThreadFactory(name) private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) @@ -208,20 +208,18 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => else { //But otherwise, if we are throttled, we need to do some book-keeping var processedMessages = 0 val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 - val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) + else 0 do { nextMessage.invoke - nextMessage = if (self.suspended.locked) { - null //If we are suspended, abort - } - else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries + null // If we are suspended, abort + } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort - else - self.dequeue //Dequeue the next message + else self.dequeue //Dequeue the next message } } while (nextMessage ne null) } diff --git a/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala b/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala new file mode 100644 index 0000000000..98579c545e --- /dev/null +++ b/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.logging.slf4j + +import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory} + +import akka.actor._ +import Actor._ + +/** + * Base trait for all classes that wants to be able use the SLF4J logging infrastructure. + * + * @author Jonas Bonér + */ +trait Logging { + @transient lazy val log = Logger(this.getClass.getName) +} + +object Logger { + def apply(logger: String) : SLFLogger = SLFLoggerFactory getLogger logger + def apply(clazz: Class[_]): SLFLogger = apply(clazz.getName) + def root : SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) +} + +/** + * SLF4J Event Handler. + * + * @author Jonas Bonér + */ +class Slf4jEventHandler extends Actor with Logging { + import EventHandler._ + + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + case event @ Error(cause, instance, message) => + log.error("\n\t[{}]\n\t[{}]\n\t[{}]", + Array[AnyRef](instance.getClass.getName, message, stackTraceFor(cause))) + + case event @ Warning(instance, message) => + log.warn("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case event @ Info(instance, message) => + log.info("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case event @ Debug(instance, message) => + log.debug("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) + + case event => log.debug("\n\t[{}]", event.toString) + } +} + + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e094bb1bf0..64f54407fb 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -70,16 +70,16 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { - lazy val LocalMavenRepo = MavenRepository("Local Maven Repo", (Path.userHome / ".m2" / "repository").asURL.toString) - lazy val AkkaRepo = MavenRepository("Akka Repository", "http://akka.io/repository") - lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") - lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") - lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") - lazy val GlassfishRepo = MavenRepository("Glassfish Repo", "http://download.java.net/maven/glassfish") - lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases") - lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo") + lazy val LocalMavenRepo = MavenRepository("Local Maven Repo", (Path.userHome / ".m2" / "repository").asURL.toString) + lazy val AkkaRepo = MavenRepository("Akka Repository", "http://akka.io/repository") + lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") + lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") + lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") + lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") + lazy val GlassfishRepo = MavenRepository("Glassfish Repo", "http://download.java.net/maven/glassfish") + lazy val ScalaToolsRelRepo = MavenRepository("Scala Tools Releases Repo", "http://scala-tools.org/repo-releases") + lazy val DatabinderRepo = MavenRepository("Databinder Repo", "http://databinder.net/repo") lazy val ScalaToolsSnapshotRepo = MavenRepository("Scala-Tools Snapshot Repo", "http://scala-tools.org/repo-snapshots") } @@ -118,6 +118,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SCALATEST_VERSION = "1.3" lazy val JETTY_VERSION = "7.1.6.v20100715" lazy val JAVAX_SERVLET_VERSION = "3.0" + lazy val SLF4J_VERSION = "1.6.0" // ------------------------------------------------------------------------------------------------------------------- // Dependencies @@ -160,6 +161,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 + lazy val slf4j = "org.slf4j" % "slf4j-api" % "1.6.0" + lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.24" + // Test lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 @@ -177,13 +181,14 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) - lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote) lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) lazy val akka_sbt_plugin = project("akka-sbt-plugin", "akka-sbt-plugin", new AkkaSbtPluginProject(_)) + lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) + lazy val akka_slf4j = project("akka-slf4j", "akka-slf4j", new AkkaSlf4jProject(_), akka_actor) // ------------------------------------------------------------------------------------------------------------------- // Miscellaneous @@ -296,12 +301,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile) } - // ------------------------------------------------------------------------------------------------------------------- - // akka-testkit subproject - // ------------------------------------------------------------------------------------------------------------------- - - class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) - // ------------------------------------------------------------------------------------------------------------------- // akka-stm subproject // ------------------------------------------------------------------------------------------------------------------- @@ -420,6 +419,20 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } } + // ------------------------------------------------------------------------------------------------------------------- + // akka-testkit subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) + + // ------------------------------------------------------------------------------------------------------------------- + // akka-slf4j subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaSlf4jProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val sjson = Dependencies.slf4j + } + // ------------------------------------------------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------------------------------------------------- From 3197596bd6850a9707e9d32a62289ee39e868676 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Mar 2011 22:12:16 +0100 Subject: [PATCH 04/24] Adding Java API for reduce, fold, apply and firstCompletedOf, adding << and apply() to CompletableFuture + a lot of docs --- .../src/main/scala/akka/dispatch/Future.scala | 75 +++++++++++++++++++ .../src/main/scala/akka/japi/JavaAPI.scala | 7 ++ .../test/scala/akka/dispatch/FutureSpec.scala | 30 ++++++++ 3 files changed, 112 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0b833f47b0..370bad6d1c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -55,6 +55,13 @@ object Futures { futureResult } + /** + * Java API + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = + firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) + /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -95,6 +102,16 @@ object Futures { } } + /** + * Java API + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + */ + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ @@ -119,6 +136,13 @@ object Futures { } } + /** + * Java API + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + */ + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = + reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -162,6 +186,10 @@ object Futures { } object Future { + /** + * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body + * The execution is performed by the specified Dispatcher. + */ def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) @@ -170,6 +198,23 @@ object Future { } sealed trait Future[+T] { + + /** + * Returns the result of this future after waiting for it to complete, + * this method will throw any throwable that this Future was completed with + * and will throw a java.util.concurrent.TimeoutException if there is no result + * within the Futures timeout + */ + def apply(): T = this.await.resultOrException match { + case None => throw new java.util.concurrent.TimeoutException("Future timed out!") + case s: Some[T] => s.get + } + + /** + * Java API for apply() + */ + def get: T = apply() + /** * Blocks the current thread until the Future has been completed or the * timeout has expired. In the case of the timeout expiring a @@ -410,13 +455,43 @@ sealed trait Future[+T] { * Essentially this is the Promise (or write-side) of a Future (read-side) */ trait CompletableFuture[T] extends Future[T] { + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ def complete(value: Either[Throwable, T]): CompletableFuture[T] + + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + + /** + * Completes this Future with the specified exception, if not already completed, + * returns this + */ final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + + /** + * Completes this Future with the specified other Future, when that Future is completed, + * unless this Future has already been completed + * returns this + */ final def completeWith(other: Future[T]): CompletableFuture[T] = { other onComplete { f => complete(f.value.get) } this } + + /** + * Alias for complete(Right(value)) + */ + final def << (value: T): CompletableFuture[T] = complete(Right(value)) + + /** + * Alias for completeWith(other) + */ + final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) } /** diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 4454ed117a..20cd33b311 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -7,6 +7,13 @@ trait Function[T,R] { def apply(param: T): R } +/** + * A Function interface. Used to create 2-arg first-class-functions is Java (sort of). + */ +trait Function2[T1, T2, R] { + def apply(arg1: T1, arg2: T2): R +} + /** A Procedure is like a Function, but it doesn't produce a return value */ trait Procedure[T] { diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 9e47ecdbe2..2cdd9b340d 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -324,4 +324,34 @@ class FutureSpec extends JUnitSuite { // make sure all futures are completed in dispatcher assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) } + + @Test def shouldBlockUntilResult { + val latch = new StandardLatch + + val f = Future({ latch.await; 5}) + val f2 = Future({ f() + 5 }) + + assert(f2.resultOrException === None) + latch.open + assert(f2() === 10) + } + + @Test def lesslessIsMore { + import akka.actor.Actor.spawn + val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue) + val begin, end = new StandardLatch + spawn { + begin.await + dataflowVar2 << dataflowVar + end.open + } + + spawn { + dataflowVar << 5 + } + begin.open + end.await + assert(dataflowVar2() === 5) + assert(dataflowVar.get === 5) + } } From f74151a67faea38801228f201aa562a394fecbb2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Mar 2011 22:20:32 +0100 Subject: [PATCH 05/24] Switched to FutureTimeoutException for Future.apply/Future.get --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 5 +---- akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala | 5 +++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 370bad6d1c..6ca346dbdf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -205,10 +205,7 @@ sealed trait Future[+T] { * and will throw a java.util.concurrent.TimeoutException if there is no result * within the Futures timeout */ - def apply(): T = this.await.resultOrException match { - case None => throw new java.util.concurrent.TimeoutException("Future timed out!") - case s: Some[T] => s.get - } + def apply(): T = this.await.resultOrException.get /** * Java API for apply() diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 2cdd9b340d..ec46140859 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -334,6 +334,11 @@ class FutureSpec extends JUnitSuite { assert(f2.resultOrException === None) latch.open assert(f2() === 10) + + val f3 = Future({ Thread.sleep(100); 5}, 10) + intercept[FutureTimeoutException] { + f3() + } } @Test def lesslessIsMore { From c87de864bf493b6c8fd11ddfaf9b202b29f95e3b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 00:14:12 +0100 Subject: [PATCH 06/24] Deprecating the current impl of DataFlowVariable --- akka-actor/src/main/scala/akka/dataflow/DataFlow.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index fec6f04d45..2e80da30dc 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -65,6 +65,7 @@ object DataFlow { /** * @author Jonas Bonér */ + @deprecated("Superceeded by Future and CompletableFuture as of 1.1") sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { import DataFlowVariable._ From a955e99def83771856c7b2c347802713578e64d5 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 23 Mar 2011 18:33:44 +1300 Subject: [PATCH 07/24] Remove akka-specific transaction and hooks --- .../src/main/scala/Ants.scala | 6 +- akka-stm/src/main/scala/akka/stm/Ref.scala | 7 + akka-stm/src/main/scala/akka/stm/Stm.scala | 5 +- .../src/main/scala/akka/stm/Transaction.scala | 252 ------------------ .../scala/akka/stm/TransactionFactory.scala | 18 +- .../akka/stm/TransactionFactoryBuilder.scala | 8 +- .../scala/akka/transactor/Coordinated.scala | 1 - .../src/test/scala/config/ConfigSpec.scala | 2 - config/akka-reference.conf | 3 - 9 files changed, 18 insertions(+), 284 deletions(-) delete mode 100644 akka-stm/src/main/scala/akka/stm/Transaction.scala diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala index 79b14d2e78..1eb38d6f0f 100644 --- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala +++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala @@ -67,7 +67,7 @@ object World { lazy val ants = setup lazy val evaporator = actorOf[Evaporator].start - private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false) + private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot") def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) } @@ -138,7 +138,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor { val locRef = Ref(initLoc) val name = "ant-from-" + initLoc._1 + "-" + initLoc._2 - implicit val txFactory = TransactionFactory(familyName = name, hooks = false) + implicit val txFactory = TransactionFactory(familyName = name) val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1)) val foraging = (p: Place) => p.pher + p.food @@ -210,7 +210,7 @@ class Evaporator extends WorldActor { import Config._ import World._ - implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false) + implicit val txFactory = TransactionFactory(familyName = "evaporator") val evaporate = (pher: Float) => pher * EvapRate def act = for (x <- 0 until Dim; y <- 0 until Dim) { diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index d6ef09bccd..74b1bf5a9e 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -8,6 +8,13 @@ import akka.actor.{newUuid, Uuid} import org.multiverse.transactional.refs.BasicRef +/** + * Common trait for all the transactional objects. + */ +@serializable trait Transactional { + val uuid: String +} + /** * Transactional managed reference. See the companion class for more information. */ diff --git a/akka-stm/src/main/scala/akka/stm/Stm.scala b/akka-stm/src/main/scala/akka/stm/Stm.scala index be511f0e2c..6e949b1ded 100644 --- a/akka-stm/src/main/scala/akka/stm/Stm.scala +++ b/akka-stm/src/main/scala/akka/stm/Stm.scala @@ -48,10 +48,7 @@ trait Stm { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - factory.addHooks - body - } + def call(mtx: MultiverseTransaction): T = body }) } } diff --git a/akka-stm/src/main/scala/akka/stm/Transaction.scala b/akka-stm/src/main/scala/akka/stm/Transaction.scala deleted file mode 100644 index b2f0caaf07..0000000000 --- a/akka-stm/src/main/scala/akka/stm/Transaction.scala +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.stm - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable.HashMap - -import akka.util.ReflectiveAccess -import akka.config.Config._ -import akka.config.ModuleNotAvailableException -import akka.AkkaException - -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} -import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.api.{PropagationLevel => MultiversePropagationLevel} -import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} - -class NoTransactionInScopeException extends AkkaException("No transaction in scope") -class TransactionRetryException(message: String) extends AkkaException(message) -class StmConfigurationException(message: String) extends AkkaException(message) - - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -object TransactionManagement extends TransactionManagement { - private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { - override protected def initialValue: Option[Transaction] = None - } - - private[akka] def getTransaction: Transaction = { - val option = transaction.get - if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") - option.get - } -} - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -trait TransactionManagement { - private[akka] def setTransaction(tx: Option[Transaction]) = - if (tx.isDefined) TransactionManagement.transaction.set(tx) - - private[akka] def clearTransaction = { - TransactionManagement.transaction.set(None) - setThreadLocalTransaction(null) - } - - private[akka] def getTransactionInScope = TransactionManagement.getTransaction - - private[akka] def isTransactionInScope = { - val option = TransactionManagement.transaction.get - (option ne null) && option.isDefined - } -} - -object Transaction { - val idFactory = new AtomicLong(-1L) - - /** - * Attach an Akka-specific Transaction to the current Multiverse transaction. - * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks - */ - private[akka] def attach = { - val mtx = getRequiredThreadLocalTransaction - val tx = new Transaction - tx.begin - tx.transaction = Some(mtx) - TransactionManagement.transaction.set(Some(tx)) - mtx.registerLifecycleListener(new TransactionLifecycleListener() { - def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match { - case TransactionLifecycleEvent.PostCommit => tx.commitJta - case TransactionLifecycleEvent.PreCommit => tx.commitPersistentState - case TransactionLifecycleEvent.PostAbort => tx.abort - case _ => {} - } - }) - } -} - -/** - * The Akka-specific Transaction class. - * For integration with persistence modules and JTA support. - */ -@serializable class Transaction { - val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) - val STATE_RETRIES = config.getInt("akka.storage.max-retries",10) - - val id = Transaction.idFactory.incrementAndGet - @volatile private[this] var status: TransactionStatus = TransactionStatus.New - private[akka] var transaction: Option[MultiverseTransaction] = None - private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] - private[akka] val depth = new AtomicInteger(0) - - val jta: Option[ReflectiveJtaModule.TransactionContainer] = - if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer) - else None - - // --- public methods --------- - - def begin = synchronized { - jta.foreach { _.beginWithStmSynchronization(this) } - } - - def commitPersistentState = synchronized { - retry(STATE_RETRIES){ - persistentStateMap.valuesIterator.foreach(_.commit) - persistentStateMap.clear - } - status = TransactionStatus.Completed - } - - def commitJta = synchronized { - jta.foreach(_.commit) - } - - def abort = synchronized { - jta.foreach(_.rollback) - persistentStateMap.valuesIterator.foreach(_.abort) - persistentStateMap.clear - } - - def retry(tries:Int)(block: => Unit):Unit={ - if(tries==0){ - throw new TransactionRetryException("Exhausted Retries while committing persistent state") - } - try{ - block - } catch{ - case e:Exception=>{ - retry(tries-1){block} - } - } - } - - def isNew = synchronized { status == TransactionStatus.New } - - def isActive = synchronized { status == TransactionStatus.Active } - - def isCompleted = synchronized { status == TransactionStatus.Completed } - - def isAborted = synchronized { status == TransactionStatus.Aborted } - - // --- internal methods --------- - - //private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE - - private[akka] def status_? = status - - private[akka] def increment = depth.incrementAndGet - - private[akka] def decrement = depth.decrementAndGet - - private[akka] def isTopLevel = depth.get == 0 - //when calling this method, make sure to prefix the uuid with the type so you - //have no possibility of kicking a diffferent type with the same uuid out of a transction - private[akka] def register(uuid: String, storage: Committable with Abortable) = { - if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){ - throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid)) - } - } - - private def ensureIsActive = if (status != TransactionStatus.Active) - throw new StmConfigurationException( - "Expected ACTIVE transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrAborted = - if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new StmConfigurationException( - "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrNew = - if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new StmConfigurationException( - "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - - override def equals(that: Any): Boolean = synchronized { - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id - } - - override def hashCode: Int = synchronized { id.toInt } - - override def toString = synchronized { "Transaction[" + id + ", " + status + "]" } -} - -@serializable sealed abstract class TransactionStatus - -object TransactionStatus { - case object New extends TransactionStatus - case object Active extends TransactionStatus - case object Aborted extends TransactionStatus - case object Completed extends TransactionStatus -} - -/** - * Common trait for all the transactional objects: - * Ref, TransactionalMap, TransactionalVector, - * PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet - */ -@serializable trait Transactional { - val uuid: String -} - -/** - * Used for integration with the persistence modules. - */ -trait Committable { - def commit(): Unit -} - -/** - * Used for integration with the persistence modules. - */ -trait Abortable { - def abort(): Unit -} - -/** - * Used internally for reflective access to the JTA module. - * Allows JTA integration to work when akka-jta.jar is on the classpath. - */ -object ReflectiveJtaModule { - type TransactionContainerObject = { - def apply(): TransactionContainer - } - - type TransactionContainer = { - def beginWithStmSynchronization(transaction: Transaction): Unit - def commit: Unit - def rollback: Unit - } - - lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined - - def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException( - "Can't load the JTA module, make sure that akka-jta.jar is on the classpath") - - val transactionContainerObjectInstance: Option[TransactionContainerObject] = - ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$") - - def createTransactionContainer: TransactionContainer = { - ensureJtaEnabled - transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer] - } -} diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala index 0bb0caa494..d04e017a6b 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala @@ -32,7 +32,6 @@ object TransactionConfig { val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true) val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires")) val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none")) - val HOOKS = config.getBool("akka.stm.hooks", true) val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) @@ -65,7 +64,6 @@ object TransactionConfig { * @param quickRelease Whether locks should be released as quickly as possible (before whole commit). * @param propagation For controlling how nested transactions behave. * @param traceLevel Transaction trace level. - * @param hooks Whether hooks for persistence modules and JTA should be added to the transaction. */ def apply(familyName: String = FAMILY_NAME, readonly: JBoolean = READONLY, @@ -78,10 +76,9 @@ object TransactionConfig { speculative: Boolean = SPECULATIVE, quickRelease: Boolean = QUICK_RELEASE, propagation: MPropagation = PROPAGATION, - traceLevel: MTraceLevel = TRACE_LEVEL, - hooks: Boolean = HOOKS) = { + traceLevel: MTraceLevel = TRACE_LEVEL) = { new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } } @@ -100,7 +97,6 @@ object TransactionConfig { *

quickRelease - Whether locks should be released as quickly as possible (before whole commit). *

propagation - For controlling how nested transactions behave. *

traceLevel - Transaction trace level. - *

hooks - Whether hooks for persistence modules and JTA should be added to the transaction. */ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, val readonly: JBoolean = TransactionConfig.READONLY, @@ -113,8 +109,7 @@ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY val speculative: Boolean = TransactionConfig.SPECULATIVE, val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, val propagation: MPropagation = TransactionConfig.PROPAGATION, - val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - val hooks: Boolean = TransactionConfig.HOOKS) + val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) object DefaultTransactionConfig extends TransactionConfig @@ -137,11 +132,10 @@ object TransactionFactory { speculative: Boolean = TransactionConfig.SPECULATIVE, quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, propagation: MPropagation = TransactionConfig.PROPAGATION, - traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - hooks: Boolean = TransactionConfig.HOOKS) = { + traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } @@ -199,8 +193,6 @@ class TransactionFactory( } val boilerplate = new TransactionBoilerplate(factory) - - def addHooks = if (config.hooks) Transaction.attach } /** diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala index 0765652c6a..b71f68c375 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala @@ -27,7 +27,6 @@ class TransactionConfigBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -41,11 +40,10 @@ class TransactionConfigBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } /** @@ -64,7 +62,6 @@ class TransactionFactoryBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -78,12 +75,11 @@ class TransactionFactoryBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 2de7747607..8ce08cf624 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -129,7 +129,6 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { - factory.addHooks val result = body val timeout = factory.config.timeout barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 69f76eb89b..4108a99d63 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -20,9 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) - getBool("akka.stm.hooks") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) - getBool("akka.stm.jta-aware") must equal(Some(false)) getInt("akka.stm.max-retries") must equal(Some(1000)) getString("akka.stm.propagation") must equal(Some("requires")) getBool("akka.stm.quick-release") must equal(Some(true)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 458f937e5e..3282a3e2c7 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -71,9 +71,6 @@ akka { quick-release = true propagation = "requires" trace-level = "none" - hooks = true - jta-aware = off # Option 'on' means that if there JTA Transaction Manager available then the STM will - # begin (or join), commit or rollback the JTA transaction. Default is 'off'. } jta { From ed1a113c21237a8c39a571e930fb68875f927c11 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 11:31:19 +0100 Subject: [PATCH 08/24] Adding OrderedMemoryAwareThreadPoolExecutor with an ExecutionHandler to the NettyRemoteServer --- .../main/scala/akka/remote/RemoteShared.scala | 20 +++++++++++++++++++ .../remote/netty/NettyRemoteSupport.scala | 15 ++++++++++---- .../src/test/scala/config/ConfigSpec.scala | 4 ++++ config/akka-reference.conf | 4 ++++ 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index ee4e5cf809..9fa9d1b5c0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -44,4 +44,24 @@ object RemoteServerSettings { } val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) + + val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT) + + val EXECUTION_POOL_SIZE = { + val sz = config.getInt("akka.remote.server.execution-pool-size",16) + if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") + sz + } + + val MAX_CHANNEL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") + sz + } + + val MAX_TOTAL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-total-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") + sz + } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 893b22b059..3fab1b20c1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -33,6 +33,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } +import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler @@ -753,9 +754,17 @@ class RemoteServerPipelineFactory( case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case _ => (Nil, Nil) } - + val execution = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor( + EXECUTION_POOL_SIZE, + MAX_CHANNEL_MEMORY_SIZE, + MAX_TOTAL_MEMORY_SIZE, + EXECUTION_POOL_KEEPALIVE.length, + EXECUTION_POOL_KEEPALIVE.unit + ) + ) val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -856,8 +865,6 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - //FIXME we should definitely spawn off this in a thread pool or something, - // potentially using Actor.spawn or something similar request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) diff --git a/akka-remote/src/test/scala/config/ConfigSpec.scala b/akka-remote/src/test/scala/config/ConfigSpec.scala index 7ba5193f6f..e37edcfc34 100644 --- a/akka-remote/src/test/scala/config/ConfigSpec.scala +++ b/akka-remote/src/test/scala/config/ConfigSpec.scala @@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.remote.ssl.debug") must equal(None) getBool("akka.remote.ssl.service") must equal(None) getInt("akka.remote.zlib-compression-level") must equal(Some(6)) + getInt("akka.remote.server.execution-pool-size") must equal(Some(16)) + getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60)) + getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0)) + getInt("akka.remote.server.max-total-memory-size") must equal(Some(0)) } } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 458f937e5e..b27c383de6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -142,6 +142,10 @@ akka { require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. backlog = 4096 # Sets the size of the connection backlog + execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-size = 16# Size of the core pool of the remote execution unit + max-channel-memory-size = 0 # Maximum channel size, 0 for off + max-total-memory-size = 0 # Maximum total size of all channels, 0 for off } client { From bc0b6c653ba7c7867725bc666233d47525a3e007 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 13:10:47 +0100 Subject: [PATCH 09/24] Removing printlns --- .../scala/serialization/SerializableTypeClassActorSpec.scala | 4 ++-- akka-remote/src/test/scala/serialization/Ticket435Spec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 7d5524bcd2..2eec948698 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -243,7 +243,7 @@ class MyStatelessActor extends Actor { class MyStatelessActorWithMessagesInMailbox extends Actor { def receive = { case "hello" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } @@ -263,7 +263,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { class MyActorWithSerializableMessages extends Actor { def receive = { case MyMessage(s, t) => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala index f22c876808..a6193d9914 100644 --- a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -117,7 +117,7 @@ class MyStatefulActor extends Actor { def receive = { case "hi" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello" => count = count + 1 From 7169803c12453fddd0504fa7623a380db89809cd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 15:00:29 +0100 Subject: [PATCH 10/24] Adding synchronous writes to NettyRemoteSupport --- .../remote/netty/NettyRemoteSupport.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 3fab1b20c1..5baddfed89 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -81,8 +81,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { - loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY - val key = Address(address) lock.readLock.lock try { @@ -217,15 +215,13 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //We don't care about that right now - } else if (!future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + throw future.getCause + } + None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -238,7 +234,9 @@ abstract class RemoteClient private[akka] ( futures.remove(futureUuid) //Clean this up //We don't care about that right now } else if (!future.isSuccess) { - futures.remove(futureUuid) //Clean this up + val f = futures.remove(futureUuid) //Clean this up + if (f ne null) + f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } } From 711e62fc3fb1617c979eabf12e9d05efaa38a240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 23 Mar 2011 15:12:09 +0100 Subject: [PATCH 11/24] Moved EventHandler to 'akka.event' plus added 'error' method without exception param --- .../src/main/scala/akka/AkkaException.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 157 --------------- .../src/main/scala/akka/actor/ActorRef.scala | 1 + .../src/main/scala/akka/actor/Scheduler.scala | 1 + .../src/main/scala/akka/config/Config.scala | 2 +- .../main/scala/akka/dataflow/DataFlow.scala | 3 +- .../ExecutorBasedEventDrivenDispatcher.scala | 3 +- .../src/main/scala/akka/dispatch/Future.scala | 3 +- .../scala/akka/dispatch/MessageHandling.scala | 9 +- .../akka/dispatch/ThreadPoolBuilder.scala | 2 +- .../main/scala/akka/event/EventHandler.scala | 180 ++++++++++++++++++ .../src/main/scala/akka/util/LockUtil.scala | 2 +- akka-http/src/main/scala/akka/http/Mist.scala | 2 +- .../scala/akka/http/Servlet30Context.scala | 2 +- .../main/scala/akka/security/Security.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 3 +- .../src/main/scala/akka/logging/SLF4J.scala | 1 + .../testkit/CallingThreadDispatcher.scala | 3 +- config/akka-reference.conf | 2 +- 19 files changed, 206 insertions(+), 174 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/event/EventHandler.scala diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 5c8948c6a4..ff4fe8fb7d 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -19,7 +19,7 @@ import java.net.{InetAddress, UnknownHostException} * * @author Jonas Bonér */ -@serializable abstract class AkkaException(message: String) extends RuntimeException(message) { +@serializable abstract class AkkaException(message: String = "") extends RuntimeException(message) { import AkkaException._ val exceptionName = getClass.getName diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d9ee3ab41c..ce850d2c22 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -76,163 +76,6 @@ class ActorKilledException private[akka](message: String) extends AkkaEx class ActorInitializationException private[akka](message: String) extends AkkaException(message) class ActorTimeoutException private[akka](message: String) extends AkkaException(message) -/** - * Error handler. - * - * Create, add and remove a listener: - *

- * val errorHandlerEventListener = Actor.actorOf(new Actor {
- *   self.dispatcher = EventHandler.EventHandlerDispatcher
- *
- *   def receive = {
- *     case EventHandler.Error(cause, instance, message) => ...
- *     case EventHandler.Warning(instance, message) => ...
- *     case EventHandler.Info(instance, message) => ...
- *     case EventHandler.Debug(instance, message) => ...
- *   }
- * })
- *
- * EventHandler.addListener(errorHandlerEventListener)
- * ...
- * EventHandler.removeListener(errorHandlerEventListener)
- * 
- * - * Log an error event: - *
- * EventHandler.notify(EventHandler.Error(exception, this, message.toString))
- * 
- * Or use the direct methods (better performance): - *
- * EventHandler.error(exception, this, message.toString)
- * 
- * - * @author Jonas Bonér - */ -object EventHandler extends ListenerManagement { - import java.io.{StringWriter, PrintWriter} - import java.text.DateFormat - import java.util.Date - import akka.dispatch.Dispatchers - - val ErrorLevel = 1 - val WarningLevel = 2 - val InfoLevel = 3 - val DebugLevel = 4 - - sealed trait Event { - @transient val thread: Thread = Thread.currentThread - } - case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event - case class Warning(instance: AnyRef, message: String = "") extends Event - case class Info(instance: AnyRef, message: String = "") extends Event - case class Debug(instance: AnyRef, message: String = "") extends Event - - val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val warning = "[WARN] [%s] [%s] [%s] %s".intern - val info = "[INFO] [%s] [%s] [%s] %s".intern - val debug = "[DEBUG] [%s] [%s] [%s] %s".intern - val generic = "[GENERIC] [%s] [%s]".intern - val ID = "event:handler".intern - - val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build - - val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { - case "ERROR" => ErrorLevel - case "WARNING" => WarningLevel - case "INFO" => InfoLevel - case "DEBUG" => DebugLevel - case unknown => throw new ConfigurationException( - "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") - } - - def notify(event: => AnyRef) = notifyListeners(event) - - def notify[T <: Event : ClassManifest](event: => T) { - if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) - } - - def error(cause: Throwable, instance: AnyRef, message: => String) = { - if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message)) - } - - def warning(instance: AnyRef, message: => String) = { - if (level >= WarningLevel) notifyListeners(Warning(instance, message)) - } - - def info(instance: AnyRef, message: => String) = { - if (level >= InfoLevel) notifyListeners(Info(instance, message)) - } - - def debug(instance: AnyRef, message: => String) = { - if (level >= DebugLevel) notifyListeners(Debug(instance, message)) - } - - def formattedTimestamp = DateFormat.getInstance.format(new Date) - - def stackTraceFor(e: Throwable) = { - val sw = new StringWriter - val pw = new PrintWriter(sw) - e.printStackTrace(pw) - sw.toString - } - - private def levelFor(eventClass: Class[_ <: Event]) = { - if (eventClass.isInstanceOf[Error]) ErrorLevel - else if (eventClass.isInstanceOf[Warning]) WarningLevel - else if (eventClass.isInstanceOf[Info]) InfoLevel - else if (eventClass.isInstanceOf[Debug]) DebugLevel - else DebugLevel - } - - class DefaultListener extends Actor { - self.id = ID - self.dispatcher = EventHandlerDispatcher - - def receive = { - case event @ Error(cause, instance, message) => - println(error.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message, - stackTraceFor(cause))) - case event @ Warning(instance, message) => - println(warning.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event @ Info(instance, message) => - println(info.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event @ Debug(instance, message) => - println(debug.format( - formattedTimestamp, - event.thread.getName, - instance.getClass.getSimpleName, - message)) - case event => - println(generic.format(formattedTimestamp, event.toString)) - } - } - - config.getList("akka.event-handlers") foreach { listenerName => - try { - ReflectiveAccess.getClassFor[Actor](listenerName) map { - clazz => addListener(Actor.actorOf(clazz).start) - } - } catch { - case e: Exception => - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]") - } - } -} - /** * This message is thrown by default when an Actors behavior doesn't match a message */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 32c03acf05..df29edd650 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -4,6 +4,7 @@ package akka.actor +import akka.event.EventHandler import akka.dispatch._ import akka.config.Config._ import akka.config.Supervision._ diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 01f4282874..cbda9d0af9 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -19,6 +19,7 @@ import scala.collection.JavaConversions import java.util.concurrent._ +import akka.event.EventHandler import akka.AkkaException object Scheduler { diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 1d9185e98d..2d25e8383e 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -5,7 +5,7 @@ package akka.config import akka.AkkaException -import akka.actor.EventHandler +import akka.event.EventHandler import java.net.InetSocketAddress import java.lang.reflect.Method diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 2e80da30dc..72fbbaaeb2 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -7,7 +7,8 @@ package akka.dataflow import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} -import akka.actor.{Actor, ActorRef, EventHandler} +import akka.event.EventHandler +import akka.actor.{Actor, ActorRef} import akka.actor.Actor._ import akka.dispatch.CompletableFuture import akka.AkkaException diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index f23b759c40..28c07c6af6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -4,7 +4,8 @@ package akka.dispatch -import akka.actor.{ActorRef, IllegalActorStateException, EventHandler} +import akka.event.EventHandler +import akka.actor.{ActorRef, IllegalActorStateException} import akka.util.{ReflectiveAccess, Switch} import java.util.Queue diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 6ca346dbdf..ba0b7b83ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -5,7 +5,8 @@ package akka.dispatch import akka.AkkaException -import akka.actor.{Actor, EventHandler} +import akka.event.EventHandler +import akka.actor.Actor import akka.routing.Dispatcher import akka.japi.{ Procedure, Function => JFunc } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index d12ad7463f..b319d8ac87 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -6,6 +6,7 @@ package akka.dispatch import java.util.concurrent._ import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} +import akka.event.EventHandler import akka.config.Configuration import akka.config.Config.TIME_UNIT import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess} @@ -43,7 +44,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () = object MessageDispatcher { val UNSCHEDULED = 0 - val SCHEDULED = 1 + val SCHEDULED = 1 val RESCHEDULED = 2 implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher @@ -55,10 +56,10 @@ object MessageDispatcher { trait MessageDispatcher { import MessageDispatcher._ - protected val uuids = new ConcurrentSkipListSet[Uuid] + protected val uuids = new ConcurrentSkipListSet[Uuid] protected val futures = new ConcurrentSkipListSet[Uuid] - protected val guard = new ReentrantGuard - protected val active = new Switch(false) + protected val guard = new ReentrantGuard + protected val active = new Switch(false) private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 7e15ed69c3..31d5dca0eb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration -import akka.actor.EventHandler +import akka.event.EventHandler object ThreadPoolConfig { type Bounds = Int diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala new file mode 100644 index 0000000000..934bd43281 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.event + +import akka.actor._ +import Actor._ +import akka.dispatch._ +import akka.config.Config._ +import akka.config.ConfigurationException +import akka.util.{ListenerManagement, ReflectiveAccess} +import akka.AkkaException + +/** + * Event handler. + *

+ * Create, add and remove a listener: + *

+ * val eventHandlerListener = Actor.actorOf(new Actor {
+ *   self.dispatcher = EventHandler.EventHandlerDispatcher
+ *
+ *   def receive = {
+ *     case EventHandler.Error(cause, instance, message) => ...
+ *     case EventHandler.Warning(instance, message)      => ...
+ *     case EventHandler.Info(instance, message)         => ...
+ *     case EventHandler.Debug(instance, message)        => ...
+ *     case genericEvent                                 => ... 
+ *   }
+ * })
+ *
+ * EventHandler.addListener(eventHandlerListener)
+ * ...
+ * EventHandler.removeListener(eventHandlerListener)
+ * 
+ *

+ * However best is probably to register the listener in the 'akka.conf' + * configuration file. + *

+ * Log an error event: + *

+ * EventHandler.notify(EventHandler.Error(exception, this, message.toString))
+ * 
+ * Or use the direct methods (better performance): + *
+ * EventHandler.error(exception, this, message.toString)
+ * 
+ * + * @author Jonas Bonér + */ +object EventHandler extends ListenerManagement { + import java.io.{StringWriter, PrintWriter} + import java.text.DateFormat + import java.util.Date + import akka.dispatch.Dispatchers + + val ErrorLevel = 1 + val WarningLevel = 2 + val InfoLevel = 3 + val DebugLevel = 4 + + sealed trait Event { + @transient val thread: Thread = Thread.currentThread + } + case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event + case class Warning(instance: AnyRef, message: String = "") extends Event + case class Info(instance: AnyRef, message: String = "") extends Event + case class Debug(instance: AnyRef, message: String = "") extends Event + + val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warning = "[WARN] [%s] [%s] [%s] %s".intern + val info = "[INFO] [%s] [%s] [%s] %s".intern + val debug = "[DEBUG] [%s] [%s] [%s] %s".intern + val generic = "[GENERIC] [%s] [%s]".intern + val ID = "event:handler".intern + + class EventHandlerException extends AkkaException + + val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build + + val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { + case "ERROR" => ErrorLevel + case "WARNING" => WarningLevel + case "INFO" => InfoLevel + case "DEBUG" => DebugLevel + case unknown => throw new ConfigurationException( + "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") + } + + def notify(event: => AnyRef) = notifyListeners(event) + + def notify[T <: Event : ClassManifest](event: => T) { + if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) + } + + def error(cause: Throwable, instance: AnyRef, message: => String) = { + if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message)) + } + + def error(instance: AnyRef, message: => String) = { + if (level >= ErrorLevel) notifyListeners(Error(new EventHandlerException, instance, message)) + } + + def warning(instance: AnyRef, message: => String) = { + if (level >= WarningLevel) notifyListeners(Warning(instance, message)) + } + + def info(instance: AnyRef, message: => String) = { + if (level >= InfoLevel) notifyListeners(Info(instance, message)) + } + + def debug(instance: AnyRef, message: => String) = { + if (level >= DebugLevel) notifyListeners(Debug(instance, message)) + } + + def formattedTimestamp = DateFormat.getInstance.format(new Date) + + def stackTraceFor(e: Throwable) = { + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString + } + + private def levelFor(eventClass: Class[_ <: Event]) = { + if (eventClass.isInstanceOf[Error]) ErrorLevel + else if (eventClass.isInstanceOf[Warning]) WarningLevel + else if (eventClass.isInstanceOf[Info]) InfoLevel + else if (eventClass.isInstanceOf[Debug]) DebugLevel + else DebugLevel + } + + class DefaultListener extends Actor { + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + case event @ Error(cause, instance, message) => + println(error.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message, + stackTraceFor(cause))) + case event @ Warning(instance, message) => + println(warning.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event @ Info(instance, message) => + println(info.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event @ Debug(instance, message) => + println(debug.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event => + println(generic.format(formattedTimestamp, event.toString)) + } + } + + config.getList("akka.event-handlers") foreach { listenerName => + try { + ReflectiveAccess.getClassFor[Actor](listenerName) map { + clazz => addListener(Actor.actorOf(clazz).start) + } + } catch { + case e: Exception => + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + listenerName + + "] due to [" + e.toString + "]") + } + } +} diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 055fdab3b0..bf72714c33 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -6,7 +6,7 @@ package akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.atomic. {AtomicBoolean} -import akka.actor.EventHandler +import akka.event.EventHandler /** * @author Jonas Bonér diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 6a7adbe2cf..eb91b9737f 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -5,7 +5,7 @@ package akka.http import akka.actor.{ActorRegistry, ActorRef, Actor} -import akka.actor.EventHandler +import akka.event.EventHandler import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServlet diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index 6ce3d1041c..19a29f46cc 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -7,7 +7,7 @@ package akka.http import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; import Types._ -import akka.actor.EventHandler +import akka.event.EventHandler /** * @author Garrick Evans diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index 9f16d54886..dce249de46 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -23,7 +23,7 @@ package akka.security import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} -import akka.actor.EventHandler +import akka.event.EventHandler import akka.actor.Actor._ import akka.config.Config diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 893b22b059..a1348da0ba 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -13,13 +13,14 @@ import akka.serialization.RemoteActorSerialization._ import akka.japi.Creator import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{PoisonPill, EventHandler, Index, +import akka.actor.{PoisonPill, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException +import akka.event.EventHandler import akka.actor.Actor._ import akka.util._ import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} diff --git a/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala b/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala index 98579c545e..b263753656 100644 --- a/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala @@ -6,6 +6,7 @@ package akka.logging.slf4j import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory} +import akka.event.EventHandler import akka.actor._ import Actor._ diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 131c18b279..ce198be6bf 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,6 +1,7 @@ package akka.testkit -import akka.actor.{ActorRef, EventHandler} +import akka.event.EventHandler +import akka.actor.ActorRef import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation} import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 3282a3e2c7..de8857fce4 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -12,7 +12,7 @@ akka { time-unit = "seconds" # Time unit for all timeout properties throughout the config - event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) + event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up From 7ce67ef41da890c112cc596554a7205de2cad019 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 15:44:20 +0100 Subject: [PATCH 12/24] Moving Initializer to akka-kernel, add manually for other uses, removing ListWriter, changing akka-http to depend on akka-actor instead of akka-remote, closing ticket #716 --- .../src/main/scala/akka/http/ListWriter.scala | 42 ------------------- .../main/scala/akka/servlet/Initializer.scala | 33 --------------- project/build/AkkaProject.scala | 3 +- 3 files changed, 2 insertions(+), 76 deletions(-) delete mode 100644 akka-http/src/main/scala/akka/http/ListWriter.scala delete mode 100644 akka-http/src/main/scala/akka/servlet/Initializer.scala diff --git a/akka-http/src/main/scala/akka/http/ListWriter.scala b/akka-http/src/main/scala/akka/http/ListWriter.scala deleted file mode 100644 index 3f1123d4d8..0000000000 --- a/akka-http/src/main/scala/akka/http/ListWriter.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.http - -import akka.serialization.Serializer - -import java.io.OutputStream -import javax.ws.rs.core.{MultivaluedMap, MediaType} -import javax.ws.rs.ext.{MessageBodyWriter, Provider} -import javax.ws.rs.Produces - -/** - * Writes Lists of JSON serializable objects. - */ -@Provider -@Produces(Array("application/json")) -class ListWriter extends MessageBodyWriter[List[_]] { - - def isWriteable(aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType) = - classOf[List[_]].isAssignableFrom(aClass) || aClass == ::.getClass - - def getSize(list: List[_], - aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType) = - -1L - - def writeTo(list: List[_], - aClass: Class[_], - aType: java.lang.reflect.Type, - annotations: Array[java.lang.annotation.Annotation], - mediaType: MediaType, - stringObjectMultivaluedMap: MultivaluedMap[String, Object], - outputStream: OutputStream): Unit = - if (list.isEmpty) outputStream.write(" ".getBytes) - else outputStream.write(Serializer.ScalaJSON.toBinary(list)) -} diff --git a/akka-http/src/main/scala/akka/servlet/Initializer.scala b/akka-http/src/main/scala/akka/servlet/Initializer.scala deleted file mode 100644 index a259a7fd34..0000000000 --- a/akka-http/src/main/scala/akka/servlet/Initializer.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.servlet - -import akka.remote.BootableRemoteActorService -import akka.actor.BootableActorLoaderService -import akka.config.Config -import akka.util.{ Bootable, AkkaLoader } - -import javax.servlet.{ServletContextListener, ServletContextEvent} - - /** - * This class can be added to web.xml mappings as a listener to start and postStop Akka. - * - * - * ... - * - * akka.servlet.Initializer - * - * ... - * - */ -class Initializer extends ServletContextListener { - lazy val loader = new AkkaLoader - - def contextDestroyed(e: ServletContextEvent): Unit = - loader.shutdown - - def contextInitialized(e: ServletContextEvent): Unit = - loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService) - } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 3fc9d7a5e4..5a04b66627 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -184,7 +184,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) - lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_remote) + lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_actor) lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) lazy val akka_sbt_plugin = project("akka-sbt-plugin", "akka-sbt-plugin", new AkkaSbtPluginProject(_)) lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) @@ -359,6 +359,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val jetty = Dependencies.jetty val jersey = Dependencies.jersey_server val jsr311 = Dependencies.jsr311 + val commons_codec = Dependencies.commons_codec // testing val junit = Dependencies.junit From 865192ee7d9d13278a5138c723e293f042e4bd5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 23 Mar 2011 22:57:19 +0100 Subject: [PATCH 13/24] added error reporting to the ReflectiveAccess object --- .../scala/akka/util/ReflectiveAccess.scala | 89 +++++++++++++------ 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 7b9590746f..41d1106818 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -11,6 +11,7 @@ import akka.AkkaException import java.net.InetSocketAddress import akka.remoteinterface.RemoteSupport import akka.actor._ +import akka.event.EventHandler /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -33,25 +34,34 @@ object ReflectiveAccess { * @author Jonas Bonér */ object Remote { - val TRANSPORT = Config.config.getString("akka.remote.layer","akka.remote.netty.NettyRemoteSupport") + val TRANSPORT = Config.config.getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport") private[akka] val configDefaultAddress = new InetSocketAddress(Config.config.getString("akka.remote.server.hostname", "localhost"), Config.config.getInt("akka.remote.server.port", 2552)) - lazy val isEnabled = remoteSupportClass.isDefined - def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( - "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") - + def ensureEnabled = if (!isEnabled) { + val e = new ModuleNotAvailableException( + "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") + EventHandler.warning(this, e.toString) + throw e + } val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) - protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { - remoteClass => () => createInstance[RemoteSupport](remoteClass,Array[Class[_]](),Array[AnyRef]()). - getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ - remoteClass.getName+ - ", make sure that akka-remote.jar is on the classpath")) + protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = + remoteSupportClass map { remoteClass => + () => createInstance[RemoteSupport]( + remoteClass, + Array[Class[_]](), + Array[AnyRef]() + ) getOrElse { + val e = new ModuleNotAvailableException( + "Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName)) + EventHandler.warning(this, e.toString) + throw e + } } } @@ -125,6 +135,7 @@ object ReflectiveAccess { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e: Exception => + EventHandler.warning(this, e.toString) None } @@ -143,6 +154,7 @@ object ReflectiveAccess { } } catch { case e: Exception => + EventHandler.warning(this, e.toString) None } @@ -155,33 +167,58 @@ object ReflectiveAccess { case None => None } } catch { - case ei: ExceptionInInitializerError => - throw ei + case e: ExceptionInInitializerError => + EventHandler.warning(this, e.toString) + throw e } def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Option[Class[T]] = { assert(fqn ne null) - val first = try { Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //First, use the specified CL + // First, use the specified CL + val first = try { + Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } - if (first.isDefined) - first - else { //Second option is to use the ContextClassLoader - val second = try { Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } - if (second.isDefined) - second + if (first.isDefined) first + else { + // Second option is to use the ContextClassLoader + val second = try { + Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } + + if (second.isDefined) second else { val third = try { - if(classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]]) //Don't try to use "loader" if we got the default "classloader" parameter + // Don't try to use "loader" if we got the default "classloader" parameter + if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]]) else None - } catch { case c: ClassNotFoundException => None } + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } - if (third.isDefined) - third - else - try { Option(Class.forName(fqn).asInstanceOf[Class[T]]) } catch { case c: ClassNotFoundException => None } //Last option is Class.forName + if (third.isDefined) third + else { + // Last option is Class.forName + try { + Option(Class.forName(fqn).asInstanceOf[Class[T]]) + } catch { + case c: ClassNotFoundException => + EventHandler.warning(this, c.toString) + None + } + } } } - } } From 5f86e8027cda64bb3c324ba5b5bd4ebdc13d2b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 10:30:41 +0100 Subject: [PATCH 14/24] moved slf4j to 'akka.event.slf4j' --- .../scala/akka/{logging => event/slf4j}/SLF4J.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename akka-slf4j/src/main/scala/akka/{logging => event/slf4j}/SLF4J.scala (86%) diff --git a/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala similarity index 86% rename from akka-slf4j/src/main/scala/akka/logging/SLF4J.scala rename to akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index b263753656..fa1d4e713e 100644 --- a/akka-slf4j/src/main/scala/akka/logging/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.logging.slf4j +package akka.event.slf4j import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory} @@ -37,17 +37,17 @@ class Slf4jEventHandler extends Actor with Logging { self.dispatcher = EventHandlerDispatcher def receive = { - case event @ Error(cause, instance, message) => + case Error(cause, instance, message) => log.error("\n\t[{}]\n\t[{}]\n\t[{}]", Array[AnyRef](instance.getClass.getName, message, stackTraceFor(cause))) - case event @ Warning(instance, message) => + case Warning(instance, message) => log.warn("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) - case event @ Info(instance, message) => + case Info(instance, message) => log.info("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) - case event @ Debug(instance, message) => + case Debug(instance, message) => log.debug("\n\t[{}]\n\t[{}]", instance.getClass.getName, message) case event => log.debug("\n\t[{}]", event.toString) From 479e47bfa3b9565fef3d30b4bd6f458adae8ec73 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Mar 2011 21:13:42 +0100 Subject: [PATCH 15/24] Fixing order-of-initialization-bug --- akka-actor/src/main/scala/akka/event/EventHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 934bd43281..d4fc55b0a9 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -76,7 +76,7 @@ object EventHandler extends ListenerManagement { class EventHandlerException extends AkkaException - val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build + lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { case "ERROR" => ErrorLevel From 898d5558264f32d16c37dde01768d7e929be546e Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 17:06:33 +1300 Subject: [PATCH 16/24] Fix race with PoisonPill --- akka-actor/src/main/scala/akka/actor/Actor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ce850d2c22..b65688f4b5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -458,8 +458,8 @@ trait Actor { case Restart(reason) => throw reason case PoisonPill => val f = self.senderFuture - if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) self.stop + if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) } private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior From b436ff995e41db20c19dae1531f799ab0219659e Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 17:08:42 +1300 Subject: [PATCH 17/24] Catch possible actor init exceptions --- .../actor/supervisor/RestartStrategySpec.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala index a930b86784..741cd7a49e 100644 --- a/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala @@ -260,11 +260,23 @@ class RestartStrategySpec extends JUnitSuite { // now crash again... should not restart slave ! Crash - slave ! Ping + + // may not be running + try { + slave ! Ping + } catch { + case e: ActorInitializationException => () + } assert(countDownLatch.await(1, TimeUnit.SECONDS)) - slave ! Crash + // may not be running + try { + slave ! Crash + } catch { + case e: ActorInitializationException => () + } + assert(stopLatch.tryAwait(1, TimeUnit.SECONDS)) assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS)) From 1547c9929054be21d6bb587a8b5c15f4749a2fe0 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 17:10:07 +1300 Subject: [PATCH 18/24] Fix race in ActorModelSpec --- akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index c7bdd61241..55c2e001af 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -105,8 +105,8 @@ object ActorModelSpec { } private[akka] abstract override def dispatch(invocation: MessageInvocation) { - super.dispatch(invocation) getStats(invocation.receiver).msgsReceived.incrementAndGet() + super.dispatch(invocation) } private[akka] abstract override def start { From 40666ca357e024cd230397b72a9e188e35b70829 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 17:21:14 +1300 Subject: [PATCH 19/24] Introduce testing time factor (for Jenkins builds) --- akka-actor/src/test/scala/akka/Testing.scala | 25 +++++++++++++++++++ .../scala/remote/RemoteTypedActorSpec.scala | 6 +++-- 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 akka-actor/src/test/scala/akka/Testing.scala diff --git a/akka-actor/src/test/scala/akka/Testing.scala b/akka-actor/src/test/scala/akka/Testing.scala new file mode 100644 index 0000000000..afc0c4a05a --- /dev/null +++ b/akka-actor/src/test/scala/akka/Testing.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka + +/** + * Multiplying numbers used in test timeouts by a factor, set by system property. + * Useful for Jenkins builds (where the machine may need more time). + */ +object Testing { + val timeFactor: Double = { + val factor = System.getProperty("akka.test.timefactor", "1.0") + try { + factor.toDouble + } catch { + case e: java.lang.NumberFormatException => 1.0 + } + } + + def time(t: Int): Int = (timeFactor * t).toInt + def time(t: Long): Long = (timeFactor * t).toLong + def time(t: Float): Float = (timeFactor * t).toFloat + def time(t: Double): Double = timeFactor * t +} diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index b72c49c204..c91565eec7 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -10,6 +10,8 @@ import akka.actor._ import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} +import akka.Testing + object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] val oneWayLog = new LinkedBlockingQueue[String] @@ -37,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest { classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], Permanent, - 10000, + Testing.time(10000), RemoteAddress(host,port)), new SuperviseTypedActor( classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwoImpl], Permanent, - 10000, + Testing.time(10000), RemoteAddress(host,port)) ).toArray).supervise } From ede34ad99cefe3d9b4f448dfed1d761c3f25189b Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 18:36:12 +1300 Subject: [PATCH 20/24] Replace sleep with latch in valueWithin test --- akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index ec46140859..f99f5f5305 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -277,14 +277,17 @@ class FutureSpec extends JUnitSuite { } @Test def resultWithinShouldNotThrowExceptions { + val latch = new StandardLatch + val actors = (1 to 10).toList map { _ => actorOf(new Actor { - def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add } + def receive = { case (add: Int, wait: Boolean, latch: StandardLatch) => if (wait) latch.await; self reply_? add } }).start } - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, idx >= 5, latch)) } val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS) + latch.open val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } val errors = result collect { case Some(Left(t)) => t } From a7e397f5fd48308e00873572455ce9c42776a33d Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 25 Mar 2011 19:09:40 +1300 Subject: [PATCH 21/24] Add a couple more test timing adjustments --- .../akka/actor/actor/ActorFireForgetRequestReplySpec.scala | 4 +++- .../src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 0f9debe5b0..1eef7f068c 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -8,6 +8,8 @@ import org.junit.Test import akka.dispatch.Dispatchers import Actor._ +import akka.Testing + object ActorFireForgetRequestReplySpec { class ReplyActor extends Actor { @@ -85,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite { actor ! "Die" try { state.finished.await(10L, TimeUnit.SECONDS) } catch { case e: TimeoutException => fail("Never got the message") } - Thread.sleep(100) + Thread.sleep(Testing.time(500)) assert(actor.isShutdown) } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala index 7fea2a78d3..9e5fba863e 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala @@ -77,8 +77,8 @@ class ReceiveTimeoutSpec extends JUnitSuite { protected def receive = { case Tick => () case ReceiveTimeout => - timeoutLatch.open count.incrementAndGet + timeoutLatch.open self.receiveTimeout = None } }).start From a579d6b2bbe20060ee674bb2b642c3e995a3c5ec Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 25 Mar 2011 11:32:09 +0100 Subject: [PATCH 22/24] Closing ticket #721, shutting down the VM if theres a broken config supplied --- .../src/main/scala/akka/config/Config.scala | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 2d25e8383e..7d50e59cd7 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -5,10 +5,6 @@ package akka.config import akka.AkkaException -import akka.event.EventHandler - -import java.net.InetSocketAddress -import java.lang.reflect.Method class ConfigurationException(message: String) extends AkkaException(message) class ModuleNotAvailableException(message: String) extends AkkaException(message) @@ -35,10 +31,8 @@ object Config { envHome orElse systemHome } - val config = { - + val config: Configuration = try { val confName = { - val envConf = System.getenv("AKKA_MODE") match { case null | "" => None case value => Some(value) @@ -52,7 +46,7 @@ object Config { (envConf orElse systemConf).map("akka." + _ + ".conf").getOrElse("akka.conf") } - try { + val newInstance = if (System.getProperty("akka.config", "") != "") { val configFile = System.getProperty("akka.config", "") println("Loading config from -Dakka.config=" + configFile) @@ -75,18 +69,23 @@ object Config { "\nUsing default values everywhere.") Configuration.fromString("akka {}") // default empty config } - } catch { - case e => - EventHandler.error(e, this, e.getMessage) - throw e - } + + val configVersion = newInstance.getString("akka.version", VERSION) + if (configVersion != VERSION) + throw new ConfigurationException( + "Akka JAR version [" + VERSION + "] is different than the provided config version [" + configVersion + "]") + + newInstance + } catch { + case e => + System.err.println("Couldn't parse config, fatal error.") + e.printStackTrace(System.err) + System.exit(-1) + throw e } val CONFIG_VERSION = config.getString("akka.version", VERSION) - if (VERSION != CONFIG_VERSION) throw new ConfigurationException( - "Akka JAR version [" + VERSION + "] is different than the provided config version [" + CONFIG_VERSION + "]") - val TIME_UNIT = config.getString("akka.time-unit", "seconds") val startTime = System.currentTimeMillis From 1ac56ffa573b7e43fcaa3a24b11689ce7c578168 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 25 Mar 2011 22:50:56 +0100 Subject: [PATCH 23/24] Removing race in isDefinedAt and in apply, closing ticket #722 --- .../src/main/scala/akka/actor/Actor.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b65688f4b5..882331b177 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -408,13 +408,16 @@ trait Actor { /** * Is the actor able to handle the message passed in as arguments? */ - def isDefinedAt(message: Any): Boolean = message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage => true - case msg if self.hotswap.nonEmpty && - self.hotswap.head.isDefinedAt(msg) => true - case msg if self.hotswap.isEmpty && - processingBehavior.isDefinedAt(msg) => true - case _ => false + def isDefinedAt(message: Any): Boolean = { + val behaviorStack = self.hotswap + message match { //Same logic as apply(msg) but without the unhandled catch-all + case l: AutoReceivedMessage => true + case msg if behaviorStack.nonEmpty && + behaviorStack.head.isDefinedAt(msg) => true + case msg if behaviorStack.isEmpty && + processingBehavior.isDefinedAt(msg) => true + case _ => false + } } /** @@ -439,13 +442,16 @@ trait Actor { // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] final def apply(msg: Any) = msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException? - case l: AutoReceivedMessage => autoReceiveMessage(l) - case msg if self.hotswap.nonEmpty && - self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) - case msg if self.hotswap.isEmpty && - processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) - case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior + private[akka] final def apply(msg: Any) = { + val behaviorStack = self.hotswap + msg match { //FIXME Add check for currentMessage eq null throw new BadUSerException? + case l: AutoReceivedMessage => autoReceiveMessage(l) + case msg if behaviorStack.nonEmpty && + behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg) + case msg if behaviorStack.isEmpty && + processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) + case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior + } } private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { From d9e61758244dbad41836b9fcd8375403eb92ebeb Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Sat, 26 Mar 2011 03:35:29 +0530 Subject: [PATCH 24/24] changed version of sjson to 0.10 --- project/build/AkkaProject.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5a04b66627..127994aa30 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -158,8 +158,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD - lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 - lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 + lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.10" % "compile" //ApacheV2 + lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.10" % "test" //ApacheV2 lazy val slf4j = "org.slf4j" % "slf4j-api" % "1.6.0" lazy val logback = "ch.qos.logback" % "logback-classic" % "0.9.24"