Added the ErrorHandler notifications to all try-catch blocks
This commit is contained in:
parent
ce00125aba
commit
6f6d459dcd
17 changed files with 162 additions and 56 deletions
|
|
@ -73,19 +73,40 @@ class ActorKilledException private[akka](message: String) extends AkkaEx
|
||||||
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
|
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
|
||||||
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
|
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
|
||||||
|
|
||||||
|
sealed trait ErrorHandlerEventLevel {
|
||||||
|
def asString: String
|
||||||
|
}
|
||||||
|
object HighErrorHandlerEventLevel extends ErrorHandlerEventLevel {
|
||||||
|
def asString = "high"
|
||||||
|
}
|
||||||
|
object MediumErrorHandlerEventLevel extends ErrorHandlerEventLevel {
|
||||||
|
def asString = "medium"
|
||||||
|
}
|
||||||
|
object LowErrorHandlerEventLevel extends ErrorHandlerEventLevel {
|
||||||
|
def asString = "low"
|
||||||
|
}
|
||||||
|
|
||||||
case class ErrorHandlerEvent(
|
case class ErrorHandlerEvent(
|
||||||
@BeanProperty val cause: Throwable,
|
@BeanProperty val cause: Throwable,
|
||||||
@BeanProperty val instance: AnyRef,
|
@BeanProperty val instance: AnyRef,
|
||||||
@BeanProperty val message: String = "") {
|
@BeanProperty val message: String = "",
|
||||||
|
@BeanProperty val level: ErrorHandlerEventLevel = MediumErrorHandlerEventLevel) {
|
||||||
@BeanProperty val thread: Thread = Thread.currentThread
|
@BeanProperty val thread: Thread = Thread.currentThread
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME add flume listener
|
||||||
|
// document writing custom
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Error handler.
|
* Error handler.
|
||||||
|
*
|
||||||
|
* Create, add and remove a listener:
|
||||||
* <pre>
|
* <pre>
|
||||||
* val errorHandlerEventListener = new Actor {
|
* val errorHandlerEventListener = new Actor {
|
||||||
|
* self.dispatcher = ErrorHandler.ErrorHandlerDispatcher
|
||||||
|
*
|
||||||
* def receive = {
|
* def receive = {
|
||||||
* case ErrorHandlerEvent(cause: Throwable, message: String) =>
|
* case ErrorHandlerEvent(cause, message, level) =>
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
|
|
@ -94,36 +115,51 @@ case class ErrorHandlerEvent(
|
||||||
* ErrorHandler.removeListener(errorHandlerEventListener)
|
* ErrorHandler.removeListener(errorHandlerEventListener)
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
* Log an error event:
|
||||||
|
* <pre>
|
||||||
|
* ErrorHandler notifyListeners ErrorHandlerEvent(reason, this, message.toString)
|
||||||
|
* </pre>
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object ErrorHandler extends ListenerManagement {
|
object ErrorHandler extends ListenerManagement {
|
||||||
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
|
import java.io.{StringWriter, PrintWriter}
|
||||||
|
import java.text.DateFormat
|
||||||
|
import java.util.Date
|
||||||
|
import akka.dispatch.Dispatchers
|
||||||
|
|
||||||
|
val error = "[error:%s] [%s] [%s] [%s] %s\n%s".intern
|
||||||
|
val ID = "default:error:handler"
|
||||||
|
val ErrorHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
|
||||||
|
|
||||||
|
def formattedTimestamp = DateFormat.getInstance.format(new Date)
|
||||||
|
|
||||||
|
def stackTraceFor(e: Throwable) = {
|
||||||
|
val sw = new StringWriter
|
||||||
|
val pw = new PrintWriter(sw)
|
||||||
|
e.printStackTrace(pw)
|
||||||
|
sw.toString
|
||||||
|
}
|
||||||
|
|
||||||
class DefaultListener extends Actor {
|
class DefaultListener extends Actor {
|
||||||
import java.io.{StringWriter, PrintWriter}
|
self.id = ID
|
||||||
import java.net.{InetAddress, UnknownHostException}
|
self.dispatcher = ErrorHandlerDispatcher
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case event @ ErrorHandlerEvent(cause, instance, message) =>
|
case event @ ErrorHandlerEvent(cause, instance, message, level) =>
|
||||||
val stackTrace = {
|
|
||||||
val sw = new StringWriter
|
|
||||||
val pw = new PrintWriter(sw)
|
|
||||||
cause.printStackTrace(pw)
|
|
||||||
sw.toString
|
|
||||||
}
|
|
||||||
val time = java.text.DateFormat.getInstance.format(new java.util.Date)
|
|
||||||
val log = error.format(
|
val log = error.format(
|
||||||
time,
|
level.asString,
|
||||||
|
formattedTimestamp,
|
||||||
event.thread.getName,
|
event.thread.getName,
|
||||||
instance.getClass.getSimpleName,
|
instance.getClass.getSimpleName,
|
||||||
message,
|
message,
|
||||||
stackTrace)
|
stackTraceFor(cause))
|
||||||
println(log)
|
println(log)
|
||||||
case _ => {}
|
case _ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
addListener(Actor.actorOf[DefaultListener].start) // FIXME configurable in config (on/off)
|
if (config.getBool("akka.default-error-handler", true))
|
||||||
|
addListener(Actor.actorOf[DefaultListener].start) // FIXME configurable in config (on/off)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -819,6 +819,7 @@ class LocalActorRef private[akka] (
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, messageHandle.message.toString)
|
||||||
throw e
|
throw e
|
||||||
} finally {
|
} finally {
|
||||||
currentMessage = null //TODO: Don't reset this, we might want to resend the message
|
currentMessage = null //TODO: Don't reset this, we might want to resend the message
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,9 @@ object Scheduler {
|
||||||
new Runnable { def run = receiver ! message },
|
new Runnable { def run = receiver ! message },
|
||||||
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, receiver + " @ " + message)
|
||||||
|
throw SchedulerException(message + " could not be scheduled on " + receiver, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,9 +56,11 @@ object Scheduler {
|
||||||
*/
|
*/
|
||||||
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
|
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
|
||||||
try {
|
try {
|
||||||
service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e)
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
throw SchedulerException("Failed to schedule a Runnable", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,7 +73,9 @@ object Scheduler {
|
||||||
new Runnable { def run = receiver ! message },
|
new Runnable { def run = receiver ! message },
|
||||||
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, receiver + " @ " + message)
|
||||||
|
throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,7 +94,9 @@ object Scheduler {
|
||||||
try {
|
try {
|
||||||
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e)
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
throw SchedulerException("Failed to scheduleOnce a Runnable", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package akka.config
|
package akka.config
|
||||||
|
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
import net.lag.configgy.{Config => CConfig, Configgy, ParseException}
|
import net.lag.configgy.{Config => CConfig, Configgy, ParseException}
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
|
@ -58,9 +59,13 @@ object Config {
|
||||||
Configgy.configure(configFile)
|
Configgy.configure(configFile)
|
||||||
println("Config loaded from -Dakka.config=" + configFile)
|
println("Config loaded from -Dakka.config=" + configFile)
|
||||||
} catch {
|
} catch {
|
||||||
case e: ParseException => throw new ConfigurationException(
|
case cause: ParseException =>
|
||||||
"Config could not be loaded from -Dakka.config=" + configFile +
|
val e = new ConfigurationException(
|
||||||
"\n\tdue to: " + e.toString)
|
"Config could not be loaded from -Dakka.config=" + configFile +
|
||||||
|
"\n\tdue to: " + cause.toString)
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
throw e
|
||||||
|
|
||||||
}
|
}
|
||||||
Configgy.config
|
Configgy.config
|
||||||
} else if (getClass.getClassLoader.getResource(confName) ne null) {
|
} else if (getClass.getClassLoader.getResource(confName) ne null) {
|
||||||
|
|
@ -68,9 +73,12 @@ object Config {
|
||||||
Configgy.configureFromResource(confName, getClass.getClassLoader)
|
Configgy.configureFromResource(confName, getClass.getClassLoader)
|
||||||
println("Config [" + confName + "] loaded from the application classpath.")
|
println("Config [" + confName + "] loaded from the application classpath.")
|
||||||
} catch {
|
} catch {
|
||||||
case e: ParseException => throw new ConfigurationException(
|
case cause: ParseException =>
|
||||||
"Can't load '" + confName + "' config file from application classpath," +
|
val e = new ConfigurationException(
|
||||||
"\n\tdue to: " + e.toString)
|
"Can't load '" + confName + "' config file from application classpath," +
|
||||||
|
"\n\tdue to: " + cause.toString)
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
Configgy.config
|
Configgy.config
|
||||||
} else if (HOME.isDefined) {
|
} else if (HOME.isDefined) {
|
||||||
|
|
@ -81,10 +89,13 @@ object Config {
|
||||||
"AKKA_HOME is defined as [" + HOME.getOrElse(throwNoAkkaHomeException) +
|
"AKKA_HOME is defined as [" + HOME.getOrElse(throwNoAkkaHomeException) +
|
||||||
"], config loaded from [" + configFile + "].")
|
"], config loaded from [" + configFile + "].")
|
||||||
} catch {
|
} catch {
|
||||||
case e: ParseException => throw new ConfigurationException(
|
case cause: ParseException =>
|
||||||
"AKKA_HOME is defined as [" + HOME.get + "] " +
|
val e = throw new ConfigurationException(
|
||||||
"\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," +
|
"AKKA_HOME is defined as [" + HOME.get + "] " +
|
||||||
"\n\tdue to: " + e.toString)
|
"\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," +
|
||||||
|
"\n\tdue to: " + cause.toString)
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
Configgy.config
|
Configgy.config
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ package akka.dataflow
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
||||||
|
|
||||||
import akka.actor.{Actor, ActorRef}
|
import akka.actor.{Actor, ActorRef, ErrorHandler, ErrorHandlerEvent}
|
||||||
import akka.actor.Actor._
|
import akka.actor.Actor._
|
||||||
import akka.dispatch.CompletableFuture
|
import akka.dispatch.CompletableFuture
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
|
@ -148,6 +148,7 @@ object DataFlow {
|
||||||
(out !! Get).as[T]
|
(out !! Get).as[T]
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
out ! Exit
|
out ! Exit
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package akka.dispatch
|
package akka.dispatch
|
||||||
|
|
||||||
import akka.actor.{ActorRef, IllegalActorStateException}
|
import akka.actor.{ActorRef, IllegalActorStateException, ErrorHandler, ErrorHandlerEvent}
|
||||||
import akka.util.{ReflectiveAccess, Switch}
|
import akka.util.{ReflectiveAccess, Switch}
|
||||||
|
|
||||||
import java.util.Queue
|
import java.util.Queue
|
||||||
|
|
@ -132,6 +132,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
executorService.get() execute mbox
|
executorService.get() execute mbox
|
||||||
} catch {
|
} catch {
|
||||||
case e: RejectedExecutionException =>
|
case e: RejectedExecutionException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this, _name)
|
||||||
mbox.dispatcherLock.unlock()
|
mbox.dispatcherLock.unlock()
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,15 @@ package akka.dispatch
|
||||||
|
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.actor.Actor.spawn
|
import akka.actor.Actor.spawn
|
||||||
|
import akka.actor.{Actor, ErrorHandler, ErrorHandlerEvent}
|
||||||
import akka.routing.Dispatcher
|
import akka.routing.Dispatcher
|
||||||
|
import akka.japi.Procedure
|
||||||
|
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
import akka.japi.Procedure
|
|
||||||
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
|
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
|
||||||
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
||||||
import akka.actor.Actor
|
|
||||||
import annotation.tailrec
|
|
||||||
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
||||||
|
import annotation.tailrec
|
||||||
|
|
||||||
class FutureTimeoutException(message: String) extends AkkaException(message)
|
class FutureTimeoutException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
|
|
@ -33,8 +33,13 @@ object Futures {
|
||||||
(body: => T): Future[T] = {
|
(body: => T): Future[T] = {
|
||||||
val f = new DefaultCompletableFuture[T](timeout)
|
val f = new DefaultCompletableFuture[T](timeout)
|
||||||
spawn({
|
spawn({
|
||||||
try { f completeWithResult body }
|
try {
|
||||||
catch { case e => f completeWithException e}
|
f completeWithResult body
|
||||||
|
} catch {
|
||||||
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
f completeWithException e
|
||||||
|
}
|
||||||
})(dispatcher)
|
})(dispatcher)
|
||||||
f
|
f
|
||||||
}
|
}
|
||||||
|
|
@ -97,7 +102,9 @@ object Futures {
|
||||||
results.clear //Do not retain the values since someone can hold onto the Future for a long time
|
results.clear //Do not retain the values since someone can hold onto the Future for a long time
|
||||||
result completeWithResult r
|
result completeWithResult r
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => result completeWithException e
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
result completeWithException e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -255,7 +262,9 @@ sealed trait Future[T] {
|
||||||
fa complete (try {
|
fa complete (try {
|
||||||
Right(f(v.right.get))
|
Right(f(v.right.get))
|
||||||
} catch {
|
} catch {
|
||||||
case e => Left(e)
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
Left(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -281,7 +290,9 @@ sealed trait Future[T] {
|
||||||
try {
|
try {
|
||||||
f(v.right.get) onComplete (fa.completeWith(_))
|
f(v.right.get) onComplete (fa.completeWith(_))
|
||||||
} catch {
|
} catch {
|
||||||
case e => fa completeWithException e
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
fa completeWithException e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -309,7 +320,9 @@ sealed trait Future[T] {
|
||||||
if (p(r)) Right(r)
|
if (p(r)) Right(r)
|
||||||
else Left(new MatchError(r))
|
else Left(new MatchError(r))
|
||||||
} catch {
|
} catch {
|
||||||
case e => Left(e)
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
Left(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,8 @@ import java.util.concurrent._
|
||||||
import atomic.{AtomicLong, AtomicInteger}
|
import atomic.{AtomicLong, AtomicInteger}
|
||||||
import ThreadPoolExecutor.CallerRunsPolicy
|
import ThreadPoolExecutor.CallerRunsPolicy
|
||||||
|
|
||||||
import akka.util. {Duration}
|
import akka.util.Duration
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
object ThreadPoolConfig {
|
object ThreadPoolConfig {
|
||||||
type Bounds = Int
|
type Bounds = Int
|
||||||
|
|
@ -207,8 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
||||||
})
|
})
|
||||||
} catch {
|
} catch {
|
||||||
case e: RejectedExecutionException =>
|
case e: RejectedExecutionException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
semaphore.release
|
semaphore.release
|
||||||
case e =>
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package akka.routing
|
package akka.routing
|
||||||
|
|
||||||
import akka.actor. {Actor, ActorRef}
|
import akka.actor.{Actor, ActorRef, ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actor pooling
|
* Actor pooling
|
||||||
|
|
@ -83,7 +83,9 @@ trait DefaultActorPool extends ActorPool
|
||||||
try {
|
try {
|
||||||
future completeWithResult (delegate !! msg).getOrElse(None)
|
future completeWithResult (delegate !! msg).getOrElse(None)
|
||||||
} catch {
|
} catch {
|
||||||
case ex => future completeWithException ex
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
future completeWithException e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.util
|
||||||
|
|
||||||
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
|
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
|
||||||
import java.util.concurrent.atomic. {AtomicBoolean}
|
import java.util.concurrent.atomic. {AtomicBoolean}
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
|
|
@ -125,7 +126,8 @@ class Switch(startAsOn: Boolean = false) {
|
||||||
action
|
action
|
||||||
} catch {
|
} catch {
|
||||||
case t =>
|
case t =>
|
||||||
switch.compareAndSet(!from,from) //Revert status
|
ErrorHandler notifyListeners ErrorHandlerEvent(t, this)
|
||||||
|
switch.compareAndSet(!from, from) //Revert status
|
||||||
throw t
|
throw t
|
||||||
}
|
}
|
||||||
true
|
true
|
||||||
|
|
|
||||||
|
|
@ -125,6 +125,7 @@ object ReflectiveAccess {
|
||||||
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -141,6 +142,7 @@ object ReflectiveAccess {
|
||||||
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -152,9 +154,11 @@ object ReflectiveAccess {
|
||||||
Option(instance.get(null).asInstanceOf[T])
|
Option(instance.get(null).asInstanceOf[T])
|
||||||
} catch {
|
} catch {
|
||||||
case e: ClassNotFoundException => {
|
case e: ClassNotFoundException => {
|
||||||
|
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
case ei: ExceptionInInitializerError => {
|
case ei: ExceptionInInitializerError => {
|
||||||
|
//ErrorHandler notifyListeners ErrorHandlerEvent(ei, this)
|
||||||
throw ei
|
throw ei
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -163,6 +167,8 @@ object ReflectiveAccess {
|
||||||
assert(fqn ne null)
|
assert(fqn ne null)
|
||||||
Some(classloader.loadClass(fqn).asInstanceOf[Class[T]])
|
Some(classloader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||||
} catch {
|
} catch {
|
||||||
case e => None
|
case e =>
|
||||||
|
//ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package akka.http
|
package akka.http
|
||||||
|
|
||||||
import akka.actor.{ActorRegistry, ActorRef, Actor}
|
import akka.actor.{ActorRegistry, ActorRef, Actor}
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||||
import javax.servlet.http.HttpServlet
|
import javax.servlet.http.HttpServlet
|
||||||
|
|
@ -388,6 +389,7 @@ trait RequestMethod {
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case io =>
|
case io =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(io, this)
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -405,7 +407,8 @@ trait RequestMethod {
|
||||||
pipe.complete
|
pipe.complete
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case io: IOException => {}
|
case io: IOException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(io, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ package akka.http
|
||||||
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
|
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
|
||||||
import Types._
|
import Types._
|
||||||
|
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Garrick Evans
|
* @author Garrick Evans
|
||||||
|
|
@ -35,6 +36,7 @@ trait Servlet30Context extends AsyncListener {
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
case ex: IllegalStateException =>
|
case ex: IllegalStateException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(ex, this)
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@
|
||||||
package akka.security
|
package akka.security
|
||||||
|
|
||||||
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
|
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
import akka.actor.Actor._
|
import akka.actor.Actor._
|
||||||
import akka.config.Config
|
import akka.config.Config
|
||||||
|
|
||||||
|
|
@ -368,6 +369,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] {
|
||||||
Some(UserInfo(user, null, rolesFor(user)))
|
Some(UserInfo(user, null, rolesFor(user)))
|
||||||
} catch {
|
} catch {
|
||||||
case e: PrivilegedActionException => {
|
case e: PrivilegedActionException => {
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
return None
|
return None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import akka.remoteinterface._
|
||||||
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.actor.Actor._
|
import akka.actor.Actor._
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
import akka.util._
|
import akka.util._
|
||||||
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
||||||
|
|
||||||
|
|
@ -428,6 +429,7 @@ class ActiveRemoteClientHandler(
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
|
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
@ -484,6 +486,7 @@ class ActiveRemoteClientHandler(
|
||||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||||
} catch {
|
} catch {
|
||||||
case problem =>
|
case problem =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(problem, this)
|
||||||
UnparsableException(classname, exception.getMessage)
|
UnparsableException(classname, exception.getMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -554,8 +557,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
|
||||||
bootstrap.releaseExternalResources
|
bootstrap.releaseExternalResources
|
||||||
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
|
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
|
||||||
} catch {
|
} catch {
|
||||||
case e: java.nio.channels.ClosedChannelException => {}
|
case e =>
|
||||||
case e => {}
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -587,6 +590,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
notifyListeners(RemoteServerError(e, this))
|
notifyListeners(RemoteServerError(e, this))
|
||||||
}
|
}
|
||||||
this
|
this
|
||||||
|
|
@ -899,6 +903,7 @@ class RemoteServerHandler(
|
||||||
val actorRef =
|
val actorRef =
|
||||||
try { createActor(actorInfo, channel).start } catch {
|
try { createActor(actorInfo, channel).start } catch {
|
||||||
case e: SecurityException =>
|
case e: SecurityException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
return
|
return
|
||||||
|
|
@ -985,7 +990,9 @@ class RemoteServerHandler(
|
||||||
|
|
||||||
write(channel, messageBuilder.build)
|
write(channel, messageBuilder.build)
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
case e: Throwable =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
}
|
}
|
||||||
|
|
||||||
messageReceiver.invoke(typedActor, args: _*) match {
|
messageReceiver.invoke(typedActor, args: _*) match {
|
||||||
|
|
@ -1000,9 +1007,11 @@ class RemoteServerHandler(
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: InvocationTargetException =>
|
case e: InvocationTargetException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
|
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
|
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
}
|
}
|
||||||
|
|
@ -1062,6 +1071,7 @@ class RemoteServerHandler(
|
||||||
actorRef
|
actorRef
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
@ -1128,6 +1138,7 @@ class RemoteServerHandler(
|
||||||
newInstance
|
newInstance
|
||||||
} catch {
|
} catch {
|
||||||
case e =>
|
case e =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
server.notifyListeners(RemoteServerError(e, server))
|
server.notifyListeners(RemoteServerError(e, server))
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.transactor
|
||||||
|
|
||||||
import akka.config.Config
|
import akka.config.Config
|
||||||
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
|
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
|
||||||
|
import akka.actor.{ErrorHandler, ErrorHandlerEvent}
|
||||||
|
|
||||||
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
|
|
@ -136,7 +137,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
|
||||||
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
|
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
|
||||||
} catch {
|
} catch {
|
||||||
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
|
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
|
||||||
case e: IllegalStateException => ()
|
case e: IllegalStateException =>
|
||||||
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,13 +6,15 @@
|
||||||
# Modify as needed.
|
# Modify as needed.
|
||||||
|
|
||||||
akka {
|
akka {
|
||||||
version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
|
version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
|
||||||
|
|
||||||
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
|
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
|
||||||
|
|
||||||
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
||||||
|
|
||||||
enable-jmx = on # expose the configuration through JMX
|
enable-jmx = on # expose the configuration through JMX
|
||||||
|
|
||||||
|
default-error-handler = on # register the default error handler listener which logs errors to STDOUT
|
||||||
|
|
||||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||||
# Can be used to bootstrap your application(s)
|
# Can be used to bootstrap your application(s)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue