From d903498ba9380399b02c99dc8cccf7151d64d67e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Mar 2011 17:26:53 +0100 Subject: [PATCH 1/3] Making thread transient in Event and adding WTF comment --- .../src/main/scala/akka/actor/Actor.scala | 4 +-- .../src/main/scala/akka/dispatch/Future.scala | 25 ++++++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 19e168006e..1bbb88aca6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -117,7 +117,7 @@ object EventHandler extends ListenerManagement { val DebugLevel = 4 sealed trait Event { - val thread: Thread = Thread.currentThread + @transient val thread: Thread = Thread.currentThread } case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event case class Warning(instance: AnyRef, message: String = "") extends Event @@ -145,7 +145,7 @@ object EventHandler extends ListenerManagement { def notify(event: => AnyRef) = notifyListeners(event) def notify[T <: Event : ClassManifest](event: => T) { - if (classManifest[T].erasure.asInstanceOf[Class[_ <: Event]] == level) notifyListeners(event) + if (classManifest[T].erasure.asInstanceOf[Class[_ <: Event]] == level) notifyListeners(event) //WTF? } def error(cause: Throwable, instance: AnyRef, message: => String) = { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3d37ec2ef8..9d20c1b615 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -72,7 +72,7 @@ object Futures { */ def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { if(futures.isEmpty) { - (new DefaultCompletableFuture[R](timeout)) completeWithResult zero + new AlreadyCompletedFuture[R](Right(zero)) } else { val result = new DefaultCompletableFuture[R](timeout) val results = new ConcurrentLinkedQueue[T]() @@ -109,7 +109,7 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { if (futures.isEmpty) - (new DefaultCompletableFuture[R](timeout)).completeWithException(new UnsupportedOperationException("empty reduce left")) + new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) else { val result = new DefaultCompletableFuture[R](timeout) val seedFound = new AtomicBoolean(false) @@ -319,7 +319,7 @@ sealed trait Future[+T] { fa complete v.asInstanceOf[Either[Throwable, A]] else { try { - f(v.right.get) onComplete (fa.completeWith(_)) + fa.completeWith(f(v.right.get)) } catch { case e: Exception => EventHandler notify EventHandler.Error(e, this) @@ -515,8 +515,25 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } private def notify(func: Future[T] => Unit) { - func(this) + try { + func(this) + } catch { + case e: Exception => EventHandler notify EventHandler.Error(e, this) + } } private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) } + +sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { + val value = Some(suppliedValue) + + def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } + def awaitResult: Option[Either[Throwable, T]] = value + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def await : Future[T] = this + def awaitBlocking : Future[T] = this + def isExpired: Boolean = false + def timeoutInNanos: Long = 0 +} From 44c7ed6850beef7b329a32b855188fc4654b3547 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Mar 2011 17:37:25 +0100 Subject: [PATCH 2/3] Removing 2 vars from Future, and adding some ScalaDoc --- .../src/main/scala/akka/dispatch/Future.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 9d20c1b615..944e5b4847 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -386,6 +386,9 @@ sealed trait Future[+T] { } +/** + * Essentially this is the Promise (or write-side) of a Future (read-side) + */ trait CompletableFuture[T] extends Future[T] { def complete(value: Either[Throwable, T]): CompletableFuture[T] final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) @@ -478,15 +481,14 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { - var notifyTheseListeners: List[Future[T] => Unit] = Nil - _lock.lock - try { + val notifyTheseListeners = try { if (_value.isEmpty) { _value = Some(value) - notifyTheseListeners = _listeners + val existingListeners = _listeners _listeners = Nil - } + existingListeners + } else Nil } finally { _signal.signalAll _lock.unlock @@ -499,12 +501,12 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { - var notifyNow = false - _lock.lock - try { - if (_value.isEmpty) _listeners ::= func - else notifyNow = true + val notifyNow = try { + if (_value.isEmpty) { + _listeners ::= func + false + } else true } finally { _lock.unlock } @@ -525,6 +527,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) } +/** + * An already completed Future is seeded with it's result at creation, is useful for when you are participating in + * a Future-composition but you already have a value to contribute. + */ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { val value = Some(suppliedValue) From d738674eeb5a91540b120b39415974c9a0aa8ea7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Mar 2011 18:03:10 +0100 Subject: [PATCH 3/3] Switching to PoisonPill to shut down Per-Session actors, and restructuring some Future-code to avoid wasteful object creation --- .../remote/netty/NettyRemoteSupport.scala | 49 +++++++++---------- .../serialization/SerializationProtocol.scala | 8 +-- 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8556f8e7ac..dadf020236 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -13,7 +13,12 @@ import akka.serialization.RemoteActorSerialization._ import akka.japi.Creator import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} +import akka.actor.{PoisonPill, EventHandler, Index, + ActorInitializationException, LocalActorRef, newUuid, + ActorRegistry, Actor, RemoteActorRef, + TypedActor, ActorRef, IllegalActorStateException, + RemoteActorSystemMessage, uuidFrom, Uuid, + Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException import akka.actor.Actor._ import akka.util._ @@ -193,7 +198,7 @@ abstract class RemoteClient private[akka] ( actorRef.id, actorRef.actorClassName, actorRef.timeout, - Left(message), + Right(message), isOneWay, senderOption, typedActorInfo, @@ -810,8 +815,10 @@ class RemoteServerHandler( // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { - try { actor.stop } catch { case e: Exception => } + try { actor ! PoisonPill } catch { case e: Exception => } } + + //FIXME switch approach or use other thread to execute this // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { @@ -886,22 +893,17 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). - onComplete(f => { - val result = f.result - val exception = f.exception - - if (exception.isDefined) { - write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) - } - else if (result.isDefined) { - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout). + onComplete(_.value.get match { + case l: Left[Throwable, Any] => write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor)) + case r: Right[Throwable, Any] => + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), actorInfo.getId, actorInfo.getTarget, actorInfo.getTimeout, - Left(result.get), + r, true, Some(actorRef), None, @@ -912,7 +914,6 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) write(channel, RemoteEncoder.encode(messageBuilder.build)) - } } ) )) @@ -957,10 +958,10 @@ class RemoteServerHandler( try { val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) - if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) + if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread else { //Sends the response - def sendResponse(result: Either[Any,Throwable]): Unit = try { + def sendResponse(result: Either[Throwable,Any]): Unit = try { val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), @@ -982,14 +983,10 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) } - messageReceiver.invoke(typedActor, args: _*) match { - case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed - f.onComplete( future => { - val result: Either[Any,Throwable] = - if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) - sendResponse(result) - }) - case other => sendResponse(Left(other)) + messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread + //If it's a future, we can lift on that to defer the send to when the future is completed + case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) ) + case other => sendResponse(Right(other)) } } } catch { @@ -1152,7 +1149,7 @@ class RemoteServerHandler( actorInfo.getId, actorInfo.getTarget, actorInfo.getTimeout, - Right(exception), + Left(exception), true, None, None, diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 12ee2228f5..7ad0c1e443 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -138,7 +138,7 @@ object ActorSerialization { actorRef.id, actorRef.actorClassName, actorRef.timeout, - Left(m.message), + Right(m.message), false, actorRef.getSender, None, @@ -279,7 +279,7 @@ object RemoteActorSerialization { actorId: String, actorClassName: String, timeout: Long, - message: Either[Any, Throwable], + message: Either[Throwable, Any], isOneWay: Boolean, senderOption: Option[ActorRef], typedActorInfo: Option[Tuple2[String, String]], @@ -319,9 +319,9 @@ object RemoteActorSerialization { .setOneWay(isOneWay) message match { - case Left(message) => + case Right(message) => messageBuilder.setMessage(MessageSerializer.serialize(message)) - case Right(exception) => + case Left(exception) => messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) .setMessage(empty(exception.getMessage))