Added the ErrorHandler notifications to all try-catch blocks

This commit is contained in:
Jonas Bonér 2011-03-02 00:14:45 +01:00
parent 2127e8077e
commit c4e2d73668
17 changed files with 162 additions and 56 deletions

View file

@ -73,19 +73,40 @@ class ActorKilledException private[akka](message: String) extends AkkaEx
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
sealed trait ErrorHandlerEventLevel {
def asString: String
}
object HighErrorHandlerEventLevel extends ErrorHandlerEventLevel {
def asString = "high"
}
object MediumErrorHandlerEventLevel extends ErrorHandlerEventLevel {
def asString = "medium"
}
object LowErrorHandlerEventLevel extends ErrorHandlerEventLevel {
def asString = "low"
}
case class ErrorHandlerEvent(
@BeanProperty val cause: Throwable,
@BeanProperty val instance: AnyRef,
@BeanProperty val message: String = "") {
@BeanProperty val message: String = "",
@BeanProperty val level: ErrorHandlerEventLevel = MediumErrorHandlerEventLevel) {
@BeanProperty val thread: Thread = Thread.currentThread
}
// FIXME add flume listener
// document writing custom
/**
* Error handler.
*
* Create, add and remove a listener:
* <pre>
* val errorHandlerEventListener = new Actor {
* self.dispatcher = ErrorHandler.ErrorHandlerDispatcher
*
* def receive = {
* case ErrorHandlerEvent(cause: Throwable, message: String) =>
* case ErrorHandlerEvent(cause, message, level) =>
* }
* }
*
@ -94,35 +115,50 @@ case class ErrorHandlerEvent(
* ErrorHandler.removeListener(errorHandlerEventListener)
* </pre>
*
* Log an error event:
* <pre>
* ErrorHandler notifyListeners ErrorHandlerEvent(reason, this, message.toString)
* </pre>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ErrorHandler extends ListenerManagement {
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
class DefaultListener extends Actor {
import java.io.{StringWriter, PrintWriter}
import java.net.{InetAddress, UnknownHostException}
import java.text.DateFormat
import java.util.Date
import akka.dispatch.Dispatchers
def receive = {
case event @ ErrorHandlerEvent(cause, instance, message) =>
val stackTrace = {
val error = "[error:%s] [%s] [%s] [%s] %s\n%s".intern
val ID = "default:error:handler"
val ErrorHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
def formattedTimestamp = DateFormat.getInstance.format(new Date)
def stackTraceFor(e: Throwable) = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
cause.printStackTrace(pw)
e.printStackTrace(pw)
sw.toString
}
val time = java.text.DateFormat.getInstance.format(new java.util.Date)
class DefaultListener extends Actor {
self.id = ID
self.dispatcher = ErrorHandlerDispatcher
def receive = {
case event @ ErrorHandlerEvent(cause, instance, message, level) =>
val log = error.format(
time,
level.asString,
formattedTimestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message,
stackTrace)
stackTraceFor(cause))
println(log)
case _ => {}
}
}
if (config.getBool("akka.default-error-handler", true))
addListener(Actor.actorOf[DefaultListener].start) // FIXME configurable in config (on/off)
}

View file

@ -819,6 +819,7 @@ class LocalActorRef private[akka] (
}
} catch {
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, messageHandle.message.toString)
throw e
} finally {
currentMessage = null //TODO: Don't reset this, we might want to resend the message

View file

@ -37,7 +37,9 @@ object Scheduler {
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, receiver + " @ " + message)
throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
@ -56,7 +58,9 @@ object Scheduler {
try {
service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e)
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw SchedulerException("Failed to schedule a Runnable", e)
}
}
@ -69,7 +73,9 @@ object Scheduler {
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, receiver + " @ " + message)
throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
}
}
@ -88,7 +94,9 @@ object Scheduler {
try {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e)
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw SchedulerException("Failed to scheduleOnce a Runnable", e)
}
}

View file

@ -5,6 +5,7 @@
package akka.config
import akka.AkkaException
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
import net.lag.configgy.{Config => CConfig, Configgy, ParseException}
import java.net.InetSocketAddress
@ -58,9 +59,13 @@ object Config {
Configgy.configure(configFile)
println("Config loaded from -Dakka.config=" + configFile)
} catch {
case e: ParseException => throw new ConfigurationException(
case cause: ParseException =>
val e = new ConfigurationException(
"Config could not be loaded from -Dakka.config=" + configFile +
"\n\tdue to: " + e.toString)
"\n\tdue to: " + cause.toString)
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw e
}
Configgy.config
} else if (getClass.getClassLoader.getResource(confName) ne null) {
@ -68,9 +73,12 @@ object Config {
Configgy.configureFromResource(confName, getClass.getClassLoader)
println("Config [" + confName + "] loaded from the application classpath.")
} catch {
case e: ParseException => throw new ConfigurationException(
case cause: ParseException =>
val e = new ConfigurationException(
"Can't load '" + confName + "' config file from application classpath," +
"\n\tdue to: " + e.toString)
"\n\tdue to: " + cause.toString)
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw e
}
Configgy.config
} else if (HOME.isDefined) {
@ -81,10 +89,13 @@ object Config {
"AKKA_HOME is defined as [" + HOME.getOrElse(throwNoAkkaHomeException) +
"], config loaded from [" + configFile + "].")
} catch {
case e: ParseException => throw new ConfigurationException(
case cause: ParseException =>
val e = throw new ConfigurationException(
"AKKA_HOME is defined as [" + HOME.get + "] " +
"\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," +
"\n\tdue to: " + e.toString)
"\n\tdue to: " + cause.toString)
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw e
}
Configgy.config
} else {

View file

@ -7,7 +7,7 @@ package akka.dataflow
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import akka.actor.{Actor, ActorRef}
import akka.actor.{Actor, ActorRef, ErrorHandler, ErrorHandlerEvent}
import akka.actor.Actor._
import akka.dispatch.CompletableFuture
import akka.AkkaException
@ -148,6 +148,7 @@ object DataFlow {
(out !! Get).as[T]
} catch {
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
out ! Exit
throw e
}

View file

@ -4,7 +4,7 @@
package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.actor.{ActorRef, IllegalActorStateException, ErrorHandler, ErrorHandlerEvent}
import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue
@ -132,6 +132,7 @@ class ExecutorBasedEventDrivenDispatcher(
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, _name)
mbox.dispatcherLock.unlock()
throw e
}

View file

@ -6,15 +6,15 @@ package akka.dispatch
import akka.AkkaException
import akka.actor.Actor.spawn
import akka.actor.{Actor, ErrorHandler, ErrorHandlerEvent}
import akka.routing.Dispatcher
import akka.japi.Procedure
import java.util.concurrent.locks.ReentrantLock
import akka.japi.Procedure
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
import akka.actor.Actor
import annotation.tailrec
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
import annotation.tailrec
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -33,8 +33,13 @@ object Futures {
(body: => T): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout)
spawn({
try { f completeWithResult body }
catch { case e => f completeWithException e}
try {
f completeWithResult body
} catch {
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
f completeWithException e
}
})(dispatcher)
f
}
@ -97,7 +102,9 @@ object Futures {
results.clear //Do not retain the values since someone can hold onto the Future for a long time
result completeWithResult r
} catch {
case e: Exception => result completeWithException e
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
result completeWithException e
}
}
}
@ -255,7 +262,9 @@ sealed trait Future[T] {
fa complete (try {
Right(f(v.right.get))
} catch {
case e => Left(e)
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
Left(e)
})
}
}
@ -281,7 +290,9 @@ sealed trait Future[T] {
try {
f(v.right.get) onComplete (fa.completeWith(_))
} catch {
case e => fa completeWithException e
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
fa completeWithException e
}
}
}
@ -309,7 +320,9 @@ sealed trait Future[T] {
if (p(r)) Right(r)
else Left(new MatchError(r))
} catch {
case e => Left(e)
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
Left(e)
})
}
}

View file

@ -9,7 +9,8 @@ import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import akka.util. {Duration}
import akka.util.Duration
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
object ThreadPoolConfig {
type Bounds = Int
@ -207,8 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
})
} catch {
case e: RejectedExecutionException =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
semaphore.release
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
throw e
}
}

View file

@ -4,7 +4,7 @@
package akka.routing
import akka.actor. {Actor, ActorRef}
import akka.actor.{Actor, ActorRef, ErrorHandler, ErrorHandlerEvent}
/**
* Actor pooling
@ -83,7 +83,9 @@ trait DefaultActorPool extends ActorPool
try {
future completeWithResult (delegate !! msg).getOrElse(None)
} catch {
case ex => future completeWithException ex
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
future completeWithException e
}
}
}

View file

@ -6,6 +6,7 @@ package akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic. {AtomicBoolean}
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -125,6 +126,7 @@ class Switch(startAsOn: Boolean = false) {
action
} catch {
case t =>
ErrorHandler notifyListeners ErrorHandlerEvent(t, this)
switch.compareAndSet(!from, from) //Revert status
throw t
}

View file

@ -125,6 +125,7 @@ object ReflectiveAccess {
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e =>
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
None
}
@ -141,6 +142,7 @@ object ReflectiveAccess {
Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e =>
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
None
}
@ -152,9 +154,11 @@ object ReflectiveAccess {
Option(instance.get(null).asInstanceOf[T])
} catch {
case e: ClassNotFoundException => {
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
None
}
case ei: ExceptionInInitializerError => {
//ErrorHandler notifyListeners ErrorHandlerEvent(ei, this)
throw ei
}
}
@ -163,6 +167,8 @@ object ReflectiveAccess {
assert(fqn ne null)
Some(classloader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case e => None
case e =>
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
None
}
}

View file

@ -5,6 +5,7 @@
package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import javax.servlet.http.HttpServlet
@ -388,6 +389,7 @@ trait RequestMethod {
}
} catch {
case io =>
ErrorHandler notifyListeners ErrorHandlerEvent(io, this)
false
}
}
@ -405,7 +407,8 @@ trait RequestMethod {
pipe.complete
}
} catch {
case io: IOException => {}
case io: IOException =>
ErrorHandler notifyListeners ErrorHandlerEvent(io, this)
}
}

View file

@ -7,6 +7,7 @@ package akka.http
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
import Types._
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
/**
* @author Garrick Evans
@ -35,6 +36,7 @@ trait Servlet30Context extends AsyncListener {
}
catch {
case ex: IllegalStateException =>
ErrorHandler notifyListeners ErrorHandlerEvent(ex, this)
false
}
}

View file

@ -23,6 +23,7 @@
package akka.security
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
import akka.actor.Actor._
import akka.config.Config
@ -368,6 +369,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] {
Some(UserInfo(user, null, rolesFor(user)))
} catch {
case e: PrivilegedActionException => {
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
return None
}
}

View file

@ -16,6 +16,7 @@ import akka.remoteinterface._
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException
import akka.actor.Actor._
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
@ -428,6 +429,7 @@ class ActiveRemoteClientHandler(
}
} catch {
case e: Exception =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
}
@ -484,6 +486,7 @@ class ActiveRemoteClientHandler(
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem =>
ErrorHandler notifyListeners ErrorHandlerEvent(problem, this)
UnparsableException(classname, exception.getMessage)
}
}
@ -554,8 +557,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => {}
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
}
}
}
@ -587,6 +590,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
}
} catch {
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
notifyListeners(RemoteServerError(e, this))
}
this
@ -899,6 +903,7 @@ class RemoteServerHandler(
val actorRef =
try { createActor(actorInfo, channel).start } catch {
case e: SecurityException =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
server.notifyListeners(RemoteServerError(e, server))
return
@ -985,7 +990,9 @@ class RemoteServerHandler(
write(channel, messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
case e: Throwable =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
server.notifyListeners(RemoteServerError(e, server))
}
messageReceiver.invoke(typedActor, args: _*) match {
@ -1000,9 +1007,11 @@ class RemoteServerHandler(
}
} catch {
case e: InvocationTargetException =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
case e: Throwable =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
}
@ -1062,6 +1071,7 @@ class RemoteServerHandler(
actorRef
} catch {
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
@ -1128,6 +1138,7 @@ class RemoteServerHandler(
newInstance
} catch {
case e =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
server.notifyListeners(RemoteServerError(e, server))
throw e
}

View file

@ -6,6 +6,7 @@ package akka.transactor
import akka.config.Config
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier
@ -136,7 +137,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
} catch {
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
case e: IllegalStateException => ()
case e: IllegalStateException =>
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
}
result
}

View file

@ -14,6 +14,8 @@ akka {
enable-jmx = on # expose the configuration through JMX
default-error-handler = on # register the default error handler listener which logs errors to STDOUT
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
# Can be used to bootstrap your application(s)
# Should be the FQN (Fully Qualified Name) of the boot class which needs to have a default constructor