diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 79b3afdfdb..a14f41f2f3 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,7 +28,9 @@ import akka.japi. {Creator, Procedure} /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) + extends AutoReceivedMessage with LifeCycleMessage { + /** * Java API */ @@ -103,6 +105,7 @@ class ActorTimeoutException private[akka](message: String) extends AkkaEx *
* EventHandler.error(exception, this, message.toString) *+ * * @author Jonas Bonér */ object EventHandler extends ListenerManagement { @@ -222,7 +225,6 @@ object EventHandler extends ListenerManagement { addListener(Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]).start) } catch { case e: Exception => - e.printStackTrace new ConfigurationException( "Event Handler specified in config can't be loaded [" + listenerName + "] due to [" + e.toString + "]") @@ -234,7 +236,7 @@ object EventHandler extends ListenerManagement { * This message is thrown by default when an Actors behavior doesn't match a message */ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { - override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg) + override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg) override def fillInStackTrace() = this //Don't waste cycles generating stack trace } @@ -255,7 +257,7 @@ object Actor extends ListenerManagement { val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] - subclassAudits.synchronized {subclassAudits.clear} + subclassAudits synchronized {subclassAudits.clear} } } Runtime.getRuntime.addShutdownHook(new Thread(hook)) @@ -265,11 +267,11 @@ object Actor extends ListenerManagement { val registry = new ActorRegistry lazy val remote: RemoteSupport = { - ReflectiveAccess. - Remote. - defaultRemoteSupport. - map(_()). - getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) + ReflectiveAccess + .Remote + .defaultRemoteSupport + .map(_()) + .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath")) } private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis @@ -456,9 +458,8 @@ trait Actor { "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + "\n\tEither use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + - "\n\t\t'val actor = Actor.actor { case msg => .. } }'") - optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") + optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? optRef.asInstanceOf[Some[ActorRef]] } @@ -557,7 +558,7 @@ trait Actor { * by default it throws an UnhandledMessageException */ def unhandled(msg: Any) { - throw new UnhandledMessageException(msg,self) + throw new UnhandledMessageException(msg, self) } /** @@ -578,19 +579,16 @@ trait Actor { * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ def become(behavior: Receive, discardOld: Boolean = true) { - if (discardOld) - unbecome - + if (discardOld) unbecome self.hotswap = self.hotswap.push(behavior) } /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome: Unit = { + def unbecome(): Unit = { val h = self.hotswap - if (h.nonEmpty) - self.hotswap = h.pop + if (h.nonEmpty) self.hotswap = h.pop } // ========================================= @@ -607,7 +605,7 @@ trait Actor { } private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { - case HotSwap(code,discardOld) => become(code(self),discardOld) + case HotSwap(code, discardOld) => become(code(self), discardOld) case RevertHotSwap => unbecome case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) @@ -616,8 +614,7 @@ trait Actor { case Restart(reason) => throw reason case PoisonPill => val f = self.senderFuture - if(f.isDefined) - f.get.completeWithException(new ActorKilledException("PoisonPill")) + if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) self.stop } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 2ec64543e3..32c03acf05 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -34,6 +34,23 @@ private[akka] object ActorRefInternals { object SHUTDOWN extends StatusType } +/** + * Abstraction for unification of sender and senderFuture for later reply + */ +abstract class Channel[T] { + + /** + * Sends the specified message to the channel + * Scala API + */ + def !(msg: T): Unit + + /** + * Sends the specified message to the channel + * Java API + */ + def sendOneWay(msg: T): Unit = this.!(msg) +} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -633,7 +650,7 @@ class LocalActorRef private[akka] ( /** * Starts up the actor and its message queue. */ - def start: ActorRef = guard.withGuard { + def start(): ActorRef = guard.withGuard { if (isShutdown) throw new ActorStartException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { @@ -687,11 +704,16 @@ class LocalActorRef private[akka] ( * * To be invoked from within the actor itself. */ - def link(actorRef: ActorRef) = guard.withGuard { - if (actorRef.supervisor.isDefined) throw new IllegalActorStateException( + def link(actorRef: ActorRef): Unit = guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(this) + } } /** @@ -825,8 +847,8 @@ class LocalActorRef private[akka] ( checkReceiveTimeout // Reschedule receive timeout } } catch { - case e: Throwable => - EventHandler notify EventHandler.Error(e, this, messageHandle.message.toString) + case e => + EventHandler.error(e, this, messageHandle.message.toString) throw e } } @@ -842,10 +864,8 @@ class LocalActorRef private[akka] ( dead.restart(reason, maxRetries, within) case _ => - if (_supervisor.isDefined) - notifySupervisorWithMessage(Exit(this, reason)) - else - dead.stop + if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else dead.stop } } @@ -880,7 +900,7 @@ class LocalActorRef private[akka] ( } protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart { + def performRestart() { val failedActor = actorInstance.get failedActor match { @@ -891,20 +911,19 @@ class LocalActorRef private[akka] ( case _ => failedActor.preRestart(reason) val freshActor = newActor - setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call + setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call freshActor.preStart freshActor.postRestart(reason) } } - def tooManyRestarts { + def tooManyRestarts() { _supervisor.foreach { sup => // can supervisor handle the notification? val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) } - stop } @@ -917,12 +936,15 @@ class LocalActorRef private[akka] ( case Temporary => shutDownTemporaryActor(this) true + case _ => // either permanent or none where default is permanent val success = try { performRestart true } catch { - case e => false //An error or exception here should trigger a retry + case e => + EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry } finally { currentMessage = null } @@ -935,27 +957,25 @@ class LocalActorRef private[akka] ( } } } else { - tooManyRestarts - true //Done + tooManyRestarts() + true // done } - if (success) - () //Alles gut - else - attemptRestart + if (success) () // alles gut + else attemptRestart() } - attemptRestart() //Tailrecursion + attemptRestart() // recur } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { val actorRef = i.next actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary => shutDownTemporaryActor(actorRef) - case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -963,8 +983,7 @@ class LocalActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) - Actor.remote.registerSupervisorForActor(this) + if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -983,14 +1002,12 @@ class LocalActorRef private[akka] ( temporaryActor.stop _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) - notifySupervisorWithMessage(UnlinkAndStop(this)) - + if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) true } private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - EventHandler notify EventHandler.Error(reason, this, message.toString) + EventHandler.error(reason, this, message.toString) //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1013,7 +1030,7 @@ class LocalActorRef private[akka] ( //Scoped stop all linked actors, to avoid leaking the 'i' val { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { i.next.stop i.remove } @@ -1032,7 +1049,7 @@ class LocalActorRef private[akka] ( val someSelfField = clazz.getDeclaredField("someSelf") selfField.setAccessible(true) someSelfField.setAccessible(true) - selfField.set(actor,value) + selfField.set(actor, value) someSelfField.set(actor, if (value ne null) Some(value) else null) true } catch { @@ -1044,11 +1061,11 @@ class LocalActorRef private[akka] ( val parent = clazz.getSuperclass if (parent eq null) throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent,actor,value) + lookupAndSetSelfFields(parent, actor, value) } } - lookupAndSetSelfFields(actor.getClass,actor,value) + lookupAndSetSelfFields(actor.getClass, actor, value) } private def initializeActorInstance = { @@ -1102,7 +1119,11 @@ private[akka] case class RemoteActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = Actor.remote.send[T](message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, actorType, loader) + val future = Actor.remote.send[T]( + message, senderOption, senderFuture, + homeAddress.get, timeout, + false, this, None, + actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } @@ -1179,7 +1200,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def id: String - def id_=(id: String): Unit /** + def id_=(id: String): Unit + + /** * User overridable callback/setting. * * Defines the life-cycle for a supervised actor. @@ -1195,11 +1218,11 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * * Can be one of: *
- * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
+ * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
*
* Or:
*
- * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
+ * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
*
*/
@volatile
@@ -1267,8 +1290,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
future.await
} catch {
case e: FutureTimeoutException =>
- if (isMessageJoinPoint) throw e
- else None
+ if (isMessageJoinPoint) {
+ EventHandler.error(e, this, e.getMessage)
+ throw e
+ } else None
}
future.resultOrException
} else throw new ActorInitializationException(
@@ -1380,20 +1405,3 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
}
-
-/**
- * Abstraction for unification of sender and senderFuture for later reply
- */
-abstract class Channel[T] {
- /**
- * Sends the specified message to the channel
- * Scala API
- */
- def !(msg: T): Unit
-
- /**
- * Sends the specified message to the channel
- * Java API
- */
- def sendOneWay(msg: T): Unit = this.!(msg)
-}
diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala
index 851c9afa8d..01f4282874 100644
--- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala
+++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala
@@ -38,8 +38,9 @@ object Scheduler {
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message)
- throw SchedulerException(message + " could not be scheduled on " + receiver, e)
+ val error = SchedulerException(message + " could not be scheduled on " + receiver, e)
+ EventHandler.error(error, this, "%s @ %s".format(receiver, message))
+ throw error
}
}
@@ -59,8 +60,9 @@ object Scheduler {
service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
- throw SchedulerException("Failed to schedule a Runnable", e)
+ val error = SchedulerException("Failed to schedule a Runnable", e)
+ EventHandler.error(error, this, error.getMessage)
+ throw error
}
}
@@ -73,9 +75,10 @@ object Scheduler {
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
- case e: Exception =>
- EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message)
- throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
+ case e: Exception =>
+ val error = SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
+ EventHandler.error(e, this, receiver + " @ " + message)
+ throw error
}
}
@@ -95,8 +98,9 @@ object Scheduler {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
- throw SchedulerException("Failed to scheduleOnce a Runnable", e)
+ val error = SchedulerException("Failed to scheduleOnce a Runnable", e)
+ EventHandler.error(e, this, error.getMessage)
+ throw error
}
}
diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala
index c67cbf8591..1d9185e98d 100644
--- a/akka-actor/src/main/scala/akka/config/Config.scala
+++ b/akka-actor/src/main/scala/akka/config/Config.scala
@@ -77,7 +77,7 @@ object Config {
}
} catch {
case e =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
throw e
}
}
diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala
index 5b50886b64..fec6f04d45 100644
--- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala
+++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala
@@ -148,7 +148,7 @@ object DataFlow {
(out !! Get).as[T]
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
out ! Exit
throw e
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 2061e558e6..c15a26e00d 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -102,7 +102,7 @@ class ExecutorBasedEventDrivenDispatcher(
try executorService.get() execute invocation
catch {
case e: RejectedExecutionException =>
- EventHandler notify EventHandler.Warning(this, e.toString)
+ EventHandler.warning(this, e.toString)
throw e
}
}
@@ -149,7 +149,7 @@ class ExecutorBasedEventDrivenDispatcher(
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
- EventHandler notify EventHandler.Warning(this, e.toString)
+ EventHandler.warning(this, e.toString)
mbox.dispatcherLock.unlock()
throw e
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index 944e5b4847..7ad12913be 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -87,7 +87,7 @@ object Futures {
result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
result completeWithException e
} finally {
results.clear
@@ -266,7 +266,7 @@ sealed trait Future[+T] {
else Left(new MatchError(r))
} catch {
case e: Exception =>
- EventHandler notifyListeners EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
Left(e)
}
}
@@ -294,7 +294,7 @@ sealed trait Future[+T] {
Right(f(v.right.get))
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
Left(e)
})
}
@@ -322,7 +322,7 @@ sealed trait Future[+T] {
fa.completeWith(f(v.right.get))
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
fa completeWithException e
}
}
@@ -352,7 +352,7 @@ sealed trait Future[+T] {
else Left(new MatchError(r))
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
Left(e)
})
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
index ee31535f28..9eb46d5c30 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
@@ -36,7 +36,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () =
Right(function.apply)
} catch {
case e: Exception =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
Left(e)
})
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index 553f3d986f..7e15ed69c3 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
-import akka.actor.{EventHandler}
+import akka.actor.EventHandler
object ThreadPoolConfig {
type Bounds = Int
@@ -208,10 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
})
} catch {
case e: RejectedExecutionException =>
- EventHandler notify EventHandler.Warning(this, e.toString)
+ EventHandler.warning(this, e.toString)
semaphore.release
case e: Throwable =>
- EventHandler notify EventHandler.Error(e, this)
+ EventHandler.error(e, this, e.getMessage)
throw e
}
}
diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala
index bdef95ec3c..8d431541f7 100644
--- a/akka-actor/src/main/scala/akka/routing/Pool.scala
+++ b/akka-actor/src/main/scala/akka/routing/Pool.scala
@@ -4,7 +4,7 @@
package akka.routing
-import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill}
+import akka.actor.{Actor, ActorRef, PoisonPill}
import java.util.concurrent.TimeUnit
/**
diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala
index 1869dbb5e3..055fdab3b0 100644
--- a/akka-actor/src/main/scala/akka/util/LockUtil.scala
+++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala
@@ -6,7 +6,7 @@ package akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic. {AtomicBoolean}
-import akka.actor.{EventHandler}
+import akka.actor.EventHandler
/**
* @author Jonas Bonér
@@ -120,15 +120,15 @@ class SimpleLock {
class Switch(startAsOn: Boolean = false) {
private val switch = new AtomicBoolean(startAsOn)
- protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized {
+ protected def transcend(from: Boolean, action: => Unit): Boolean = synchronized {
if (switch.compareAndSet(from, !from)) {
try {
action
} catch {
- case t: Throwable =>
- EventHandler notify EventHandler.Error(t, this)
- switch.compareAndSet(!from, from) //Revert status
- throw t
+ case e: Throwable =>
+ EventHandler.error(e, this, e.getMessage)
+ switch.compareAndSet(!from, from) // revert status
+ throw e
}
true
} else false
diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala
index b966eea5ec..ecc6dbfb4b 100644
--- a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala
+++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala
@@ -24,7 +24,6 @@ object SupervisorSpec {
class PingPong1Actor extends Actor {
import self._
- //dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case Ping =>
messageLog.put("ping")
@@ -34,11 +33,9 @@ object SupervisorSpec {
oneWayLog.put("oneway")
case Die =>
- println("******************** GOT DIE 1")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
- println("******************** restart 1")
messageLog.put(reason.getMessage)
}
}
@@ -50,11 +47,9 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
- println("******************** GOT DIE 2")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
- println("******************** restart 2")
messageLog.put(reason.getMessage)
}
}
@@ -66,12 +61,10 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
- println("******************** GOT DIE 3")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
- println("******************** restart 3")
messageLog.put(reason.getMessage)
}
}
@@ -84,12 +77,10 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
- println("******************** GOT DIE 3")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
- println("******************** restart temporary")
messageLog.put(reason.getMessage)
}
}
@@ -114,17 +105,6 @@ class SupervisorSpec extends JUnitSuite {
var pingpong3: ActorRef = _
var temporaryActor: ActorRef = _
-/*
- @Test def shouldStartServer = {
- clearMessageLogs
- val sup = getSingleActorAllForOneSupervisor
- sup.start
-
- expect("pong") {
- (pingpong1 !! (Ping, 5000)).getOrElse("nil")
- }
- }
-*/
@Test def shoulNotRestartProgrammaticallyLinkedTemporaryActor = {
clearMessageLogs
val master = actorOf[Master].start
diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala
new file mode 100644
index 0000000000..cb694d7408
--- /dev/null
+++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB