diff --git a/LICENSE b/LICENSE index 011cb9d98a..c14a2bed2f 100755 --- a/LICENSE +++ b/LICENSE @@ -17,4 +17,4 @@ the License. --------------- Licenses for dependency projects can be found here: -[http://doc.akkasource.org/licenses] +[http://doc.akka.io/licenses] diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 8ab4f925a6..73072b2894 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -4,7 +4,6 @@ package akka -import akka.util.Logging import akka.actor.newUuid import java.io.{StringWriter, PrintWriter} @@ -34,16 +33,9 @@ import java.net.{InetAddress, UnknownHostException} printStackTrace(pw) sw.toString } - - private lazy val _log = { - AkkaException.log.slf4j.error(toString) - () - } - - def log: Unit = _log } -object AkkaException extends Logging { +object AkkaException { val hostname = try { InetAddress.getLocalHost.getHostName } catch { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 84583c39da..6de117966a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -1,4 +1,14 @@ /** + + + + + + + + + + * Copyright (C) 2009-2011 Scalable Solutions AB */ @@ -8,13 +18,14 @@ import akka.dispatch._ import akka.config.Config._ import akka.config.Supervision._ import akka.util.Helpers.{narrow, narrowSilently} +import akka.util.ListenerManagement import akka.AkkaException import java.util.concurrent.TimeUnit import java.net.InetSocketAddress import scala.reflect.BeanProperty -import akka.util. {ReflectiveAccess, Logging, Duration} +import akka.util. {ReflectiveAccess, Duration} import akka.remoteinterface.RemoteSupport import akka.japi. {Creator, Procedure} @@ -66,14 +77,111 @@ case class MaximumNumberOfRestartsWithinTimeRangeReached( @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage // Exceptions for Actors -class ActorStartException private[akka](message: String) extends AkkaException(message) -class IllegalActorStateException private[akka](message: String) extends AkkaException(message) -class ActorKilledException private[akka](message: String) extends AkkaException(message) +class ActorStartException private[akka](message: String) extends AkkaException(message) +class IllegalActorStateException private[akka](message: String) extends AkkaException(message) +class ActorKilledException private[akka](message: String) extends AkkaException(message) class ActorInitializationException private[akka](message: String) extends AkkaException(message) -class ActorTimeoutException private[akka](message: String) extends AkkaException(message) +class ActorTimeoutException private[akka](message: String) extends AkkaException(message) /** - * This message is thrown by default when an Actors behavior doesn't match a message + * Error handler. + * + * Create, add and remove a listener: + *
+ * val errorHandlerEventListener = new Actor {
+ *   self.dispatcher = EventHandler.EventHandlerDispatcher
+ *     
+ *   def receive = {
+ *     case EventHandler.Error(cause, instance, message) => ...
+ *     case EventHandler.Warning(cause, instance, message) => ...
+ *     case EventHandler.Info(instance, message) => ...
+ *     case EventHandler.Debug(instance, message) => ...
+ *   }
+ * }
+ * 
+ * EventHandler.addListener(errorHandlerEventListener)
+ * ...
+ * EventHandler.removeListener(errorHandlerEventListener)
+ * 
+ * + * Log an error event: + *
+ * EventHandler notifyListeners EventHandler.Error(exception, this, message.toString)
+ * 
+ * @author Jonas Bonér + */ +object EventHandler extends ListenerManagement { + import java.io.{StringWriter, PrintWriter} + import java.text.DateFormat + import java.util.Date + import akka.dispatch.Dispatchers + + sealed trait Event { + val thread: Thread = Thread.currentThread + } + case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event + case class Warning(cause: Throwable, instance: AnyRef, message: String = "") extends Event + case class Info(instance: AnyRef, message: String = "") extends Event + case class Debug(instance: AnyRef, message: String = "") extends Event + + val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warning = "[WARN] [%s] [%s] [%s] %s\n%s".intern + val info = "[INFO] [%s] [%s] [%s] %s".intern + val debug = "[DEBUG] [%s] [%s] [%s] %s".intern + val ID = "default:error:handler".intern + + val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build + + def formattedTimestamp = DateFormat.getInstance.format(new Date) + + def stackTraceFor(e: Throwable) = { + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString + } + + class DefaultListener extends Actor { + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + case event @ Error(cause, instance, message) => + println(error.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message, + stackTraceFor(cause))) + case event @ Warning(cause, instance, message) => + println(warning.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message, + stackTraceFor(cause))) + case event @ Info(instance, message) => + println(info.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case event @ Debug(instance, message) => + println(debug.format( + formattedTimestamp, + event.thread.getName, + instance.getClass.getSimpleName, + message)) + case _ => {} + } + } + + if (config.getBool("akka.default-error-handler", true)) + addListener(Actor.actorOf[DefaultListener].start) // FIXME configurable in config (on/off) +} + +/** + * This message is thrown by default when an Actors behavior doesn't match a message */ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg) @@ -85,7 +193,8 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception * * @author Jonas Bonér */ -object Actor extends Logging { +object Actor extends ListenerManagement { + /** * Add shutdown cleanups */ @@ -93,7 +202,6 @@ object Actor extends Logging { val hook = new Runnable { override def run { // Clear Thread.subclassAudits - log.slf4j.info("Clearing subclass audits") val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] @@ -279,7 +387,7 @@ object Actor extends Logging { * * @author Jonas Bonér */ -trait Actor extends Logging { +trait Actor { /** * Type alias because traits cannot have companion objects. @@ -353,14 +461,14 @@ trait Actor extends Logging { *
    *   def receive =  {
    *     case Ping =>
-   *       log.slf4j.info("got a 'Ping' message")
+   *       println("got a 'Ping' message")
    *       self.reply("pong")
    *
    *     case OneWay =>
-   *       log.slf4j.info("got a 'OneWay' message")
+   *       println("got a 'OneWay' message")
    *
    *     case unknown =>
-   *       log.slf4j.warn("unknown message [{}], ignoring", unknown)
+   *       println("unknown message: " + unknown)
    * }
    * 
