Fixed things from review. See #1310

This commit is contained in:
Patrik Nordwall 2012-02-01 17:38:12 +01:00
parent c447f46224
commit 5033647176
14 changed files with 58 additions and 57 deletions

View file

@ -7,15 +7,15 @@ import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class HarmlessSpec extends AkkaSpec with MustMatchers {
class NonFatalSpec extends AkkaSpec with MustMatchers {
"A Harmless extractor" must {
"A NonFatal extractor" must {
"match ordinary RuntimeException" in {
try {
throw new RuntimeException("Boom")
} catch {
case Harmless(e) // as expected
case NonFatal(e) // as expected
}
}
@ -29,7 +29,7 @@ class HarmlessSpec extends AkkaSpec with MustMatchers {
try {
blowUp(0)
} catch {
case Harmless(e) assert(false)
case NonFatal(e) assert(false)
}
}
}
@ -39,7 +39,7 @@ class HarmlessSpec extends AkkaSpec with MustMatchers {
try {
throw new InterruptedException("Simulated InterruptedException")
} catch {
case Harmless(e) assert(false)
case NonFatal(e) assert(false)
}
}
}
@ -49,13 +49,13 @@ class HarmlessSpec extends AkkaSpec with MustMatchers {
throw new InterruptedException("Simulated InterruptedException")
} catch {
case _: InterruptedException // as expected
case Harmless(e) assert(false)
case NonFatal(e) assert(false)
}
try {
throw new RuntimeException("Simulated RuntimeException")
} catch {
case Harmless(_) | _: InterruptedException // as expected
case NonFatal(_) | _: InterruptedException // as expected
}
}

View file

@ -14,7 +14,7 @@ import akka.util.{ Duration, Helpers }
import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.util.Harmless
import akka.util.NonFatal
//TODO: everything here for current compatibility - could be limited more
@ -363,7 +363,7 @@ private[akka] class ActorCell(
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case Harmless(e)
case NonFatal(e)
try {
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
@ -395,7 +395,7 @@ private[akka] class ActorCell(
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
} catch {
case Harmless(e) try {
case NonFatal(e) try {
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@ -459,7 +459,7 @@ private[akka] class ActorCell(
case ChildTerminated(child) handleChildTerminated(child)
}
} catch {
case Harmless(e)
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message))
//TODO FIXME How should problems here be handled???
throw e
@ -475,19 +475,23 @@ private[akka] class ActorCell(
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
// actor can be null when creation fails fatal error
case msg if actor == null
case msg actor(msg)
// FIXME: actor can be null when creation fails with fatal error, why?
case msg if actor == null
system.eventStream.publish(Warning(self.path.toString, this.getClass, "Ignoring message due to null actor [%s]" format msg))
case msg actor(msg)
}
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
val ex = ActorInterruptedException(e)
actor.supervisorStrategy.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
case Harmless(e)
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@ -497,7 +501,7 @@ private[akka] class ActorCell(
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case Harmless(e)
case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
throw e
}

View file

@ -346,7 +346,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
log.error(cause, "Uncaught error from thread [{}]", thread.getName)
cause match {
case Harmless(_) | _: InterruptedException
case NonFatal(_) | _: InterruptedException
case _ shutdown()
}
}

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.japi.{ Creator, Option JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Timeout, Harmless }
import akka.util.{ Timeout, NonFatal }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.dispatch._
@ -270,7 +270,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
sender ! m(me)
}
} catch {
case Harmless(e)
case NonFatal(e)
sender ! Status.Failure(e)
throw e
}

View file

@ -15,7 +15,7 @@ import com.typesafe.config.Config
import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
import akka.jsr166y.ForkJoinPool
import akka.util.Harmless
import akka.util.NonFatal
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
@ -81,7 +81,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
try {
runnable.run()
} catch {
case Harmless(e)
case NonFatal(e)
eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally {
cleanup()

View file

@ -17,7 +17,7 @@ import akka.util.{ Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger }
import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.util.Harmless
import akka.util.NonFatal
object Await {
sealed trait CanAwait
@ -142,7 +142,7 @@ object Future {
try {
Right(body)
} catch {
case Harmless(e) Left(e)
case NonFatal(e) Left(e)
}
}
})
@ -322,16 +322,19 @@ object Future {
try {
next.apply()
} catch {
case Harmless(e) logError("Future.dispatchTask", e)
case NonFatal(e) logError(e)
}
}
} finally { _taskStack.remove() }
})
}
private def logError(logSource: String, problem: Throwable)(implicit executor: ExecutionContext): Unit = {
/**
* Internal API, do not call
*/
private[akka] def logError(problem: Throwable)(implicit executor: ExecutionContext): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage))
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, "Future", this.getClass, problem.getMessage))
case other problem.printStackTrace()
}
}
@ -489,7 +492,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
future complete (try {
Right(f(res))
} catch {
case Harmless(e) Left(e)
case NonFatal(e) Left(e)
})
}
future
@ -536,7 +539,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
try {
p completeWith f(r)
} catch {
case Harmless(e) p complete Left(e)
case NonFatal(e) p complete Left(e)
case t p complete Left(new ExecutionException(t)); throw t
}
}
@ -575,18 +578,12 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case r @ Right(res) p complete (try {
if (pred(res)) r else Left(new MatchError(res))
} catch {
case Harmless(e) Left(e)
case NonFatal(e) Left(e)
})
}
p
}
protected def logError(logSource: String, problem: Throwable): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, logSource, this.getClass, problem.getMessage))
case other problem.printStackTrace()
}
}
}
object Promise {
@ -661,7 +658,7 @@ trait Promise[T] extends Future[T] {
try {
fr completeWith cont(thisPromise)
} catch {
case Harmless(e) fr failure e
case NonFatal(e) fr failure e
}
}
fr
@ -674,7 +671,7 @@ trait Promise[T] extends Future[T] {
try {
fr completeWith cont(f)
} catch {
case Harmless(e) fr failure e
case NonFatal(e) fr failure e
}
}
fr
@ -782,7 +779,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
}
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) {
try { func(result) } catch { case Harmless(e) logError("Future.onComplete", e) }
try { func(result) } catch { case NonFatal(e) Future.logError(e) }
}
}

