Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-03-19 17:52:59 +01:00
commit d5ffc7e739
18 changed files with 187 additions and 152 deletions

View file

@ -28,7 +28,9 @@ import akka.japi. {Creator, Procedure}
/* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage {
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
extends AutoReceivedMessage with LifeCycleMessage {
/**
* Java API
*/
@ -103,6 +105,7 @@ class ActorTimeoutException private[akka](message: String) extends AkkaEx
* <pre>
* EventHandler.error(exception, this, message.toString)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
@ -222,7 +225,6 @@ object EventHandler extends ListenerManagement {
addListener(Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]).start)
} catch {
case e: Exception =>
e.printStackTrace
new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
@ -234,7 +236,7 @@ object EventHandler extends ListenerManagement {
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg)
override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg)
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
@ -255,7 +257,7 @@ object Actor extends ListenerManagement {
val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
tf.setAccessible(true)
val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]]
subclassAudits.synchronized {subclassAudits.clear}
subclassAudits synchronized {subclassAudits.clear}
}
}
Runtime.getRuntime.addShutdownHook(new Thread(hook))
@ -265,11 +267,11 @@ object Actor extends ListenerManagement {
val registry = new ActorRegistry
lazy val remote: RemoteSupport = {
ReflectiveAccess.
Remote.
defaultRemoteSupport.
map(_()).
getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath"))
ReflectiveAccess
.Remote
.defaultRemoteSupport
.map(_())
.getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath"))
}
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
@ -456,9 +458,8 @@ trait Actor {
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
optRef.asInstanceOf[Some[ActorRef]]
}
@ -557,7 +558,7 @@ trait Actor {
* by default it throws an UnhandledMessageException
*/
def unhandled(msg: Any) {
throw new UnhandledMessageException(msg,self)
throw new UnhandledMessageException(msg, self)
}
/**
@ -578,19 +579,16 @@ trait Actor {
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Receive, discardOld: Boolean = true) {
if (discardOld)
unbecome
if (discardOld) unbecome
self.hotswap = self.hotswap.push(behavior)
}
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome: Unit = {
def unbecome(): Unit = {
val h = self.hotswap
if (h.nonEmpty)
self.hotswap = h.pop
if (h.nonEmpty) self.hotswap = h.pop
}
// =========================================
@ -607,7 +605,7 @@ trait Actor {
}
private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
case HotSwap(code,discardOld) => become(code(self),discardOld)
case HotSwap(code, discardOld) => become(code(self), discardOld)
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@ -616,8 +614,7 @@ trait Actor {
case Restart(reason) => throw reason
case PoisonPill =>
val f = self.senderFuture
if(f.isDefined)
f.get.completeWithException(new ActorKilledException("PoisonPill"))
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
self.stop
}

View file

@ -34,6 +34,23 @@ private[akka] object ActorRefInternals {
object SHUTDOWN extends StatusType
}
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
/**
* Sends the specified message to the channel
* Scala API
*/
def !(msg: T): Unit
/**
* Sends the specified message to the channel
* Java API
*/
def sendOneWay(msg: T): Unit = this.!(msg)
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -633,7 +650,7 @@ class LocalActorRef private[akka] (
/**
* Starts up the actor and its message queue.
*/
def start: ActorRef = guard.withGuard {
def start(): ActorRef = guard.withGuard {
if (isShutdown) throw new ActorStartException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
@ -687,11 +704,16 @@ class LocalActorRef private[akka] (
* <p/>
* To be invoked from within the actor itself.
*/
def link(actorRef: ActorRef) = guard.withGuard {
if (actorRef.supervisor.isDefined) throw new IllegalActorStateException(
def link(actorRef: ActorRef): Unit = guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this)
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this)
}
}
/**
@ -825,8 +847,8 @@ class LocalActorRef private[akka] (
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e: Throwable =>
EventHandler notify EventHandler.Error(e, this, messageHandle.message.toString)
case e =>
EventHandler.error(e, this, messageHandle.message.toString)
throw e
}
}
@ -842,10 +864,8 @@ class LocalActorRef private[akka] (
dead.restart(reason, maxRetries, within)
case _ =>
if (_supervisor.isDefined)
notifySupervisorWithMessage(Exit(this, reason))
else
dead.stop
if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
else dead.stop
}
}
@ -880,7 +900,7 @@ class LocalActorRef private[akka] (
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
def performRestart {
def performRestart() {
val failedActor = actorInstance.get
failedActor match {
@ -891,20 +911,19 @@ class LocalActorRef private[akka] (
case _ =>
failedActor.preRestart(reason)
val freshActor = newActor
setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call
setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.preStart
freshActor.postRestart(reason)
}
}
def tooManyRestarts {
def tooManyRestarts() {
_supervisor.foreach { sup =>
// can supervisor handle the notification?
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
}
stop
}
@ -917,12 +936,15 @@ class LocalActorRef private[akka] (
case Temporary =>
shutDownTemporaryActor(this)
true
case _ => // either permanent or none where default is permanent
val success = try {
performRestart
true
} catch {
case e => false //An error or exception here should trigger a retry
case e =>
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
} finally {
currentMessage = null
}
@ -935,27 +957,25 @@ class LocalActorRef private[akka] (
}
}
} else {
tooManyRestarts
true //Done
tooManyRestarts()
true // done
}
if (success)
() //Alles gut
else
attemptRestart
if (success) () // alles gut
else attemptRestart()
}
attemptRestart() //Tailrecursion
attemptRestart() // recur
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
val i = _linkedActors.values.iterator
while(i.hasNext) {
while (i.hasNext) {
val actorRef = i.next
actorRef.lifeCycle match {
// either permanent or none where default is permanent
case Temporary => shutDownTemporaryActor(actorRef)
case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
}
@ -963,8 +983,7 @@ class LocalActorRef private[akka] (
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled
if (_supervisor.isDefined) {
if (homeAddress.isDefined)
Actor.remote.registerSupervisorForActor(this)
if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
}
@ -983,14 +1002,12 @@ class LocalActorRef private[akka] (
temporaryActor.stop
_linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
// if last temporary actor is gone, then unlink me from supervisor
if (_linkedActors.isEmpty)
notifySupervisorWithMessage(UnlinkAndStop(this))
if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
true
}
private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
EventHandler notify EventHandler.Error(reason, this, message.toString)
EventHandler.error(reason, this, message.toString)
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@ -1013,7 +1030,7 @@ class LocalActorRef private[akka] (
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
while(i.hasNext) {
while (i.hasNext) {
i.next.stop
i.remove
}
@ -1032,7 +1049,7 @@ class LocalActorRef private[akka] (
val someSelfField = clazz.getDeclaredField("someSelf")
selfField.setAccessible(true)
someSelfField.setAccessible(true)
selfField.set(actor,value)
selfField.set(actor, value)
someSelfField.set(actor, if (value ne null) Some(value) else null)
true
} catch {
@ -1044,11 +1061,11 @@ class LocalActorRef private[akka] (
val parent = clazz.getSuperclass
if (parent eq null)
throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
lookupAndSetSelfFields(parent,actor,value)
lookupAndSetSelfFields(parent, actor, value)
}
}
lookupAndSetSelfFields(actor.getClass,actor,value)
lookupAndSetSelfFields(actor.getClass, actor, value)
}
private def initializeActorInstance = {
@ -1102,7 +1119,11 @@ private[akka] case class RemoteActorRef private[akka] (
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = Actor.remote.send[T](message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, actorType, loader)
val future = Actor.remote.send[T](
message, senderOption, senderFuture,
homeAddress.get, timeout,
false, this, None,
actorType, loader)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
@ -1179,7 +1200,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/
def id: String
def id_=(id: String): Unit /**
def id_=(id: String): Unit
/**
* User overridable callback/setting.
* <p/>
* Defines the life-cycle for a supervised actor.
@ -1195,11 +1218,11 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* <p/>
* Can be one of:
* <pre>
* faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
* faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
* </pre>
* Or:
* <pre>
* faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
* faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
* </pre>
*/
@volatile
@ -1267,8 +1290,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
future.await
} catch {
case e: FutureTimeoutException =>
if (isMessageJoinPoint) throw e
else None
if (isMessageJoinPoint) {
EventHandler.error(e, this, e.getMessage)
throw e
} else None
}
future.resultOrException
} else throw new ActorInitializationException(
@ -1380,20 +1405,3 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
}
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
/**
* Sends the specified message to the channel
* Scala API
*/
def !(msg: T): Unit
/**
* Sends the specified message to the channel
* Java API
*/
def sendOneWay(msg: T): Unit = this.!(msg)
}

View file

@ -38,8 +38,9 @@ object Scheduler {
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message)
throw SchedulerException(message + " could not be scheduled on " + receiver, e)
val error = SchedulerException(message + " could not be scheduled on " + receiver, e)
EventHandler.error(error, this, "%s @ %s".format(receiver, message))
throw error
}
}
@ -59,8 +60,9 @@ object Scheduler {
service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
throw SchedulerException("Failed to schedule a Runnable", e)
val error = SchedulerException("Failed to schedule a Runnable", e)
EventHandler.error(error, this, error.getMessage)
throw error
}
}
@ -73,9 +75,10 @@ object Scheduler {
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message)
throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
case e: Exception =>
val error = SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
EventHandler.error(e, this, receiver + " @ " + message)
throw error
}
}
@ -95,8 +98,9 @@ object Scheduler {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
throw SchedulerException("Failed to scheduleOnce a Runnable", e)
val error = SchedulerException("Failed to scheduleOnce a Runnable", e)
EventHandler.error(e, this, error.getMessage)
throw error
}
}

View file

@ -77,7 +77,7 @@ object Config {
}
} catch {
case e =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
throw e
}
}

View file

@ -148,7 +148,7 @@ object DataFlow {
(out !! Get).as[T]
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
out ! Exit
throw e
}

View file

@ -102,7 +102,7 @@ class ExecutorBasedEventDrivenDispatcher(
try executorService.get() execute invocation
catch {
case e: RejectedExecutionException =>
EventHandler notify EventHandler.Warning(this, e.toString)
EventHandler.warning(this, e.toString)
throw e
}
}
@ -149,7 +149,7 @@ class ExecutorBasedEventDrivenDispatcher(
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
EventHandler notify EventHandler.Warning(this, e.toString)
EventHandler.warning(this, e.toString)
mbox.dispatcherLock.unlock()
throw e
}

View file

@ -87,7 +87,7 @@ object Futures {
result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
result completeWithException e
} finally {
results.clear
@ -266,7 +266,7 @@ sealed trait Future[+T] {
else Left(new MatchError(r))
} catch {
case e: Exception =>
EventHandler notifyListeners EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
Left(e)
}
}
@ -294,7 +294,7 @@ sealed trait Future[+T] {
Right(f(v.right.get))
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
Left(e)
})
}
@ -322,7 +322,7 @@ sealed trait Future[+T] {
fa.completeWith(f(v.right.get))
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
fa completeWithException e
}
}
@ -352,7 +352,7 @@ sealed trait Future[+T] {
else Left(new MatchError(r))
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
Left(e)
})
}

View file

@ -36,7 +36,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () =
Right(function.apply)
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
Left(e)
})
}

View file

@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
import akka.actor.{EventHandler}
import akka.actor.EventHandler
object ThreadPoolConfig {
type Bounds = Int
@ -208,10 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
})
} catch {
case e: RejectedExecutionException =>
EventHandler notify EventHandler.Warning(this, e.toString)
EventHandler.warning(this, e.toString)
semaphore.release
case e: Throwable =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
throw e
}
}

View file

@ -4,7 +4,7 @@
package akka.routing
import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill}
import akka.actor.{Actor, ActorRef, PoisonPill}
import java.util.concurrent.TimeUnit
/**

View file

@ -6,7 +6,7 @@ package akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic. {AtomicBoolean}
import akka.actor.{EventHandler}
import akka.actor.EventHandler
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -120,15 +120,15 @@ class SimpleLock {
class Switch(startAsOn: Boolean = false) {
private val switch = new AtomicBoolean(startAsOn)
protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized {
protected def transcend(from: Boolean, action: => Unit): Boolean = synchronized {
if (switch.compareAndSet(from, !from)) {
try {
action
} catch {
case t: Throwable =>
EventHandler notify EventHandler.Error(t, this)
switch.compareAndSet(!from, from) //Revert status
throw t
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
switch.compareAndSet(!from, from) // revert status
throw e
}
true
} else false

View file

@ -24,7 +24,6 @@ object SupervisorSpec {
class PingPong1Actor extends Actor {
import self._
//dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case Ping =>
messageLog.put("ping")
@ -34,11 +33,9 @@ object SupervisorSpec {
oneWayLog.put("oneway")
case Die =>
println("******************** GOT DIE 1")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
println("******************** restart 1")
messageLog.put(reason.getMessage)
}
}
@ -50,11 +47,9 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
println("******************** GOT DIE 2")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
println("******************** restart 2")
messageLog.put(reason.getMessage)
}
}
@ -66,12 +61,10 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
println("******************** GOT DIE 3")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
println("******************** restart 3")
messageLog.put(reason.getMessage)
}
}
@ -84,12 +77,10 @@ object SupervisorSpec {
messageLog.put("ping")
reply("pong")
case Die =>
println("******************** GOT DIE 3")
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
override def postRestart(reason: Throwable) {
println("******************** restart temporary")
messageLog.put(reason.getMessage)
}
}
@ -114,17 +105,6 @@ class SupervisorSpec extends JUnitSuite {
var pingpong3: ActorRef = _
var temporaryActor: ActorRef = _
/*
@Test def shouldStartServer = {
clearMessageLogs
val sup = getSingleActorAllForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
}
*/
@Test def shoulNotRestartProgrammaticallyLinkedTemporaryActor = {
clearMessageLogs
val master = actorOf[Master].start

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.dispatch.Dispatchers
import akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent}
import Actor._
class SupervisorTreeSpec extends WordSpec with MustMatchers {
var log = ""
case object Die
class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor {
self.id = myId
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
a.foreach(self.link(_))
def receive = {
case Die => throw new Exception(self.id + " is dying...")
}
override def preRestart(reason: Throwable) {
log += self.id
}
}
"In a 3 levels deep supervisor tree (linked in the constructor) we" should {
"be able to kill the middle actor and see itself and its child restarted" in {
log = "INIT"
val lastActor = actorOf(new Chainer("lastActor")).start
val middleActor = actorOf(new Chainer("middleActor", Some(lastActor))).start
val headActor = actorOf(new Chainer("headActor", Some(middleActor))).start
middleActor ! Die
Thread.sleep(100)
log must equal ("INITmiddleActorlastActor")
}
}
}

