Preserving Envelopes in UnstartedActorCell as requested by Mr Pink

This commit is contained in:
Viktor Klang 2013-01-17 18:24:57 +01:00
parent 2dd3697b5c
commit c38be1bc4a
8 changed files with 55 additions and 47 deletions

View file

@ -214,19 +214,19 @@ private[akka] trait Cell {
*/
def start(): this.type
/**
* Recursively suspend this actor and all its children. Must not throw exceptions.
* Recursively suspend this actor and all its children. Is only allowed to throw Fatal Throwables.
*/
def suspend(): Unit
/**
* Recursively resume this actor and all its children. Must not throw exceptions.
* Recursively resume this actor and all its children. Is only allowed to throw Fatal Throwables.
*/
def resume(causedByFailure: Throwable): Unit
/**
* Restart this actor (will recursively restart or stop all children). Must not throw exceptions.
* Restart this actor (will recursively restart or stop all children). Is only allowed to throw Fatal Throwables.
*/
def restart(cause: Throwable): Unit
/**
* Recursively terminate this actor and all its children. Must not throw exceptions.
* Recursively terminate this actor and all its children. Is only allowed to throw Fatal Throwables.
*/
def stop(): Unit
/**
@ -246,16 +246,26 @@ private[akka] trait Cell {
* Get the stats for the named child, if that exists.
*/
def getChildByName(name: String): Option[ChildStats]
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
* Is only allowed to throw Fatal Throwables.
*/
def tell(message: Any, sender: ActorRef): Unit
def sendMessage(msg: Envelope): Unit
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
* Is only allowed to throw Fatal Throwables.
*/
final def sendMessage(message: Any, sender: ActorRef): Unit =
sendMessage(Envelope(message, sender, system))
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Is only allowed to throw Fatal Throwables.
*/
def sendSystemMessage(msg: SystemMessage): Unit
/**

View file

@ -350,7 +350,7 @@ private[akka] class LocalActorRef private[akka] (
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.tell(message, sender)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
override def restart(cause: Throwable): Unit = actorCell.restart(cause)

View file

@ -151,7 +151,7 @@ private[akka] class RepointableActorRef(
}
} else this
def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.tell(message, sender)
def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender)
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
@ -181,7 +181,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
while (!queue.isEmpty) {
queue.poll() match {
case s: SystemMessage cell.sendSystemMessage(s)
case e: Envelope cell.tell(e.message, e.sender)
case e: Envelope cell.sendMessage(e)
}
}
} finally {
@ -203,21 +203,20 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
def getChildByName(name: String): Option[ChildRestartStats] = None
def tell(message: Any, sender: ActorRef): Unit = {
val useSender = if (sender eq Actor.noSender) system.deadLetters else sender
def sendMessage(msg: Envelope): Unit = {
if (lock.tryLock(timeout.length, timeout.unit)) {
try {
val cell = self.underlying
if (cellIsReady(cell)) {
cell.tell(message, useSender)
} else if (!queue.offer(Envelope(message, useSender, system))) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure"))
system.deadLetters ! DeadLetter(message, useSender, self)
cell.sendMessage(msg)
} else if (!queue.offer(msg)) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + msg.message.getClass + " due to enqueue failure"))
system.deadLetters ! DeadLetter(msg.message, msg.sender, self)
}
} finally lock.unlock()
} else {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout"))
system.deadLetters ! DeadLetter(message, useSender, self)
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + msg.message.getClass + " due to lock timeout"))
system.deadLetters ! DeadLetter(msg.message, msg.sender, self)
}
}

View file

@ -5,12 +5,13 @@
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.actor.{ ActorRef, ActorCell }
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
import akka.event.Logging.Error
import akka.util.Unsafe
import scala.util.control.NonFatal
import akka.dispatch.NullMessage
import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell }
import akka.serialization.SerializationExtension
private[akka] trait Dispatch { this: ActorCell
@ -102,9 +103,16 @@ private[akka] trait Dispatch { this: ActorCell ⇒
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
def tell(message: Any, sender: ActorRef): Unit =
try dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
catch {
def sendMessage(msg: Envelope): Unit =
try {
val m = msg.message.asInstanceOf[AnyRef]
if (m eq null) throw new InvalidMessageException("Message is null")
if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(m).get, m.getClass).get
}
dispatcher.dispatch(this, msg)
} catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}

View file

@ -9,27 +9,19 @@ import akka.event.Logging.{ Error, LogEventException }
import akka.actor._
import akka.event.EventStream
import com.typesafe.config.Config
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Index }
import scala.annotation.tailrec
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Await, Awaitable }
import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
final case class Envelope private (val message: Any, val sender: ActorRef)
object Envelope {
def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = {
val msg = message.asInstanceOf[AnyRef]
if (msg eq null) throw new InvalidMessageException("Message is null")
if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.deserialize(ser.serialize(msg).get, msg.getClass).get
}
new Envelope(message, sender)
}
def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope =
new Envelope(message, if (sender ne Actor.noSender) sender else system.deadLetters)
}
/**

View file

@ -10,7 +10,7 @@ import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import akka.ConfigurationException
import akka.dispatch.Dispatchers
import akka.dispatch.{ Envelope, Dispatchers }
import akka.pattern.pipe
import akka.japi.Util.immutableSeq
import com.typesafe.config.Config
@ -118,25 +118,24 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
* resizer is invoked asynchronously, i.e. not necessarily before the
* message has been sent.
*/
override def tell(message: Any, sender: ActorRef): Unit = {
val s = if (sender eq null) system.deadLetters else sender
val msg = message match {
override def sendMessage(msg: Envelope): Unit = {
val message = msg.message match {
case wrapped: RouterEnvelope wrapped.message
case m m
}
applyRoute(s, message) foreach {
case Destination(snd, `self`)
super.tell(msg, snd)
case Destination(snd, recipient)
applyRoute(msg.sender, msg.message) foreach {
case Destination(sender, `self`)
super.sendMessage(Envelope(message, sender, system))
case Destination(sender, recipient)
resize() // only resize when the message target is one of the routees
recipient.tell(msg, snd)
recipient.tell(message, sender)
}
}
def resize(): Unit =
for (r routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true))
super.tell(Router.Resize, self)
super.sendMessage(Envelope(Router.Resize, self, system))
}
}

View file

@ -293,4 +293,4 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[
data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend)
}
}
}

View file

@ -44,8 +44,8 @@ class OsgiActorSystemFactory(val context: BundleContext, val fallbackClassLoader
def actorSystemConfig(context: BundleContext): Config = {
val bundleSymbolicName = context.getBundle.getSymbolicName
val bundleId = context.getBundle.getBundleId
val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x => s"etc/$x")
val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) => x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y))))
val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x s"etc/$x")
val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y))))
applicationConfiguration.withFallback(ConfigFactory.load(classloader).withFallback(ConfigFactory.defaultReference(OsgiActorSystemFactory.akkaActorClassLoader)))
}