From d903498ba9380399b02c99dc8cccf7151d64d67e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Mar 2011 17:26:53 +0100 Subject: [PATCH] Making thread transient in Event and adding WTF comment --- .../src/main/scala/akka/actor/Actor.scala | 4 +-- .../src/main/scala/akka/dispatch/Future.scala | 25 ++++++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 19e168006e..1bbb88aca6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -117,7 +117,7 @@ object EventHandler extends ListenerManagement { val DebugLevel = 4 sealed trait Event { - val thread: Thread = Thread.currentThread + @transient val thread: Thread = Thread.currentThread } case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event case class Warning(instance: AnyRef, message: String = "") extends Event @@ -145,7 +145,7 @@ object EventHandler extends ListenerManagement { def notify(event: => AnyRef) = notifyListeners(event) def notify[T <: Event : ClassManifest](event: => T) { - if (classManifest[T].erasure.asInstanceOf[Class[_ <: Event]] == level) notifyListeners(event) + if (classManifest[T].erasure.asInstanceOf[Class[_ <: Event]] == level) notifyListeners(event) //WTF? } def error(cause: Throwable, instance: AnyRef, message: => String) = { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3d37ec2ef8..9d20c1b615 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -72,7 +72,7 @@ object Futures { */ def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { if(futures.isEmpty) { - (new DefaultCompletableFuture[R](timeout)) completeWithResult zero + new AlreadyCompletedFuture[R](Right(zero)) } else { val result = new DefaultCompletableFuture[R](timeout) val results = new ConcurrentLinkedQueue[T]() @@ -109,7 +109,7 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { if (futures.isEmpty) - (new DefaultCompletableFuture[R](timeout)).completeWithException(new UnsupportedOperationException("empty reduce left")) + new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) else { val result = new DefaultCompletableFuture[R](timeout) val seedFound = new AtomicBoolean(false) @@ -319,7 +319,7 @@ sealed trait Future[+T] { fa complete v.asInstanceOf[Either[Throwable, A]] else { try { - f(v.right.get) onComplete (fa.completeWith(_)) + fa.completeWith(f(v.right.get)) } catch { case e: Exception => EventHandler notify EventHandler.Error(e, this) @@ -515,8 +515,25 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } private def notify(func: Future[T] => Unit) { - func(this) + try { + func(this) + } catch { + case e: Exception => EventHandler notify EventHandler.Error(e, this) + } } private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) } + +sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { + val value = Some(suppliedValue) + + def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } + def awaitResult: Option[Either[Throwable, T]] = value + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def await : Future[T] = this + def awaitBlocking : Future[T] = this + def isExpired: Boolean = false + def timeoutInNanos: Long = 0 +}