*/ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e34099b96c..7b2487b9a2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -22,7 +22,7 @@ import scala.reflect.BeanProperty import scala.collection.immutable.Stack import scala.annotation.tailrec -private[akka] object ActorRefInternals extends Logging { +private[akka] object ActorRefInternals { /** * LifeCycles for ActorRefs. @@ -68,9 +68,6 @@ private[akka] object ActorRefInternals extends Logging { * @author Jonas Bonér */ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => - //Reuse same logger - import Actor.log - // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = newUuid @@ -525,7 +522,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def checkReceiveTimeout = { cancelReceiveTimeout if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed - log.slf4j.debug("Scheduling timeout for {}", this) _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) } } @@ -534,7 +530,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal if (_futureTimeout.isDefined) { _futureTimeout.get.cancel(true) _futureTimeout = None - log.slf4j.debug("Timeout canceled for {}", this) } } } @@ -686,7 +681,6 @@ class LocalActorRef private[akka] ( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") linkedActors.put(actorRef.uuid, actorRef) actorRef.supervisor = Some(this) - Actor.log.slf4j.debug("Linking actor [{}] to actor [{}]", actorRef, this) } /** @@ -699,7 +693,6 @@ class LocalActorRef private[akka] ( "Actor [" + actorRef + "] is not a linked actor, can't unlink") linkedActors.remove(actorRef.uuid) actorRef.supervisor = None - Actor.log.slf4j.debug("Unlinking actor [{}] from actor [{}]", actorRef, this) } /** @@ -804,7 +797,8 @@ class LocalActorRef private[akka] ( else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + dispatcher dispatchMessage new MessageInvocation( + this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) future.get } } @@ -813,25 +807,24 @@ class LocalActorRef private[akka] ( * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { - if (isShutdown) Actor.log.slf4j.warn("Actor [{}] is shut down,\n\tignoring message [{}]", toString, messageHandle) - else { + if (!isShutdown) { currentMessage = messageHandle try { - Actor.log.slf4j.trace("Invoking actor with message: {}\n", messageHandle) try { cancelReceiveTimeout // FIXME: leave this here? actor(messageHandle.message) currentMessage = null // reset current message after successful invocation } catch { - case e: InterruptedException => { currentMessage = null } // received message while actor is shutting down, ignore - case e => handleExceptionInDispatch(e, messageHandle.message) + case e: InterruptedException => + currentMessage = null // received message while actor is shutting down, ignore + case e => + handleExceptionInDispatch(e, messageHandle.message) } finally { checkReceiveTimeout // Reschedule receive timeout } } catch { - case e => - Actor.log.slf4j.error("Could not invoke actor [{}]", this) - Actor.log.slf4j.error("Problem", e) + case e: Throwable => + EventHandler notifyListeners EventHandler.Error(e, this, messageHandle.message.toString) throw e } } @@ -846,7 +839,7 @@ class LocalActorRef private[akka] ( dead.restart(reason, maxRetries, within) case _ => - if(_supervisor.isDefined) + if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) else dead.stop @@ -885,42 +878,28 @@ class LocalActorRef private[akka] ( protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { def performRestart { - Actor.log.slf4j.info("Restarting actor [{}] configured as PERMANENT.", id) val failedActor = actorInstance.get failedActor match { case p: Proxyable => //p.swapProxiedActor(freshActor) //TODO: broken - Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id) failedActor.preRestart(reason) - Actor.log.slf4j.debug("Invoking 'postRestart' for failed actor instance [{}].", id) failedActor.postRestart(reason) case _ => - Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id) failedActor.preRestart(reason) val freshActor = newActor setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call freshActor.preStart - Actor.log.slf4j.debug("Invoking 'postRestart' for new actor instance [{}].", id) freshActor.postRestart(reason) } } def tooManyRestarts { - Actor.log.slf4j.warn( - "Maximum number of restarts [{}] within time range [{}] reached." + - "\n\tWill *not* restart actor [{}] anymore." + - "\n\tLast exception causing restart was" + - "\n\t[{}].", - Array[AnyRef](maxNrOfRetries, withinTimeRange, this, reason)) _supervisor.foreach { sup => // can supervisor handle the notification? val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - else Actor.log.slf4j.warn( - "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + - "\n\tCan't send the message to the supervisor [{}].", sup) } stop @@ -940,20 +919,15 @@ class LocalActorRef private[akka] ( performRestart true } catch { - case e => Actor.log.slf4j.debug("Unexpected exception during restart",e) - false //An error or exception here should trigger a retry + case e => false //An error or exception here should trigger a retry } finally { currentMessage = null } - - Actor.log.slf4j.debug("Restart: {} for [{}].", success, id) - if (success) { _status = ActorRefInternals.RUNNING dispatcher.resume(this) restartLinkedActors(reason,maxNrOfRetries,withinTimeRange) } - success } } @@ -1002,25 +976,18 @@ class LocalActorRef private[akka] ( } private def shutDownTemporaryActor(temporaryActor: ActorRef) { - Actor.log.slf4j.info("Actor [{}] configured as TEMPORARY and will not be restarted.", temporaryActor.id) temporaryActor.stop linkedActors.remove(temporaryActor.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (linkedActors.isEmpty) { - Actor.log.slf4j.info( - "All linked actors have died permanently (they were all configured as TEMPORARY)" + - "\n\tshutting down and unlinking supervisor actor as well [{}].", - temporaryActor.id) notifySupervisorWithMessage(UnlinkAndStop(this)) } - true } private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - Actor.log.slf4j.error("Exception when invoking \n\tactor [{}] \n\twith message [{}]", this, message) - Actor.log.slf4j.error("Problem", reason) - + EventHandler notifyListeners EventHandler.Error(reason, this, message.toString) + //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1030,7 +997,7 @@ class LocalActorRef private[akka] ( else { lifeCycle match { case Temporary => shutDownTemporaryActor(this) - case _ => dispatcher.resume(this) //Resume processing for this actor + case _ => dispatcher.resume(this) //Resume processing for this actor } } } @@ -1060,9 +1027,7 @@ class LocalActorRef private[akka] ( case e: NoSuchFieldException => false } - if (success) { - true - } + if (success) true else { val parent = clazz.getSuperclass if (parent eq null) @@ -1076,7 +1041,6 @@ class LocalActorRef private[akka] ( private def initializeActorInstance = { actor.preStart // run actor preStart - Actor.log.slf4j.trace("[{}] has started", toString) Actor.registry.register(this) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 18fa30d740..03bef32d25 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -255,7 +255,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag * Shuts down and unregisters all actors in the system. */ def shutdownAll() { - log.slf4j.info("Shutting down all actors in the system...") if (TypedActorModule.isEnabled) { val elements = actorsByUUID.elements while (elements.hasMoreElements) { @@ -270,7 +269,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag } actorsByUUID.clear actorsById.clear - log.slf4j.info("All actors have been shut down and unregistered from ActorRegistry") } } diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index 6600f486a5..4b96f9ab5d 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -8,13 +8,13 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.util.jar.JarFile -import akka.util.{Bootable, Logging} +import akka.util.{Bootable} import akka.config.Config._ /** * Handles all modules in the deploy directory (load and unload) */ -trait BootableActorLoaderService extends Bootable with Logging { +trait BootableActorLoaderService extends Bootable { val BOOT_CLASSES = config.getList("akka.boot") lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader @@ -25,7 +25,6 @@ trait BootableActorLoaderService extends Bootable with Logging { val DEPLOY = HOME.get + "/deploy" val DEPLOY_DIR = new File(DEPLOY) if (!DEPLOY_DIR.exists) { - log.slf4j.error("Could not find a deploy directory at [{}]", DEPLOY) System.exit(-1) } val filesToDeploy = DEPLOY_DIR.listFiles.toArray.toList @@ -41,8 +40,6 @@ trait BootableActorLoaderService extends Bootable with Logging { } } val toDeploy = filesToDeploy.map(_.toURI.toURL) - log.slf4j.info("Deploying applications from [{}]: [{}]", DEPLOY, toDeploy) - log.slf4j.debug("Loading dependencies [{}]", dependencyJars) val allJars = toDeploy ::: dependencyJars new URLClassLoader(allJars.toArray,Thread.currentThread.getContextClassLoader) @@ -50,12 +47,9 @@ trait BootableActorLoaderService extends Bootable with Logging { }) abstract override def onLoad = { - applicationLoader.foreach(_ => log.slf4j.info("Creating /deploy class-loader")) - super.onLoad for (loader <- applicationLoader; clazz <- BOOT_CLASSES) { - log.slf4j.info("Loading boot class [{}]", clazz) loader.loadClass(clazz).newInstance } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala old mode 100755 new mode 100644 index f84c35837a..046685f22d --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -291,19 +291,16 @@ trait FSM[S, D] { private val handleEventDefault: StateFunction = { case Event(value, stateData) => - log.slf4j.warn("Event {} not handled in state {}, staying at current state", value, currentState.stateName) stay } private var handleEvent: StateFunction = handleEventDefault private var terminateEvent: PartialFunction[StopEvent[S,D], Unit] = { case StopEvent(Failure(cause), _, _) => - log.slf4j.error("Stopping because of a failure with cause {}", cause) - case StopEvent(reason, _, _) => log.slf4j.info("Stopping because of reason: {}", reason) + case StopEvent(reason, _, _) => } private var transitionEvent: TransitionHandler = (from, to) => { - log.slf4j.debug("Transitioning from state {} to {}", from, to) } override final protected def receive: Receive = { @@ -376,7 +373,6 @@ trait FSM[S, D] { } private def terminate(reason: Reason) = { - timers.foreach{ case (timer, t) => log.slf4j.info("Canceling timer {}", timer); t.cancel} terminateEvent.apply(StopEvent(reason, currentState.stateName, currentState.stateData)) self.stop } @@ -405,7 +401,7 @@ trait FSM[S, D] { def replying(replyValue: Any): State = { self.sender match { case Some(sender) => sender ! replyValue - case None => log.slf4j.error("Unable to send reply value {}, no sender reference to reply to", replyValue) + case None => } this } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 1a81740368..5fab4f8046 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -19,31 +19,27 @@ import scala.collection.JavaConversions import java.util.concurrent._ -import akka.util.Logging import akka.AkkaException -object Scheduler extends Logging { +object Scheduler { import Actor._ case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e) @volatile private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - log.slf4j.info("Starting up Scheduler") - /** * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay */ def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - log.slf4j.trace( - "Schedule scheduled event\n\tevent = [{}]\n\treceiver = [{}]\n\tinitialDelay = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]", - Array[AnyRef](message, receiver, initialDelay.asInstanceOf[AnyRef], delay.asInstanceOf[AnyRef], timeUnit)) try { service.scheduleAtFixedRate( new Runnable { def run = receiver ! message }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this, receiver + " @ " + message) + throw SchedulerException(message + " could not be scheduled on " + receiver, e) } } @@ -59,14 +55,12 @@ object Scheduler extends Logging { * avoid blocking operations since this is executed in the schedulers thread */ def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - log.slf4j.trace( - "Schedule scheduled event\n\trunnable = [{}]\n\tinitialDelay = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]", - Array[AnyRef](runnable, initialDelay.asInstanceOf[AnyRef], delay.asInstanceOf[AnyRef], timeUnit)) - try { - service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + throw SchedulerException("Failed to schedule a Runnable", e) } } @@ -74,15 +68,14 @@ object Scheduler extends Logging { * Schedules to send the specified message to the receiver after delay */ def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - log.slf4j.trace( - "Schedule one-time event\n\tevent = [{}]\n\treceiver = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]", - Array[AnyRef](message, receiver, delay.asInstanceOf[AnyRef], timeUnit)) try { service.schedule( new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this, receiver + " @ " + message) + throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) } } @@ -98,23 +91,20 @@ object Scheduler extends Logging { * avoid blocking operations since the runnable is executed in the schedulers thread */ def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - log.slf4j.trace( - "Schedule one-time event\n\trunnable = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]", - Array[AnyRef](runnable, delay.asInstanceOf[AnyRef], timeUnit)) try { service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + throw SchedulerException("Failed to scheduleOnce a Runnable", e) } } def shutdown: Unit = synchronized { - log.slf4j.info("Shutting down Scheduler") service.shutdown } def restart: Unit = synchronized { - log.slf4j.info("Restarting Scheduler") shutdown service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) } diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index efba95aa07..d9e77dcbcb 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -76,7 +76,7 @@ object Supervisor { * * @author Jonas Bonér */ -case class SupervisorFactory(val config: SupervisorConfig) extends Logging { +case class SupervisorFactory(val config: SupervisorConfig) { def newInstance: Supervisor = newInstanceFor(config) @@ -166,11 +166,6 @@ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) exten // FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor case MaximumNumberOfRestartsWithinTimeRangeReached( victim, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) => - Actor.log.slf4j.warn( - "Declaratively configured supervisor received a [MaximumNumberOfRestartsWithinTimeRangeReached] notification," + - "\n\tbut there is currently no way of handling it in a declaratively configured supervisor." + - "\n\tIf you want to be able to handle this error condition then you need to create the supervision tree programatically." + - "\n\tThis will be supported in the future.") case unknown => throw new SupervisorException( "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") } diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index ac7daa2dc6..bf2208d960 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -62,8 +62,6 @@ import scala.reflect.BeanProperty */ abstract class UntypedActor extends Actor { - def logger = log.logger //Give the Java guys a break - def getContext(): ActorRef = self final protected def receive = { diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index cb3731679b..1be08b14ae 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -5,7 +5,7 @@ package akka.config import akka.AkkaException -import akka.util.Logging +import akka.actor.{EventHandler} import net.lag.configgy.{Config => CConfig, Configgy, ParseException} import java.net.InetSocketAddress @@ -19,7 +19,7 @@ class ModuleNotAvailableException(message: String) extends AkkaException(message * * @author Jonas Bonér */ -object Config extends Logging { +object Config { val VERSION = "1.1-SNAPSHOT" val HOME = { @@ -57,40 +57,49 @@ object Config extends Logging { val configFile = System.getProperty("akka.config", "") try { Configgy.configure(configFile) - log.slf4j.info("Config loaded from -Dakka.config={}", configFile) + println("Config loaded from -Dakka.config=" + configFile) } catch { - case e: ParseException => throw new ConfigurationException( - "Config could not be loaded from -Dakka.config=" + configFile + - "\n\tdue to: " + e.toString) + case cause: ParseException => + val e = new ConfigurationException( + "Config could not be loaded from -Dakka.config=" + configFile + + "\n\tdue to: " + cause.toString) + EventHandler notifyListeners EventHandler.Error(e, this) + throw e + } Configgy.config } else if (getClass.getClassLoader.getResource(confName) ne null) { try { Configgy.configureFromResource(confName, getClass.getClassLoader) - log.slf4j.info("Config [{}] loaded from the application classpath.",confName) + println("Config [" + confName + "] loaded from the application classpath.") } catch { - case e: ParseException => throw new ConfigurationException( - "Can't load '" + confName + "' config file from application classpath," + - "\n\tdue to: " + e.toString) + case cause: ParseException => + val e = new ConfigurationException( + "Can't load '" + confName + "' config file from application classpath," + + "\n\tdue to: " + cause.toString) + EventHandler notifyListeners EventHandler.Error(e, this) + throw e } Configgy.config } else if (HOME.isDefined) { try { val configFile = HOME.get + "/config/" + confName Configgy.configure(configFile) - log.slf4j.info( - "AKKA_HOME is defined as [{}], config loaded from [{}].", - HOME.getOrElse(throwNoAkkaHomeException), - configFile) + println( + "AKKA_HOME is defined as [" + HOME.getOrElse(throwNoAkkaHomeException) + + "], config loaded from [" + configFile + "].") } catch { - case e: ParseException => throw new ConfigurationException( - "AKKA_HOME is defined as [" + HOME.get + "] " + - "\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," + - "\n\tdue to: " + e.toString) + case cause: ParseException => + val e = throw new ConfigurationException( + "AKKA_HOME is defined as [" + HOME.get + "] " + + "\n\tbut the 'akka.conf' config file can not be found at [" + HOME.get + "/config/"+ confName + "]," + + "\n\tdue to: " + cause.toString) + EventHandler notifyListeners EventHandler.Error(e, this) + throw e } Configgy.config } else { - log.slf4j.warn( + println( "\nCan't load '" + confName + "'." + "\nOne of the three ways of locating the '" + confName + "' file needs to be defined:" + "\n\t1. Define the '-Dakka.config=...' system property option." + diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 3abf6c8aca..7e1cb6f99d 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -7,7 +7,7 @@ package akka.dataflow import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} -import akka.actor.{Actor, ActorRef} +import akka.actor.{Actor, ActorRef, EventHandler} import akka.actor.Actor._ import akka.dispatch.CompletableFuture import akka.AkkaException @@ -148,6 +148,7 @@ object DataFlow { (out !! Get).as[T] } catch { case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) out ! Exit throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index fe221dba58..357b5e9e80 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.actor.{Actor, ActorRef} import akka.actor.newUuid import akka.config.Config._ -import akka.util.{Duration, Logging} +import akka.util.{Duration} import net.lag.configgy.ConfigMap @@ -46,7 +46,7 @@ import java.util.concurrent.TimeUnit * * @author Jonas Bonér */ -object Dispatchers extends Logging { +object Dispatchers { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). map(time => Duration(time, TIME_UNIT)). diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 3f83887673..b312878315 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -4,7 +4,7 @@ package akka.dispatch -import akka.actor.{ActorRef, IllegalActorStateException} +import akka.actor.{ActorRef, IllegalActorStateException, EventHandler} import akka.util.{ReflectiveAccess, Switch} import java.util.Queue @@ -132,6 +132,7 @@ class ExecutorBasedEventDrivenDispatcher( executorService.get() execute mbox } catch { case e: RejectedExecutionException => + EventHandler notifyListeners EventHandler.Warning(e, this, _name) mbox.dispatcherLock.unlock() throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 045f268d30..451cdf8b80 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -153,4 +153,4 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( recipient // nothing found, reuse same start index next time } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0cdd9cafdd..b76e73132d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -6,15 +6,15 @@ package akka.dispatch import akka.AkkaException import akka.actor.Actor.spawn +import akka.actor.{Actor, EventHandler} import akka.routing.Dispatcher +import akka.japi.Procedure import java.util.concurrent.locks.ReentrantLock -import akka.japi.Procedure import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit} import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS} -import akka.actor.Actor -import annotation.tailrec import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger} +import annotation.tailrec class FutureTimeoutException(message: String) extends AkkaException(message) @@ -33,8 +33,13 @@ object Futures { (body: => T): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) spawn({ - try { f completeWithResult body } - catch { case e => f completeWithException e} + try { + f completeWithResult body + } catch { + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + f completeWithException e + } })(dispatcher) f } @@ -97,7 +102,9 @@ object Futures { results.clear //Do not retain the values since someone can hold onto the Future for a long time result completeWithResult r } catch { - case e: Exception => result completeWithException e + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + result completeWithException e } } } @@ -255,7 +262,9 @@ sealed trait Future[T] { fa complete (try { Right(f(v.right.get)) } catch { - case e => Left(e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + Left(e) }) } } @@ -281,7 +290,9 @@ sealed trait Future[T] { try { f(v.right.get) onComplete (fa.completeWith(_)) } catch { - case e => fa completeWithException e + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + fa completeWithException e } } } @@ -309,7 +320,9 @@ sealed trait Future[T] { if (p(r)) Right(r) else Left(new MatchError(r)) } catch { - case e => Left(e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + Left(e) }) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 7253b43845..cce4d2e871 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -7,7 +7,7 @@ package akka.dispatch import java.util.concurrent._ import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} -import akka.util.{Switch, ReentrantGuard, Logging, HashCode, ReflectiveAccess} +import akka.util.{Switch, ReentrantGuard, HashCode, ReflectiveAccess} import akka.actor._ /** @@ -99,11 +99,11 @@ trait MessageDispatcher { */ def stopAllAttachedActors { val i = uuids.iterator - while(i.hasNext()) { + while (i.hasNext()) { val uuid = i.next() Actor.registry.actorFor(uuid) match { case Some(actor) => actor.stop - case None => + case None => {} } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index cb2f2b98dc..9efd64b576 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -9,7 +9,8 @@ import java.util.concurrent._ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy -import akka.util. {Duration, Logging} +import akka.util.Duration +import akka.actor.{EventHandler} object ThreadPoolConfig { type Bounds = Int @@ -170,22 +171,19 @@ object MonitorableThread { * @author Jonas Bonér */ class MonitorableThread(runnable: Runnable, name: String) - extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging { + extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) { setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = - log.slf4j.error("Thread.UncaughtException", cause) + def uncaughtException(thread: Thread, cause: Throwable) = {} }) override def run = { val debug = MonitorableThread.debugLifecycle - log.slf4j.debug("Created thread {}", getName) try { MonitorableThread.alive.incrementAndGet super.run } finally { MonitorableThread.alive.decrementAndGet - log.slf4j.debug("Exiting thread {}", getName) } } } @@ -210,15 +208,16 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend }) } catch { case e: RejectedExecutionException => + EventHandler notifyListeners EventHandler.Warning(e, this) semaphore.release - case e => - log.slf4j.error("Unexpected exception", e) + case e: Throwable => + EventHandler notifyListeners EventHandler.Error(e, this) throw e } } } -trait ExecutorServiceDelegate extends ExecutorService with Logging { +trait ExecutorServiceDelegate extends ExecutorService { def executor: ExecutorService @@ -254,7 +253,6 @@ trait LazyExecutorService extends ExecutorServiceDelegate { def createExecutor: ExecutorService lazy val executor = { - log.slf4j.info("Lazily initializing ExecutorService for ",this) createExecutor } } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 3b870731e7..4e78446396 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.AkkaException import reflect.BeanProperty -trait RemoteModule extends Logging { +trait RemoteModule { val UUID_PREFIX = "uuid:" def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 2a66d5eece..d8437590d1 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -4,7 +4,7 @@ package akka.routing -import akka.actor. {Actor, ActorRef} +import akka.actor.{Actor, ActorRef, EventHandler} /** * Actor pooling @@ -14,8 +14,8 @@ import akka.actor. {Actor, ActorRef} * * Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message. * Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types. - * The first determines the size itself - either fixed or bounded. - * The second determines how to adjust of the pool according to some internal pressure characteristic. + * The first determines the size itself - either fixed or bounded. + * The second determines how to adjust of the pool according to some internal pressure characteristic. * Filters - A filter can be used to refine the raw pressure value returned from a capacitor. * * It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say @@ -27,8 +27,8 @@ import akka.actor. {Actor, ActorRef} object ActorPool { - case object Stat - case class Stats(size:Int) + case object Stat + case class Stats(size:Int) } /** @@ -36,95 +36,93 @@ object ActorPool */ trait ActorPool { - def instance():ActorRef - def capacity(delegates:Seq[ActorRef]):Int - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] + def instance():ActorRef + def capacity(delegates:Seq[ActorRef]):Int + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] } /** * A default implementation of a pool, on each message to route, - * - checks the current capacity and adjusts accordingly if needed - * - routes the incoming message to a selection set of delegate actors + * - checks the current capacity and adjusts accordingly if needed + * - routes the incoming message to a selection set of delegate actors */ trait DefaultActorPool extends ActorPool { - this: Actor => - - import ActorPool._ - import collection.mutable.LinkedList - import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached - - - protected var _delegates = LinkedList[ActorRef]() - private var _lastCapacityChange = 0 - private var _lastSelectorCount = 0 - - - override def postStop = _delegates foreach {_ stop} - - protected def _route:Receive = - { - // - // for testing... - // - case Stat => - self reply_? Stats(_delegates length) - - case max:MaximumNumberOfRestartsWithinTimeRangeReached => - log.error("Pooled actor will be removed after exceeding maxium number of restart retries. ["+max.victim.toString+"]") - _delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)} + this: Actor => + + import ActorPool._ + import collection.mutable.LinkedList + import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached + + + protected var _delegates = LinkedList[ActorRef]() + private var _lastCapacityChange = 0 + private var _lastSelectorCount = 0 + + + override def postStop = _delegates foreach {_ stop} + + protected def _route:Receive = + { + // + // for testing... + // + case Stat => + self reply_? Stats(_delegates length) + + case max:MaximumNumberOfRestartsWithinTimeRangeReached => + _delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)} - case msg => - _capacity - _select foreach {delegate => - self.senderFuture match { - case None => delegate ! msg - case Some(future) => - Actor.spawn { - try { - future completeWithResult (delegate !! msg).getOrElse(None) - } catch { - case ex => future completeWithException ex - } - } - } - } - } - - private def _capacity = - { - _lastCapacityChange = capacity(_delegates) - if (_lastCapacityChange > 0) { - _delegates ++= { - for (i <- 0 until _lastCapacityChange) yield { - val delegate = instance() - self startLink delegate - delegate - } - } - log.slf4j.debug("Pool capacity increased by {}", _lastCapacityChange) - } - else if (_lastCapacityChange < 0) { - val s = _delegates splitAt(_delegates.length + _lastCapacityChange) - s._2 foreach {_ stop} - _delegates = s._1 - - log.slf4j.debug("Pool capacity decreased by {}", -1*_lastCapacityChange) - } - } - - private def _select = - { - val s = select(_delegates) - _lastSelectorCount = s._2 - s._1 - } + case msg => + _capacity + _select foreach {delegate => + self.senderFuture match { + case None => delegate ! msg + case Some(future) => + Actor.spawn { + try { + future completeWithResult (delegate !! msg).getOrElse(None) + } catch { + case e => + EventHandler notifyListeners EventHandler.Error(e, this) + future completeWithException e + } + } + } + } + } + + private def _capacity = + { + _lastCapacityChange = capacity(_delegates) + if (_lastCapacityChange > 0) { + _delegates ++= { + for (i <- 0 until _lastCapacityChange) yield { + val delegate = instance() + self startLink delegate + delegate + } + } + } + else if (_lastCapacityChange < 0) { + val s = _delegates splitAt(_delegates.length + _lastCapacityChange) + s._2 foreach {_ stop} + _delegates = s._1 + } + } + + private def _select = + { + val s = select(_delegates) + _lastSelectorCount = s._2 + s._1 + } } /** * Selectors - * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool + * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool **/ /** @@ -132,24 +130,24 @@ trait DefaultActorPool extends ActorPool */ trait SmallestMailboxSelector { - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - var set:Seq[ActorRef] = Nil - var take = { - if (partialFill) math.min(selectionCount, delegates.length) - else selectionCount - } - - while (take > 0) { - set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set - take -= set.size - } + def selectionCount:Int + def partialFill:Boolean + + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = + { + var set:Seq[ActorRef] = Nil + var take = { + if (partialFill) math.min(selectionCount, delegates.length) + else selectionCount + } + + while (take > 0) { + set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set + take -= set.size + } - (set.iterator, set.size) - } + (set.iterator, set.size) + } } /** @@ -157,33 +155,33 @@ trait SmallestMailboxSelector */ trait RoundRobinSelector { - private var _last:Int = -1; - - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - val length = delegates.length - val take = { - if (partialFill) math.min(selectionCount, length) - else selectionCount - } - - var set = for (i <- 0 to take) yield { - _last += 1 - if (_last >= length) _last = 0 - delegates(_last) - } - - (set.iterator, set.size) - } + private var _last:Int = -1; + + def selectionCount:Int + def partialFill:Boolean + + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = + { + val length = delegates.length + val take = { + if (partialFill) math.min(selectionCount, length) + else selectionCount + } + + var set = for (i <- 0 to take) yield { + _last += 1 + if (_last >= length) _last = 0 + delegates(_last) + } + + (set.iterator, set.size) + } } /** * Capacitors - * These traits define how to alter the size of the pool + * These traits define how to alter the size of the pool */ /** @@ -191,14 +189,14 @@ trait RoundRobinSelector */ trait FixedSizeCapacitor { - def limit:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val d = limit - delegates.size - if (d>0) d - else 0 - } + def limit:Int + + def capacity(delegates:Seq[ActorRef]):Int = + { + val d = limit - delegates.size + if (d>0) d + else 0 + } } /** @@ -206,22 +204,22 @@ trait FixedSizeCapacitor */ trait BoundedCapacitor { - def lowerBound:Int - def upperBound:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val current = delegates length - var delta = _eval(delegates) - val proposed = current + delta - - if (proposed < lowerBound) delta += (lowerBound - proposed) - else if (proposed > upperBound) delta -= (proposed - upperBound) - - delta - } - - protected def _eval(delegates:Seq[ActorRef]):Int + def lowerBound:Int + def upperBound:Int + + def capacity(delegates:Seq[ActorRef]):Int = + { + val current = delegates length + var delta = _eval(delegates) + val proposed = current + delta + + if (proposed < lowerBound) delta += (lowerBound - proposed) + else if (proposed > upperBound) delta -= (proposed - upperBound) + + delta + } + + protected def _eval(delegates:Seq[ActorRef]):Int } /** @@ -229,14 +227,14 @@ trait BoundedCapacitor */ trait MailboxPressureCapacitor { - def pressureThreshold:Int - - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1} - n - } + def pressureThreshold:Int + + def pressure(delegates:Seq[ActorRef]):Int = + { + var n = 0; + delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1} + n + } } /** @@ -244,12 +242,12 @@ trait MailboxPressureCapacitor */ trait ActiveFuturesPressureCapacitor { - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.senderFuture.isDefined) n+=1} - n - } + def pressure(delegates:Seq[ActorRef]):Int = + { + var n = 0; + delegates foreach {d => if (d.senderFuture.isDefined) n+=1} + n + } } @@ -257,12 +255,12 @@ trait ActiveFuturesPressureCapacitor */ trait CapacityStrategy { - import ActorPool._ - - def pressure(delegates:Seq[ActorRef]):Int - def filter(pressure:Int, capacity:Int):Int - - protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size) + import ActorPool._ + + def pressure(delegates:Seq[ActorRef]):Int + def filter(pressure:Int, capacity:Int):Int + + protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size) } trait FixedCapacityStrategy extends FixedSizeCapacitor @@ -288,7 +286,7 @@ trait Filter // are updated consistently. ramping up is always + and backing off // is always - and each should return 0 otherwise... // - rampup (pressure, capacity) + backoff (pressure, capacity) + rampup (pressure, capacity) + backoff (pressure, capacity) } } @@ -352,7 +350,7 @@ trait RunningMeanBackoff _capacity += capacity if (capacity > 0 && pressure/capacity < backoffThreshold && - _capacity > 0 && _pressure/_capacity < backoffThreshold) { + _capacity > 0 && _pressure/_capacity < backoffThreshold) { math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt } else diff --git a/akka-actor/src/main/scala/akka/util/Crypt.scala b/akka-actor/src/main/scala/akka/util/Crypt.scala index c7bb520edc..63d6aff4df 100644 --- a/akka-actor/src/main/scala/akka/util/Crypt.scala +++ b/akka-actor/src/main/scala/akka/util/Crypt.scala @@ -9,7 +9,7 @@ import java.security.{MessageDigest, SecureRandom} /** * @author Jonas Bonér */ -object Crypt extends Logging { +object Crypt { val hex = "0123456789ABCDEF" val lineSeparator = System.getProperty("line.separator") @@ -24,7 +24,6 @@ object Crypt extends Logging { def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) def generateSecureCookie: String = { - log.slf4j.info("Generating secure cookie...") val bytes = Array.fill(32)(0.byteValue) random.nextBytes(bytes) sha1(bytes) diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index c2bfcca0af..8949f73f8b 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -7,7 +7,7 @@ package akka.util /** * @author Jonas Bonér */ -object Helpers extends Logging { +object Helpers { implicit def null2Option[T](t: T): Option[T] = Option(t) @@ -42,8 +42,6 @@ object Helpers extends Logging { narrow(o) } catch { case e: ClassCastException => - log.slf4j.warn("Cannot narrow {} to expected type {}!", o, implicitly[Manifest[T]].erasure.getName) - log.slf4j.trace("narrowSilently", e) None } diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala index aab92c7601..916fac9c6a 100644 --- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala @@ -13,7 +13,7 @@ import akka.actor.ActorRef * * @author Martin Krasser */ -trait ListenerManagement extends Logging { +trait ListenerManagement { private val listeners = new ConcurrentSkipListSet[ActorRef] @@ -50,14 +50,13 @@ trait ListenerManagement extends Logging { */ def hasListener(listener: ActorRef): Boolean = listeners.contains(listener) - protected def notifyListeners(message: => Any) { + protected[akka] def notifyListeners(message: => Any) { if (hasListeners) { val msg = message val iterator = listeners.iterator while (iterator.hasNext) { val listener = iterator.next if (listener.isRunning) listener ! msg - else log.slf4j.warn("Can't notify [{}] since it is not running.", listener) } } } @@ -65,12 +64,11 @@ trait ListenerManagement extends Logging { /** * Execute f with each listener as argument. */ - protected def foreachListener(f: (ActorRef) => Unit) { + protected[akka] def foreachListener(f: (ActorRef) => Unit) { val iterator = listeners.iterator while (iterator.hasNext) { val listener = iterator.next if (listener.isRunning) f(listener) - else log.slf4j.warn("Can't notify [{}] since it is not running.", listener) } } } diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index f9d594e2de..131f0a5a49 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -6,6 +6,7 @@ package akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.atomic. {AtomicBoolean} +import akka.actor.{EventHandler} /** * @author Jonas Bonér @@ -124,8 +125,9 @@ class Switch(startAsOn: Boolean = false) { try { action } catch { - case t => - switch.compareAndSet(!from,from) //Revert status + case t: Throwable => + EventHandler notifyListeners EventHandler.Error(t, this) + switch.compareAndSet(!from, from) //Revert status throw t } true diff --git a/akka-actor/src/main/scala/akka/util/Logging.scala b/akka-actor/src/main/scala/akka/util/Logging.scala deleted file mode 100644 index 87973b5e2c..0000000000 --- a/akka-actor/src/main/scala/akka/util/Logging.scala +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.util - -import org.slf4j.{Logger => SLFLogger,LoggerFactory => SLFLoggerFactory} - -/** - * Base trait for all classes that wants to be able use the logging infrastructure. - * - * @author Jonas Bonér - */ -trait Logging { - @transient val log = Logger(this.getClass.getName) -} - -/** - * Scala SLF4J wrapper - * - * Example: - *
- * class Foo extends Logging {
- *   log.info("My foo is %s","alive")
- *   log.error(new Exception(),"My foo is %s","broken")
- * }
- * 
- * - * The logger uses String.format: - * http://download-llnw.oracle.com/javase/6/docs/api/java/lang/String.html#format(java.lang.String,%20java.lang.Object...) - * - * If you want to use underlying slf4j Logger, do: - * log.slf4j.info("My foo is {}","alive") - * log.slf4j.error("My foo is broken",new Exception()) - */ -class Logger(val slf4j: SLFLogger) { - final def name = logger.getName - final def logger = slf4j - - final def trace_? = logger.isTraceEnabled - final def debug_? = logger.isDebugEnabled - final def info_? = logger.isInfoEnabled - final def warning_? = logger.isWarnEnabled - final def error_? = logger.isErrorEnabled - - //Trace - final def trace(t: Throwable, fmt: => String, arg: Any, argN: Any*) { - trace(t,message(fmt,arg,argN:_*)) - } - - final def trace(t: Throwable, msg: => String) { - if (trace_?) logger.trace(msg,t) - } - - final def trace(fmt: => String, arg: Any, argN: Any*) { - trace(message(fmt,arg,argN:_*)) - } - - final def trace(msg: => String) { - if (trace_?) logger trace msg - } - - //Debug - final def debug(t: Throwable, fmt: => String, arg: Any, argN: Any*) { - debug(t,message(fmt,arg,argN:_*)) - } - - final def debug(t: Throwable, msg: => String) { - if (debug_?) logger.debug(msg,t) - } - - final def debug(fmt: => String, arg: Any, argN: Any*) { - debug(message(fmt,arg,argN:_*)) - } - - final def debug(msg: => String) { - if (debug_?) logger debug msg - } - - //Info - final def info(t: Throwable, fmt: => String, arg: Any, argN: Any*) { - info(t,message(fmt,arg,argN:_*)) - } - - final def info(t: Throwable, msg: => String) { - if (info_?) logger.info(msg,t) - } - - final def info(fmt: => String, arg: Any, argN: Any*) { - info(message(fmt,arg,argN:_*)) - } - - final def info(msg: => String) { - if (info_?) logger info msg - } - - //Warning - final def warning(t: Throwable, fmt: => String, arg: Any, argN: Any*) { - warning(t,message(fmt,arg,argN:_*)) - } - - final def warn(t: Throwable, fmt: => String, arg: Any, argN: Any*) = warning(t, fmt, arg, argN) - - final def warning(t: Throwable, msg: => String) { - if (warning_?) logger.warn(msg,t) - } - - final def warn(t: Throwable, msg: => String) = warning(t, msg) - - final def warning(fmt: => String, arg: Any, argN: Any*) { - warning(message(fmt,arg,argN:_*)) - } - - final def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN:_*) - - final def warning(msg: => String) { - if (warning_?) logger warn msg - } - - final def warn(msg: => String) = warning(msg) - - //Error - final def error(t: Throwable, fmt: => String, arg: Any, argN: Any*) { - error(t,message(fmt,arg,argN:_*)) - } - - final def error(t: Throwable, msg: => String) { - if (error_?) logger.error(msg,t) - } - - final def error(fmt: => String, arg: Any, argN: Any*) { - error(message(fmt,arg,argN:_*)) - } - - final def error(msg: => String) { - if (error_?) logger error msg - } - - protected final def message(fmt: String, arg: Any, argN: Any*) : String = { - if ((argN eq null) || argN.isEmpty) fmt.format(arg) - else fmt.format((arg +: argN):_*) - } -} - -/** - * Logger factory - * - * ex. - * - * val logger = Logger("my.cool.logger") - * val logger = Logger(classOf[Banana]) - * val rootLogger = Logger.root - * - */ -object Logger { - - /* Uncomment to be able to debug what logging configuration will be used - { - import org.slf4j.LoggerFactory - import ch.qos.logback.classic.LoggerContext - import ch.qos.logback.core.util.StatusPrinter - - // print logback's internal status - StatusPrinter.print(LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]) - }*/ - - def apply(logger: String) : Logger = new Logger(SLFLoggerFactory getLogger logger) - def apply(clazz: Class[_]) : Logger = apply(clazz.getName) - def root : Logger = apply(SLFLogger.ROOT_LOGGER_NAME) -} diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 26d9902af8..aab68f2086 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -17,7 +17,7 @@ import akka.actor._ * * @author Jonas Bonér */ -object ReflectiveAccess extends Logging { +object ReflectiveAccess { val loader = getClass.getClassLoader @@ -124,9 +124,7 @@ object ReflectiveAccess extends Logging { ctor.setAccessible(true) Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { - case e => - log.slf4j.warn("Could not instantiate class [{}]", clazz.getName) - log.slf4j.warn("createInstance",e.getCause) + case e: Exception => None } @@ -142,9 +140,7 @@ object ReflectiveAccess extends Logging { ctor.setAccessible(true) Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { - case e => - log.slf4j.warn("Could not instantiate class [{}]", fqn) - log.slf4j.warn("createInstance",e.getCause) + case e: Exception => None } @@ -156,13 +152,9 @@ object ReflectiveAccess extends Logging { Option(instance.get(null).asInstanceOf[T]) } catch { case e: ClassNotFoundException => { - log.slf4j.debug("Could not get object [{}]", fqn) - log.slf4j.debug("getObjectFor", e) None } case ei: ExceptionInInitializerError => { - log.slf4j.error("Exception in initializer for object [{}]",fqn) - log.slf4j.error("Cause was:",ei.getCause) throw ei } } @@ -171,6 +163,7 @@ object ReflectiveAccess extends Logging { assert(fqn ne null) Some(classloader.loadClass(fqn).asInstanceOf[Class[T]]) } catch { - case e => None + case e: Exception => + None } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 345bc0457f..d5d6b28edc 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -60,11 +60,9 @@ object ActorRefSpec { case "complex2" => replyActor ! "complexRequest2" case "simple" => replyActor ! "simpleRequest" case "complexReply" => { - println("got complex reply") latch.countDown } case "simpleReply" => { - println("got simple reply") latch.countDown } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala index b25e96e87c..5213557048 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -43,7 +43,6 @@ object FSMActorSpec { goto(Open) using CodeState("", code) forMax timeout } case wrong => { - log.slf4j.error("Wrong code {}", wrong) stay using CodeState("", code) } } @@ -61,7 +60,6 @@ object FSMActorSpec { whenUnhandled { case Event(_, stateData) => { - log.slf4j.info("Unhandled") unhandledLatch.open stay } @@ -83,12 +81,10 @@ object FSMActorSpec { initialize private def doLock() { - log.slf4j.info("Locked") lockedLatch.open } private def doUnlock = { - log.slf4j.info("Unlocked") unlockedLatch.open } } diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index af0be95de4..b966eea5ec 100644 --- a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -498,18 +498,16 @@ class SupervisorSpec extends JUnitSuite { val inits = new AtomicInteger(0) val dyingActor = actorOf(new Actor { self.lifeCycle = Permanent - log.slf4j.debug("Creating dying actor, attempt: " + inits.incrementAndGet) + inits.incrementAndGet if (!(inits.get % 2 != 0)) throw new IllegalStateException("Don't wanna!") - def receive = { case Ping => self.reply_?("pong") case Die => throw new Exception("expected") } }) - val supervisor = Supervisor( SupervisorConfig( diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala index dce9100e1f..bce649b43b 100644 --- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala @@ -2,7 +2,6 @@ package akka.actor.routing import akka.actor.Actor import akka.actor.Actor._ -import akka.util.Logging import org.scalatest.Suite import org.junit.runner.RunWith @@ -15,7 +14,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import akka.routing._ @RunWith(classOf[JUnitRunner]) -class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers with Logging { +class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers { import Routing._ @Test def testDispatcher = { @@ -181,315 +180,315 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers for(a <- List(t1,t2,d1,d2)) a.stop } - // Actor Pool Capacity Tests - - // - // make sure the pool is of the fixed, expected capacity - // - @Test def testFixedCapacityActorPool = { + // Actor Pool Capacity Tests + + // + // make sure the pool is of the fixed, expected capacity + // + @Test def testFixedCapacityActorPool = { - val latch = new CountDownLatch(2) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with FixedCapacityStrategy - with SmallestMailboxSelector - { - def factory = actorOf(new Actor { - def receive = { - case _ => - counter.incrementAndGet - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool = actorOf(new TestPool).start - pool ! "a" - pool ! "b" - val done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - counter.get must be (2) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - pool stop - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - // - @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { + val latch = new CountDownLatch(2) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with FixedCapacityStrategy + with SmallestMailboxSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + counter.incrementAndGet + latch.countDown + } + }) + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + } + + val pool = actorOf(new TestPool).start + pool ! "a" + pool ! "b" + val done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + counter.get must be (2) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + pool stop + } + + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + // + @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with ActiveFuturesPressureCapacitor - with SmallestMailboxSelector + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with ActiveFuturesPressureCapacitor + with SmallestMailboxSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown + } + }) - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - // - // first message should create the minimum number of delgates - // - val pool = actorOf(new TestPool).start - pool ! 1 - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + // + // first message should create the minimum number of delgates + // + val pool = actorOf(new TestPool).start + pool ! 1 + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool !!! t - Thread.sleep(50) - } - } - - // - // 2 more should go thru w/out triggering more - // - loops = 2 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool !!! t + Thread.sleep(50) + } + } + + // + // 2 more should go thru w/out triggering more + // + loops = 2 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // - // a whole bunch should max it out - // - loops = 10 - loop(500) + // + // a whole bunch should max it out + // + loops = 10 + loop(500) - done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - - pool stop - } + done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) + + pool stop + } - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - // - @Test def testBoundedCapacityActorPoolWithMailboxPressure = { + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + // + @Test def testBoundedCapacityActorPoolWithMailboxPressure = { - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown + } + }) - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - val pool = actorOf(new TestPool).start + val pool = actorOf(new TestPool).start - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool ! t - } - } - - // - // send a few messages and observe pool at its lower bound - // - loops = 3 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool ! t + } + } + + // + // send a few messages and observe pool at its lower bound + // + loops = 3 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // - // send a bunch over the theshold and observe an increment - // - loops = 15 - loop(500) + // + // send a bunch over the theshold and observe an increment + // + loops = 15 + loop(500) - done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) - - pool stop - } - - // Actor Pool Selector Tests - - @Test def testRoundRobinSelector = { + done = latch.await(10,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) + + pool stop + } + + // Actor Pool Selector Tests + + @Test def testRoundRobinSelector = { - var latch = new CountDownLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - class TestPool1 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector + var latch = new CountDownLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] + + class TestPool1 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 1 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool1 = actorOf(new TestPool1).start - pool1 ! "a" - pool1 ! "b" - var done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (1) - pool1 stop - - class TestPool2 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown + } + }) + + def limit = 1 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + } + + val pool1 = actorOf(new TestPool1).start + pool1 ! "a" + pool1 ! "b" + var done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (1) + pool1 stop + + class TestPool2 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route - } + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown + } + }) + + def limit = 2 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + } - latch = new CountDownLatch(2) - delegates clear - - val pool2 = actorOf(new TestPool2).start - pool2 ! "a" - pool2 ! "b" - done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (2) - pool2 stop - } - - // Actor Pool Filter Tests - - // - // reuse previous test to max pool then observe filter reducing capacity over time - // - @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + latch = new CountDownLatch(2) + delegates clear + + val pool2 = actorOf(new TestPool2).start + pool2 ! "a" + pool2 ! "b" + done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (2) + pool2 stop + } + + // Actor Pool Filter Tests + + // + // reuse previous test to max pool then observe filter reducing capacity over time + // + @Test def testBoundedCapacityActorPoolWithBackoffFilter = { - var latch = new CountDownLatch(10) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with Filter - with RunningMeanBackoff - with BasicRampup - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - latch.countDown - } - }) + var latch = new CountDownLatch(10) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with Filter + with RunningMeanBackoff + with BasicRampup + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + latch.countDown + } + }) - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route - } + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + } - // - // put some pressure on the pool - // - val pool = actorOf(new TestPool).start - for (m <- 0 to 10) pool ! 250 - Thread.sleep(5) - val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size - z must be >= (2) - var done = latch.await(10,TimeUnit.SECONDS) - done must be (true) + // + // put some pressure on the pool + // + val pool = actorOf(new TestPool).start + for (m <- 0 to 10) pool ! 250 + Thread.sleep(5) + val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size + z must be >= (2) + var done = latch.await(10,TimeUnit.SECONDS) + done must be (true) - - // - // - // - for (m <- 0 to 3) { - pool ! 1 - Thread.sleep(500) - } - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) - - pool stop - } - - + + // + // + // + for (m <- 0 to 3) { + pool ! 1 + Thread.sleep(500) + } + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) + + pool stop + } + + } diff --git a/akka-http/src/main/scala/akka/http/DefaultAkkaLoader.scala b/akka-http/src/main/scala/akka/http/DefaultAkkaLoader.scala index b7c09afde8..35a9918bc1 100644 --- a/akka-http/src/main/scala/akka/http/DefaultAkkaLoader.scala +++ b/akka-http/src/main/scala/akka/http/DefaultAkkaLoader.scala @@ -5,7 +5,7 @@ package akka.http import akka.config.Config -import akka.util.{Logging, Bootable} +import akka.util.{Bootable} import akka.remote.BootableRemoteActorService import akka.actor.BootableActorLoaderService import akka.servlet.AkkaLoader diff --git a/akka-http/src/main/scala/akka/http/EmbeddedAppServer.scala b/akka-http/src/main/scala/akka/http/EmbeddedAppServer.scala index d881459817..6ca38736f7 100644 --- a/akka-http/src/main/scala/akka/http/EmbeddedAppServer.scala +++ b/akka-http/src/main/scala/akka/http/EmbeddedAppServer.scala @@ -9,7 +9,7 @@ import javax.servlet.ServletConfig import java.io.File import akka.actor.BootableActorLoaderService -import akka.util.{Bootable, Logging} +import akka.util.Bootable import org.eclipse.jetty.xml.XmlConfiguration import org.eclipse.jetty.server.{Handler, Server} @@ -20,7 +20,7 @@ import akka.AkkaException /** * Handles the Akka Comet Support (load/unload) */ -trait EmbeddedAppServer extends Bootable with Logging { +trait EmbeddedAppServer extends Bootable { self: BootableActorLoaderService => import akka.config.Config._ @@ -39,7 +39,6 @@ trait EmbeddedAppServer extends Bootable with Logging { abstract override def onLoad = { super.onLoad if (isRestEnabled) { - log.slf4j.info("Attempting to start Akka HTTP service") val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!"))) @@ -64,15 +63,11 @@ trait EmbeddedAppServer extends Bootable with Logging { s.start() s } - log.slf4j.info("Akka HTTP service started") } } abstract override def onUnload = { super.onUnload - server foreach { t => - log.slf4j.info("Shutting down REST service (Jersey)") - t.stop() - } + server foreach { _.stop() } } } diff --git a/akka-http/src/main/scala/akka/http/JettyContinuation.scala b/akka-http/src/main/scala/akka/http/JettyContinuation.scala index c1c96f485c..9c02cf03ff 100644 --- a/akka-http/src/main/scala/akka/http/JettyContinuation.scala +++ b/akka-http/src/main/scala/akka/http/JettyContinuation.scala @@ -14,8 +14,7 @@ import Types._ /** * @author Garrick Evans */ -trait JettyContinuation extends ContinuationListener with akka.util.Logging -{ +trait JettyContinuation extends ContinuationListener { import javax.servlet.http.HttpServletResponse import MistSettings._ @@ -79,7 +78,6 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging // unexpected continution state(s) - log and do nothing // case _ => { - log.slf4j.warn("Received continuation in unexpected state: "+continuation.isInitial+" "+continuation.isSuspended+" "+continuation.isExpired+" "+continuation.isResumed) //continuation.cancel None } diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index dcad75456c..53a6d8f6d0 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -4,8 +4,8 @@ package akka.http -import akka.util.Logging import akka.actor.{ActorRegistry, ActorRef, Actor} +import akka.actor.{EventHandler} import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServlet @@ -63,7 +63,7 @@ import Types._ /** * */ -trait Mist extends Logging { +trait Mist { import javax.servlet.{ServletContext} import MistSettings._ @@ -118,16 +118,11 @@ trait Mist extends Logging { val server = context.getServerInfo val (major, minor) = (context.getMajorVersion, context.getMinorVersion) - log.slf4j.info("Initializing Akka HTTP on {} with Servlet API {}.{}",Array[AnyRef](server, major: java.lang.Integer, minor: java.lang.Integer)) - _factory = if (major >= 3) { - log.slf4j.info("Supporting Java asynchronous contexts.") Some(Servlet30ContextMethodFactory) } else if (server.toLowerCase startsWith JettyServer) { - log.slf4j.info("Supporting Jetty asynchronous continuations.") Some(JettyContinuationMethodFactory) } else { - log.slf4j.error("No asynchronous request handling can be supported.") None } } @@ -185,7 +180,7 @@ class AkkaMistFilter extends Filter with Mist { case "POST" => mistify(hreq, hres)(_factory.get.Post) case "PUT" => mistify(hreq, hres)(_factory.get.Put) case "TRACE" => mistify(hreq, hres)(_factory.get.Trace) - case unknown => log.slf4j.warn("Unknown http method: {}",unknown) + case unknown => {} } chain.doFilter(req,res) case _ => chain.doFilter(req,res) @@ -270,7 +265,6 @@ trait Endpoint { this: Actor => */ protected def _na(uri: String, req: RequestMethod) = { req.NotFound("No endpoint available for [" + uri + "]") - log.slf4j.debug("No endpoint available for [{}]", uri) } } @@ -300,7 +294,7 @@ class RootEndpoint extends Actor with Endpoint { def recv: Receive = { case NoneAvailable(uri, req) => _na(uri, req) - case unknown => log.slf4j.error("Unexpected message sent to root endpoint. [{}]", unknown) + case unknown => {} } /** @@ -319,8 +313,7 @@ class RootEndpoint extends Actor with Endpoint { * * @author Garrick Evans */ -trait RequestMethod extends Logging -{ +trait RequestMethod { import java.io.IOException import javax.servlet.http.{HttpServletResponse, HttpServletRequest} @@ -387,7 +380,6 @@ trait RequestMethod extends Logging case Some(pipe) => { try { if (!suspended) { - log.slf4j.warn("Attempt to complete an expired connection.") false } else { @@ -396,14 +388,13 @@ trait RequestMethod extends Logging true } } catch { - case io => - log.slf4j.error("Failed to write data to connection on resume - the client probably disconnected", io) + case io: Exception => + EventHandler notifyListeners EventHandler.Error(io, this) false } } case None => - log.slf4j.error("Attempt to complete request with no context.") false } @@ -411,24 +402,17 @@ trait RequestMethod extends Logging context match { case Some(pipe) => { try { - if (!suspended) { - log.slf4j.warn("Attempt to complete an expired connection.") - } - else { + if (suspended) { response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume") pipe.complete } - } - catch { - case io: IOException => log.slf4j.error("Request completed with internal error.", io) - } - finally { - log.slf4j.error("Request completed with internal error.", t) + } catch { + case io: IOException => + EventHandler notifyListeners EventHandler.Error(io, this) } } - case None => - log.slf4j.error("Attempt to complete request with no context", t) + case None => {} } } diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index 974ece230d..1caeeb83ef 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -7,12 +7,12 @@ package akka.http import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; import Types._ +import akka.actor.{EventHandler} /** * @author Garrick Evans */ -trait Servlet30Context extends AsyncListener with akka.util.Logging -{ +trait Servlet30Context extends AsyncListener { import javax.servlet.http.HttpServletResponse import MistSettings._ @@ -36,7 +36,7 @@ trait Servlet30Context extends AsyncListener with akka.util.Logging } catch { case ex: IllegalStateException => - log.slf4j.info("Cannot update timeout - already returned to container") + EventHandler notifyListeners EventHandler.Error(ex, this) false } } @@ -46,8 +46,8 @@ trait Servlet30Context extends AsyncListener with akka.util.Logging // def onComplete(e: AsyncEvent) {} def onError(e: AsyncEvent) = e.getThrowable match { - case null => log.slf4j.warn("Error occured...") - case t => log.slf4j.warn("Error occured", t) + case null => {} + case t => {} } def onStartAsync(e: AsyncEvent) {} def onTimeout(e: AsyncEvent) = { diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index 553984a22e..6105c5265e 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -23,9 +23,9 @@ package akka.security import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} +import akka.actor.{EventHandler} import akka.actor.Actor._ import akka.config.Config -import akka.util.Logging import com.sun.jersey.api.model.AbstractMethod import com.sun.jersey.spi.container.{ResourceFilterFactory, ContainerRequest, ContainerRequestFilter, ContainerResponse, ContainerResponseFilter, ResourceFilter} @@ -69,9 +69,9 @@ case class SpnegoCredentials(token: Array[Byte]) extends Credentials /** * Jersey Filter for invocation intercept and authorization/authentication */ -class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { +class AkkaSecurityFilterFactory extends ResourceFilterFactory { class Filter(actor: ActorRef, rolesAllowed: Option[List[String]]) - extends ResourceFilter with ContainerRequestFilter with Logging { + extends ResourceFilter with ContainerRequestFilter { override def getRequestFilter: ContainerRequestFilter = this @@ -91,7 +91,6 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { throw new WebApplicationException(r.asInstanceOf[Response]) case None => throw new WebApplicationException(408) case unknown => { - log.slf4j.warn("Authenticator replied with unexpected result [{}]", unknown) throw new WebApplicationException(Response.Status.INTERNAL_SERVER_ERROR) } } @@ -244,7 +243,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] { * class to create an authenticator. Don't forget to set the authenticator FQN in the * rest-part of the akka config */ -trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging { +trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] { import LiftUtils._ private object InvalidateNonces @@ -257,8 +256,7 @@ trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] w case InvalidateNonces => val ts = System.currentTimeMillis nonceMap.filter(tuple => (ts - tuple._2) < nonceValidityPeriod) - case unknown => - log.slf4j.error("Don't know what to do with: ", unknown) + case unknown => {} } //Schedule the invalidation of nonces @@ -345,7 +343,7 @@ import org.ietf.jgss.GSSContext import org.ietf.jgss.GSSCredential import org.ietf.jgss.GSSManager -trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] with Logging { +trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] { override def unauthorized = Response.status(401).header("WWW-Authenticate", "Negotiate").build @@ -371,8 +369,8 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w Some(UserInfo(user, null, rolesFor(user))) } catch { case e: PrivilegedActionException => { - log.slf4j.error("Action not allowed", e) - return None + EventHandler notifyListeners EventHandler.Error(e, this) + None } } } diff --git a/akka-http/src/main/scala/akka/servlet/AkkaLoader.scala b/akka-http/src/main/scala/akka/servlet/AkkaLoader.scala index 6aef44d733..db689c9dac 100644 --- a/akka-http/src/main/scala/akka/servlet/AkkaLoader.scala +++ b/akka-http/src/main/scala/akka/servlet/AkkaLoader.scala @@ -6,12 +6,12 @@ package akka.servlet import akka.config.Config import akka.actor.Actor -import akka.util. {Switch, Logging, Bootable} +import akka.util. {Switch, Bootable} /* * This class is responsible for booting up a stack of bundles and then shutting them down */ -class AkkaLoader extends Logging { +class AkkaLoader { private val hasBooted = new Switch(false) @volatile private var _bundles: Option[Bootable] = None @@ -23,50 +23,50 @@ class AkkaLoader extends Logging { */ def boot(withBanner: Boolean, b : Bootable): Unit = hasBooted switchOn { if (withBanner) printBanner - log.slf4j.info("Starting Akka...") + println("Starting Akka...") b.onLoad Thread.currentThread.setContextClassLoader(getClass.getClassLoader) _bundles = Some(b) - log.slf4j.info("Akka started successfully") + println("Akka started successfully") } /* * Shutdown, well, shuts down the bundles used in boot */ def shutdown: Unit = hasBooted switchOff { - log.slf4j.info("Shutting down Akka...") + println("Shutting down Akka...") _bundles.foreach(_.onUnload) _bundles = None Actor.shutdownHook.run - log.slf4j.info("Akka succesfully shut down") + println("Akka succesfully shut down") } private def printBanner = { - log.slf4j.info("==================================================") - log.slf4j.info(" t") - log.slf4j.info(" t t t") - log.slf4j.info(" t t tt t") - log.slf4j.info(" tt t t tt t") - log.slf4j.info(" t ttttttt t ttt t") - log.slf4j.info(" t tt ttt t ttt t") - log.slf4j.info(" t t ttt t ttt t t") - log.slf4j.info(" tt t ttt ttt ttt t") - log.slf4j.info(" t t ttt ttt t tt t") - log.slf4j.info(" t ttt ttt t t") - log.slf4j.info(" tt ttt ttt t") - log.slf4j.info(" ttt ttt") - log.slf4j.info(" tttttttt ttt ttt ttt ttt tttttttt") - log.slf4j.info(" ttt tt ttt ttt ttt ttt ttt ttt") - log.slf4j.info(" ttt ttt ttt ttt ttt ttt ttt ttt") - log.slf4j.info(" ttt ttt ttt ttt ttt tt ttt ttt") - log.slf4j.info(" tttt ttttttttt tttttttt tttt") - log.slf4j.info(" ttttttttt ttt ttt ttt ttt ttttttttt") - log.slf4j.info(" ttt ttt ttt ttt ttt ttt ttt ttt") - log.slf4j.info(" ttt ttt ttt ttt ttt ttt ttt ttt") - log.slf4j.info(" ttt tt ttt ttt ttt ttt ttt ttt") - log.slf4j.info(" tttttttt ttt ttt ttt ttt tttttttt") - log.slf4j.info("==================================================") - log.slf4j.info(" Running version {}", Config.VERSION) - log.slf4j.info("==================================================") + println("==================================================") + println(" t") + println(" t t t") + println(" t t tt t") + println(" tt t t tt t") + println(" t ttttttt t ttt t") + println(" t tt ttt t ttt t") + println(" t t ttt t ttt t t") + println(" tt t ttt ttt ttt t") + println(" t t ttt ttt t tt t") + println(" t ttt ttt t t") + println(" tt ttt ttt t") + println(" ttt ttt") + println(" tttttttt ttt ttt ttt ttt tttttttt") + println(" ttt tt ttt ttt ttt ttt ttt ttt") + println(" ttt ttt ttt ttt ttt ttt ttt ttt") + println(" ttt ttt ttt ttt ttt tt ttt ttt") + println(" tttt ttttttttt tttttttt tttt") + println(" ttttttttt ttt ttt ttt ttt ttttttttt") + println(" ttt ttt ttt ttt ttt ttt ttt ttt") + println(" ttt ttt ttt ttt ttt ttt ttt ttt") + println(" ttt tt ttt ttt ttt ttt ttt ttt") + println(" tttttttt ttt ttt ttt ttt tttttttt") + println("==================================================") + println(" Running version {}", Config.VERSION) + println("==================================================") } } diff --git a/akka-http/src/main/scala/akka/servlet/Initializer.scala b/akka-http/src/main/scala/akka/servlet/Initializer.scala index cef7463f59..75b036d635 100644 --- a/akka-http/src/main/scala/akka/servlet/Initializer.scala +++ b/akka-http/src/main/scala/akka/servlet/Initializer.scala @@ -7,7 +7,7 @@ package akka.servlet import akka.remote.BootableRemoteActorService import akka.actor.BootableActorLoaderService import akka.config.Config -import akka.util.{Logging, Bootable} +import akka.util.Bootable import javax.servlet.{ServletContextListener, ServletContextEvent} diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 3a1b5c967f..bd586ce939 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -6,14 +6,14 @@ package akka.remote import akka.config.Config.config import akka.actor. {Actor, BootableActorLoaderService} -import akka.util. {ReflectiveAccess, Bootable, Logging} +import akka.util. {ReflectiveAccess, Bootable} /** * This bundle/service is responsible for booting up and shutting down the remote actors facility *

* It is used in Kernel */ -trait BootableRemoteActorService extends Bootable with Logging { +trait BootableRemoteActorService extends Bootable { self: BootableActorLoaderService => protected lazy val remoteServerThread = new Thread(new Runnable() { @@ -24,18 +24,14 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) { - log.slf4j.info("Initializing Remote Actors Service...") startRemoteService - log.slf4j.info("Remote Actors Service initialized") } super.onLoad } abstract override def onUnload = { - log.slf4j.info("Shutting down Remote Actors Service") Actor.remote.shutdown if (remoteServerThread.isAlive) remoteServerThread.join(1000) - log.slf4j.info("Remote Actors Service has been shut down") super.onUnload } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 90336649e7..d70b83d987 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -10,7 +10,7 @@ import akka.util._ import com.google.protobuf.{Message, ByteString} -object MessageSerializer extends Logging { +object MessageSerializer { private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b0ae2a9b38..db0fbe2937 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,6 +16,7 @@ import akka.remoteinterface._ import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.AkkaException import akka.actor.Actor._ +import akka.actor.{EventHandler} import akka.util._ import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} @@ -38,7 +39,7 @@ import scala.reflect.BeanProperty import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} -trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging => +trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement => private val remoteClients = new HashMap[Address, RemoteClient] private val remoteActors = new Index[Address, Uuid] private val lock = new ReadWriteGuard @@ -142,7 +143,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem */ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, - val remoteAddress: InetSocketAddress) extends Logging { + val remoteAddress: InetSocketAddress) { val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort @@ -194,7 +195,6 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - log.slf4j.debug("sending message: {} has future {}", request, senderFuture) if (isRunning) { if (request.getOneWay) { currentChannel.write(request).addListener(new ChannelFutureListener { @@ -272,22 +272,17 @@ class ActiveRemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) - log.slf4j.info("Starting remote client connection to [{}]", remoteAddress) - // Wait until the connection attempt succeeds or fails. connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress) - log.slf4j.debug("Remote client connection failed", connection.getCause) false } else { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { if(isRunning) { - log.slf4j.debug("Reaping expired futures awaiting completion from [{}]", remoteAddress) val i = futures.entrySet.iterator while(i.hasNext) { val e = i.next @@ -304,15 +299,12 @@ class ActiveRemoteClient private[akka] ( case true => true case false if reconnectIfAlreadyConnected => isAuthenticated.set(false) - log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress) openChannels.remove(connection.getChannel) connection.getChannel.close connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - log.slf4j.error("Reconnection to [{}] has failed", remoteAddress) - log.slf4j.debug("Reconnection failed", connection.getCause) false } else true case false => false @@ -320,7 +312,6 @@ class ActiveRemoteClient private[akka] ( } def shutdown = runSwitch switchOff { - log.slf4j.info("Shutting down {}", name) notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop timer = null @@ -329,7 +320,6 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null - log.slf4j.info("{} has been shut down", name) } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -339,7 +329,6 @@ class ActiveRemoteClient private[akka] ( } else { val timeLeft = RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart) if (timeLeft > 0) { - log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) true } else false } @@ -399,12 +388,11 @@ class ActiveRemoteClientHandler( val remoteAddress: SocketAddress, val timer: HashedWheelTimer, val client: ActiveRemoteClient) - extends SimpleChannelUpstreamHandler with Logging { + extends SimpleChannelUpstreamHandler { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { - log.slf4j.debug(event.toString) } super.handleUpstream(ctx, event) } @@ -414,8 +402,6 @@ class ActiveRemoteClientHandler( event.getMessage match { case reply: RemoteMessageProtocol => val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) - log.slf4j.debug("Trying to map back to future: {}",replyUuid) val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] if (reply.hasMessage) { @@ -442,9 +428,9 @@ class ActiveRemoteClientHandler( throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) } } catch { - case e: Exception => + case e: Throwable => + EventHandler notifyListeners EventHandler.Error(e, this) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) - log.slf4j.error("Unexpected exception in remote client handler", e) throw e } } @@ -465,7 +451,6 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -482,16 +467,11 @@ class ActiveRemoteClientHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) - log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress)) - if (event.getCause ne null) - log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause) - else - log.slf4j.error("Unexpected exception from downstream in remote client: {}", event) - + if (event.getCause ne null) event.getCause.printStackTrace event.getChannel.close } @@ -505,9 +485,8 @@ class ActiveRemoteClientHandler( .getConstructor(Array[Class[_]](classOf[String]): _*) .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { - case problem => - log.debug("Couldn't parse exception returned from RemoteServer",problem) - log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage) + case problem: Throwable => + EventHandler notifyListeners EventHandler.Error(problem, this) UnparsableException(classname, exception.getMessage) } } @@ -578,8 +557,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.releaseExternalResources serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { - case e: java.nio.channels.ClosedChannelException => {} - case e => serverModule.log.slf4j.warn("Could not close remote server channel in a graceful way") + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) } } } @@ -607,12 +586,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { - log.slf4j.debug("Starting up remote server on {}:{}",_hostname, _port) currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) } } catch { - case e => - log.slf4j.error("Could not start up remote server", e) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) notifyListeners(RemoteServerError(e, this)) } this @@ -622,7 +600,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => _isRunning switchOff { currentServer.getAndSet(None) foreach { instance => - log.slf4j.debug("Shutting down remote server on {}:{}",instance.host, instance.port) instance.shutdown } } @@ -634,7 +611,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => * @param typedActor typed actor to register */ def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard { - log.slf4j.debug("Registering server side remote typed actor [{}] with id [{}]", typedActor.getClass.getName, id) if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid) else registerTypedActor(id, typedActor, typedActors) } @@ -645,7 +621,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => * @param typedActor typed actor to register */ def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard { - log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id) registerTypedPerSessionActor(id, () => factory, typedActorsFactories) } @@ -655,13 +630,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ def register(id: String, actorRef: ActorRef): Unit = guard withGuard { - log.slf4j.debug("Registering server side remote actor [{}] with id [{}]", actorRef.actorClass.getName, id) if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid) else register(id, actorRef, actors) } def registerByUuid(actorRef: ActorRef): Unit = guard withGuard { - log.slf4j.debug("Registering remote actor {} to it's uuid {}", actorRef, actorRef.uuid) register(actorRef.uuid.toString, actorRef, actorsByUuid) } @@ -678,7 +651,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized { - log.slf4j.debug("Registering server side remote session actor with id [{}]", id) registerPerSession(id, () => factory, actorsFactories) } @@ -702,7 +674,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregister(actorRef: ActorRef): Unit = guard withGuard { if (_isRunning.isOn) { - log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid)) actors.remove(actorRef.id, actorRef) actorsByUuid.remove(actorRef.uuid, actorRef) } @@ -715,7 +686,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregister(id: String): Unit = guard withGuard { if (_isRunning.isOn) { - log.slf4j.info("Unregistering server side remote actor with id [{}]", id) if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) else { val actorRef = actors get id @@ -732,7 +702,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregisterPerSession(id: String): Unit = { if (_isRunning.isOn) { - log.slf4j.info("Unregistering server side remote session actor with id [{}]", id) actorsFactories.remove(id) } } @@ -744,7 +713,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregisterTypedActor(id: String):Unit = guard withGuard { if (_isRunning.isOn) { - log.slf4j.info("Unregistering server side remote typed actor with id [{}]", id) if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length)) else typedActors.remove(id) } @@ -818,7 +786,7 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { + val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { import RemoteServerSettings._ val CHANNEL_INIT = "channel-init".intern @@ -855,7 +823,6 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) - log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name) if (SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) // Begin handshake. @@ -876,17 +843,16 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { import scala.collection.JavaConversions.asScalaIterable val clientAddress = getClientAddress(ctx) - log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name) // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { - try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) } + try { actor.stop } catch { case e: Exception => } } // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); actor <- asScalaIterable(map.values)) { - try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) } + try { TypedActor.stop(actor) } catch { case e: Exception => } } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -894,13 +860,11 @@ class RemoteServerHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - log.slf4j.debug("Remote client [{}] channel closed from [{}]", clientAddress, server.name) server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { - log.slf4j.debug(event.toString) } super.handleUpstream(ctx, event) } @@ -914,7 +878,6 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - log.slf4j.error("Unexpected exception from remote downstream", event.getCause) event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) } @@ -926,7 +889,6 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - log.slf4j.debug("Received RemoteMessageProtocol[\n{}]",request) request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) @@ -937,11 +899,11 @@ class RemoteServerHandler( private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo - log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid) val actorRef = try { createActor(actorInfo, channel).start } catch { case e: SecurityException => + EventHandler notifyListeners EventHandler.Error(e, this) write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) server.notifyListeners(RemoteServerError(e, server)) return @@ -967,16 +929,13 @@ class RemoteServerHandler( None, Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). onComplete(f => { - log.slf4j.debug("Future was completed, now flushing to remote!") val result = f.result val exception = f.exception if (exception.isDefined) { - log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass) write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } else if (result.isDefined) { - log.slf4j.debug("Returning result from actor invocation [{}]",result.get) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), @@ -1004,7 +963,6 @@ class RemoteServerHandler( private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = { val actorInfo = request.getActorInfo val typedActorInfo = actorInfo.getTypedActorInfo - log.slf4j.debug("Dispatching to remote typed actor [{} :: {}]", typedActorInfo.getMethod, typedActorInfo.getInterface) val typedActor = createTypedActor(actorInfo, channel) val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList @@ -1031,9 +989,10 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) write(channel, messageBuilder.build) - log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result) } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + server.notifyListeners(RemoteServerError(e, server)) } messageReceiver.invoke(typedActor, args: _*) match { @@ -1048,9 +1007,11 @@ class RemoteServerHandler( } } catch { case e: InvocationTargetException => + EventHandler notifyListeners EventHandler.Error(e, this) write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) - case e: Throwable => + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) } @@ -1100,7 +1061,6 @@ class RemoteServerHandler( if (UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) @@ -1110,8 +1070,8 @@ class RemoteServerHandler( server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid actorRef } catch { - case e => - log.slf4j.error("Could not create remote actor instance", e) + case e: Throwable => + EventHandler notifyListeners EventHandler.Error(e, this) server.notifyListeners(RemoteServerError(e, server)) throw e } @@ -1167,8 +1127,6 @@ class RemoteServerHandler( if (UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname) - val (interfaceClass, targetClass) = if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), applicationLoader.get.loadClass(targetClassname)) @@ -1179,8 +1137,8 @@ class RemoteServerHandler( server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid newInstance } catch { - case e => - log.slf4j.error("Could not create remote typed actor instance", e) + case e: Throwable => + EventHandler notifyListeners EventHandler.Error(e, this) server.notifyListeners(RemoteServerError(e, server)) throw e } @@ -1201,8 +1159,6 @@ class RemoteServerHandler( private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { val actorInfo = request.getActorInfo - log.slf4j.error("Could not invoke remote actor [{}]", actorInfo.getTarget) - log.slf4j.debug("Could not invoke remote actor", exception) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), @@ -1230,7 +1186,6 @@ class RemoteServerHandler( "The remote client [" + clientAddress + "] does not have a secure cookie.") if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException( "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") - log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress) } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 99445594f2..12ee2228f5 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -170,7 +170,6 @@ object ActorSerialization { private[akka] def fromProtobufToLocalActorRef[T <: Actor]( protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { - Actor.log.slf4j.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) val serializer = if (format.isInstanceOf[SerializerBasedActorFormat[_]]) @@ -248,7 +247,6 @@ object RemoteActorSerialization { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol) val ref = RemoteActorRef( protocol.getClassOrServiceName, protocol.getActorClassname, @@ -256,8 +254,6 @@ object RemoteActorSerialization { protocol.getHomeAddress.getPort, protocol.getTimeout, loader) - - Actor.log.slf4j.debug("Newly deserialized RemoteActorRef has uuid: {}", ref.uuid) ref } @@ -267,8 +263,6 @@ object RemoteActorSerialization { def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = { import ar._ - Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",actorClassName, ar.homeAddress) - Actor.remote.registerByUuid(ar) RemoteActorRefProtocol.newBuilder @@ -396,7 +390,6 @@ object TypedActorSerialization { private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef]( protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = { - Actor.log.slf4j.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol) val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader) val intfClass = toClass(loader, protocol.getInterfaceName) TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U] @@ -436,7 +429,6 @@ object RemoteTypedActorSerialization { * Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy. */ private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = { - Actor.log.slf4j.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol) val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader) val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName) TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T] diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSample.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSample.scala index a8990e2b88..6b11f73f10 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSample.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSample.scala @@ -1,7 +1,6 @@ package akka.actor.remote import akka.actor.{Actor, ActorRegistry} -import akka.util.Logging import Actor._ @@ -41,11 +40,10 @@ object ServerInitiatedRemoteActorServer { } } -object ServerInitiatedRemoteActorClient extends Logging { +object ServerInitiatedRemoteActorClient { def main(args: Array[String]) = { val actor = Actor.remote.actorFor("hello-service", "localhost", 2552) val result = actor !! "Hello" - log.slf4j.info("Result from Remote Actor: {}", result) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index c762c2da7d..5a2c580e5e 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -78,7 +78,7 @@ class Hakker(name: String,left: ActorRef, right: ActorRef) extends Actor { //back to think about how he should obtain his chopsticks :-) def waiting_for(chopstickToWaitFor: ActorRef, otherChopstick: ActorRef): Receive = { case Taken(`chopstickToWaitFor`) => - log.info("%s has picked up %s and %s, and starts to eat",name,left.id,right.id) + println("%s has picked up %s and %s, and starts to eat",name,left.id,right.id) become(eating) Scheduler.scheduleOnce(self,Think,5,TimeUnit.SECONDS) @@ -108,14 +108,14 @@ class Hakker(name: String,left: ActorRef, right: ActorRef) extends Actor { become(thinking) left ! Put(self) right ! Put(self) - log.info("%s puts down his chopsticks and starts to think",name) + println("%s puts down his chopsticks and starts to think",name) Scheduler.scheduleOnce(self,Eat,5,TimeUnit.SECONDS) } //All hakkers start in a non-eating state def receive = { case Think => - log.info("%s starts to think",name) + println("%s starts to think",name) become(thinking) Scheduler.scheduleOnce(self,Eat,5,TimeUnit.SECONDS) } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 7a4641c35e..3273136690 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -89,7 +89,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit when(Waiting) { case Event(Think, _) => - log.info("%s starts to think", name) + println("%s starts to think", name) startThinking(5 seconds) } @@ -128,7 +128,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startEating(left: ActorRef, right: ActorRef): State = { - log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) + println("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) goto(Eating) using TakenChopsticks(Some(left), Some(right)) forMax (5 seconds) } @@ -147,7 +147,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // then he puts down his chopsticks and starts to think when(Eating) { case Event(StateTimeout, _) => - log.info("%s puts down his chopsticks and starts to think", name) + println("%s puts down his chopsticks and starts to think", name) left ! Put right ! Put startThinking(5 seconds) diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala index f0adb42899..3477ff5783 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala @@ -5,35 +5,29 @@ package sample.remote import akka.actor.Actor._ -import akka.util.Logging import akka.actor. {ActorRegistry, Actor} import Actor.remote class RemoteHelloWorldActor extends Actor { def receive = { case "Hello" => - log.slf4j.info("Received 'Hello'") self.reply("World") } } -object ClientManagedRemoteActorServer extends Logging { +object ClientManagedRemoteActorServer { def run = { remote.start("localhost", 2552) - log.slf4j.info("Remote node started") } def main(args: Array[String]) = run } -object ClientManagedRemoteActorClient extends Logging { +object ClientManagedRemoteActorClient { def run = { val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start - log.slf4j.info("Remote actor created, moved to the server") - log.slf4j.info("Sending 'Hello' to remote actor") val result = actor !! "Hello" - log.slf4j.info("Result from Remote Actor: '{}'", result.get) } def main(args: Array[String]) = run diff --git a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala index 96e8d1debf..4776c19004 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala @@ -5,37 +5,30 @@ package sample.remote import akka.actor.Actor._ -import akka.util.Logging import akka.actor. {ActorRegistry, Actor} class HelloWorldActor extends Actor { def receive = { case "Hello" => - log.slf4j.info("Received 'Hello'") self.reply("World") } } -object ServerManagedRemoteActorServer extends Logging { +object ServerManagedRemoteActorServer { def run = { Actor.remote.start("localhost", 2552) - log.slf4j.info("Remote node started") Actor.remote.register("hello-service", actorOf[HelloWorldActor]) - log.slf4j.info("Remote actor registered and started") } def main(args: Array[String]) = run } -object ServerManagedRemoteActorClient extends Logging { +object ServerManagedRemoteActorClient { def run = { val actor = Actor.remote.actorFor("hello-service", "localhost", 2552) - log.slf4j.info("Remote client created") - log.slf4j.info("Sending 'Hello' to remote actor") val result = actor !! "Hello" - log.slf4j.info("Result from Remote Actor: '{}'", result.get) } def main(args: Array[String]) = run diff --git a/akka-stm/src/main/scala/akka/stm/Transaction.scala b/akka-stm/src/main/scala/akka/stm/Transaction.scala index 48b9beaf2d..b2f0caaf07 100644 --- a/akka-stm/src/main/scala/akka/stm/Transaction.scala +++ b/akka-stm/src/main/scala/akka/stm/Transaction.scala @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.HashMap -import akka.util.{Logging, ReflectiveAccess} +import akka.util.ReflectiveAccess import akka.config.Config._ import akka.config.ModuleNotAvailableException import akka.AkkaException @@ -88,7 +88,7 @@ object Transaction { * The Akka-specific Transaction class. * For integration with persistence modules and JTA support. */ -@serializable class Transaction extends Logging { +@serializable class Transaction { val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) val STATE_RETRIES = config.getInt("akka.storage.max-retries",10) @@ -102,17 +102,13 @@ object Transaction { if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer) else None - log.slf4j.trace("Creating transaction " + toString) - // --- public methods --------- def begin = synchronized { - log.slf4j.trace("Starting transaction " + toString) jta.foreach { _.beginWithStmSynchronization(this) } } def commitPersistentState = synchronized { - log.trace("Committing transaction " + toString) retry(STATE_RETRIES){ persistentStateMap.valuesIterator.foreach(_.commit) persistentStateMap.clear @@ -125,14 +121,12 @@ object Transaction { } def abort = synchronized { - log.slf4j.trace("Aborting transaction " + toString) jta.foreach(_.rollback) persistentStateMap.valuesIterator.foreach(_.abort) persistentStateMap.clear } def retry(tries:Int)(block: => Unit):Unit={ - log.debug("Trying commit of persistent data structures") if(tries==0){ throw new TransactionRetryException("Exhausted Retries while committing persistent state") } @@ -140,7 +134,6 @@ object Transaction { block } catch{ case e:Exception=>{ - log.warn(e,"Exception while committing persistent state, retrying") retry(tries-1){block} } } @@ -169,8 +162,6 @@ object Transaction { //have no possibility of kicking a diffferent type with the same uuid out of a transction private[akka] def register(uuid: String, storage: Committable with Abortable) = { if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){ - log.error("existing:"+System.identityHashCode(persistentStateMap.get(uuid).get)) - log.error("new:"+System.identityHashCode(storage)) throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid)) } } diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 730bad3f9c..028f615216 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -6,6 +6,7 @@ package akka.transactor import akka.config.Config import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory} +import akka.actor.{EventHandler} import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -132,12 +133,7 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { factory.addHooks val result = body val timeout = factory.config.timeout - try { - barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) - } catch { - // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake - case e: IllegalStateException => () - } + barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) result } }) diff --git a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala index 5132cbbeb1..367ef5ac5f 100644 --- a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala @@ -18,7 +18,6 @@ object CoordinatedIncrement { implicit val txFactory = TransactionFactory(timeout = 3 seconds) def increment = { - log.slf4j.info(name + ": incrementing") count alter (_ + 1) } diff --git a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala index 2adfaeca5f..b6f8405e08 100644 --- a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala @@ -26,7 +26,6 @@ object FickleFriends { implicit val txFactory = TransactionFactory(timeout = 3 seconds) def increment = { - log.slf4j.info(name + ": incrementing") count alter (_ + 1) } @@ -65,7 +64,6 @@ object FickleFriends { implicit val txFactory = TransactionFactory(timeout = 3 seconds) def increment = { - log.slf4j.info(name + ": incrementing") count alter (_ + 1) } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index f10a0e1d2c..212abff1d4 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -20,7 +20,6 @@ object TransactorIncrement { override def transactionFactory = TransactionFactory(timeout = 3 seconds) def increment = { - log.slf4j.info(name + ": incrementing") count alter (_ + 1) } @@ -32,7 +31,7 @@ object TransactorIncrement { } override def before = { - case i: Increment => log.slf4j.info(name + ": before transaction") + case i: Increment => } def atomically = { @@ -44,7 +43,7 @@ object TransactorIncrement { } override def after = { - case i: Increment => log.slf4j.info(name + ": after transaction") + case i: Increment => } override def normally = { diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index a7f29dc731..790e9824a1 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -431,7 +431,7 @@ trait TypedActorFactory { * * @author Jonas Bonér */ -object TypedActor extends Logging { +object TypedActor { import Actor.actorOf val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() @@ -576,8 +576,6 @@ object TypedActor extends Logging { actorRef.timeout = config.timeout - //log.slf4j.debug("config._host for {} is {} but homeAddress is {} and on ref {}",Array[AnyRef](intfClass, config._host, typedActor.context.homeAddress,actorRef.homeAddress)) - val remoteAddress = actorRef match { case remote: RemoteActorRef => remote.homeAddress case local: LocalActorRef if local.clientManaged => local.homeAddress diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala index 81bde84281..315467f2ee 100644 --- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala @@ -24,7 +24,7 @@ import com.google.inject._ * * @author Jonas Bonér */ -private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBase with Logging { +private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBase { private var injector: Injector = _ private var supervisor: Option[Supervisor] = None private var faultHandlingStrategy: FaultHandlingStrategy = NoFaultHandlingStrategy @@ -43,7 +43,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa * @return the typed actors for the class */ def getInstance[T](clazz: Class[T]): List[T] = synchronized { - log.slf4j.debug("Retrieving typed actor [{}]", clazz.getName) if (injector eq null) throw new IllegalActorStateException( "inject() and/or supervise() must be called before invoking getInstance(clazz)") val (proxy, targetInstance, component) = diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 150cbc6773..6aa1432c8a 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -6,13 +6,15 @@ # Modify as needed. akka { - version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka. + version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] + enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] - time-unit = "seconds" # Time unit for all timeout properties throughout the config + time-unit = "seconds" # Time unit for all timeout properties throughout the config - enable-jmx = on # expose the configuration through JMX + enable-jmx = on # expose the configuration through JMX + + default-error-handler = on # register the default error handler listener which logs errors to STDOUT # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # Can be used to bootstrap your application(s) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 2ceb21d6fd..7d278d5384 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -26,7 +26,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val javaCompileSettings = Seq("-Xlint:unchecked") - override def compileOptions = super.compileOptions ++ scalaCompileSettings.map(CompileOption) + override def compileOptions = super.compileOptions ++ scalaCompileSettings.map(CompileOption) override def javaCompileOptions = super.javaCompileOptions ++ javaCompileSettings.map(JavaCompileOption) // ------------------------------------------------------------------------------------------------------------------- @@ -103,7 +103,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsRelRepo) - lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback", sbt.DefaultMavenRepository) lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo) lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo) @@ -115,12 +114,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val JERSEY_VERSION = "1.3" lazy val MULTIVERSE_VERSION = "0.6.2" lazy val SCALATEST_VERSION = "1.3" - lazy val LOGBACK_VERSION = "0.9.24" - lazy val SLF4J_VERSION = "1.6.0" lazy val JETTY_VERSION = "7.1.6.v20100715" lazy val JAVAX_SERVLET_VERSION = "3.0" - // ------------------------------------------------------------------------------------------------------------------- // Dependencies // ------------------------------------------------------------------------------------------------------------------- @@ -161,7 +157,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 - lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2 + lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" //ApacheV2 lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2 lazy val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" % "compile" //ApacheV2 @@ -170,11 +166,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT - lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 + lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 - lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1 - lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2 // Test @@ -303,7 +297,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val configgy = Dependencies.configgy - val logback = Dependencies.logback // testing val junit = Dependencies.junit