diff --git a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala new file mode 100644 index 0000000000..c7d93b4b01 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +import org.scalatest.matchers.MustMatchers +import akka.testkit.AkkaSpec + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class NonFatalSpec extends AkkaSpec with MustMatchers { + + "A NonFatal extractor" must { + + "match ordinary RuntimeException" in { + try { + throw new RuntimeException("Boom") + } catch { + case NonFatal(e) ⇒ // as expected + } + } + + "not match StackOverflowError" in { + //not @tailrec + def blowUp(n: Long): Long = { + blowUp(n + 1) + 1 + } + + intercept[StackOverflowError] { + try { + blowUp(0) + } catch { + case NonFatal(e) ⇒ assert(false) + } + } + } + + "not match InterruptedException" in { + intercept[InterruptedException] { + try { + throw new InterruptedException("Simulated InterruptedException") + } catch { + case NonFatal(e) ⇒ assert(false) + } + } + } + + } + + "A NonFatalOrInterrupted extractor" must { + + "match InterruptedException" in { + try { + throw new InterruptedException("Simulated InterruptedException") + } catch { + case NonFatalOrInterrupted(e) ⇒ // as expected + } + } + + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index acce1f5ee3..089b187354 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -14,6 +14,8 @@ import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension +import akka.util.NonFatal +import akka.util.NonFatalOrInterrupted //TODO: everything here for current compatibility - could be limited more @@ -362,8 +364,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - // TODO catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ + case NonFatal(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted @@ -395,8 +396,7 @@ private[akka] class ActorCell( actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) } catch { - // TODO catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ try { + case NonFatal(e) ⇒ try { system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -460,7 +460,7 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case e ⇒ //Should we really catch everything here? + case NonFatal(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) //TODO FIXME How should problems here be handled??? throw e @@ -476,31 +476,32 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + // actor can be null when creation fails fatal error + case msg if actor == null ⇒ case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { - case e ⇒ + case NonFatalOrInterrupted(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) - // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - - // make sure that InterruptedException does not leave this thread - if (e.isInstanceOf[InterruptedException]) { - val ex = ActorInterruptedException(e) - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw e //Re-throw InterruptedExceptions as expected - } else { - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) + e match { + case ie: InterruptedException ⇒ + // make sure that InterruptedException does not leave this thread + val ex = ActorInterruptedException(ie) + actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(ex), self) + throw ie //Re-throw InterruptedExceptions as expected + case _ ⇒ + actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(e), self) } } finally { checkReceiveTimeout // Reschedule receive timeout } } catch { - case e ⇒ + case NonFatal(e) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) throw e } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index e3235a5cec..473649a3fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -341,8 +341,19 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten final val settings: Settings = new Settings(applicationConfig, name) + protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = + new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable): Unit = { + log.error(cause, "Uncaught error from thread [{}]", thread.getName) + cause match { + case NonFatalOrInterrupted(e) ⇒ + case _ ⇒ shutdown() + } + } + } + final val threadFactory: MonitorableThreadFactory = - MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader)) + MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader), uncaughtExceptionHandler) def logConfiguration(): Unit = log.info(settings.toString) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 69c8e44fc3..300949b277 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -6,7 +6,7 @@ package akka.actor import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } -import akka.util.{ Timeout } +import akka.util.{ Timeout, NonFatal } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ @@ -270,7 +270,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi sender ! m(me) } } catch { - case t: Throwable ⇒ sender ! Status.Failure(t); throw t + case NonFatal(e) ⇒ + sender ! Status.Failure(e) + throw e } } } finally { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 943eeb2b33..bc1494a6b2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -15,6 +15,7 @@ import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension import akka.jsr166y.ForkJoinPool +import akka.util.NonFatal final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -80,8 +81,8 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl try { runnable.run() } catch { - // TODO catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) + case NonFatal(e) ⇒ + eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally { cleanup() } @@ -166,9 +167,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext try { executeTask(invocation) } catch { - case e ⇒ + case t ⇒ inhabitantsUpdater.decrementAndGet(this) - throw e + throw t } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 495faba5d6..218b3035ea 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,19 +7,17 @@ package akka.dispatch import akka.event.Logging.Error import scala.Option import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption } - import scala.util.continuations._ - import java.util.concurrent.TimeUnit.NANOSECONDS import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } - import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger } import akka.dispatch.Await.CanAwait import java.util.concurrent._ +import akka.util.NonFatal object Await { sealed trait CanAwait @@ -144,8 +142,7 @@ object Future { try { Right(body) } catch { - // TODO catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ Left(e) + case NonFatal(e) ⇒ Left(e) } } }) @@ -325,12 +322,12 @@ object Future { try { next.apply() } catch { - case e ⇒ - // TODO catching all and continue isn't good for OOME, ticket #1418 + case NonFatal(e) ⇒ executor match { case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage)) case other ⇒ + // TODO printStackTrace e.printStackTrace() } } @@ -492,9 +489,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { future complete (try { Right(f(res)) } catch { - case e ⇒ - logError("Future.map", e) - Left(e) + case NonFatal(e) ⇒ Left(e) }) } future @@ -541,9 +536,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { try { p completeWith f(r) } catch { - case e ⇒ - p complete Left(e) - logError("Future.flatMap", e) + case NonFatal(e) ⇒ p complete Left(e) } } p @@ -581,9 +574,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case r @ Right(res) ⇒ p complete (try { if (pred(res)) r else Left(new MatchError(res)) } catch { - case e ⇒ - logError("Future.filter", e) - Left(e) + case NonFatal(e) ⇒ Left(e) }) } p @@ -669,9 +660,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(thisPromise) } catch { - case e ⇒ - logError("Promise.completeWith", e) - fr failure e + case NonFatal(e) ⇒ fr failure e } } fr @@ -684,9 +673,7 @@ trait Promise[T] extends Future[T] { try { fr completeWith cont(f) } catch { - case e ⇒ - logError("Promise.completeWith", e) - fr failure e + case NonFatal(e) ⇒ fr failure e } } fr @@ -794,7 +781,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case e ⇒ logError("Future onComplete-callback raised an exception", e) } + try { func(result) } catch { case NonFatal(e) ⇒ logError("Future onComplete-callback raised an exception", e) } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d2bc7ff01d..0f42ffa4e4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -197,7 +197,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue if (nextMessage eq null) nextMessage = systemDrain() } } catch { - case e ⇒ + case NonFatal(e) ⇒ actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index cf09a8ecdd..7eb90b8ef0 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -108,7 +108,6 @@ class BoundedBlockingQueue[E <: AnyRef]( throw ie } false - // TODO catching all and continue isn't good for OOME, ticket #1418 case e ⇒ notFull.signal() result = e diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala new file mode 100644 index 0000000000..b4bbbaed27 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +/** + * Extractor of non-fatal Throwables. Will not match + * VirtualMachineError (OutOfMemoryError, StackOverflowError) + * and InterruptedException. + * + * Usage to catch all non-fatal throwables: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case NonFatal(e) => log.error(e, "Something not that bad") + * } + * }}} + */ +object NonFatal { + + def unapply(t: Throwable): Option[Throwable] = t match { + // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors + case e: VirtualMachineError ⇒ None + case e: ThreadDeath ⇒ None + case e: InterruptedException ⇒ None + case e ⇒ Some(e) + } + +} + +/** + * Match InterruptedException and the same as + * [[akka.util.NonFatal]]. + */ +object NonFatalOrInterrupted { + def unapply(t: Throwable): Option[Throwable] = t match { + case e: InterruptedException ⇒ Some(e) + case e ⇒ NonFatal.unapply(t) + } +} \ No newline at end of file