From c447f462249f58b823bd1a229d5fa45525d2c949 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 1 Feb 2012 14:40:12 +0100 Subject: [PATCH] Some polish on the error handling. See #1310 --- ...{NonFatalSpec.scala => HarmlessSpec.scala} | 26 +++++++----- .../src/main/scala/akka/actor/ActorCell.scala | 32 +++++++-------- .../main/scala/akka/actor/ActorSystem.scala | 4 +- .../main/scala/akka/actor/TypedActor.scala | 4 +- .../akka/dispatch/AbstractDispatcher.scala | 4 +- .../src/main/scala/akka/dispatch/Future.scala | 37 +++++++++-------- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../src/main/scala/akka/util/Harmless.scala | 29 +++++++++++++ .../src/main/scala/akka/util/NonFatal.scala | 41 ------------------- .../actor/mailbox/FiledBasedMailbox.scala | 6 +-- .../actor/mailbox/filequeue/Journal.scala | 3 +- .../actor/mailbox/RedisBasedMailbox.scala | 5 ++- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 3 +- .../remote/netty/NettyRemoteSupport.scala | 6 +-- .../testkit/CallingThreadDispatcher.scala | 12 +++--- 15 files changed, 101 insertions(+), 113 deletions(-) rename akka-actor-tests/src/test/scala/akka/util/{NonFatalSpec.scala => HarmlessSpec.scala} (62%) create mode 100644 akka-actor/src/main/scala/akka/util/Harmless.scala delete mode 100644 akka-actor/src/main/scala/akka/util/NonFatal.scala diff --git a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala b/akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala similarity index 62% rename from akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala rename to akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala index c7d93b4b01..5ac96c2d34 100644 --- a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/HarmlessSpec.scala @@ -7,15 +7,15 @@ import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class NonFatalSpec extends AkkaSpec with MustMatchers { +class HarmlessSpec extends AkkaSpec with MustMatchers { - "A NonFatal extractor" must { + "A Harmless extractor" must { "match ordinary RuntimeException" in { try { throw new RuntimeException("Boom") } catch { - case NonFatal(e) ⇒ // as expected + case Harmless(e) ⇒ // as expected } } @@ -29,7 +29,7 @@ class NonFatalSpec extends AkkaSpec with MustMatchers { try { blowUp(0) } catch { - case NonFatal(e) ⇒ assert(false) + case Harmless(e) ⇒ assert(false) } } } @@ -39,22 +39,26 @@ class NonFatalSpec extends AkkaSpec with MustMatchers { try { throw new InterruptedException("Simulated InterruptedException") } catch { - case NonFatal(e) ⇒ assert(false) + case Harmless(e) ⇒ assert(false) } } } - } - - "A NonFatalOrInterrupted extractor" must { - - "match InterruptedException" in { + "be used together with InterruptedException" in { try { throw new InterruptedException("Simulated InterruptedException") } catch { - case NonFatalOrInterrupted(e) ⇒ // as expected + case _: InterruptedException ⇒ // as expected + case Harmless(e) ⇒ assert(false) + } + + try { + throw new RuntimeException("Simulated RuntimeException") + } catch { + case Harmless(_) | _: InterruptedException ⇒ // as expected } } } + } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 089b187354..c445b7b35c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -14,8 +14,7 @@ import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension -import akka.util.NonFatal -import akka.util.NonFatalOrInterrupted +import akka.util.Harmless //TODO: everything here for current compatibility - could be limited more @@ -364,7 +363,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted @@ -396,7 +395,7 @@ private[akka] class ActorCell( actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) } catch { - case NonFatal(e) ⇒ try { + case Harmless(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -460,7 +459,7 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) //TODO FIXME How should problems here be handled??? throw e @@ -482,26 +481,23 @@ private[akka] class ActorCell( } currentMessage = null // reset current message after successful invocation } catch { - case NonFatalOrInterrupted(e) ⇒ + case e: InterruptedException ⇒ + // make sure that InterruptedException does not leave this thread + val ex = ActorInterruptedException(e) + actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(ex), self) + throw e //Re-throw InterruptedExceptions as expected + case Harmless(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - e match { - case ie: InterruptedException ⇒ - // make sure that InterruptedException does not leave this thread - val ex = ActorInterruptedException(ie) - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw ie //Re-throw InterruptedExceptions as expected - case _ ⇒ - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) - } + actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(e), self) } finally { checkReceiveTimeout // Reschedule receive timeout } } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) throw e } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 473649a3fb..9087781d58 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -346,8 +346,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten def uncaughtException(thread: Thread, cause: Throwable): Unit = { log.error(cause, "Uncaught error from thread [{}]", thread.getName) cause match { - case NonFatalOrInterrupted(e) ⇒ - case _ ⇒ shutdown() + case Harmless(_) | _: InterruptedException ⇒ + case _ ⇒ shutdown() } } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 300949b277..64818561af 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -6,7 +6,7 @@ package akka.actor import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } -import akka.util.{ Timeout, NonFatal } +import akka.util.{ Timeout, Harmless } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ @@ -270,7 +270,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi sender ! m(me) } } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ sender ! Status.Failure(e) throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index bc1494a6b2..d029791f8a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -15,7 +15,7 @@ import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension import akka.jsr166y.ForkJoinPool -import akka.util.NonFatal +import akka.util.Harmless final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -81,7 +81,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl try { runnable.run() } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally { cleanup() diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 218b3035ea..d2b5be434b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -17,7 +17,7 @@ import akka.util.{ Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger } import akka.dispatch.Await.CanAwait import java.util.concurrent._ -import akka.util.NonFatal +import akka.util.Harmless object Await { sealed trait CanAwait @@ -142,7 +142,7 @@ object Future { try { Right(body) } catch { - case NonFatal(e) ⇒ Left(e) + case Harmless(e) ⇒ Left(e) } } }) @@ -322,19 +322,19 @@ object Future { try { next.apply() } catch { - case NonFatal(e) ⇒ - executor match { - case m: MessageDispatcher ⇒ - m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage)) - case other ⇒ - // TODO printStackTrace - e.printStackTrace() - } + case Harmless(e) ⇒ logError("Future.dispatchTask", e) } } } finally { _taskStack.remove() } }) } + + private def logError(logSource: String, problem: Throwable)(implicit executor: ExecutionContext): Unit = { + executor match { + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage)) + case other ⇒ problem.printStackTrace() + } + } } sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { @@ -489,7 +489,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { future complete (try { Right(f(res)) } catch { - case NonFatal(e) ⇒ Left(e) + case Harmless(e) ⇒ Left(e) }) } future @@ -536,7 +536,8 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { try { p completeWith f(r) } catch { - case NonFatal(e) ⇒ p complete Left(e) + case Harmless(e) ⇒ p complete Left(e) + case t ⇒ p complete Left(new ExecutionException(t)); throw t } } p @@ -574,15 +575,15 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case r @ Right(res) ⇒ p complete (try { if (pred(res)) r else Left(new MatchError(res)) } catch { - case NonFatal(e) ⇒ Left(e) + case Harmless(e) ⇒ Left(e) }) } p } - protected def logError(msg: String, problem: Throwable): Unit = { + protected def logError(logSource: String, problem: Throwable): Unit = { executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, this.getClass, problem.getMessage)) + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage)) case other ⇒ problem.printStackTrace() } } @@ -660,7 +661,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(thisPromise) } catch { - case NonFatal(e) ⇒ fr failure e + case Harmless(e) ⇒ fr failure e } } fr @@ -673,7 +674,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(f) } catch { - case NonFatal(e) ⇒ fr failure e + case Harmless(e) ⇒ fr failure e } } fr @@ -781,7 +782,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case NonFatal(e) ⇒ logError("Future onComplete-callback raised an exception", e) } + try { func(result) } catch { case Harmless(e) ⇒ logError("Future.onComplete", e) } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 0f42ffa4e4..7489b59a98 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -197,7 +197,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue if (nextMessage eq null) nextMessage = systemDrain() } } catch { - case NonFatal(e) ⇒ + case Harmless(e) ⇒ actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } diff --git a/akka-actor/src/main/scala/akka/util/Harmless.scala b/akka-actor/src/main/scala/akka/util/Harmless.scala new file mode 100644 index 0000000000..6a15538f2d --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/Harmless.scala @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +/** + * Extractor of harmless Throwables. Will not match fatal errors + * like VirtualMachineError (OutOfMemoryError, StackOverflowError) + * ThreadDeath, and InterruptedException. + * + * Usage to catch all harmless throwables: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case Harmless(e) => log.error(e, "Something not that bad") + * } + * }}} + */ +object Harmless { + + def unapply(t: Throwable): Option[Throwable] = t match { + // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException ⇒ None + case e ⇒ Some(e) + } + +} + diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala deleted file mode 100644 index b4bbbaed27..0000000000 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.util - -/** - * Extractor of non-fatal Throwables. Will not match - * VirtualMachineError (OutOfMemoryError, StackOverflowError) - * and InterruptedException. - * - * Usage to catch all non-fatal throwables: - * {{{ - * try { - * // dangerous stuff - * } catch { - * case NonFatal(e) => log.error(e, "Something not that bad") - * } - * }}} - */ -object NonFatal { - - def unapply(t: Throwable): Option[Throwable] = t match { - // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors - case e: VirtualMachineError ⇒ None - case e: ThreadDeath ⇒ None - case e: InterruptedException ⇒ None - case e ⇒ Some(e) - } - -} - -/** - * Match InterruptedException and the same as - * [[akka.util.NonFatal]]. - */ -object NonFatalOrInterrupted { - def unapply(t: Throwable): Option[Throwable] = t match { - case e: InterruptedException ⇒ Some(e) - case e ⇒ NonFatal.unapply(t) - } -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 0f18f1ec58..320e63233a 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -11,6 +11,7 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config +import akka.util.Harmless class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: ActorContext) = new FileBasedMailbox(owner) @@ -24,7 +25,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi val queuePath = settings.QueuePath private val queue = try { - try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case e ⇒ {} } + try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case Harmless(_) ⇒ {} } val queue = new filequeue.PersistentQueue(queuePath, name, settings, log) queue.setup // replays journal queue.discardExpired @@ -68,8 +69,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi queue.remove true } catch { - // TODO catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ false + case Harmless(_) ⇒ false } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala index 6b8fa8c568..64aba1b4af 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/Journal.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio.{ ByteBuffer, ByteOrder } import java.nio.channels.FileChannel import akka.event.LoggingAdapter +import akka.util.Harmless // returned from journal replay sealed trait JournalItem @@ -99,7 +100,7 @@ class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) close() queueFile.delete } catch { - case _ ⇒ + case Harmless(_) ⇒ } } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 3c33663b73..38493d2ceb 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -11,6 +11,7 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config +import akka.util.Harmless class RedisBasedMailboxException(message: String) extends AkkaException(message) @@ -47,7 +48,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w envelope } catch { case e: java.util.NoSuchElementException ⇒ null - case e ⇒ + case Harmless(e) ⇒ log.error(e, "Couldn't dequeue from Redis-based mailbox") throw e } @@ -73,7 +74,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w clients = connect() body } - case e ⇒ + case Harmless(e) ⇒ val error = new RedisBasedMailboxException("Could not connect to Redis server, due to: " + e.getMessage) log.error(error, error.getMessage) throw error diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 1420a8a543..4b197689bb 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -12,6 +12,7 @@ import akka.cluster.zookeeper.ZooKeeperQueue import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config +import akka.util.Harmless class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) @@ -45,7 +46,7 @@ class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owne } catch { case e: java.util.NoSuchElementException ⇒ null case e: InterruptedException ⇒ null - case e ⇒ + case Harmless(e) ⇒ log.error(e, "Couldn't dequeue from ZooKeeper-based mailbox, due to: " + e.getMessage) throw e } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e9fe83dd7e..f9131338bf 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -9,21 +9,19 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.Executors - import scala.collection.mutable.HashMap - import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer - import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef } +import akka.util.Harmless /** * Provides the implementation of the Netty remote support @@ -80,7 +78,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor try { remoteClients foreach { case (_, client) ⇒ try client.shutdown() catch { - case e ⇒ log.error(e, "failure while shutting down [{}]", client) + case Harmless(e) ⇒ log.error(e, "failure while shutting down [{}]", client) } } remoteClients.clear() diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index af2c44611a..70ec56f60b 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -6,15 +6,13 @@ package akka.testkit import java.lang.ref.WeakReference import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList - import scala.annotation.tailrec - import com.typesafe.config.Config - import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } +import akka.util.Harmless /* * Locking rules: @@ -225,7 +223,7 @@ class CallingThreadDispatcher( Thread.currentThread().interrupt() intex = ie true - case e ⇒ + case Harmless(e) ⇒ log.error(e, "Error during message processing") queue.leave false @@ -235,7 +233,7 @@ class CallingThreadDispatcher( false } else false } catch { - case e ⇒ queue.leave; throw e + case Harmless(e) ⇒ queue.leave; throw e } finally { mbox.ctdLock.unlock } @@ -294,8 +292,8 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with override def cleanUp(): Unit = { /* - * This is called from dispatcher.unregister, i.e. under this.lock. If - * another thread obtained a reference to this mailbox and enqueues after + * This is called from dispatcher.unregister, i.e. under this.lock. If + * another thread obtained a reference to this mailbox and enqueues after * the gather operation, tough luck: no guaranteed delivery to deadLetters. */ suspendSwitch.locked {