Moved EventHandler to 'akka.event' plus added 'error' method without exception param
This commit is contained in:
parent
74f7d474ab
commit
6429086b51
19 changed files with 206 additions and 174 deletions
|
|
@ -19,7 +19,7 @@ import java.net.{InetAddress, UnknownHostException}
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable abstract class AkkaException(message: String) extends RuntimeException(message) {
|
||||
@serializable abstract class AkkaException(message: String = "") extends RuntimeException(message) {
|
||||
import AkkaException._
|
||||
val exceptionName = getClass.getName
|
||||
|
||||
|
|
|
|||
|
|
@ -76,163 +76,6 @@ class ActorKilledException private[akka](message: String) extends AkkaEx
|
|||
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
|
||||
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* Error handler.
|
||||
*
|
||||
* Create, add and remove a listener:
|
||||
* <pre>
|
||||
* val errorHandlerEventListener = Actor.actorOf(new Actor {
|
||||
* self.dispatcher = EventHandler.EventHandlerDispatcher
|
||||
*
|
||||
* def receive = {
|
||||
* case EventHandler.Error(cause, instance, message) => ...
|
||||
* case EventHandler.Warning(instance, message) => ...
|
||||
* case EventHandler.Info(instance, message) => ...
|
||||
* case EventHandler.Debug(instance, message) => ...
|
||||
* }
|
||||
* })
|
||||
*
|
||||
* EventHandler.addListener(errorHandlerEventListener)
|
||||
* ...
|
||||
* EventHandler.removeListener(errorHandlerEventListener)
|
||||
* </pre>
|
||||
*
|
||||
* Log an error event:
|
||||
* <pre>
|
||||
* EventHandler.notify(EventHandler.Error(exception, this, message.toString))
|
||||
* </pre>
|
||||
* Or use the direct methods (better performance):
|
||||
* <pre>
|
||||
* EventHandler.error(exception, this, message.toString)
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object EventHandler extends ListenerManagement {
|
||||
import java.io.{StringWriter, PrintWriter}
|
||||
import java.text.DateFormat
|
||||
import java.util.Date
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
val ErrorLevel = 1
|
||||
val WarningLevel = 2
|
||||
val InfoLevel = 3
|
||||
val DebugLevel = 4
|
||||
|
||||
sealed trait Event {
|
||||
@transient val thread: Thread = Thread.currentThread
|
||||
}
|
||||
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
|
||||
case class Warning(instance: AnyRef, message: String = "") extends Event
|
||||
case class Info(instance: AnyRef, message: String = "") extends Event
|
||||
case class Debug(instance: AnyRef, message: String = "") extends Event
|
||||
|
||||
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
|
||||
val warning = "[WARN] [%s] [%s] [%s] %s".intern
|
||||
val info = "[INFO] [%s] [%s] [%s] %s".intern
|
||||
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
|
||||
val generic = "[GENERIC] [%s] [%s]".intern
|
||||
val ID = "event:handler".intern
|
||||
|
||||
val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
|
||||
|
||||
val level: Int = config.getString("akka.event-handler-level", "DEBUG") match {
|
||||
case "ERROR" => ErrorLevel
|
||||
case "WARNING" => WarningLevel
|
||||
case "INFO" => InfoLevel
|
||||
case "DEBUG" => DebugLevel
|
||||
case unknown => throw new ConfigurationException(
|
||||
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
|
||||
}
|
||||
|
||||
def notify(event: => AnyRef) = notifyListeners(event)
|
||||
|
||||
def notify[T <: Event : ClassManifest](event: => T) {
|
||||
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
|
||||
}
|
||||
|
||||
def error(cause: Throwable, instance: AnyRef, message: => String) = {
|
||||
if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message))
|
||||
}
|
||||
|
||||
def warning(instance: AnyRef, message: => String) = {
|
||||
if (level >= WarningLevel) notifyListeners(Warning(instance, message))
|
||||
}
|
||||
|
||||
def info(instance: AnyRef, message: => String) = {
|
||||
if (level >= InfoLevel) notifyListeners(Info(instance, message))
|
||||
}
|
||||
|
||||
def debug(instance: AnyRef, message: => String) = {
|
||||
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
|
||||
}
|
||||
|
||||
def formattedTimestamp = DateFormat.getInstance.format(new Date)
|
||||
|
||||
def stackTraceFor(e: Throwable) = {
|
||||
val sw = new StringWriter
|
||||
val pw = new PrintWriter(sw)
|
||||
e.printStackTrace(pw)
|
||||
sw.toString
|
||||
}
|
||||
|
||||
private def levelFor(eventClass: Class[_ <: Event]) = {
|
||||
if (eventClass.isInstanceOf[Error]) ErrorLevel
|
||||
else if (eventClass.isInstanceOf[Warning]) WarningLevel
|
||||
else if (eventClass.isInstanceOf[Info]) InfoLevel
|
||||
else if (eventClass.isInstanceOf[Debug]) DebugLevel
|
||||
else DebugLevel
|
||||
}
|
||||
|
||||
class DefaultListener extends Actor {
|
||||
self.id = ID
|
||||
self.dispatcher = EventHandlerDispatcher
|
||||
|
||||
def receive = {
|
||||
case event @ Error(cause, instance, message) =>
|
||||
println(error.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message,
|
||||
stackTraceFor(cause)))
|
||||
case event @ Warning(instance, message) =>
|
||||
println(warning.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event @ Info(instance, message) =>
|
||||
println(info.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event @ Debug(instance, message) =>
|
||||
println(debug.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event =>
|
||||
println(generic.format(formattedTimestamp, event.toString))
|
||||
}
|
||||
}
|
||||
|
||||
config.getList("akka.event-handlers") foreach { listenerName =>
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](listenerName) map {
|
||||
clazz => addListener(Actor.actorOf(clazz).start)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + listenerName +
|
||||
"] due to [" + e.toString + "]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This message is thrown by default when an Actors behavior doesn't match a message
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch._
|
||||
import akka.config.Config._
|
||||
import akka.config.Supervision._
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import scala.collection.JavaConversions
|
|||
|
||||
import java.util.concurrent._
|
||||
|
||||
import akka.event.EventHandler
|
||||
import akka.AkkaException
|
||||
|
||||
object Scheduler {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.config
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.lang.reflect.Method
|
||||
|
|
|
|||
|
|
@ -7,7 +7,8 @@ package akka.dataflow
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
||||
|
||||
import akka.actor.{Actor, ActorRef, EventHandler}
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.actor.Actor._
|
||||
import akka.dispatch.CompletableFuture
|
||||
import akka.AkkaException
|
||||
|
|
|
|||
|
|
@ -4,7 +4,8 @@
|
|||
|
||||
package akka.dispatch
|
||||
|
||||
import akka.actor.{ActorRef, IllegalActorStateException, EventHandler}
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.{ActorRef, IllegalActorStateException}
|
||||
import akka.util.{ReflectiveAccess, Switch}
|
||||
|
||||
import java.util.Queue
|
||||
|
|
|
|||
|
|
@ -5,7 +5,8 @@
|
|||
package akka.dispatch
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.actor.{Actor, EventHandler}
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.Actor
|
||||
import akka.routing.Dispatcher
|
||||
import akka.japi.{ Procedure, Function => JFunc }
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.dispatch
|
|||
|
||||
import java.util.concurrent._
|
||||
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
|
||||
import akka.event.EventHandler
|
||||
import akka.config.Configuration
|
||||
import akka.config.Config.TIME_UNIT
|
||||
import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess}
|
||||
|
|
@ -43,7 +44,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () =
|
|||
|
||||
object MessageDispatcher {
|
||||
val UNSCHEDULED = 0
|
||||
val SCHEDULED = 1
|
||||
val SCHEDULED = 1
|
||||
val RESCHEDULED = 2
|
||||
|
||||
implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher
|
||||
|
|
@ -55,10 +56,10 @@ object MessageDispatcher {
|
|||
trait MessageDispatcher {
|
||||
import MessageDispatcher._
|
||||
|
||||
protected val uuids = new ConcurrentSkipListSet[Uuid]
|
||||
protected val uuids = new ConcurrentSkipListSet[Uuid]
|
||||
protected val futures = new ConcurrentSkipListSet[Uuid]
|
||||
protected val guard = new ReentrantGuard
|
||||
protected val active = new Switch(false)
|
||||
protected val guard = new ReentrantGuard
|
||||
protected val active = new Switch(false)
|
||||
|
||||
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger}
|
|||
import ThreadPoolExecutor.CallerRunsPolicy
|
||||
|
||||
import akka.util.Duration
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
|
||||
object ThreadPoolConfig {
|
||||
type Bounds = Int
|
||||
|
|
|
|||
180
akka-actor/src/main/scala/akka/event/EventHandler.scala
Normal file
180
akka-actor/src/main/scala/akka/event/EventHandler.scala
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.event
|
||||
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import akka.dispatch._
|
||||
import akka.config.Config._
|
||||
import akka.config.ConfigurationException
|
||||
import akka.util.{ListenerManagement, ReflectiveAccess}
|
||||
import akka.AkkaException
|
||||
|
||||
/**
|
||||
* Event handler.
|
||||
* <p/>
|
||||
* Create, add and remove a listener:
|
||||
* <pre>
|
||||
* val eventHandlerListener = Actor.actorOf(new Actor {
|
||||
* self.dispatcher = EventHandler.EventHandlerDispatcher
|
||||
*
|
||||
* def receive = {
|
||||
* case EventHandler.Error(cause, instance, message) => ...
|
||||
* case EventHandler.Warning(instance, message) => ...
|
||||
* case EventHandler.Info(instance, message) => ...
|
||||
* case EventHandler.Debug(instance, message) => ...
|
||||
* case genericEvent => ...
|
||||
* }
|
||||
* })
|
||||
*
|
||||
* EventHandler.addListener(eventHandlerListener)
|
||||
* ...
|
||||
* EventHandler.removeListener(eventHandlerListener)
|
||||
* </pre>
|
||||
* <p/>
|
||||
* However best is probably to register the listener in the 'akka.conf'
|
||||
* configuration file.
|
||||
* <p/>
|
||||
* Log an error event:
|
||||
* <pre>
|
||||
* EventHandler.notify(EventHandler.Error(exception, this, message.toString))
|
||||
* </pre>
|
||||
* Or use the direct methods (better performance):
|
||||
* <pre>
|
||||
* EventHandler.error(exception, this, message.toString)
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object EventHandler extends ListenerManagement {
|
||||
import java.io.{StringWriter, PrintWriter}
|
||||
import java.text.DateFormat
|
||||
import java.util.Date
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
val ErrorLevel = 1
|
||||
val WarningLevel = 2
|
||||
val InfoLevel = 3
|
||||
val DebugLevel = 4
|
||||
|
||||
sealed trait Event {
|
||||
@transient val thread: Thread = Thread.currentThread
|
||||
}
|
||||
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
|
||||
case class Warning(instance: AnyRef, message: String = "") extends Event
|
||||
case class Info(instance: AnyRef, message: String = "") extends Event
|
||||
case class Debug(instance: AnyRef, message: String = "") extends Event
|
||||
|
||||
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
|
||||
val warning = "[WARN] [%s] [%s] [%s] %s".intern
|
||||
val info = "[INFO] [%s] [%s] [%s] %s".intern
|
||||
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
|
||||
val generic = "[GENERIC] [%s] [%s]".intern
|
||||
val ID = "event:handler".intern
|
||||
|
||||
class EventHandlerException extends AkkaException
|
||||
|
||||
val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
|
||||
|
||||
val level: Int = config.getString("akka.event-handler-level", "DEBUG") match {
|
||||
case "ERROR" => ErrorLevel
|
||||
case "WARNING" => WarningLevel
|
||||
case "INFO" => InfoLevel
|
||||
case "DEBUG" => DebugLevel
|
||||
case unknown => throw new ConfigurationException(
|
||||
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
|
||||
}
|
||||
|
||||
def notify(event: => AnyRef) = notifyListeners(event)
|
||||
|
||||
def notify[T <: Event : ClassManifest](event: => T) {
|
||||
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
|
||||
}
|
||||
|
||||
def error(cause: Throwable, instance: AnyRef, message: => String) = {
|
||||
if (level >= ErrorLevel) notifyListeners(Error(cause, instance, message))
|
||||
}
|
||||
|
||||
def error(instance: AnyRef, message: => String) = {
|
||||
if (level >= ErrorLevel) notifyListeners(Error(new EventHandlerException, instance, message))
|
||||
}
|
||||
|
||||
def warning(instance: AnyRef, message: => String) = {
|
||||
if (level >= WarningLevel) notifyListeners(Warning(instance, message))
|
||||
}
|
||||
|
||||
def info(instance: AnyRef, message: => String) = {
|
||||
if (level >= InfoLevel) notifyListeners(Info(instance, message))
|
||||
}
|
||||
|
||||
def debug(instance: AnyRef, message: => String) = {
|
||||
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
|
||||
}
|
||||
|
||||
def formattedTimestamp = DateFormat.getInstance.format(new Date)
|
||||
|
||||
def stackTraceFor(e: Throwable) = {
|
||||
val sw = new StringWriter
|
||||
val pw = new PrintWriter(sw)
|
||||
e.printStackTrace(pw)
|
||||
sw.toString
|
||||
}
|
||||
|
||||
private def levelFor(eventClass: Class[_ <: Event]) = {
|
||||
if (eventClass.isInstanceOf[Error]) ErrorLevel
|
||||
else if (eventClass.isInstanceOf[Warning]) WarningLevel
|
||||
else if (eventClass.isInstanceOf[Info]) InfoLevel
|
||||
else if (eventClass.isInstanceOf[Debug]) DebugLevel
|
||||
else DebugLevel
|
||||
}
|
||||
|
||||
class DefaultListener extends Actor {
|
||||
self.id = ID
|
||||
self.dispatcher = EventHandlerDispatcher
|
||||
|
||||
def receive = {
|
||||
case event @ Error(cause, instance, message) =>
|
||||
println(error.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message,
|
||||
stackTraceFor(cause)))
|
||||
case event @ Warning(instance, message) =>
|
||||
println(warning.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event @ Info(instance, message) =>
|
||||
println(info.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event @ Debug(instance, message) =>
|
||||
println(debug.format(
|
||||
formattedTimestamp,
|
||||
event.thread.getName,
|
||||
instance.getClass.getSimpleName,
|
||||
message))
|
||||
case event =>
|
||||
println(generic.format(formattedTimestamp, event.toString))
|
||||
}
|
||||
}
|
||||
|
||||
config.getList("akka.event-handlers") foreach { listenerName =>
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](listenerName) map {
|
||||
clazz => addListener(Actor.actorOf(clazz).start)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + listenerName +
|
||||
"] due to [" + e.toString + "]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -6,7 +6,7 @@ package akka.util
|
|||
|
||||
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
|
||||
import java.util.concurrent.atomic. {AtomicBoolean}
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.http
|
||||
|
||||
import akka.actor.{ActorRegistry, ActorRef, Actor}
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
import javax.servlet.http.HttpServlet
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.http
|
|||
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
|
||||
import Types._
|
||||
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@
|
|||
package akka.security
|
||||
|
||||
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
|
||||
import akka.actor.EventHandler
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.Actor._
|
||||
import akka.config.Config
|
||||
|
||||
|
|
|
|||
|
|
@ -13,13 +13,14 @@ import akka.serialization.RemoteActorSerialization._
|
|||
import akka.japi.Creator
|
||||
import akka.config.Config._
|
||||
import akka.remoteinterface._
|
||||
import akka.actor.{PoisonPill, EventHandler, Index,
|
||||
import akka.actor.{PoisonPill, Index,
|
||||
ActorInitializationException, LocalActorRef, newUuid,
|
||||
ActorRegistry, Actor, RemoteActorRef,
|
||||
TypedActor, ActorRef, IllegalActorStateException,
|
||||
RemoteActorSystemMessage, uuidFrom, Uuid,
|
||||
Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.AkkaException
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.Actor._
|
||||
import akka.util._
|
||||
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.logging.slf4j
|
|||
|
||||
import org.slf4j.{Logger => SLFLogger, LoggerFactory => SLFLoggerFactory}
|
||||
|
||||
import akka.event.EventHandler
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package akka.testkit
|
||||
|
||||
import akka.actor.{ActorRef, EventHandler}
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation}
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.LinkedList
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ akka {
|
|||
|
||||
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
||||
|
||||
event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
|
||||
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
|
||||
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
|
||||
|
||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue