From e7a410dc0cb3102fbea5871abb6093491f46e334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 18 Mar 2011 23:04:48 +0100 Subject: [PATCH 1/3] Fixed bug with restarting supervised supervisor that had done linking in constructor + Changed all calls to EventHandler to use direct 'error' and 'warning' methods for improved performance --- .../src/main/scala/akka/actor/ActorRef.scala | 66 +++++++++---------- .../src/main/scala/akka/actor/Scheduler.scala | 22 ++++--- .../src/main/scala/akka/config/Config.scala | 2 +- .../main/scala/akka/dataflow/DataFlow.scala | 2 +- .../ExecutorBasedEventDrivenDispatcher.scala | 4 +- .../src/main/scala/akka/dispatch/Future.scala | 10 +-- .../scala/akka/dispatch/MessageHandling.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 6 +- .../src/main/scala/akka/routing/Pool.scala | 2 +- .../src/main/scala/akka/util/LockUtil.scala | 12 ++-- .../actor/supervisor/SupervisorTreeSpec.scala | 46 +++++++++++++ akka-http/src/main/scala/akka/http/Mist.scala | 6 +- .../scala/akka/http/Servlet30Context.scala | 6 +- .../main/scala/akka/security/Security.scala | 4 +- .../remote/netty/NettyRemoteSupport.scala | 27 ++++---- .../scala/akka/transactor/Coordinated.scala | 1 - 16 files changed, 134 insertions(+), 84 deletions(-) create mode 100644 akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 2ec64543e3..c01dacf53e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -633,7 +633,7 @@ class LocalActorRef private[akka] ( /** * Starts up the actor and its message queue. */ - def start: ActorRef = guard.withGuard { + def start(): ActorRef = guard.withGuard { if (isShutdown) throw new ActorStartException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { @@ -687,11 +687,16 @@ class LocalActorRef private[akka] ( *

* To be invoked from within the actor itself. */ - def link(actorRef: ActorRef) = guard.withGuard { - if (actorRef.supervisor.isDefined) throw new IllegalActorStateException( + def link(actorRef: ActorRef): Unit = guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(this) + } } /** @@ -825,8 +830,8 @@ class LocalActorRef private[akka] ( checkReceiveTimeout // Reschedule receive timeout } } catch { - case e: Throwable => - EventHandler notify EventHandler.Error(e, this, messageHandle.message.toString) + case e => + EventHandler.error(e, this, messageHandle.message.toString) throw e } } @@ -842,10 +847,8 @@ class LocalActorRef private[akka] ( dead.restart(reason, maxRetries, within) case _ => - if (_supervisor.isDefined) - notifySupervisorWithMessage(Exit(this, reason)) - else - dead.stop + if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else dead.stop } } @@ -880,7 +883,7 @@ class LocalActorRef private[akka] ( } protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart { + def performRestart() { val failedActor = actorInstance.get failedActor match { @@ -891,20 +894,19 @@ class LocalActorRef private[akka] ( case _ => failedActor.preRestart(reason) val freshActor = newActor - setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call + setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call freshActor.preStart freshActor.postRestart(reason) } } - def tooManyRestarts { + def tooManyRestarts() { _supervisor.foreach { sup => // can supervisor handle the notification? val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) } - stop } @@ -917,12 +919,15 @@ class LocalActorRef private[akka] ( case Temporary => shutDownTemporaryActor(this) true + case _ => // either permanent or none where default is permanent val success = try { performRestart true } catch { - case e => false //An error or exception here should trigger a retry + case e => + EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry } finally { currentMessage = null } @@ -935,27 +940,25 @@ class LocalActorRef private[akka] ( } } } else { - tooManyRestarts - true //Done + tooManyRestarts() + true // done } - if (success) - () //Alles gut - else - attemptRestart + if (success) () // alles gut + else attemptRestart() } - attemptRestart() //Tailrecursion + attemptRestart() // recur } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { val actorRef = i.next actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary => shutDownTemporaryActor(actorRef) - case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -963,8 +966,7 @@ class LocalActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) - Actor.remote.registerSupervisorForActor(this) + if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -983,14 +985,12 @@ class LocalActorRef private[akka] ( temporaryActor.stop _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) - notifySupervisorWithMessage(UnlinkAndStop(this)) - + if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) true } private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - EventHandler notify EventHandler.Error(reason, this, message.toString) + EventHandler.error(reason, this, message.toString) //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1013,7 +1013,7 @@ class LocalActorRef private[akka] ( //Scoped stop all linked actors, to avoid leaking the 'i' val { val i = _linkedActors.values.iterator - while(i.hasNext) { + while (i.hasNext) { i.next.stop i.remove } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 851c9afa8d..01f4282874 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -38,8 +38,9 @@ object Scheduler { initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message) - throw SchedulerException(message + " could not be scheduled on " + receiver, e) + val error = SchedulerException(message + " could not be scheduled on " + receiver, e) + EventHandler.error(error, this, "%s @ %s".format(receiver, message)) + throw error } } @@ -59,8 +60,9 @@ object Scheduler { service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) - throw SchedulerException("Failed to schedule a Runnable", e) + val error = SchedulerException("Failed to schedule a Runnable", e) + EventHandler.error(error, this, error.getMessage) + throw error } } @@ -73,9 +75,10 @@ object Scheduler { new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e: Exception => - EventHandler notify EventHandler.Error(e, this, receiver + " @ " + message) - throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + case e: Exception => + val error = SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + EventHandler.error(e, this, receiver + " @ " + message) + throw error } } @@ -95,8 +98,9 @@ object Scheduler { service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) - throw SchedulerException("Failed to scheduleOnce a Runnable", e) + val error = SchedulerException("Failed to scheduleOnce a Runnable", e) + EventHandler.error(e, this, error.getMessage) + throw error } } diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index c67cbf8591..1d9185e98d 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -77,7 +77,7 @@ object Config { } } catch { case e => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 5b50886b64..fec6f04d45 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -148,7 +148,7 @@ object DataFlow { (out !! Get).as[T] } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) out ! Exit throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 2061e558e6..c15a26e00d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -102,7 +102,7 @@ class ExecutorBasedEventDrivenDispatcher( try executorService.get() execute invocation catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) throw e } } @@ -149,7 +149,7 @@ class ExecutorBasedEventDrivenDispatcher( executorService.get() execute mbox } catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) mbox.dispatcherLock.unlock() throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3d37ec2ef8..4555bf614a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -87,7 +87,7 @@ object Futures { result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) result completeWithException e } finally { results.clear @@ -266,7 +266,7 @@ sealed trait Future[+T] { else Left(new MatchError(r)) } catch { case e: Exception => - EventHandler notifyListeners EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) } } @@ -294,7 +294,7 @@ sealed trait Future[+T] { Right(f(v.right.get)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } @@ -322,7 +322,7 @@ sealed trait Future[+T] { f(v.right.get) onComplete (fa.completeWith(_)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) fa completeWithException e } } @@ -352,7 +352,7 @@ sealed trait Future[+T] { else Left(new MatchError(r)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index ee31535f28..9eb46d5c30 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -36,7 +36,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () = Right(function.apply) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) Left(e) }) } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 553f3d986f..7e15ed69c3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration -import akka.actor.{EventHandler} +import akka.actor.EventHandler object ThreadPoolConfig { type Bounds = Int @@ -208,10 +208,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend }) } catch { case e: RejectedExecutionException => - EventHandler notify EventHandler.Warning(this, e.toString) + EventHandler.warning(this, e.toString) semaphore.release case e: Throwable => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index bdef95ec3c..8d431541f7 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -4,7 +4,7 @@ package akka.routing -import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill} +import akka.actor.{Actor, ActorRef, PoisonPill} import java.util.concurrent.TimeUnit /** diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 1869dbb5e3..055fdab3b0 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -6,7 +6,7 @@ package akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} import java.util.concurrent.atomic. {AtomicBoolean} -import akka.actor.{EventHandler} +import akka.actor.EventHandler /** * @author Jonas Bonér @@ -120,15 +120,15 @@ class SimpleLock { class Switch(startAsOn: Boolean = false) { private val switch = new AtomicBoolean(startAsOn) - protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized { + protected def transcend(from: Boolean, action: => Unit): Boolean = synchronized { if (switch.compareAndSet(from, !from)) { try { action } catch { - case t: Throwable => - EventHandler notify EventHandler.Error(t, this) - switch.compareAndSet(!from, from) //Revert status - throw t + case e: Throwable => + EventHandler.error(e, this, e.getMessage) + switch.compareAndSet(!from, from) // revert status + throw e } true } else false diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala new file mode 100644 index 0000000000..cb694d7408 --- /dev/null +++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.dispatch.Dispatchers +import akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent} +import Actor._ + +class SupervisorTreeSpec extends WordSpec with MustMatchers { + + var log = "" + case object Die + class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor { + self.id = myId + self.lifeCycle = Permanent + self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000) + a.foreach(self.link(_)) + + def receive = { + case Die => throw new Exception(self.id + " is dying...") + } + + override def preRestart(reason: Throwable) { + log += self.id + } + } + + "In a 3 levels deep supervisor tree (linked in the constructor) we" should { + + "be able to kill the middle actor and see itself and its child restarted" in { + log = "INIT" + + val lastActor = actorOf(new Chainer("lastActor")).start + val middleActor = actorOf(new Chainer("middleActor", Some(lastActor))).start + val headActor = actorOf(new Chainer("headActor", Some(middleActor))).start + + middleActor ! Die + Thread.sleep(100) + log must equal ("INITmiddleActorlastActor") + } + } +} diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index f144b69f29..6a7adbe2cf 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -5,7 +5,7 @@ package akka.http import akka.actor.{ActorRegistry, ActorRef, Actor} -import akka.actor.{EventHandler} +import akka.actor.EventHandler import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServlet @@ -389,7 +389,7 @@ trait RequestMethod { } } catch { case io: Exception => - EventHandler notify EventHandler.Error(io, this) + EventHandler.error(io, this, io.getMessage) false } } @@ -408,7 +408,7 @@ trait RequestMethod { } } catch { case io: IOException => - EventHandler notify EventHandler.Error(io, this) + EventHandler.error(io, this, io.getMessage) } } diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index 3bc6c9261b..6ce3d1041c 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -7,7 +7,7 @@ package akka.http import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; import Types._ -import akka.actor.{EventHandler} +import akka.actor.EventHandler /** * @author Garrick Evans @@ -35,8 +35,8 @@ trait Servlet30Context extends AsyncListener { true } catch { - case ex: IllegalStateException => - EventHandler notify EventHandler.Error(ex, this) + case e: IllegalStateException => + EventHandler.error(e, this, e.getMessage) false } } diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index 97b25e1b3e..9f16d54886 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -23,7 +23,7 @@ package akka.security import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} -import akka.actor.{EventHandler} +import akka.actor.EventHandler import akka.actor.Actor._ import akka.config.Config @@ -369,7 +369,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] { Some(UserInfo(user, null, rolesFor(user))) } catch { case e: PrivilegedActionException => { - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) None } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8556f8e7ac..00c5f3ab81 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -31,12 +31,13 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler -import java.net.{ SocketAddress, InetSocketAddress } -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } import scala.collection.mutable.{ HashMap } import scala.reflect.BeanProperty + +import java.net.{ SocketAddress, InetSocketAddress } import java.lang.reflect.InvocationTargetException -import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } +import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean} object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -429,7 +430,7 @@ class ActiveRemoteClientHandler( } } catch { case e: Throwable => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) throw e } @@ -478,7 +479,7 @@ class ActiveRemoteClientHandler( .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { case problem: Throwable => - EventHandler notify EventHandler.Error(problem, this) + EventHandler.error(problem, this, problem.getMessage) UnparsableException(classname, exception.getMessage) } } @@ -558,7 +559,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) } } } @@ -591,7 +592,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) notifyListeners(RemoteServerError(e, this)) } this @@ -862,7 +863,7 @@ class RemoteServerHandler( val actorRef = try { createActor(actorInfo, channel) } catch { case e: SecurityException => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) server.notifyListeners(RemoteServerError(e, server)) return @@ -978,7 +979,7 @@ class RemoteServerHandler( write(channel, RemoteEncoder.encode(messageBuilder.build)) } catch { case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) } @@ -994,11 +995,11 @@ class RemoteServerHandler( } } catch { case e: InvocationTargetException => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) case e: Exception => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) } @@ -1058,7 +1059,7 @@ class RemoteServerHandler( actorRef.start //Start it where it's created } catch { case e: Throwable => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) throw e } @@ -1125,7 +1126,7 @@ class RemoteServerHandler( newInstance } catch { case e: Throwable => - EventHandler notify EventHandler.Error(e, this) + EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) throw e } diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 028f615216..2de7747607 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -6,7 +6,6 @@ package akka.transactor import akka.config.Config import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory} -import akka.actor.{EventHandler} import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier From 0dba3ac29c802a667550f5d38edc8263e3258e75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 19 Mar 2011 01:06:26 +0100 Subject: [PATCH 2/3] removed some println --- .../actor/supervisor/SupervisorSpec.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index b966eea5ec..ecc6dbfb4b 100644 --- a/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -24,7 +24,6 @@ object SupervisorSpec { class PingPong1Actor extends Actor { import self._ - //dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case Ping => messageLog.put("ping") @@ -34,11 +33,9 @@ object SupervisorSpec { oneWayLog.put("oneway") case Die => - println("******************** GOT DIE 1") throw new RuntimeException("Expected exception; to test fault-tolerance") } override def postRestart(reason: Throwable) { - println("******************** restart 1") messageLog.put(reason.getMessage) } } @@ -50,11 +47,9 @@ object SupervisorSpec { messageLog.put("ping") reply("pong") case Die => - println("******************** GOT DIE 2") throw new RuntimeException("Expected exception; to test fault-tolerance") } override def postRestart(reason: Throwable) { - println("******************** restart 2") messageLog.put(reason.getMessage) } } @@ -66,12 +61,10 @@ object SupervisorSpec { messageLog.put("ping") reply("pong") case Die => - println("******************** GOT DIE 3") throw new RuntimeException("Expected exception; to test fault-tolerance") } override def postRestart(reason: Throwable) { - println("******************** restart 3") messageLog.put(reason.getMessage) } } @@ -84,12 +77,10 @@ object SupervisorSpec { messageLog.put("ping") reply("pong") case Die => - println("******************** GOT DIE 3") throw new RuntimeException("Expected exception; to test fault-tolerance") } override def postRestart(reason: Throwable) { - println("******************** restart temporary") messageLog.put(reason.getMessage) } } @@ -114,17 +105,6 @@ class SupervisorSpec extends JUnitSuite { var pingpong3: ActorRef = _ var temporaryActor: ActorRef = _ -/* - @Test def shouldStartServer = { - clearMessageLogs - val sup = getSingleActorAllForOneSupervisor - sup.start - - expect("pong") { - (pingpong1 !! (Ping, 5000)).getOrElse("nil") - } - } -*/ @Test def shoulNotRestartProgrammaticallyLinkedTemporaryActor = { clearMessageLogs val master = actorOf[Master].start From 24a1d39b3cc94d8e7f2f207b983b21fece1b04ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 19 Mar 2011 01:37:48 +0100 Subject: [PATCH 3/3] added event handler logging + minor reformatting and cleanup --- .../src/main/scala/akka/actor/Actor.scala | 41 ++++++------ .../src/main/scala/akka/actor/ActorRef.scala | 62 +++++++++++-------- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 47b99cbebe..e022cf1401 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,7 +28,9 @@ import akka.japi. {Creator, Procedure} /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) + extends AutoReceivedMessage with LifeCycleMessage { + /** * Java API */ @@ -103,6 +105,7 @@ class ActorTimeoutException private[akka](message: String) extends AkkaEx *

  * EventHandler.error(exception, this, message.toString)
  * 
+ * * @author Jonas Bonér */ object EventHandler extends ListenerManagement { @@ -222,7 +225,6 @@ object EventHandler extends ListenerManagement { addListener(Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]).start) } catch { case e: Exception => - e.printStackTrace new ConfigurationException( "Event Handler specified in config can't be loaded [" + listenerName + "] due to [" + e.toString + "]") @@ -234,7 +236,7 @@ object EventHandler extends ListenerManagement { * This message is thrown by default when an Actors behavior doesn't match a message */ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { - override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg) + override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg) override def fillInStackTrace() = this //Don't waste cycles generating stack trace } @@ -255,7 +257,7 @@ object Actor extends ListenerManagement { val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] - subclassAudits.synchronized {subclassAudits.clear} + subclassAudits synchronized {subclassAudits.clear} } } Runtime.getRuntime.addShutdownHook(new Thread(hook)) @@ -265,11 +267,11 @@ object Actor extends ListenerManagement { val registry = new ActorRegistry lazy val remote: RemoteSupport = { - ReflectiveAccess. - Remote. - defaultRemoteSupport. - map(_()). - getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) + ReflectiveAccess + .Remote + .defaultRemoteSupport + .map(_()) + .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath")) } private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis @@ -456,9 +458,8 @@ trait Actor { "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + "\n\tEither use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + - "\n\t\t'val actor = Actor.actor { case msg => .. } }'") - optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") + optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? optRef.asInstanceOf[Some[ActorRef]] } @@ -557,7 +558,7 @@ trait Actor { * by default it throws an UnhandledMessageException */ def unhandled(msg: Any) { - throw new UnhandledMessageException(msg,self) + throw new UnhandledMessageException(msg, self) } /** @@ -578,19 +579,16 @@ trait Actor { * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ def become(behavior: Receive, discardOld: Boolean = true) { - if (discardOld) - unbecome - + if (discardOld) unbecome self.hotswap = self.hotswap.push(behavior) } /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome: Unit = { + def unbecome(): Unit = { val h = self.hotswap - if (h.nonEmpty) - self.hotswap = h.pop + if (h.nonEmpty) self.hotswap = h.pop } // ========================================= @@ -607,7 +605,7 @@ trait Actor { } private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { - case HotSwap(code,discardOld) => become(code(self),discardOld) + case HotSwap(code, discardOld) => become(code(self), discardOld) case RevertHotSwap => unbecome case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) @@ -616,8 +614,7 @@ trait Actor { case Restart(reason) => throw reason case PoisonPill => val f = self.senderFuture - if(f.isDefined) - f.get.completeWithException(new ActorKilledException("PoisonPill")) + if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) self.stop } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c01dacf53e..32c03acf05 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -34,6 +34,23 @@ private[akka] object ActorRefInternals { object SHUTDOWN extends StatusType } +/** + * Abstraction for unification of sender and senderFuture for later reply + */ +abstract class Channel[T] { + + /** + * Sends the specified message to the channel + * Scala API + */ + def !(msg: T): Unit + + /** + * Sends the specified message to the channel + * Java API + */ + def sendOneWay(msg: T): Unit = this.!(msg) +} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -966,7 +983,7 @@ class LocalActorRef private[akka] ( protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) + if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -1032,7 +1049,7 @@ class LocalActorRef private[akka] ( val someSelfField = clazz.getDeclaredField("someSelf") selfField.setAccessible(true) someSelfField.setAccessible(true) - selfField.set(actor,value) + selfField.set(actor, value) someSelfField.set(actor, if (value ne null) Some(value) else null) true } catch { @@ -1044,11 +1061,11 @@ class LocalActorRef private[akka] ( val parent = clazz.getSuperclass if (parent eq null) throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent,actor,value) + lookupAndSetSelfFields(parent, actor, value) } } - lookupAndSetSelfFields(actor.getClass,actor,value) + lookupAndSetSelfFields(actor.getClass, actor, value) } private def initializeActorInstance = { @@ -1102,7 +1119,11 @@ private[akka] case class RemoteActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = Actor.remote.send[T](message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, actorType, loader) + val future = Actor.remote.send[T]( + message, senderOption, senderFuture, + homeAddress.get, timeout, + false, this, None, + actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } @@ -1179,7 +1200,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def id: String - def id_=(id: String): Unit /** + def id_=(id: String): Unit + + /** * User overridable callback/setting. *

* Defines the life-cycle for a supervised actor. @@ -1195,11 +1218,11 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => *

* Can be one of: *

-   *  faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
+   *  faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
    * 
* Or: *
-   *  faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
+   *  faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
    * 
*/ @volatile @@ -1267,8 +1290,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => future.await } catch { case e: FutureTimeoutException => - if (isMessageJoinPoint) throw e - else None + if (isMessageJoinPoint) { + EventHandler.error(e, this, e.getMessage) + throw e + } else None } future.resultOrException } else throw new ActorInitializationException( @@ -1380,20 +1405,3 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) } } - -/** - * Abstraction for unification of sender and senderFuture for later reply - */ -abstract class Channel[T] { - /** - * Sends the specified message to the channel - * Scala API - */ - def !(msg: T): Unit - - /** - * Sends the specified message to the channel - * Java API - */ - def sendOneWay(msg: T): Unit = this.!(msg) -}