View file

@ -197,7 +197,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
if (nextMessage eq null) nextMessage = systemDrain()
}
} catch {
case Harmless(e)
case NonFatal(e)
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e
}

View file

@ -4,7 +4,7 @@
package akka.util
/**
* Extractor of harmless Throwables. Will not match fatal errors
* Extractor of non-fatal Throwables. Will not match fatal errors
* like VirtualMachineError (OutOfMemoryError, StackOverflowError)
* ThreadDeath, and InterruptedException.
*
@ -13,11 +13,11 @@ package akka.util
* try {
* // dangerous stuff
* } catch {
* case Harmless(e) => log.error(e, "Something not that bad")
* case NonFatal(e) => log.error(e, "Something not that bad")
* }
* }}}
*/
object Harmless {
object NonFatal {
def unapply(t: Throwable): Option[Throwable] = t match {
// VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors

View file

@ -11,7 +11,7 @@ import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.Harmless
import akka.util.NonFatal
class FileBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new FileBasedMailbox(owner)
@ -25,7 +25,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi
val queuePath = settings.QueuePath
private val queue = try {
try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case Harmless(_) {} }
try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case NonFatal(_) {} }
val queue = new filequeue.PersistentQueue(queuePath, name, settings, log)
queue.setup // replays journal
queue.discardExpired
@ -69,7 +69,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi
queue.remove
true
} catch {
case Harmless(_) false
case NonFatal(_) false
}
}

View file

@ -21,7 +21,7 @@ import java.io._
import java.nio.{ ByteBuffer, ByteOrder }
import java.nio.channels.FileChannel
import akka.event.LoggingAdapter
import akka.util.Harmless
import akka.util.NonFatal
// returned from journal replay
sealed trait JournalItem
@ -100,7 +100,7 @@ class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter)
close()
queueFile.delete
} catch {
case Harmless(_)
case NonFatal(_)
}
}

View file

@ -11,7 +11,7 @@ import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.Harmless
import akka.util.NonFatal
class RedisBasedMailboxException(message: String) extends AkkaException(message)
@ -48,7 +48,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w
envelope
} catch {
case e: java.util.NoSuchElementException null
case Harmless(e)
case NonFatal(e)
log.error(e, "Couldn't dequeue from Redis-based mailbox")
throw e
}
@ -74,7 +74,7 @@ class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) w
clients = connect()
body
}
case Harmless(e)
case NonFatal(e)
val error = new RedisBasedMailboxException("Could not connect to Redis server, due to: " + e.getMessage)
log.error(error, error.getMessage)
throw error

View file

@ -12,7 +12,7 @@ import akka.cluster.zookeeper.ZooKeeperQueue
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.Harmless
import akka.util.NonFatal
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
@ -46,7 +46,7 @@ class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owne
} catch {
case e: java.util.NoSuchElementException null
case e: InterruptedException null
case Harmless(e)
case NonFatal(e)
log.error(e, "Couldn't dequeue from ZooKeeper-based mailbox, due to: " + e.getMessage)
throw e
}

View file

@ -21,7 +21,7 @@ import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef }
import akka.util.Harmless
import akka.util.NonFatal
/**
* Provides the implementation of the Netty remote support
@ -78,7 +78,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
try {
remoteClients foreach {
case (_, client) try client.shutdown() catch {
case Harmless(e) log.error(e, "failure while shutting down [{}]", client)
case NonFatal(e) log.error(e, "failure while shutting down [{}]", client)
}
}
remoteClients.clear()

View file

@ -12,7 +12,7 @@ import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSy
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
import akka.util.Harmless
import akka.util.NonFatal
/*
* Locking rules:
@ -223,7 +223,7 @@ class CallingThreadDispatcher(
Thread.currentThread().interrupt()
intex = ie
true
case Harmless(e)
case NonFatal(e)
log.error(e, "Error during message processing")
queue.leave
false
@ -233,7 +233,7 @@ class CallingThreadDispatcher(
false
} else false
} catch {
case Harmless(e) queue.leave; throw e
case NonFatal(e) queue.leave; throw e
} finally {
mbox.ctdLock.unlock
}