View file

@ -5,7 +5,7 @@
package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.{EventHandler}
import akka.actor.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import javax.servlet.http.HttpServlet
@ -389,7 +389,7 @@ trait RequestMethod {
}
} catch {
case io: Exception =>
EventHandler notify EventHandler.Error(io, this)
EventHandler.error(io, this, io.getMessage)
false
}
}
@ -408,7 +408,7 @@ trait RequestMethod {
}
} catch {
case io: IOException =>
EventHandler notify EventHandler.Error(io, this)
EventHandler.error(io, this, io.getMessage)
}
}

View file

@ -7,7 +7,7 @@ package akka.http
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
import Types._
import akka.actor.{EventHandler}
import akka.actor.EventHandler
/**
* @author Garrick Evans
@ -35,8 +35,8 @@ trait Servlet30Context extends AsyncListener {
true
}
catch {
case ex: IllegalStateException =>
EventHandler notify EventHandler.Error(ex, this)
case e: IllegalStateException =>
EventHandler.error(e, this, e.getMessage)
false
}
}

View file

@ -23,7 +23,7 @@
package akka.security
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
import akka.actor.{EventHandler}
import akka.actor.EventHandler
import akka.actor.Actor._
import akka.config.Config
@ -369,7 +369,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] {
Some(UserInfo(user, null, rolesFor(user)))
} catch {
case e: PrivilegedActionException => {
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
None
}
}

View file

@ -36,12 +36,13 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import scala.collection.mutable.{ HashMap }
import scala.reflect.BeanProperty
import java.net.{ SocketAddress, InetSocketAddress }
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean}
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -434,7 +435,7 @@ class ActiveRemoteClientHandler(
}
} catch {
case e: Throwable =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
}
@ -483,7 +484,7 @@ class ActiveRemoteClientHandler(
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Throwable =>
EventHandler notify EventHandler.Error(problem, this)
EventHandler.error(problem, this, problem.getMessage)
UnparsableException(classname, exception.getMessage)
}
}
@ -563,7 +564,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
}
}
}
@ -596,7 +597,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
}
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this))
}
this
@ -869,7 +870,7 @@ class RemoteServerHandler(
val actorRef =
try { createActor(actorInfo, channel) } catch {
case e: SecurityException =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
server.notifyListeners(RemoteServerError(e, server))
return
@ -979,7 +980,7 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
@ -991,11 +992,11 @@ class RemoteServerHandler(
}
} catch {
case e: InvocationTargetException =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
}
@ -1055,7 +1056,7 @@ class RemoteServerHandler(
actorRef.start //Start it where it's created
} catch {
case e: Throwable =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
@ -1122,7 +1123,7 @@ class RemoteServerHandler(
newInstance
} catch {
case e: Throwable =>
EventHandler notify EventHandler.Error(e, this)
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
throw e
}

View file

@ -6,7 +6,6 @@ package akka.transactor
import akka.config.Config
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
import akka.actor.{EventHandler}
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier