Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-03-19 12:22:17 +01:00
commit 233044f077
4 changed files with 65 additions and 45 deletions

View file

@ -120,7 +120,7 @@ object EventHandler extends ListenerManagement {
val DebugLevel = 4 val DebugLevel = 4
sealed trait Event { sealed trait Event {
val thread: Thread = Thread.currentThread @transient val thread: Thread = Thread.currentThread
} }
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
case class Warning(instance: AnyRef, message: String = "") extends Event case class Warning(instance: AnyRef, message: String = "") extends Event

View file

@ -72,7 +72,7 @@ object Futures {
*/ */
def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
if(futures.isEmpty) { if(futures.isEmpty) {
(new DefaultCompletableFuture[R](timeout)) completeWithResult zero new AlreadyCompletedFuture[R](Right(zero))
} else { } else {
val result = new DefaultCompletableFuture[R](timeout) val result = new DefaultCompletableFuture[R](timeout)
val results = new ConcurrentLinkedQueue[T]() val results = new ConcurrentLinkedQueue[T]()
@ -109,7 +109,7 @@ object Futures {
*/ */
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = {
if (futures.isEmpty) if (futures.isEmpty)
(new DefaultCompletableFuture[R](timeout)).completeWithException(new UnsupportedOperationException("empty reduce left")) new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left")))
else { else {
val result = new DefaultCompletableFuture[R](timeout) val result = new DefaultCompletableFuture[R](timeout)
val seedFound = new AtomicBoolean(false) val seedFound = new AtomicBoolean(false)
@ -319,7 +319,7 @@ sealed trait Future[+T] {
fa complete v.asInstanceOf[Either[Throwable, A]] fa complete v.asInstanceOf[Either[Throwable, A]]
else { else {
try { try {
f(v.right.get) onComplete (fa.completeWith(_)) fa.completeWith(f(v.right.get))
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.error(e, this, e.getMessage) EventHandler.error(e, this, e.getMessage)
@ -386,6 +386,9 @@ sealed trait Future[+T] {
} }
/**
* Essentially this is the Promise (or write-side) of a Future (read-side)
*/
trait CompletableFuture[T] extends Future[T] { trait CompletableFuture[T] extends Future[T] {
def complete(value: Either[Throwable, T]): CompletableFuture[T] def complete(value: Either[Throwable, T]): CompletableFuture[T]
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
@ -478,15 +481,14 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
} }
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
var notifyTheseListeners: List[Future[T] => Unit] = Nil
_lock.lock _lock.lock
try { val notifyTheseListeners = try {
if (_value.isEmpty) { if (_value.isEmpty) {
_value = Some(value) _value = Some(value)
notifyTheseListeners = _listeners val existingListeners = _listeners
_listeners = Nil _listeners = Nil
} existingListeners
} else Nil
} finally { } finally {
_signal.signalAll _signal.signalAll
_lock.unlock _lock.unlock
@ -499,12 +501,12 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
} }
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
var notifyNow = false
_lock.lock _lock.lock
try { val notifyNow = try {
if (_value.isEmpty) _listeners ::= func if (_value.isEmpty) {
else notifyNow = true _listeners ::= func
false
} else true
} finally { } finally {
_lock.unlock _lock.unlock
} }
@ -515,8 +517,29 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
} }
private def notify(func: Future[T] => Unit) { private def notify(func: Future[T] => Unit) {
func(this) try {
func(this)
} catch {
case e: Exception => EventHandler notify EventHandler.Error(e, this)
}
} }
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
} }
/**
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] {
val value = Some(suppliedValue)
def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
def awaitResult: Option[Either[Throwable, T]] = value
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value
def await : Future[T] = this
def awaitBlocking : Future[T] = this
def isExpired: Boolean = false
def timeoutInNanos: Long = 0
}

View file

@ -13,7 +13,12 @@ import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator import akka.japi.Creator
import akka.config.Config._ import akka.config.Config._
import akka.remoteinterface._ import akka.remoteinterface._
import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.actor.{PoisonPill, EventHandler, Index,
ActorInitializationException, LocalActorRef, newUuid,
ActorRegistry, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException import akka.AkkaException
import akka.actor.Actor._ import akka.actor.Actor._
import akka.util._ import akka.util._
@ -194,7 +199,7 @@ abstract class RemoteClient private[akka] (
actorRef.id, actorRef.id,
actorRef.actorClassName, actorRef.actorClassName,
actorRef.timeout, actorRef.timeout,
Left(message), Right(message),
isOneWay, isOneWay,
senderOption, senderOption,
typedActorInfo, typedActorInfo,
@ -811,8 +816,10 @@ class RemoteServerHandler(
// stop all session actors // stop all session actors
for (map <- Option(sessionActors.remove(event.getChannel)); for (map <- Option(sessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) { actor <- asScalaIterable(map.values)) {
try { actor.stop } catch { case e: Exception => } try { actor ! PoisonPill } catch { case e: Exception => }
} }
//FIXME switch approach or use other thread to execute this
// stop all typed session actors // stop all typed session actors
for (map <- Option(typedSessionActors.remove(event.getChannel)); for (map <- Option(typedSessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) { actor <- asScalaIterable(map.values)) {
@ -887,22 +894,17 @@ class RemoteServerHandler(
message, message,
request.getActorInfo.getTimeout, request.getActorInfo.getTimeout,
None, None,
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
onComplete(f => { onComplete(_.value.get match {
val result = f.result case l: Left[Throwable, Any] => write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor))
val exception = f.exception case r: Right[Throwable, Any] =>
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
if (exception.isDefined) {
write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
}
else if (result.isDefined) {
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef), Some(actorRef),
Right(request.getUuid), Right(request.getUuid),
actorInfo.getId, actorInfo.getId,
actorInfo.getTarget, actorInfo.getTarget,
actorInfo.getTimeout, actorInfo.getTimeout,
Left(result.get), r,
true, true,
Some(actorRef), Some(actorRef),
None, None,
@ -913,7 +915,6 @@ class RemoteServerHandler(
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, RemoteEncoder.encode(messageBuilder.build)) write(channel, RemoteEncoder.encode(messageBuilder.build))
}
} }
) )
)) ))
@ -958,10 +959,10 @@ class RemoteServerHandler(
try { try {
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
else { else {
//Sends the response //Sends the response
def sendResponse(result: Either[Any,Throwable]): Unit = try { def sendResponse(result: Either[Throwable,Any]): Unit = try {
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None, None,
Right(request.getUuid), Right(request.getUuid),
@ -983,14 +984,10 @@ class RemoteServerHandler(
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
messageReceiver.invoke(typedActor, args: _*) match { messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread
case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed //If it's a future, we can lift on that to defer the send to when the future is completed
f.onComplete( future => { case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
val result: Either[Any,Throwable] = case other => sendResponse(Right(other))
if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get)
sendResponse(result)
})
case other => sendResponse(Left(other))
} }
} }
} catch { } catch {
@ -1153,7 +1150,7 @@ class RemoteServerHandler(
actorInfo.getId, actorInfo.getId,
actorInfo.getTarget, actorInfo.getTarget,
actorInfo.getTimeout, actorInfo.getTimeout,
Right(exception), Left(exception),
true, true,
None, None,
None, None,

View file

@ -138,7 +138,7 @@ object ActorSerialization {
actorRef.id, actorRef.id,
actorRef.actorClassName, actorRef.actorClassName,
actorRef.timeout, actorRef.timeout,
Left(m.message), Right(m.message),
false, false,
actorRef.getSender, actorRef.getSender,
None, None,
@ -279,7 +279,7 @@ object RemoteActorSerialization {
actorId: String, actorId: String,
actorClassName: String, actorClassName: String,
timeout: Long, timeout: Long,
message: Either[Any, Throwable], message: Either[Throwable, Any],
isOneWay: Boolean, isOneWay: Boolean,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]], typedActorInfo: Option[Tuple2[String, String]],
@ -319,9 +319,9 @@ object RemoteActorSerialization {
.setOneWay(isOneWay) .setOneWay(isOneWay)
message match { message match {
case Left(message) => case Right(message) =>
messageBuilder.setMessage(MessageSerializer.serialize(message)) messageBuilder.setMessage(MessageSerializer.serialize(message))
case Right(exception) => case Left(exception) =>
messageBuilder.setException(ExceptionProtocol.newBuilder messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName) .setClassname(exception.getClass.getName)
.setMessage(empty(exception.getMessage)) .setMessage(empty(exception.getMessage))