diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 2ec64543e3..c01dacf53e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -633,7 +633,7 @@ class LocalActorRef private[akka] ( /** * Starts up the actor and its message queue. */ - def start: ActorRef = guard.withGuard { + def start(): ActorRef = guard.withGuard { if (isShutdown) throw new ActorStartException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { @@ -687,11 +687,16 @@ class LocalActorRef private[akka] ( *
* To be invoked from within the actor itself. */ - def link(actorRef: ActorRef) = guard.withGuard { - if (actorRef.supervisor.isDefined) throw new IllegalActorStateException( + def link(actorRef: ActorRef): Unit = guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(this) + } } /** @@ -825,8 +830,8 @@ class LocalActorRef private[akka] ( checkReceiveTimeout // Reschedule receive timeout } } catch { - case e: Throwable => - EventHandler notify EventHandler.Error(e, this, messageHandle.message.toString) + case e => + EventHandler.error(e, this, messageHandle.message.toString) throw e } } @@ -842,10 +847,8 @@ class LocalActorRef private[akka] ( dead.restart(reason, maxRetries, within) case _ => - if (_supervisor.isDefined) - notifySupervisorWithMessage(Exit(this, reason)) - else - dead.stop + if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else dead.stop } } @@ -880,7 +883,7 @@ class LocalActorRef private[akka] ( } protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart { + def performRestart() { val failedActor = actorInstance.get failedActor match { @@ -891,20 +894,19 @@ class LocalActorRef private[akka] ( case _ => failedActor.preRestart(reason) val freshActor = newActor - setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call + setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call freshActor.preStart freshActor.postRestart(reason) } } - def tooManyRestarts { + def tooManyRestarts() { _supervisor.foreach { sup => // can supervisor handle the notification? val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) } - stop } @@ -917,12 +919,15 @@ class LocalActorRef private[akka] ( case Temporary => shutDownTemporaryActor(this) true + case _ => // either permanent or none where default is permanent val success = try { performRestart true } catch { - case e => false //An error or exception here should trigger a retry + case e => + EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry } finally { currentMessage = null } @@ -935,27 +940,25 @@ class LocalActorRef private[akka] ( } } } else { - tooManyRestarts - true //Done + tooManyRestarts() + true // done } - if (success) - () //Alles gut - else - attemptRestart + if (success) () // alles gut + else attemptRestart() } - attemptRestart() //Tailrecursion + attemptRestart() // recur } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { val actorRef = i.next actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary => shutDownTemporaryActor(actorRef) - case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -963,8 +966,7 @@ class LocalActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) - Actor.remote.registerSupervisorForActor(this) + if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -983,14 +985,12 @@ class LocalActorRef private[akka] ( temporaryActor.stop _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) - notifySupervisorWithMessage(UnlinkAndStop(this)) - + if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) true } private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - EventHandler notify EventHandler.Error(reason, this, message.toString) + EventHandler.error(reason, this, message.toString) //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1013,7 +1013,7 @@ class LocalActorRef private[akka] ( //Scoped stop all linked actors, to avoid leaking the 'i' val { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { i.next.stop i.remove } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 851c9afa8d..01f4282874 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -38,8 +38,9 @@ object Scheduler { initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message) - throw SchedulerException(message + " could not be scheduled on " + receiver, e) + val error = SchedulerException(message + " could not be scheduled on " + receiver, e) + EventHandler.error(error, this, "%s @ %s".format(receiver, message)) + throw error } } @@ -59,8 +60,9 @@ object Scheduler { service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) - throw SchedulerException("Failed to schedule a Runnable", e) + val error = SchedulerException("Failed to schedule a Runnable", e) + EventHandler.error(error, this, error.getMessage) + throw error } } @@ -73,9 +75,10 @@ object Scheduler { new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => - EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message) - throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + case e: Exception => + val error = SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + EventHandler.error(e, this, receiver + " @ " + message) + throw error } } @@ -95,8 +98,9 @@ object Scheduler { service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) - throw SchedulerException("Failed to scheduleOnce a Runnable", e) + val error = SchedulerException("Failed to scheduleOnce a Runnable", e) + EventHandler.error(e, this, error.getMessage) + throw error } } diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index c67cbf8591..1d9185e98d 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -77,7 +77,7 @@ object Config { } } catch { case e => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 5b50886b64..fec6f04d45 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -148,7 +148,7 @@ object DataFlow { (out !! Get).as[T] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) out ! Exit throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 2061e558e6..c15a26e00d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -102,7 +102,7 @@ class ExecutorBasedEventDrivenDispatcher( try executorService.get() execute invocation catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) throw e } } @@ -149,7 +149,7 @@ class ExecutorBasedEventDrivenDispatcher( executorService.get() execute mbox } catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) mbox.dispatcherLock.unlock() throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3d37ec2ef8..4555bf614a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -87,7 +87,7 @@ object Futures { result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) result completeWithException e } finally { results.clear @@ -266,7 +266,7 @@ sealed trait Future[+T] { else Left(new MatchError(r)) } catch { case e: Exception => - EventHandler notifyListeners EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) } } @@ -294,7 +294,7 @@ sealed trait Future[+T] { Right(f(v.right.get)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } @@ -322,7 +322,7 @@ sealed trait Future[+T] { f(v.right.get) onComplete (fa.completeWith(_)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) fa completeWithException e } } @@ -352,7 +352,7 @@ sealed trait Future[+T] { else Left(new MatchError(r)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index ee31535f28..9eb46d5c30 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -36,7 +36,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () = Right(function.apply) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 553f3d986f..7e15ed69c3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration -import akka.actor.{EventHandler} +import akka.actor.EventHandler object ThreadPoolConfig { type Bounds = Int @@ -208,10 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend }) } catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) semaphore.release case e: Throwable => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index bdef95ec3c..8d431541f7 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -4,7 +4,7 @@ package akka.routing -import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill} +import akka.actor.{Actor, ActorRef, PoisonPill} import java.util.concurrent.TimeUnit /** diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 1869dbb5e3..055fdab3b0 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -6,7 +6,7 @@ package akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.atomic. {AtomicBoolean} -import akka.actor.{EventHandler} +import akka.actor.EventHandler /** * @author Jonas Bonér @@ -120,15 +120,15 @@ class SimpleLock { class Switch(startAsOn: Boolean = false) { private val switch = new AtomicBoolean(startAsOn) - protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized { + protected def transcend(from: Boolean, action: => Unit): Boolean = synchronized { if (switch.compareAndSet(from, !from)) { try { action } catch { - case t: Throwable => - EventHandler notify EventHandler.Error(t, this) - switch.compareAndSet(!from, from) //Revert status - throw t + case e: Throwable => + EventHandler.error(e, this, e.getMessage) + switch.compareAndSet(!from, from) // revert status + throw e } true } else false diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala new file mode 100644 index 0000000000..cb694d7408 --- /dev/null +++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB