Verifying serializability of messages when serialize is on

This commit is contained in:
Viktor Klang 2011-12-27 16:22:24 +01:00
parent fc35d6b20f
commit 5ff8f4e2a4
7 changed files with 35 additions and 79 deletions

View file

@ -75,7 +75,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
result result
} }
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters) def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system)
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null q must not be null

View file

@ -308,7 +308,7 @@ private[akka] class ActorCell(
} }
final def tell(message: Any, sender: ActorRef): Unit = final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system))
final def sender: ActorRef = currentMessage match { final def sender: ActorRef = currentMessage match {
case null system.deadLetters case null system.deadLetters

View file

@ -58,9 +58,9 @@ object ActorSystem {
def create(): ActorSystem = apply() def create(): ActorSystem = apply()
def apply(): ActorSystem = apply("default") def apply(): ActorSystem = apply("default")
class Settings(cfg: Config, val name: String) { class Settings(cfg: Config, final val name: String) {
val config: Config = { final val config: Config = {
val config = cfg.withFallback(ConfigFactory.defaultReference) val config = cfg.withFallback(ConfigFactory.defaultReference)
config.checkValid(ConfigFactory.defaultReference, "akka") config.checkValid(ConfigFactory.defaultReference, "akka")
config config
@ -69,81 +69,38 @@ object ActorSystem {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import config._ import config._
val ConfigVersion = getString("akka.version") final val ConfigVersion = getString("akka.version")
val ProviderClass = getString("akka.actor.provider") final val ProviderClass = getString("akka.actor.provider")
val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
val LogLevel = getString("akka.loglevel") final val LogLevel = getString("akka.loglevel")
val StdoutLogLevel = getString("akka.stdout-loglevel") final val StdoutLogLevel = getString("akka.stdout-loglevel")
val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart") final val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart")
val AddLoggingReceive = getBoolean("akka.actor.debug.receive") final val AddLoggingReceive = getBoolean("akka.actor.debug.receive")
val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive")
val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle")
val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm")
val DebugEventStream = getBoolean("akka.actor.debug.event-stream") final val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
val Home = config.getString("akka.home") match { final val Home = config.getString("akka.home") match {
case "" None case "" None
case x Some(x) case x Some(x)
} }
val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
if (ConfigVersion != Version) if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
override def toString: String = config.root.render override def toString: String = config.root.render
} }
// TODO move to migration kit
object OldConfigurationLoader {
val defaultConfig: Config = {
val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig
cfg.withFallback(ConfigFactory.defaultReference).resolve(ConfigResolveOptions.defaults)
}
// file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax
val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka")
private def envMode = System.getenv("AKKA_MODE") match {
case null | "" None
case value Some(value)
}
private def systemMode = System.getProperty("akka.mode") match {
case null | "" None
case value Some(value)
}
private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false)
private def fromProperties = try {
val property = Option(System.getProperty("akka.config"))
property.map(p
ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions)))
} catch { case _ None }
private def fromClasspath = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions)))
} catch { case _ None }
private def fromHome = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(GlobalHome.get + "/config/" + defaultLocation), configParseOptions)))
} catch { case _ None }
private def emptyConfig = ConfigFactory.systemProperties
}
} }
/** /**
@ -323,7 +280,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
import ActorSystem._ import ActorSystem._
val settings = new Settings(applicationConfig, name) final val settings = new Settings(applicationConfig, name)
def logConfiguration(): Unit = log.info(settings.toString) def logConfiguration(): Unit = log.info(settings.toString)

View file

@ -6,21 +6,21 @@ package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import akka.event.Logging.Error import akka.event.Logging.Error
import akka.util.{ Duration, Switch, ReentrantGuard } import akka.util.Duration
import atomic.{ AtomicInteger, AtomicLong }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
import akka.actor._ import akka.actor._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import locks.ReentrantLock
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.event.EventStream import akka.event.EventStream
import akka.actor.ActorSystem.Settings
import com.typesafe.config.Config import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicReference
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
final case class Envelope(val message: Any, val sender: ActorRef) { final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null")
else if (system.settings.SerializeAllMessages) SerializationExtension(system).serialize(message.asInstanceOf[AnyRef]) match {
case Left(t) throw t
case Right(_) //Just verify that it works to serialize it
}
} }
object SystemMessage { object SystemMessage {
@ -103,7 +103,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
def name: String def name: String
/** /**
* Identfier of this dispatcher, corresponds to the full key * Identifier of this dispatcher, corresponds to the full key
* of the dispatcher configuration. * of the dispatcher configuration.
*/ */
def id: String def id: String

View file

@ -67,7 +67,7 @@ trait DurableMessageSerialization {
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage) val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage)
val sender = deserializeActorRef(durableMessage.getSender) val sender = deserializeActorRef(durableMessage.getSender)
new Envelope(message, sender) new Envelope(message, sender)(owner.system)
} }
} }

View file

@ -72,7 +72,7 @@ class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
doc match { doc match {
case Some(msg) { case Some(msg) {
log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg) log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg)
envelopePromise.success(msg.envelope()) envelopePromise.success(msg.envelope(system))
log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise) log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise)
} }
case None case None

View file

@ -10,9 +10,8 @@ import org.bson.io.OutputBuffer
import org.bson.types.ObjectId import org.bson.types.ObjectId
import java.io.InputStream import java.io.InputStream
import org.bson.collection._ import org.bson.collection._
import akka.actor.LocalActorRef
import akka.actor.ActorRef
import akka.dispatch.Envelope import akka.dispatch.Envelope
import akka.actor.{ ActorSystem, LocalActorRef, ActorRef }
/** /**
* A container message for durable mailbox messages, which can be easily stuffed into * A container message for durable mailbox messages, which can be easily stuffed into
@ -32,7 +31,7 @@ case class MongoDurableMessage(
val sender: ActorRef, val sender: ActorRef,
val _id: ObjectId = new ObjectId) { val _id: ObjectId = new ObjectId) {
def envelope() = Envelope(message, sender) def envelope(system: ActorSystem) = Envelope(message, sender)(system)
} }
// vim: set ts=2 sw=2 sts=2 et: // vim: set ts=2 sw=2 sts=2 et: