diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d5f9b5ed54..d940aa2c20 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -560,9 +560,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { - def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ runnable.run()) - } + def run(timeout: org.jboss.netty.akka.util.Timeout) { dispatcher.execute(runnable) } } private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = @@ -575,7 +573,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createSingleTask(f: ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ f) + dispatcher.execute(new Runnable { def run = f }) } } @@ -598,7 +596,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ f) + dispatcher.execute(new Runnable { def run = f }) try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } @@ -609,7 +607,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ runnable.run()) + dispatcher.execute(runnable) try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 9ec0314b61..cbdfa912e1 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -390,6 +390,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi private[akka] class TypedActorInvocationHandler(val extension: TypedActorExtension, val actorVar: AtomVar[ActorRef], val timeout: Timeout) extends InvocationHandler { def actor = actorVar.get + @throws(classOf[Throwable]) def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 24b9da4447..52f35fd952 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -74,10 +74,10 @@ case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to sup case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch -final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { def run() { try { - function() + runnable.run() } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) @@ -87,15 +87,48 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, } } +object ExecutionContext { + implicit def defaultExecutionContext(implicit system: ActorSystem): ExecutionContext = system.dispatcher + + /** + * Creates an ExecutionContext from the given ExecutorService + */ + def fromExecutorService(e: ExecutorService): ExecutionContext = new WrappedExecutorService(e) + + /** + * Creates an ExecutionContext from the given Executor + */ + def fromExecutor(e: Executor): ExecutionContext = new WrappedExecutor(e) + + private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext + + private class WrappedExecutor(val executor: Executor) extends Executor with ExecutionContext { + override final def execute(runnable: Runnable): Unit = executor.execute(runnable) + } +} + +/** + * An ExecutionContext is essentially the same thing as a java.util.concurrent.Executor + * This interface/trait exists to decouple the concept of execution from Actors & MessageDispatchers + * It is also needed to provide a fallback implicit default instance (in the companion object). + */ +trait ExecutionContext { + + /** + * Submits the runnable for execution + */ + def execute(runnable: Runnable): Unit +} + object MessageDispatcher { val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher val SCHEDULED = 1 val RESCHEDULED = 2 - implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher + implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } -abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable { +abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable with Executor with ExecutionContext { import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater } @@ -131,8 +164,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext ifSensibleToDoSoThenScheduleShutdown() } - protected[akka] final def dispatchTask(block: () ⇒ Unit) { - val invocation = TaskInvocation(eventStream, block, taskCleanup) + final def execute(runnable: Runnable) { + val invocation = TaskInvocation(eventStream, runnable, taskCleanup) inhabitantsUpdater.incrementAndGet(this) try { executeTask(invocation) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 39260c667b..fea97fbaf3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,8 +21,9 @@ import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } -import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable } import akka.dispatch.Await.CanAwait +import java.util.concurrent._ +import akka.actor.ActorSystem object Await { sealed trait CanAwait @@ -55,37 +56,37 @@ object Futures { /** * Java API, equivalent to Future.apply */ - def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor) /** * Java API, equivalent to Promise.apply */ - def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher) + def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor) /** * Java API, creates an already completed Promise with the specified exception */ - def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher) + def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor) /** * Java API, Creates an already completed Promise with the specified result */ - def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher) + def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor) /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = { - Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_)) + def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = { + Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(executor).map(JOption.fromScalaOption(_)) } /** * Java API. * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] = - Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher) + def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] = + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(executor) /** * Java API @@ -94,23 +95,23 @@ object Futures { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] = - Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher) + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] = + Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(executor) /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] = - Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher) + def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], executor: ExecutionContext): Future[R] = + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(executor) /** * Java API. * Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = { - implicit val d = dispatcher + def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = { + implicit val d = executor scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a @@ -124,8 +125,8 @@ object Futures { * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = { - implicit val d = dispatcher + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], executor: ExecutionContext): Future[JIterable[B]] = { + implicit val d = executor scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒ val fb = fn(a) for (r ← fr; b ← fb) yield { r add b; r } @@ -139,18 +140,19 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher): Future[T] = { + def apply[T](body: ⇒ T)(implicit executor: ExecutionContext): Future[T] = { val promise = Promise[T]() - dispatcher dispatchTask { () ⇒ - promise complete { - try { - Right(body) - } catch { - // FIXME catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ Left(e) + executor.execute(new Runnable { + def run = + promise complete { + try { + Right(body) + } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 + case e ⇒ Left(e) + } } - } - } + }) promise } @@ -161,13 +163,13 @@ object Future { * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = { + def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val futureResult = Promise[T]() val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _ @@ -179,7 +181,7 @@ object Future { /** * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { + def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { if (futures.isEmpty) Promise.successful[Option[T]](None) else { val result = Promise[Option[T]]() @@ -210,7 +212,7 @@ object Future { * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) * */ - def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { + def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise.successful(zero) else sequence(futures).map(_.foldLeft(zero)(foldFun)) } @@ -222,7 +224,7 @@ object Future { * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) * */ - def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { + def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures).map(_ reduce op) } @@ -234,7 +236,7 @@ object Future { * val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x))) * */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) @@ -256,7 +258,7 @@ object Future { * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ - def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = { + def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { val future = Promise[A] dispatchTask({ () ⇒ (reify(body) foreachFull (future success, future failure): Future[Any]) onFailure { @@ -290,7 +292,7 @@ object Future { * } * */ - def blocking(implicit dispatcher: MessageDispatcher): Unit = + def blocking(implicit executor: ExecutionContext): Unit = _taskStack.get match { case Some(taskStack) if taskStack.nonEmpty ⇒ val tasks = taskStack.elems @@ -308,32 +310,38 @@ object Future { /** * Internal API, do not call */ - private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit = + private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit = _taskStack.get match { case Some(taskStack) if !force ⇒ taskStack push task - case _ ⇒ - dispatcher dispatchTask { () ⇒ - try { - val taskStack = Stack[() ⇒ Unit](task) - _taskStack set Some(taskStack) - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case e ⇒ - // FIXME catching all and continue isn't good for OOME, ticket #1418 - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage)) + case _ ⇒ executor.execute( + new Runnable { + def run = + try { + val taskStack = Stack[() ⇒ Unit](task) + _taskStack set Some(taskStack) + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e ⇒ + // FIXME catching all and continue isn't good for OOME, ticket #1418 + executor match { + case m: MessageDispatcher ⇒ + m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage)) + case other ⇒ + e.printStackTrace() + } + } } - } - } finally { _taskStack set None } - } + } finally { _taskStack set None } + }) } } sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { - implicit def dispatcher: MessageDispatcher + implicit def executor: ExecutionContext protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) @@ -458,7 +466,14 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { val future = Promise[A]() onComplete { case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] - case Right(res) ⇒ future complete (try { Right(f(res)) } catch { case e: Exception ⇒ Left(e) }) + case Right(res) ⇒ + future complete (try { + Right(f(res)) + } catch { + case e ⇒ + logError("Future.map", e) + Left(e) + }) } future } @@ -500,16 +515,29 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { onComplete { case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] - case Right(r) ⇒ try { p completeWith f(r) } catch { case e: Exception ⇒ p complete Left(e) } + case Right(r) ⇒ + try { + p completeWith f(r) + } catch { + case e ⇒ + p complete Left(e) + logError("Future.flatMap", e) + } } p } + /** + * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions + */ final def foreach(f: T ⇒ Unit): Unit = onComplete { case Right(r) ⇒ f(r) case _ ⇒ } + /** + * Used by for-comprehensions + */ final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { @@ -519,16 +547,32 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } + /** + * Returns a new Future that will hold the successful result of this Future if it matches + * the given predicate, if it doesn't match, the resulting Future will be a failed Future + * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future + */ final def filter(pred: T ⇒ Boolean): Future[T] = { val p = Promise[T]() onComplete { case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] case r @ Right(res) ⇒ p complete (try { if (pred(res)) r else Left(new MatchError(res)) - } catch { case e: Exception ⇒ Left(e) }) + } catch { + case e ⇒ + logError("Future.filter", e) + Left(e) + }) } p } + + protected def logError(msg: String, problem: Throwable): Unit = { + executor match { + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case other ⇒ problem.printStackTrace() + } + } } object Promise { @@ -537,17 +581,17 @@ object Promise { * * Scala API */ - def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() + def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]() /** * Creates an already completed Promise with the specified exception */ - def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception)) + def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception)) /** * Creates an already completed Promise with the specified result */ - def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result)) + def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result)) } /** @@ -600,7 +644,13 @@ trait Promise[T] extends Future[T] { val fr = Promise[Any]() val thisPromise = this thisPromise completeWith other onComplete { v ⇒ - try { fr completeWith cont(thisPromise) } catch { case e: Exception ⇒ fr failure e } + try { + fr completeWith cont(thisPromise) + } catch { + case e ⇒ + logError("Promise.completeWith", e) + fr failure e + } } fr } @@ -609,7 +659,13 @@ trait Promise[T] extends Future[T] { val fr = Promise[Any]() val f = stream.dequeue(this) f.onComplete { _ ⇒ - try { fr completeWith cont(f) } catch { case e: Exception ⇒ fr failure e } + try { + fr completeWith cont(f) + } catch { + case e ⇒ + logError("Promise.completeWith", e) + fr failure e + } } fr } @@ -639,7 +695,7 @@ private[dispatch] object DefaultPromise { /** * The default concrete Future implementation. */ -class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { +class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self ⇒ import DefaultPromise.{ FState, Success, Failure, Pending } @@ -729,7 +785,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } + try { func(result) } catch { case e ⇒ logError("Future onComplete-callback raised an exception", e) } } } @@ -737,7 +793,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { +final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { val value = Some(resolve(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = true diff --git a/akka-docs/general/addressing.rst b/akka-docs/general/addressing.rst index a9485fd1b2..cc87da61f3 100644 --- a/akka-docs/general/addressing.rst +++ b/akka-docs/general/addressing.rst @@ -175,7 +175,7 @@ Looking up Actors by Concrete Path In addition, actor references may be looked up using the :meth:`ActorSystem.actorFor` method, which returns an (unverified) local, remote or clustered actor reference. Sending messages to such a reference or -attempting to observe its livelyhood will traverse the actor hierarchy of the +attempting to observe its liveness will traverse the actor hierarchy of the actor system from top to bottom by passing messages from parent to child until either the target is reached or failure is certain, i.e. a name in the path does not exist (in practice this process will be optimized using caches, but it diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index fa42c0c402..1b6a755ede 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -70,8 +70,6 @@ class FutureDocSpec extends AkkaSpec { import akka.dispatch.Future import akka.util.duration._ - implicit def dispatcher = system.dispatcher - val future = Future { "Hello" + "World" } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 98a7c75329..3399f68639 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -37,8 +37,16 @@ akka { } remote { + + # Which implementation of akka.remote.RemoteSupport to use + # default is a TCP-based remote transport based on Netty transport = "akka.remote.netty.NettyRemoteSupport" + # In case of increased latency / overflow how long + # should we wait (blocking the sender) until we deem the send to be cancelled? + # 0 means "never backoff", any positive number will indicate time to block at most. + backoff-timeout = 0ms + use-compression = off # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 67f5b6f8c9..c8ce919944 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -23,6 +23,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) + val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) // TODO cluster config will go into akka-cluster/reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 3832d0b8fa..24ae131a29 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -18,14 +18,14 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import scala.collection.mutable.HashMap import java.net.InetSocketAddress -import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException import akka.event.Logging -import locks.ReentrantReadWriteLock import org.jboss.netty.channel._ import akka.actor.ActorSystemImpl import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } +import java.util.concurrent._ +import locks.ReentrantReadWriteLock class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) @@ -73,16 +73,21 @@ abstract class RemoteClient private[akka] ( */ private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { try { - currentChannel.write(request).addListener( + val channel = currentChannel + val f = channel.write(request) + f.addListener( new ChannelFutureListener { def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //Not interesting at the moment - } else if (!future.isSuccess) { + if (future.isCancelled || !future.isSuccess) { remoteSupport.notifyListeners(RemoteClientWriteFailed(request, future.getCause, remoteSupport, remoteAddress)) } } }) + // Check if we should back off + if (!channel.isWritable) { + val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout + if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off + } } catch { case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala index 3eeab35bd0..014afd0c54 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala @@ -43,7 +43,7 @@ object MultiJvmSync { val barrier = if (AkkaRemoteSpec.testNodes eq null) new FileBasedBarrier(name, count, testName, nodeName, timeout) else - new ZkClient.ZkBarrier(name + "_" + nodeName, count, testName) + new ZkClient.ZkBarrier(name + "_" + nodeName, count, "/" + testName) barrier.await() } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala index 225be41d76..d6ef72ae22 100755 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -6,12 +6,14 @@ package akka.remote import com.typesafe.config.Config import org.apache.zookeeper._ import ZooDefs.Ids +import collection.JavaConversions._ +import java.net.InetAddress object ZkClient extends Watcher { // Don't forget to close! lazy val zk: ZooKeeper = { val remoteNodes = AkkaRemoteSpec.testNodes split ',' - + // ZkServers are configured to listen on a specific port. val connectString = remoteNodes map (_+":2181") mkString "," new ZooKeeper(connectString, 3000, this) @@ -22,29 +24,56 @@ object ZkClient extends Watcher { } class ZkBarrier(name: String, count: Int, root: String) extends Barrier { - if (zk.exists(root, false) eq null) { - zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + @annotation.tailrec + private def waitForServer() { + // SI-1672 + val r = try { + zk.exists("/", false); true + } catch { + case _: KeeperException.ConnectionLossException => + println("Server is not ready, sleeping...") + Thread.sleep(10000) + false + } + if (!r) waitForServer() + } + waitForServer() + + try { + zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + } catch { + case _: KeeperException.NodeExistsException => } def enter() { zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + while (true) { ZkClient.this.synchronized { - if (zk.getChildren(root, true).size < count) { + val children = zk.getChildren(root, true) + println("Enter, children: " + children.mkString(",")) + if (children.size < count) { + println("waiting") ZkClient.this.wait() - } + } else + return } } } - def leave() { + final def leave() { zk.delete(root + "/" + name, -1) + while (true) { ZkClient.this.synchronized { - if (!zk.getChildren(root, true).isEmpty) { + val children = zk.getChildren(root, true) + println("Leave, children: " + children.mkString(",")) + if (!children.isEmpty) { + println("waiting") ZkClient.this.wait() - } + } else + return } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 1132d1a733..cd8c3c8eb5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -15,6 +15,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) + getMilliseconds("akka.remote.backoff-timeout") must equal(0) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) //akka.remote.server