Added reportFailure for logging in ExecutionContext. See #1310

This commit is contained in:
Patrik Nordwall 2012-02-03 10:37:31 +01:00
parent 5033647176
commit 44b5ff056a
6 changed files with 37 additions and 22 deletions

View file

@ -15,6 +15,7 @@ import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.event.Logging.LogEventException
//TODO: everything here for current compatibility - could be limited more
@ -365,7 +366,7 @@ private[akka] class ActorCell(
} catch {
case NonFatal(e)
try {
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor")))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -396,7 +397,7 @@ private[akka] class ActorCell(
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
} catch {
case NonFatal(e) try {
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor")))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -460,7 +461,7 @@ private[akka] class ActorCell(
}
} catch {
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message)))
//TODO FIXME How should problems here be handled???
throw e
}
@ -483,7 +484,7 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage)))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
@ -492,7 +493,7 @@ private[akka] class ActorCell(
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage)))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
actor.supervisorStrategy.handleSupervisorFailing(self, children)
@ -502,7 +503,7 @@ private[akka] class ActorCell(
}
} catch {
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage)))
throw e
}
}

View file

@ -16,6 +16,7 @@ import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
import akka.jsr166y.ForkJoinPool
import akka.util.NonFatal
import akka.event.Logging.LogEventException
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
@ -102,10 +103,13 @@ object ExecutionContext {
*/
def fromExecutor(e: Executor): ExecutionContext = new WrappedExecutor(e)
private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext
private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext {
override def reportFailure(t: Throwable): Unit = t.printStackTrace()
}
private class WrappedExecutor(val executor: Executor) extends Executor with ExecutionContext {
override final def execute(runnable: Runnable): Unit = executor.execute(runnable)
override def reportFailure(t: Throwable): Unit = t.printStackTrace()
}
}
@ -120,6 +124,13 @@ trait ExecutionContext {
* Submits the runnable for execution
*/
def execute(runnable: Runnable): Unit
/**
* Failed tasks should call reportFailure to let the ExecutionContext
* log the problem or whatever is appropriate for the implementation.
*/
def reportFailure(t: Throwable): Unit
}
object MessageDispatcher {
@ -173,6 +184,11 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
}
def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException prerequisites.eventStream.publish(e.event)
case _ prerequisites.eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
}
@tailrec
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match {
case 0

View file

@ -322,22 +322,13 @@ object Future {
try {
next.apply()
} catch {
case NonFatal(e) logError(e)
case NonFatal(e) executor.reportFailure(e)
}
}
} finally { _taskStack.remove() }
})
}
/**
* Internal API, do not call
*/
private[akka] def logError(problem: Throwable)(implicit executor: ExecutionContext): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, "Future", this.getClass, problem.getMessage))
case other problem.printStackTrace()
}
}
}
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
@ -779,7 +770,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
}
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) {
try { func(result) } catch { case NonFatal(e) Future.logError(e) }
try { func(result) } catch { case NonFatal(e) executor.reportFailure(e) }
}
}

View file

@ -498,6 +498,13 @@ object Logging {
*/
class EventHandlerException extends AkkaException
/**
* Exception that wraps a LogEvent.
*/
class LogEventException(val event: LogEvent) extends NoStackTrace {
override def getMessage: String = event.toString
}
/**
* Base type of LogEvents
*/

View file

@ -51,7 +51,7 @@ import RemoteSystemDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import akka.dispatch.{Await, Dispatchers, Future, PinnedDispatcher}
import akka.dispatch.{ Await, Dispatchers, Future, PinnedDispatcher }
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
@ -1158,7 +1158,7 @@ class DefaultClusterNode private[akka] (
throw cause
}
} catch {
case e: TimeoutException =>
case e: TimeoutException
EventHandler.error(e, this, "Remote command to [%s] timed out".format(connection.address))
throw e
case e: Exception

View file

@ -543,8 +543,8 @@ object TransactionLog {
private[akka] def await[T](future: Promise[T]): T = {
future.await.value.get match {
case Right(result) => result
case Left(throwable) => handleError(throwable)
case Right(result) result
case Left(throwable) handleError(throwable)
}
}