diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6f48fd235e..7b216969ba 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -214,19 +214,19 @@ private[akka] trait Cell { */ def start(): this.type /** - * Recursively suspend this actor and all its children. Must not throw exceptions. + * Recursively suspend this actor and all its children. Is only allowed to throw Fatal Throwables. */ def suspend(): Unit /** - * Recursively resume this actor and all its children. Must not throw exceptions. + * Recursively resume this actor and all its children. Is only allowed to throw Fatal Throwables. */ def resume(causedByFailure: Throwable): Unit /** - * Restart this actor (will recursively restart or stop all children). Must not throw exceptions. + * Restart this actor (will recursively restart or stop all children). Is only allowed to throw Fatal Throwables. */ def restart(cause: Throwable): Unit /** - * Recursively terminate this actor and all its children. Must not throw exceptions. + * Recursively terminate this actor and all its children. Is only allowed to throw Fatal Throwables. */ def stop(): Unit /** @@ -246,16 +246,26 @@ private[akka] trait Cell { * Get the stats for the named child, if that exists. */ def getChildByName(name: String): Option[ChildStats] + /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. - * Must not throw exceptions. + * Is only allowed to throw Fatal Throwables. */ - def tell(message: Any, sender: ActorRef): Unit + def sendMessage(msg: Envelope): Unit + /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. - * Must not throw exceptions. + * Is only allowed to throw Fatal Throwables. + */ + final def sendMessage(message: Any, sender: ActorRef): Unit = + sendMessage(Envelope(message, sender, system)) + + /** + * Enqueue a message to be sent to the actor; may or may not actually + * schedule the actor to run, depending on which type of cell it is. + * Is only allowed to throw Fatal Throwables. */ def sendSystemMessage(msg: SystemMessage): Unit /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index d33821cd92..df8948a03d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -350,7 +350,7 @@ private[akka] class LocalActorRef private[akka] ( override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message) - override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.tell(message, sender) + override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender) override def restart(cause: Throwable): Unit = actorCell.restart(cause) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 300481ad06..62cca3477b 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -151,7 +151,7 @@ private[akka] class RepointableActorRef( } } else this - def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.tell(message, sender) + def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender) def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) @@ -181,7 +181,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, while (!queue.isEmpty) { queue.poll() match { case s: SystemMessage ⇒ cell.sendSystemMessage(s) - case e: Envelope ⇒ cell.tell(e.message, e.sender) + case e: Envelope ⇒ cell.sendMessage(e) } } } finally { @@ -203,21 +203,20 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None - def tell(message: Any, sender: ActorRef): Unit = { - val useSender = if (sender eq Actor.noSender) system.deadLetters else sender + def sendMessage(msg: Envelope): Unit = { if (lock.tryLock(timeout.length, timeout.unit)) { try { val cell = self.underlying if (cellIsReady(cell)) { - cell.tell(message, useSender) - } else if (!queue.offer(Envelope(message, useSender, system))) { - system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) - system.deadLetters ! DeadLetter(message, useSender, self) + cell.sendMessage(msg) + } else if (!queue.offer(msg)) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + msg.message.getClass + " due to enqueue failure")) + system.deadLetters ! DeadLetter(msg.message, msg.sender, self) } } finally lock.unlock() } else { - system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout")) - system.deadLetters ! DeadLetter(message, useSender, self) + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + msg.message.getClass + " due to lock timeout")) + system.deadLetters ! DeadLetter(msg.message, msg.sender, self) } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 1d3b498c2b..9bce41fc4f 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,12 +5,13 @@ package akka.actor.dungeon import scala.annotation.tailrec -import akka.actor.{ ActorRef, ActorCell } import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } import akka.event.Logging.Error import akka.util.Unsafe import scala.util.control.NonFatal import akka.dispatch.NullMessage +import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell } +import akka.serialization.SerializationExtension private[akka] trait Dispatch { this: ActorCell ⇒ @@ -102,9 +103,16 @@ private[akka] trait Dispatch { this: ActorCell ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) } - def tell(message: Any, sender: ActorRef): Unit = - try dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) - catch { + def sendMessage(msg: Envelope): Unit = + try { + val m = msg.message.asInstanceOf[AnyRef] + if (m eq null) throw new InvalidMessageException("Message is null") + if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) { + val s = SerializationExtension(system) + s.deserialize(s.serialize(m).get, m.getClass).get + } + dispatcher.dispatch(this, msg) + } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 5d470746f4..ff59242c39 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -9,27 +9,19 @@ import akka.event.Logging.{ Error, LogEventException } import akka.actor._ import akka.event.EventStream import com.typesafe.config.Config -import akka.serialization.SerializationExtension import akka.util.{ Unsafe, Index } import scala.annotation.tailrec import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.duration.Duration -import scala.concurrent.{ ExecutionContext, Await, Awaitable } -import scala.util.control.NonFatal +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal final case class Envelope private (val message: Any, val sender: ActorRef) object Envelope { - def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = { - val msg = message.asInstanceOf[AnyRef] - if (msg eq null) throw new InvalidMessageException("Message is null") - if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { - val ser = SerializationExtension(system) - ser.deserialize(ser.serialize(msg).get, msg.getClass).get - } - new Envelope(message, sender) - } + def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = + new Envelope(message, if (sender ne Actor.noSender) sender else system.deadLetters) } /** diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index ee98de0f10..14d2de39a9 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -10,7 +10,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ import akka.ConfigurationException -import akka.dispatch.Dispatchers +import akka.dispatch.{ Envelope, Dispatchers } import akka.pattern.pipe import akka.japi.Util.immutableSeq import com.typesafe.config.Config @@ -118,25 +118,24 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * resizer is invoked asynchronously, i.e. not necessarily before the * message has been sent. */ - override def tell(message: Any, sender: ActorRef): Unit = { - val s = if (sender eq null) system.deadLetters else sender - val msg = message match { + override def sendMessage(msg: Envelope): Unit = { + val message = msg.message match { case wrapped: RouterEnvelope ⇒ wrapped.message case m ⇒ m } - applyRoute(s, message) foreach { - case Destination(snd, `self`) ⇒ - super.tell(msg, snd) - case Destination(snd, recipient) ⇒ + applyRoute(msg.sender, msg.message) foreach { + case Destination(sender, `self`) ⇒ + super.sendMessage(Envelope(message, sender, system)) + case Destination(sender, recipient) ⇒ resize() // only resize when the message target is one of the routees - recipient.tell(msg, snd) + recipient.tell(message, sender) } } def resize(): Unit = for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) - super.tell(Router.Resize, self) + super.sendMessage(Envelope(Router.Resize, self, system)) } } diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala index 0841130f34..3e1d2c6908 100644 --- a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala +++ b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala @@ -293,4 +293,4 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[ data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend) } -} +} diff --git a/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala index 6b799df30c..4194afba01 100644 --- a/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala +++ b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala @@ -44,8 +44,8 @@ class OsgiActorSystemFactory(val context: BundleContext, val fallbackClassLoader def actorSystemConfig(context: BundleContext): Config = { val bundleSymbolicName = context.getBundle.getSymbolicName val bundleId = context.getBundle.getBundleId - val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x => s"etc/$x") - val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) => x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y)))) + val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x ⇒ s"etc/$x") + val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) ⇒ x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y)))) applicationConfiguration.withFallback(ConfigFactory.load(classloader).withFallback(ConfigFactory.defaultReference(OsgiActorSystemFactory.akkaActorClassLoader))) }