diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 30f8b8a4d3..ee8c35c602 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -15,6 +15,7 @@ import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension import akka.util.NonFatal +import akka.event.Logging.LogEventException //TODO: everything here for current compatibility - could be limited more @@ -365,7 +366,7 @@ private[akka] class ActorCell( } catch { case NonFatal(e) ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"))) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -396,7 +397,7 @@ private[akka] class ActorCell( actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) } catch { case NonFatal(e) ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"))) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -460,7 +461,7 @@ private[akka] class ActorCell( } } catch { case NonFatal(e) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message))) //TODO FIXME How should problems here be handled??? throw e } @@ -483,7 +484,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e: InterruptedException ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage))) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) // make sure that InterruptedException does not leave this thread @@ -492,7 +493,7 @@ private[akka] class ActorCell( parent.tell(Failed(ex), self) throw e //Re-throw InterruptedExceptions as expected case NonFatal(e) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage))) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) actor.supervisorStrategy.handleSupervisorFailing(self, children) @@ -502,7 +503,7 @@ private[akka] class ActorCell( } } catch { case NonFatal(e) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage))) throw e } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index bc1494a6b2..f3a21681ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -16,6 +16,7 @@ import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension import akka.jsr166y.ForkJoinPool import akka.util.NonFatal +import akka.event.Logging.LogEventException final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -102,10 +103,13 @@ object ExecutionContext { */ def fromExecutor(e: Executor): ExecutionContext = new WrappedExecutor(e) - private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext + private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext { + override def reportFailure(t: Throwable): Unit = t.printStackTrace() + } private class WrappedExecutor(val executor: Executor) extends Executor with ExecutionContext { override final def execute(runnable: Runnable): Unit = executor.execute(runnable) + override def reportFailure(t: Throwable): Unit = t.printStackTrace() } } @@ -120,6 +124,13 @@ trait ExecutionContext { * Submits the runnable for execution */ def execute(runnable: Runnable): Unit + + /** + * Failed tasks should call reportFailure to let the ExecutionContext + * log the problem or whatever is appropriate for the implementation. + */ + def reportFailure(t: Throwable): Unit + } object MessageDispatcher { @@ -173,6 +184,11 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } } + def reportFailure(t: Throwable): Unit = t match { + case e: LogEventException ⇒ prerequisites.eventStream.publish(e.event) + case _ ⇒ prerequisites.eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage)) + } + @tailrec private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match { case 0 ⇒ diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 15a9251047..b540e94dbe 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -322,22 +322,13 @@ object Future { try { next.apply() } catch { - case NonFatal(e) ⇒ logError(e) + case NonFatal(e) ⇒ executor.reportFailure(e) } } } finally { _taskStack.remove() } }) } - /** - * Internal API, do not call - */ - private[akka] def logError(problem: Throwable)(implicit executor: ExecutionContext): Unit = { - executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, "Future", this.getClass, problem.getMessage)) - case other ⇒ problem.printStackTrace() - } - } } sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { @@ -779,7 +770,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case NonFatal(e) ⇒ Future.logError(e) } + try { func(result) } catch { case NonFatal(e) ⇒ executor.reportFailure(e) } } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index c8bbe5f9eb..df5cafa877 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -498,6 +498,13 @@ object Logging { */ class EventHandlerException extends AkkaException + /** + * Exception that wraps a LogEvent. + */ + class LogEventException(val event: LogEvent) extends NoStackTrace { + override def getMessage: String = event.toString + } + /** * Base type of LogEvents */ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 130149b491..2b69f32c3b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -51,7 +51,7 @@ import RemoteSystemDaemonMessageType._ import com.eaio.uuid.UUID import com.google.protobuf.ByteString -import akka.dispatch.{Await, Dispatchers, Future, PinnedDispatcher} +import akka.dispatch.{ Await, Dispatchers, Future, PinnedDispatcher } // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down @@ -1158,7 +1158,7 @@ class DefaultClusterNode private[akka] ( throw cause } } catch { - case e: TimeoutException => + case e: TimeoutException ⇒ EventHandler.error(e, this, "Remote command to [%s] timed out".format(connection.address)) throw e case e: Exception ⇒ diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index ce9eb300f5..07cf10be9f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -543,8 +543,8 @@ object TransactionLog { private[akka] def await[T](future: Promise[T]): T = { future.await.value.get match { - case Right(result) => result - case Left(throwable) => handleError(throwable) + case Right(result) ⇒ result + case Left(throwable) ⇒ handleError(throwable) } }