From 5033647176af367a1e1f17474b142933a86317cf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 1 Feb 2012 17:38:12 +0100 Subject: [PATCH] Fixed things from review. See #1310 --- ...{HarmlessSpec.scala => NonFatalSpec.scala} | 14 ++++----- .../src/main/scala/akka/actor/ActorCell.scala | 22 +++++++------ .../main/scala/akka/actor/ActorSystem.scala | 2 +- .../main/scala/akka/actor/TypedActor.scala | 4 +-- .../akka/dispatch/AbstractDispatcher.scala | 4 +-- .../src/main/scala/akka/dispatch/Future.scala | 31 +++++++++---------- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../util/{Harmless.scala => NonFatal.scala} | 6 ++-- .../actor/mailbox/FiledBasedMailbox.scala | 6 ++-- .../actor/mailbox/filequeue/Journal.scala | 4 +-- .../actor/mailbox/RedisBasedMailbox.scala | 6 ++-- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 4 +-- .../remote/netty/NettyRemoteSupport.scala | 4 +-- .../testkit/CallingThreadDispatcher.scala | 6 ++-- 14 files changed, 58 insertions(+), 57 deletions(-) rename akka-actor-tests/src/test/scala/akka/util/{HarmlessSpec.scala => NonFatalSpec.scala} (77%) rename akka-actor/src/main/scala/akka/util/{Harmless.scala => NonFatal.scala} (80%) diff --git a/akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala similarity index 77% rename from akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala rename to akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala index 5ac96c2d34..f7d9789ec3 100644 --- a/akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala @@ -7,15 +7,15 @@ import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class HarmlessSpec extends AkkaSpec with MustMatchers { +class NonFatalSpec extends AkkaSpec with MustMatchers { - "A Harmless extractor" must { + "A NonFatal extractor" must { "match ordinary RuntimeException" in { try { throw new RuntimeException("Boom") } catch { - case Harmless(e) ⇒ // as expected + case NonFatal(e) ⇒ // as expected } } @@ -29,7 +29,7 @@ class HarmlessSpec extends AkkaSpec with MustMatchers { try { blowUp(0) } catch { - case Harmless(e) ⇒ assert(false) + case NonFatal(e) ⇒ assert(false) } } } @@ -39,7 +39,7 @@ class HarmlessSpec extends AkkaSpec with MustMatchers { try { throw new InterruptedException("Simulated InterruptedException") } catch { - case Harmless(e) ⇒ assert(false) + case NonFatal(e) ⇒ assert(false) } } } @@ -49,13 +49,13 @@ class HarmlessSpec extends AkkaSpec with MustMatchers { throw new InterruptedException("Simulated InterruptedException") } catch { case _: InterruptedException ⇒ // as expected - case Harmless(e) ⇒ assert(false) + case NonFatal(e) ⇒ assert(false) } try { throw new RuntimeException("Simulated RuntimeException") } catch { - case Harmless(_) | _: InterruptedException ⇒ // as expected + case NonFatal(_) | _: InterruptedException ⇒ // as expected } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c445b7b35c..30f8b8a4d3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -14,7 +14,7 @@ import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension -import akka.util.Harmless +import akka.util.NonFatal //TODO: everything here for current compatibility - could be limited more @@ -363,7 +363,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted @@ -395,7 +395,7 @@ private[akka] class ActorCell( actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) } catch { - case Harmless(e) ⇒ try { + case NonFatal(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -459,7 +459,7 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) //TODO FIXME How should problems here be handled??? throw e @@ -475,19 +475,23 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - // actor can be null when creation fails fatal error - case msg if actor == null ⇒ - case msg ⇒ actor(msg) + // FIXME: actor can be null when creation fails with fatal error, why? + case msg if actor == null ⇒ + system.eventStream.publish(Warning(self.path.toString, this.getClass, "Ignoring message due to null actor [%s]" format msg)) + case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { case e: InterruptedException ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) // make sure that InterruptedException does not leave this thread val ex = ActorInterruptedException(e) actor.supervisorStrategy.handleSupervisorFailing(self, children) parent.tell(Failed(ex), self) throw e //Re-throw InterruptedExceptions as expected - case Harmless(e) ⇒ + case NonFatal(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -497,7 +501,7 @@ private[akka] class ActorCell( checkReceiveTimeout // Reschedule receive timeout } } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) throw e } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 9087781d58..ee8d603b2c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -346,7 +346,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten def uncaughtException(thread: Thread, cause: Throwable): Unit = { log.error(cause, "Uncaught error from thread [{}]", thread.getName) cause match { - case Harmless(_) | _: InterruptedException ⇒ + case NonFatal(_) | _: InterruptedException ⇒ case _ ⇒ shutdown() } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 64818561af..300949b277 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -6,7 +6,7 @@ package akka.actor import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } -import akka.util.{ Timeout, Harmless } +import akka.util.{ Timeout, NonFatal } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ @@ -270,7 +270,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi sender ! m(me) } } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ sender ! Status.Failure(e) throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index d029791f8a..bc1494a6b2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -15,7 +15,7 @@ import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension import akka.jsr166y.ForkJoinPool -import akka.util.Harmless +import akka.util.NonFatal final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -81,7 +81,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl try { runnable.run() } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally { cleanup() diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d2b5be434b..15a9251047 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -17,7 +17,7 @@ import akka.util.{ Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger } import akka.dispatch.Await.CanAwait import java.util.concurrent._ -import akka.util.Harmless +import akka.util.NonFatal object Await { sealed trait CanAwait @@ -142,7 +142,7 @@ object Future { try { Right(body) } catch { - case Harmless(e) ⇒ Left(e) + case NonFatal(e) ⇒ Left(e) } } }) @@ -322,16 +322,19 @@ object Future { try { next.apply() } catch { - case Harmless(e) ⇒ logError("Future.dispatchTask", e) + case NonFatal(e) ⇒ logError(e) } } } finally { _taskStack.remove() } }) } - private def logError(logSource: String, problem: Throwable)(implicit executor: ExecutionContext): Unit = { + /** + * Internal API, do not call + */ + private[akka] def logError(problem: Throwable)(implicit executor: ExecutionContext): Unit = { executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage)) + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, "Future", this.getClass, problem.getMessage)) case other ⇒ problem.printStackTrace() } } @@ -489,7 +492,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { future complete (try { Right(f(res)) } catch { - case Harmless(e) ⇒ Left(e) + case NonFatal(e) ⇒ Left(e) }) } future @@ -536,7 +539,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { try { p completeWith f(r) } catch { - case Harmless(e) ⇒ p complete Left(e) + case NonFatal(e) ⇒ p complete Left(e) case t ⇒ p complete Left(new ExecutionException(t)); throw t } } @@ -575,18 +578,12 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case r @ Right(res) ⇒ p complete (try { if (pred(res)) r else Left(new MatchError(res)) } catch { - case Harmless(e) ⇒ Left(e) + case NonFatal(e) ⇒ Left(e) }) } p } - protected def logError(logSource: String, problem: Throwable): Unit = { - executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage)) - case other ⇒ problem.printStackTrace() - } - } } object Promise { @@ -661,7 +658,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(thisPromise) } catch { - case Harmless(e) ⇒ fr failure e + case NonFatal(e) ⇒ fr failure e } } fr @@ -674,7 +671,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(f) } catch { - case Harmless(e) ⇒ fr failure e + case NonFatal(e) ⇒ fr failure e } } fr @@ -782,7 +779,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case Harmless(e) ⇒ logError("Future.onComplete", e) } + try { func(result) } catch { case NonFatal(e) ⇒ Future.logError(e) } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 7489b59a98..0f42ffa4e4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -197,7 +197,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue if (nextMessage eq null) nextMessage = systemDrain() } } catch { - case Harmless(e) ⇒ + case NonFatal(e) ⇒ actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } diff --git a/akka-actor/src/main/scala/akka/util/Harmless.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala similarity index 80% rename from akka-actor/src/main/scala/akka/util/Harmless.scala rename to akka-actor/src/main/scala/akka/util/NonFatal.scala index 6a15538f2d..36bb1960e9 100644 --- a/akka-actor/src/main/scala/akka/util/Harmless.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -4,7 +4,7 @@ package akka.util /** - * Extractor of harmless Throwables. Will not match fatal errors + * Extractor of non-fatal Throwables. Will not match fatal errors * like VirtualMachineError (OutOfMemoryError, StackOverflowError) * ThreadDeath, and InterruptedException. * @@ -13,11 +13,11 @@ package akka.util * try { * // dangerous stuff * } catch { - * case Harmless(e) => log.error(e, "Something not that bad") + * case NonFatal(e) => log.error(e, "Something not that bad") * } * }}} */ -object Harmless { +object NonFatal { def unapply(t: Throwable): Option[Throwable] = t match { // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 320e63233a..8da17a4cc9 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -11,7 +11,7 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config -import akka.util.Harmless +import akka.util.NonFatal class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: ActorContext) = new FileBasedMailbox(owner) @@ -25,7 +25,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi val queuePath = settings.QueuePath private val queue = try { - try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case Harmless(_) ⇒ {} } + try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case NonFatal(_) ⇒ {} } val queue = new filequeue.PersistentQueue(queuePath, name, settings, log) queue.setup // replays journal queue.discardExpired @@ -69,7 +69,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi queue.remove true } catch { - case Harmless(_) ⇒ false + case NonFatal(_) ⇒ false } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala index 64aba1b4af..65910fb158 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.{ ByteBuffer, ByteOrder } import java.nio.channels.FileChannel import akka.event.LoggingAdapter -import akka.util.Harmless +import akka.util.NonFatal // returned from journal replay sealed trait JournalItem @@ -100,7 +100,7 @@ class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) close() queueFile.delete } catch { - case Harmless(_) ⇒ + case NonFatal(_) ⇒ } } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 38493d2ceb..5d4e09e65e 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -11,7 +11,7 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config -import akka.util.Harmless +import akka.util.NonFatal class RedisBasedMailboxException(message: String) extends AkkaException(message) @@ -48,7 +48,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w envelope } catch { case e: java.util.NoSuchElementException ⇒ null - case Harmless(e) ⇒ + case NonFatal(e) ⇒ log.error(e, "Couldn't dequeue from Redis-based mailbox") throw e } @@ -74,7 +74,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w clients = connect() body } - case Harmless(e) ⇒ + case NonFatal(e) ⇒ val error = new RedisBasedMailboxException("Could not connect to Redis server, due to: " + e.getMessage) log.error(error, error.getMessage) throw error diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 4b197689bb..81e4c378eb 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -12,7 +12,7 @@ import akka.cluster.zookeeper.ZooKeeperQueue import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config -import akka.util.Harmless +import akka.util.NonFatal class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) @@ -46,7 +46,7 @@ class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owne } catch { case e: java.util.NoSuchElementException ⇒ null case e: InterruptedException ⇒ null - case Harmless(e) ⇒ + case NonFatal(e) ⇒ log.error(e, "Couldn't dequeue from ZooKeeper-based mailbox, due to: " + e.getMessage) throw e } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f9131338bf..b2fb06c180 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -21,7 +21,7 @@ import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef } -import akka.util.Harmless +import akka.util.NonFatal /** * Provides the implementation of the Netty remote support @@ -78,7 +78,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor try { remoteClients foreach { case (_, client) ⇒ try client.shutdown() catch { - case Harmless(e) ⇒ log.error(e, "failure while shutting down [{}]", client) + case NonFatal(e) ⇒ log.error(e, "failure while shutting down [{}]", client) } } remoteClients.clear() diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 70ec56f60b..8282ee58f5 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -12,7 +12,7 @@ import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSy import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } -import akka.util.Harmless +import akka.util.NonFatal /* * Locking rules: @@ -223,7 +223,7 @@ class CallingThreadDispatcher( Thread.currentThread().interrupt() intex = ie true - case Harmless(e) ⇒ + case NonFatal(e) ⇒ log.error(e, "Error during message processing") queue.leave false @@ -233,7 +233,7 @@ class CallingThreadDispatcher( false } else false } catch { - case Harmless(e) ⇒ queue.leave; throw e + case NonFatal(e) ⇒ queue.leave; throw e } finally { mbox.ctdLock.unlock }