=act, rem, clu #3521: make serialize-messages work with core modules

This commit is contained in:
Endre Sándor Varga 2013-08-23 14:39:21 +02:00
parent d319e3f4d6
commit b566e9393d
55 changed files with 260 additions and 109 deletions

View file

@ -0,0 +1,5 @@
akka {
actor {
serialize-messages = on
}
}

View file

@ -31,7 +31,7 @@ object ActorLifeCycleSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { class ActorLifeCycleSpec extends AkkaSpec("akka.actor.serialize-messages=off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
import ActorLifeCycleSpec._ import ActorLifeCycleSpec._
"An Actor" must { "An Actor" must {
@ -44,7 +44,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
override def postRestart(reason: Throwable) { report("postRestart") } override def postRestart(reason: Throwable) { report("postRestart") }
}) }).withDeploy(Deploy.local)
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0)) expectMsg(("preStart", id, 0))

View file

@ -72,6 +72,7 @@ object ActorWithStashSpec {
} }
val testConf = """ val testConf = """
akka.actor.serialize-messages = off
my-dispatcher { my-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
} }

View file

@ -28,6 +28,12 @@ object DeathWatchSpec {
* and therefore the `Terminated` message is wrapped. * and therefore the `Terminated` message is wrapped.
*/ */
case class WrappedTerminated(t: Terminated) case class WrappedTerminated(t: Terminated)
case class W(ref: ActorRef)
case class U(ref: ActorRef)
case class FF(fail: Failed)
case class Latches(t1: TestLatch, t2: TestLatch) extends NoSerializationVerificationNeeded
} }
trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
@ -126,7 +132,6 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"fail a monitor which does not handle Terminated()" in { "fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
case class FF(fail: Failed)
val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(child, cause, 0)), child) testActor.tell(FF(Failed(child, cause, 0)), child)
@ -185,13 +190,11 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
} }
"discard Terminated when unwatched between sysmsg and processing" in { "discard Terminated when unwatched between sysmsg and processing" in {
case class W(ref: ActorRef)
case class U(ref: ActorRef)
class Watcher extends Actor { class Watcher extends Actor {
def receive = { def receive = {
case W(ref) context watch ref case W(ref) context watch ref
case U(ref) context unwatch ref case U(ref) context unwatch ref
case (t1: TestLatch, t2: TestLatch) case Latches(t1: TestLatch, t2: TestLatch)
t1.countDown() t1.countDown()
Await.ready(t2, 3.seconds) Await.ready(t2, 3.seconds)
} }
@ -201,7 +204,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher") val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher")
val p = TestProbe() val p = TestProbe()
w ! W(p.ref) w ! W(p.ref)
w ! ((t1, t2)) w ! Latches(t1, t2)
Await.ready(t1, 3.seconds) Await.ready(t1, 3.seconds)
watch(p.ref) watch(p.ref)
system stop p.ref system stop p.ref

View file

@ -34,6 +34,9 @@ object FSMActorSpec {
case object Locked extends LockState case object Locked extends LockState
case object Open extends LockState case object Open extends LockState
case object Hello
case object Bye
class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] { class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] {
import latches._ import latches._
@ -144,8 +147,6 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
} }
val answerLatch = TestLatch() val answerLatch = TestLatch()
object Hello
object Bye
val tester = system.actorOf(Props(new Actor { val tester = system.actorOf(Props(new Actor {
def receive = { def receive = {
case Hello lock ! "hello" case Hello lock ! "hello"
@ -254,7 +255,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
"log events and transitions if asked to do so" in { "log events and transitions if asked to do so" in {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", "akka.actor.serialize-messages" -> "off",
"akka.actor.debug.fsm" -> true).asJava).withFallback(system.settings.config) "akka.actor.debug.fsm" -> true).asJava).withFallback(system.settings.config)
val fsmEventSystem = ActorSystem("fsmEvent", config) val fsmEventSystem = ActorSystem("fsmEvent", config)
try { try {

View file

@ -11,8 +11,13 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await import scala.concurrent.Await
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
object ReceiveTimeoutSpec {
case object Tick
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReceiveTimeoutSpec extends AkkaSpec { class ReceiveTimeoutSpec extends AkkaSpec {
import ReceiveTimeoutSpec._
"An actor with receive timeout" must { "An actor with receive timeout" must {
@ -33,7 +38,6 @@ class ReceiveTimeoutSpec extends AkkaSpec {
"reschedule timeout after regular receive" in { "reschedule timeout after regular receive" in {
val timeoutLatch = TestLatch() val timeoutLatch = TestLatch()
case object Tick
val timeoutActor = system.actorOf(Props(new Actor { val timeoutActor = system.actorOf(Props(new Actor {
context.setReceiveTimeout(500 milliseconds) context.setReceiveTimeout(500 milliseconds)
@ -53,7 +57,6 @@ class ReceiveTimeoutSpec extends AkkaSpec {
"be able to turn off timeout if desired" in { "be able to turn off timeout if desired" in {
val count = new AtomicInteger(0) val count = new AtomicInteger(0)
val timeoutLatch = TestLatch() val timeoutLatch = TestLatch()
case object Tick
val timeoutActor = system.actorOf(Props(new Actor { val timeoutActor = system.actorOf(Props(new Actor {
context.setReceiveTimeout(500 milliseconds) context.setReceiveTimeout(500 milliseconds)

View file

@ -19,7 +19,7 @@ import scala.concurrent.duration._
import akka.pattern.ask import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec with DefaultTimeout { class RestartStrategySpec extends AkkaSpec("akka.actor.serialize-messages = off") with DefaultTimeout {
override def atStartup { override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))

View file

@ -22,6 +22,7 @@ object SchedulerSpec {
val testConf = ConfigFactory.parseString(""" val testConf = ConfigFactory.parseString("""
akka.scheduler.implementation = akka.actor.DefaultScheduler akka.scheduler.implementation = akka.actor.DefaultScheduler
akka.scheduler.ticks-per-wheel = 32 akka.scheduler.ticks-per-wheel = 32
akka.actor.serialize-messages = off
""").withFallback(AkkaSpec.testConf) """).withFallback(AkkaSpec.testConf)
val testConfRevolver = ConfigFactory.parseString(""" val testConfRevolver = ConfigFactory.parseString("""

View file

@ -71,6 +71,7 @@ object SupervisorHierarchySpec {
type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator" type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator"
} }
akka.loglevel = INFO akka.loglevel = INFO
akka.actor.serialize-messages = off
akka.actor.debug.fsm = on akka.actor.debug.fsm = on
""") """)

View file

@ -17,6 +17,7 @@ import scala.util.control.NonFatal
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """
akka.actor.serialize-messages = off
pinned-dispatcher { pinned-dispatcher {
executor = thread-pool-executor executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher

View file

@ -67,7 +67,7 @@ object SupervisorSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
import SupervisorSpec._ import SupervisorSpec._

View file

@ -15,7 +15,7 @@ import akka.dispatch.Dispatchers
import akka.pattern.ask import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { class SupervisorTreeSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender with DefaultTimeout {
"In a 3 levels deep supervisor tree (linked in the constructor) we" must { "In a 3 levels deep supervisor tree (linked in the constructor) we" must {

View file

@ -28,7 +28,7 @@ import scala.annotation.tailrec
object ActorModelSpec { object ActorModelSpec {
sealed trait ActorModelMessage sealed trait ActorModelMessage extends NoSerializationVerificationNeeded
case class TryReply(expect: Any) extends ActorModelMessage case class TryReply(expect: Any) extends ActorModelMessage

View file

@ -28,8 +28,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
settings.ConfigVersion must equal("2.3-SNAPSHOT") settings.ConfigVersion must equal("2.3-SNAPSHOT")
getBoolean("akka.daemonic") must equal(false) getBoolean("akka.daemonic") must equal(false)
getBoolean("akka.actor.serialize-messages") must equal(false)
settings.SerializeAllMessages must equal(false) // WARNING: This setting must be off in the default reference.conf, but must be on when running
// the test suite.
getBoolean("akka.actor.serialize-messages") must equal(true)
settings.SerializeAllMessages must equal(true)
getInt("akka.scheduler.ticks-per-wheel") must equal(512) getInt("akka.scheduler.ticks-per-wheel") must equal(512)
getMilliseconds("akka.scheduler.tick-duration") must equal(10) getMilliseconds("akka.scheduler.tick-duration") must equal(10)

View file

@ -49,6 +49,9 @@ object FutureSpec {
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
} }
} }
case class Req[T](req: T)
case class Res[T](res: T)
} }
class JavaFutureSpec extends JavaFutureTests with JUnitSuiteLike class JavaFutureSpec extends JavaFutureTests with JUnitSuiteLike
@ -268,8 +271,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"support pattern matching within a for-comprehension" in { "support pattern matching within a for-comprehension" in {
filterException[NoSuchElementException] { filterException[NoSuchElementException] {
case class Req[T](req: T)
case class Res[T](res: T)
val actor = system.actorOf(Props(new Actor { val actor = system.actorOf(Props(new Actor {
def receive = { def receive = {
case Req(s: String) sender ! Res(s.length) case Req(s: String) sender ! Res(s.length)

View file

@ -241,6 +241,7 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec {
object SingleConsumerOnlyMailboxVerificationSpec { object SingleConsumerOnlyMailboxVerificationSpec {
case object Ping case object Ping
val mailboxConf = ConfigFactory.parseString(""" val mailboxConf = ConfigFactory.parseString("""
akka.actor.serialize-messages = off
test-dispatcher { test-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
throughput = 1 throughput = 1

View file

@ -11,12 +11,13 @@ import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.event.Logging.InitializeLogger import akka.event.Logging.InitializeLogger
import akka.pattern.gracefulStop import akka.pattern.gracefulStop
import akka.testkit.{ TestProbe, AkkaSpec } import akka.testkit.{ EventFilter, TestEvent, TestProbe, AkkaSpec }
object EventStreamSpec { object EventStreamSpec {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
akka { akka {
actor.serialize-messages = off
stdout-loglevel = WARNING stdout-loglevel = WARNING
loglevel = INFO loglevel = INFO
loggers = ["akka.event.EventStreamSpec$MyLog", "%s"] loggers = ["akka.event.EventStreamSpec$MyLog", "%s"]
@ -25,6 +26,7 @@ object EventStreamSpec {
val configUnhandled = ConfigFactory.parseString(""" val configUnhandled = ConfigFactory.parseString("""
akka { akka {
actor.serialize-messages = off
stdout-loglevel = WARNING stdout-loglevel = WARNING
loglevel = DEBUG loglevel = DEBUG
actor.debug.unhandled = on actor.debug.unhandled = on

View file

@ -14,7 +14,12 @@ import akka.io.TcpPipelineHandler.Management
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Deploy import akka.actor.Deploy
object DelimiterFramingSpec {
case class Listener(ref: ActorRef)
}
class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") { class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") {
import DelimiterFramingSpec._
val addresses = TestUtils.temporaryServerAddresses(4) val addresses = TestUtils.temporaryServerAddresses(4)
@ -110,8 +115,6 @@ class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on"
} }
} }
case class Listener(ref: ActorRef)
class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging { class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging {
import Tcp.Connected import Tcp.Connected

View file

@ -23,10 +23,17 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
import akka.util.{ Helpers, ByteString } import akka.util.{ Helpers, ByteString }
import akka.TestUtils._ import akka.TestUtils._
object TcpConnectionSpec {
case object Ack extends Event
case class Registration(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded
}
class TcpConnectionSpec extends AkkaSpec(""" class TcpConnectionSpec extends AkkaSpec("""
akka.io.tcp.register-timeout = 500ms akka.io.tcp.register-timeout = 500ms
akka.actor.serialize-creators = on akka.actor.serialize-creators = on
""") { """) {
import TcpConnectionSpec._
// Helper to avoid Windows localization specific differences // Helper to avoid Windows localization specific differences
def ignoreIfWindows(): Unit = def ignoreIfWindows(): Unit =
if (Helpers.isWindows) { if (Helpers.isWindows) {
@ -725,7 +732,7 @@ class TcpConnectionSpec extends AkkaSpec("""
} }
def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit =
registerCallReceiver.ref.tell(channel -> initialOps, channelActor) registerCallReceiver.ref.tell(Registration(channel, initialOps), channelActor)
def setServerSocketOptions() = () def setServerSocketOptions() = ()
@ -755,7 +762,7 @@ class TcpConnectionSpec extends AkkaSpec("""
lazy val clientSideChannel = connectionActor.underlyingActor.channel lazy val clientSideChannel = connectionActor.underlyingActor.channel
override def run(body: Unit): Unit = super.run { override def run(body: Unit): Unit = super.run {
registerCallReceiver.expectMsg(clientSideChannel -> OP_CONNECT) registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT))
registerCallReceiver.sender must be(connectionActor) registerCallReceiver.sender must be(connectionActor)
body body
} }
@ -903,5 +910,4 @@ class TcpConnectionSpec extends AkkaSpec("""
} }
} }
object Ack extends Event
} }

View file

@ -23,7 +23,7 @@ object PatternSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PatternSpec extends AkkaSpec { class PatternSpec extends AkkaSpec("akka.actor.serialize-messages = off") {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
import PatternSpec._ import PatternSpec._

View file

@ -18,6 +18,7 @@ import scala.util.Try
object ResizerSpec { object ResizerSpec {
val config = """ val config = """
akka.actor.serialize-messages = off
akka.actor.deployment { akka.actor.deployment {
/router1 { /router1 {
router = round-robin router = round-robin

View file

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
object RoutingSpec { object RoutingSpec {
val config = """ val config = """
akka.actor.serialize-messages = off
akka.actor.deployment { akka.actor.deployment {
/router1 { /router1 {
router = round-robin router = round-robin

View file

@ -25,6 +25,7 @@ object SerializationTests {
val serializeConf = """ val serializeConf = """
akka { akka {
actor { actor {
serialize-messages = off
serializers { serializers {
test = "akka.serialization.TestSerializer" test = "akka.serialization.TestSerializer"
} }

View file

@ -87,7 +87,9 @@ object FSM {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) { // FIXME: what about the cancellable?
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext)
extends NoSerializationVerificationNeeded {
private var ref: Option[Cancellable] = _ private var ref: Option[Cancellable] = _
private val scheduler = context.system.scheduler private val scheduler = context.system.scheduler
private implicit val executionContext = context.dispatcher private implicit val executionContext = context.dispatcher
@ -676,13 +678,13 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
* All messages sent to the [[akka.actor.FSM]] will be wrapped inside an * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an
* `Event`, which allows pattern matching to extract both state and data. * `Event`, which allows pattern matching to extract both state and data.
*/ */
case class Event(event: Any, stateData: D) case class Event(event: Any, stateData: D) extends NoSerializationVerificationNeeded
/** /**
* Case class representing the state of the [[akka.actor.FSM]] whithin the * Case class representing the state of the [[akka.actor.FSM]] whithin the
* `onTermination` block. * `onTermination` block.
*/ */
case class StopEvent(reason: Reason, currentState: S, stateData: D) case class StopEvent(reason: Reason, currentState: S, stateData: D) extends NoSerializationVerificationNeeded
} }
/** /**

View file

@ -243,7 +243,7 @@ object IO {
/** /**
* Messages used to communicate with an [[akka.actor.IOManager]]. * Messages used to communicate with an [[akka.actor.IOManager]].
*/ */
sealed trait IOMessage sealed trait IOMessage extends NoSerializationVerificationNeeded
/** /**
* Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel * Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel
@ -912,6 +912,13 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider {
require(readBufferSize <= Int.MaxValue && readBufferSize > 0) require(readBufferSize <= Int.MaxValue && readBufferSize > 0)
require(selectInterval > 0) require(selectInterval > 0)
} }
/**
* INTERNAL API
*
* unique message that is sent to ourself to initiate the next select
*/
private[akka] case object Select
} }
/** /**
@ -922,6 +929,7 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider {
final class IOManagerActor(val settings: Settings) extends Actor with ActorLogging { final class IOManagerActor(val settings: Settings) extends Actor with ActorLogging {
import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT } import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT }
import settings.{ defaultBacklog, selectInterval, readBufferSize } import settings.{ defaultBacklog, selectInterval, readBufferSize }
import IOManager.Select
private type ReadChannel = ReadableByteChannel with SelectableChannel private type ReadChannel = ReadableByteChannel with SelectableChannel
private type WriteChannel = WritableByteChannel with SelectableChannel private type WriteChannel = WritableByteChannel with SelectableChannel
@ -956,9 +964,6 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi
*/ */
private var fastSelect = false private var fastSelect = false
/** unique message that is sent to ourself to initiate the next select */
private case object Select
/** This method should be called after receiving any message */ /** This method should be called after receiving any message */
private def run() { private def run() {
if (!running) { if (!running) {

View file

@ -21,16 +21,10 @@ import akka.pattern.ask
import akka.actor.ActorDSL import akka.actor.ActorDSL
import akka.actor.Props import akka.actor.Props
trait Inbox { this: ActorDSL.type /**
* INTERNAL API
protected trait InboxExtension { this: Extension */
val DSLInboxQueueSize = config.getInt("inbox-size") private[akka] object Inbox {
val inboxNr = new AtomicInteger
val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize)
def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet)
}
private sealed trait Query { private sealed trait Query {
def deadline: Deadline def deadline: Deadline
@ -45,6 +39,22 @@ trait Inbox { this: ActorDSL.type ⇒
} }
private case class StartWatch(target: ActorRef) private case class StartWatch(target: ActorRef)
private case object Kick private case object Kick
}
trait Inbox { this: ActorDSL.type
import Inbox._
protected trait InboxExtension { this: Extension
val DSLInboxQueueSize = config.getInt("inbox-size")
val inboxNr = new AtomicInteger
val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize)
def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet)
}
private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] {
def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time
} }

View file

@ -107,10 +107,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒
def sendMessage(msg: Envelope): Unit = def sendMessage(msg: Envelope): Unit =
try { try {
val m = msg.message.asInstanceOf[AnyRef] if (system.settings.SerializeAllMessages) {
if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) { val unwrapped = (msg.message match {
val s = SerializationExtension(system) case DeadLetter(wrapped, _, _) wrapped
s.deserialize(s.serialize(m).get, m.getClass).get case other other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
} }
dispatcher.dispatch(this, msg) dispatcher.dispatch(this, msg)
} catch handleException } catch handleException

View file

@ -736,6 +736,11 @@ object BackpressureBuffer {
*/ */
trait LowWatermarkReached extends Tcp.Event trait LowWatermarkReached extends Tcp.Event
case object LowWatermarkReached extends LowWatermarkReached case object LowWatermarkReached extends LowWatermarkReached
/**
* INTERNAL API
*/
private[io] case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event
} }
/** /**
@ -769,8 +774,6 @@ class BackpressureBuffer(lowBytes: Long, highBytes: Long, maxBytes: Long)
require(highBytes >= lowBytes, "highWatermark needs to be at least as large as lowWatermark") require(highBytes >= lowBytes, "highWatermark needs to be at least as large as lowWatermark")
require(maxBytes >= highBytes, "maxCapacity needs to be at least as large as highWatermark") require(maxBytes >= highBytes, "maxCapacity needs to be at least as large as highWatermark")
case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event
override def apply(ctx: HasLogging) = new PipePair[Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { override def apply(ctx: HasLogging) = new PipePair[Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] {
import Tcp._ import Tcp._

View file

@ -54,7 +54,7 @@ private[io] trait ChannelRegistry {
* a result of it having called `register` on the `ChannelRegistry`. * a result of it having called `register` on the `ChannelRegistry`.
* Enables a channel actor to directly schedule interest setting tasks to the selector mgmt. dispatcher. * Enables a channel actor to directly schedule interest setting tasks to the selector mgmt. dispatcher.
*/ */
private[io] trait ChannelRegistration { private[io] trait ChannelRegistration extends NoSerializationVerificationNeeded {
def enableInterest(op: Int) def enableInterest(op: Int)
def disableInterest(op: Int) def disableInterest(op: Int)
} }
@ -66,8 +66,9 @@ private[io] object SelectionHandler {
} }
case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry Props) case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry Props)
extends NoSerializationVerificationNeeded
case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) } case class Retry(command: WorkerForCommand, retriesLeft: Int) extends NoSerializationVerificationNeeded { require(retriesLeft >= 0) }
case object ChannelConnectable case object ChannelConnectable
case object ChannelAcceptable case object ChannelAcceptable

View file

@ -94,7 +94,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
/** /**
* The common interface for [[Command]] and [[Event]]. * The common interface for [[Command]] and [[Event]].
*/ */
sealed trait Message sealed trait Message extends NoSerializationVerificationNeeded
/// COMMANDS /// COMMANDS

View file

@ -437,7 +437,7 @@ private[io] object TcpConnection {
// INTERNAL MESSAGES // INTERNAL MESSAGES
/** Informs actor that no writing was possible but there is still work remaining */ /** Informs actor that no writing was possible but there is still work remaining */
case class SendBufferFull(remainingWrite: PendingWrite) case class SendBufferFull(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
/** Informs actor that a pending file write has finished */ /** Informs actor that a pending file write has finished */
case object WriteFileFinished case object WriteFileFinished
/** Informs actor that a pending WriteFile failed */ /** Informs actor that a pending WriteFile failed */

View file

@ -8,7 +8,7 @@ import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel }
import java.net.InetSocketAddress import java.net.InetSocketAddress
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.actor._
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.io.Tcp._ import akka.io.Tcp._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
@ -18,11 +18,11 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
*/ */
private[io] object TcpListener { private[io] object TcpListener {
case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage { case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage with NoSerializationVerificationNeeded {
def failureMessage = FailedRegisterIncoming(channel) def failureMessage = FailedRegisterIncoming(channel)
} }
case class FailedRegisterIncoming(channel: SocketChannel) case class FailedRegisterIncoming(channel: SocketChannel) extends NoSerializationVerificationNeeded
} }

View file

@ -6,7 +6,7 @@ package akka.io
import scala.beans.BeanProperty import scala.beans.BeanProperty
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import akka.actor.{ Actor, ActorContext, ActorRef, Props, Terminated } import akka.actor._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString import akka.util.ByteString
import akka.event.Logging import akka.event.Logging
@ -50,12 +50,12 @@ object TcpPipelineHandler {
/** /**
* Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor. * Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor.
*/ */
case class Command(@BeanProperty cmd: Cmd) case class Command(@BeanProperty cmd: Cmd) extends NoSerializationVerificationNeeded
/** /**
* Wrapper class for events emitted by the [[TcpPipelineHandler]] actor. * Wrapper class for events emitted by the [[TcpPipelineHandler]] actor.
*/ */
case class Event(@BeanProperty evt: Evt) case class Event(@BeanProperty evt: Evt) extends NoSerializationVerificationNeeded
} }
/** /**

View file

@ -9,14 +9,11 @@ import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.actor.ActorSelection
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.actor.Deploy
/** /**
* Base trait for all cluster messages. All ClusterMessage's are serializable. * Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -132,7 +129,7 @@ private[cluster] object InternalClusterAction {
* Comand to [[akka.cluster.ClusterDaemon]] to create a * Comand to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberUpListener]]. * [[akka.cluster.OnMemberUpListener]].
*/ */
case class AddOnMemberUpListener(callback: Runnable) case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded
sealed trait SubscriptionMessage sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage

View file

@ -118,6 +118,8 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
exercise-actors = on exercise-actors = on
} }
akka.actor.serialize-messages = off
akka.actor.serialize-creators = off
akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.cluster { akka.cluster {
auto-down = on auto-down = on

View file

@ -1 +1,6 @@
akka.actor.serialize-creators=on akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.AkkaSpec
class SerializationChecksSpec extends AkkaSpec {
"Settings serialize-messages and serialize-creators" must {
"be on for tests" in {
system.settings.SerializeAllCreators must be(true)
system.settings.SerializeAllMessages must be(true)
}
}
}

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
import scala.collection.immutable import scala.collection.immutable
import akka.actor.Actor import akka.actor.Actor
import akka.actor.Actor.Receive import akka.actor.Actor.Receive
import akka.actor.Deploy
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorSelection import akka.actor.ActorSelection
@ -38,7 +39,7 @@ object ClusterSingletonManager {
maxTakeOverRetries: Int = 15, maxTakeOverRetries: Int = 15,
retryInterval: FiniteDuration = 1.second): Props = retryInterval: FiniteDuration = 1.second): Props =
Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role, Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role,
maxHandOverRetries, maxTakeOverRetries, retryInterval) maxHandOverRetries, maxTakeOverRetries, retryInterval).withDeploy(Deploy.local)
/** /**
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]. * Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].

View file

@ -12,6 +12,7 @@ import org.scalatest.BeforeAndAfterEach
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.actor.Props import akka.actor.Props
import akka.actor.Actor import akka.actor.Actor
import akka.actor.Deploy
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.FSM import akka.actor.FSM
@ -60,7 +61,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
def receive = { def receive = {
case x testActor ! x case x testActor ! x
} }
}), "echo") }).withDeploy(Deploy.local), "echo")
} }
enterBarrier("initialize") enterBarrier("initialize")

View file

@ -4,7 +4,7 @@
package akka.remote.testconductor package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props } import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, NoSerializationVerificationNeeded }
import RemoteConnection.getAddrString import RemoteConnection.getAddrString
import TestConductorProtocol._ import TestConductorProtocol._
import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent } import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent }
@ -362,7 +362,7 @@ private[akka] object Controller {
class ClientDisconnectedException(msg: String) extends AkkaException(msg) with NoStackTrace class ClientDisconnectedException(msg: String) extends AkkaException(msg) with NoStackTrace
case object GetNodes case object GetNodes
case object GetSockAddr case object GetSockAddr
case class CreateServerFSM(channel: Channel) case class CreateServerFSM(channel: Channel) extends NoSerializationVerificationNeeded
case class NodeInfo(name: RoleName, addr: Address, fsm: ActorRef) case class NodeInfo(name: RoleName, addr: Address, fsm: ActorRef)
} }

View file

@ -6,7 +6,7 @@ package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } import akka.actor._
import akka.remote.testconductor.RemoteConnection.getAddrString import akka.remote.testconductor.RemoteConnection.getAddrString
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Await, Future } import scala.concurrent.{ ExecutionContext, Await, Future }
@ -122,7 +122,7 @@ private[akka] object ClientFSM {
case class Data(channel: Option[Channel], runningOp: Option[(String, ActorRef)]) case class Data(channel: Option[Channel], runningOp: Option[(String, ActorRef)])
case class Connected(channel: Channel) case class Connected(channel: Channel) extends NoSerializationVerificationNeeded
case class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace case class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace
case object Disconnected case object Disconnected
} }

View file

@ -5,8 +5,7 @@ package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.Props import akka.actor.{Props, Actor, ActorIdentity, Identify, Deploy}
import akka.actor.Actor
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Awaitable import scala.concurrent.Awaitable
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -16,8 +15,6 @@ import java.net.InetSocketAddress
import java.net.InetAddress import java.net.InetAddress
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig } import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig }
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.actor.Identify
import akka.actor.ActorIdentity
object TestConductorMultiJvmSpec extends MultiNodeConfig { object TestConductorMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false)) commonConfig(debugConfig(on = false))
@ -50,7 +47,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with ST
def receive = { def receive = {
case x testActor ! x; sender ! x case x testActor ! x; sender ! x
} }
}), "echo") }).withDeploy(Deploy.local), "echo")
} }
enterBarrier("name") enterBarrier("name")

View file

@ -0,0 +1,6 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit.AkkaSpec
class SerializationChecksSpec extends AkkaSpec {
"Settings serialize-messages and serialize-creators" must {
"be on for tests" in {
system.settings.SerializeAllCreators must be(true)
system.settings.SerializeAllMessages must be(true)
}
}
}

View file

@ -5,7 +5,7 @@ package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy, PoisonPill } import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest } import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.event.Logging import akka.event.Logging
@ -531,7 +531,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
def receive = { def receive = {
case x: InetSocketAddress testActor ! controller case x: InetSocketAddress testActor ! controller
} }
})) }).withDeploy(Deploy.local))
val actor = expectMsgType[ActorRef] val actor = expectMsgType[ActorRef]
f(actor) f(actor)
actor ! PoisonPill // clean up so network connections don't accumulate during test run actor ! PoisonPill // clean up so network connections don't accumulate during test run
@ -550,7 +550,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
def receive = { def receive = {
case _ sender ! barrier case _ sender ! barrier
} }
})) ! "" }).withDeploy(Deploy.local)) ! ""
expectMsgType[ActorRef] expectMsgType[ActorRef]
} }

View file

@ -388,13 +388,15 @@ private[remote] object EndpointWriter {
* used instead. * used instead.
* @param handle Handle of the new inbound association. * @param handle Handle of the new inbound association.
*/ */
case class TakeOver(handle: AkkaProtocolHandle) case class TakeOver(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded
case object BackoffTimer case object BackoffTimer
case object FlushAndStop case object FlushAndStop
case object AckIdleCheckTimer case object AckIdleCheckTimer
case class StopReading(writer: ActorRef) case class StopReading(writer: ActorRef)
case class StoppedReading(writer: ActorRef) case class StoppedReading(writer: ActorRef)
case class Handle(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded
case class OutboundAck(ack: Ack) case class OutboundAck(ack: Ack)
sealed trait State sealed trait State
@ -475,7 +477,7 @@ private[remote] class EndpointWriter(
reader = startReadEndpoint(h) reader = startReadEndpoint(h)
Writing Writing
case None case None
transport.associate(remoteAddress) pipeTo self transport.associate(remoteAddress).mapTo[AkkaProtocolHandle].map(Handle(_)) pipeTo self
Initializing Initializing
}, },
stateData = ()) stateData = ())
@ -489,7 +491,7 @@ private[remote] class EndpointWriter(
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel)
case Event(Status.Failure(e), _) case Event(Status.Failure(e), _)
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel) publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel)
case Event(inboundHandle: AkkaProtocolHandle, _) case Event(Handle(inboundHandle), _)
// Assert handle == None? // Assert handle == None?
context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid)
handle = Some(inboundHandle) handle = Some(inboundHandle)

View file

@ -24,6 +24,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
*/ */
private[akka] object RemoteActorRefProvider { private[akka] object RemoteActorRefProvider {
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
extends NoSerializationVerificationNeeded
sealed trait TerminatorState sealed trait TerminatorState
case object Uninitialized extends TerminatorState case object Uninitialized extends TerminatorState

View file

@ -83,7 +83,7 @@ private[remote] object Remoting {
} }
} }
case class RegisterTransportActor(props: Props, name: String) case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded
private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
override def supervisorStrategy = OneForOneStrategy() { override def supervisorStrategy = OneForOneStrategy() {
@ -233,7 +233,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
private[remote] object EndpointManager { private[remote] object EndpointManager {
// Messages between Remoting and EndpointManager // Messages between Remoting and EndpointManager
sealed trait RemotingCommand sealed trait RemotingCommand extends NoSerializationVerificationNeeded
case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand
@ -250,10 +250,12 @@ private[remote] object EndpointManager {
case class ManagementCommandAck(status: Boolean) case class ManagementCommandAck(status: Boolean)
// Messages internal to EndpointManager // Messages internal to EndpointManager
case object Prune case object Prune extends NoSerializationVerificationNeeded
case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]], case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]],
results: Seq[(Transport, Address, Promise[AssociationEventListener])]) results: Seq[(Transport, Address, Promise[AssociationEventListener])])
extends NoSerializationVerificationNeeded
case class ListensFailure(addressesPromise: Promise[Seq[(Transport, Address)]], cause: Throwable) case class ListensFailure(addressesPromise: Promise[Seq[(Transport, Address)]], cause: Throwable)
extends NoSerializationVerificationNeeded
// Helper class to store address pairs // Helper class to store address pairs
case class Link(localAddress: Address, remoteAddress: Address) case class Link(localAddress: Address, remoteAddress: Address)

View file

@ -120,7 +120,7 @@ abstract class AbstractTransportAdapterHandle(val originalLocalAddress: Address,
} }
object ActorTransportAdapter { object ActorTransportAdapter {
sealed trait TransportOperation sealed trait TransportOperation extends NoSerializationVerificationNeeded
case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation

View file

@ -91,7 +91,7 @@ private[remote] class AkkaProtocolTransport(
protected def managerProps = { protected def managerProps = {
val wt = wrappedTransport val wt = wrappedTransport
val s = settings val s = settings
Props(classOf[AkkaProtocolManager], wt, s) Props(classOf[AkkaProtocolManager], wt, s).withDeploy(Deploy.local)
} }
} }
@ -185,9 +185,11 @@ private[transport] object ProtocolStateActor {
*/ */
case object Open extends AssociationState case object Open extends AssociationState
case object HeartbeatTimer case object HeartbeatTimer extends NoSerializationVerificationNeeded
case class HandleListenerRegistered(listener: HandleEventListener) case class Handle(handle: AssociationHandle) extends NoSerializationVerificationNeeded
case class HandleListenerRegistered(listener: HandleEventListener) extends NoSerializationVerificationNeeded
sealed trait ProtocolStateData sealed trait ProtocolStateData
trait InitialProtocolStateData extends ProtocolStateData trait InitialProtocolStateData extends ProtocolStateData
@ -251,7 +253,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
initialData match { initialData match {
case d: OutboundUnassociated case d: OutboundUnassociated
d.transport.associate(d.remoteAddress) pipeTo self d.transport.associate(d.remoteAddress).map(Handle(_)) pipeTo self
startWith(Closed, d) startWith(Closed, d)
case d: InboundUnassociated case d: InboundUnassociated
@ -266,7 +268,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
statusPromise.failure(e) statusPromise.failure(e)
stop() stop()
case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _)) case Event(Handle(wrappedHandle), OutboundUnassociated(_, statusPromise, _))
wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self)) wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self))
if (sendAssociate(wrappedHandle, localHandshakeInfo)) { if (sendAssociate(wrappedHandle, localHandshakeInfo)) {
failureDetector.heartbeat() failureDetector.heartbeat()
@ -275,7 +277,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
} else { } else {
// Underlying transport was busy -- Associate could not be sent // Underlying transport was busy -- Associate could not be sent
setTimer("associate-retry", wrappedHandle, RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false) setTimer("associate-retry", Handle(wrappedHandle), RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false)
stay() stay()
} }

View file

@ -8,7 +8,7 @@ import akka.pattern.{ PromiseActorRef, ask, pipe }
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.AkkaPduCodec.Associate import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottlerManager.Checkin import akka.remote.transport.ThrottlerManager.{ Listener, Handle, ListenerAndMode, Checkin }
import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.transport.Transport._ import akka.remote.transport.Transport._
import akka.util.{ Timeout, ByteString } import akka.util.{ Timeout, ByteString }
@ -90,7 +90,7 @@ object ThrottlerTransportAdapter {
def getInstance = this def getInstance = this
} }
sealed trait ThrottleMode { sealed trait ThrottleMode extends NoSerializationVerificationNeeded {
def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration
} }
@ -183,8 +183,16 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
* INTERNAL API * INTERNAL API
*/ */
private[transport] object ThrottlerManager { private[transport] object ThrottlerManager {
case class OriginResolved() case class Checkin(origin: Address, handle: ThrottlerHandle) extends NoSerializationVerificationNeeded
case class Checkin(origin: Address, handle: ThrottlerHandle)
case class AssociateResult(handle: AssociationHandle, statusPromise: Promise[AssociationHandle])
extends NoSerializationVerificationNeeded
case class ListenerAndMode(listener: HandleEventListener, mode: ThrottleMode) extends NoSerializationVerificationNeeded
case class Handle(handle: ThrottlerHandle) extends NoSerializationVerificationNeeded
case class Listener(listener: HandleEventListener) extends NoSerializationVerificationNeeded
} }
/** /**
@ -192,6 +200,7 @@ private[transport] object ThrottlerManager {
*/ */
private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager { private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager {
import ThrottlerManager._
import context.dispatcher import context.dispatcher
private var throttlingModes = Map[Address, (ThrottleMode, Direction)]() private var throttlingModes = Map[Address, (ThrottleMode, Direction)]()
@ -202,20 +211,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
override def ready: Receive = { override def ready: Receive = {
case InboundAssociation(handle) case InboundAssociation(handle)
val wrappedHandle = wrapHandle(handle, associationListener, inbound = true) val wrappedHandle = wrapHandle(handle, associationListener, inbound = true)
wrappedHandle.throttlerActor ! wrappedHandle wrappedHandle.throttlerActor ! Handle(wrappedHandle)
case AssociateUnderlying(remoteAddress, statusPromise) case AssociateUnderlying(remoteAddress, statusPromise)
wrappedTransport.associate(remoteAddress) onComplete { wrappedTransport.associate(remoteAddress) onComplete {
// Slight modification of pipe, only success is sent, failure is propagated to a separate future // Slight modification of pipe, only success is sent, failure is propagated to a separate future
case Success(handle) self ! ((handle, statusPromise)) case Success(handle) self ! AssociateResult(handle, statusPromise)
case Failure(e) statusPromise.failure(e) case Failure(e) statusPromise.failure(e)
} }
// Finished outbound association and got back the handle // Finished outbound association and got back the handle
case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) //FIXME switch to a real message iso Tuple2 case AssociateResult(handle, statusPromise)
val wrappedHandle = wrapHandle(handle, associationListener, inbound = false) val wrappedHandle = wrapHandle(handle, associationListener, inbound = false)
val naked = nakedAddress(handle.remoteAddress) val naked = nakedAddress(handle.remoteAddress)
val inMode = getInboundMode(naked) val inMode = getInboundMode(naked)
wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked)) wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked))
wrappedHandle.readHandlerPromise.future map { _ -> inMode } pipeTo wrappedHandle.throttlerActor wrappedHandle.readHandlerPromise.future map { ListenerAndMode(_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= naked -> wrappedHandle handleTable ::= naked -> wrappedHandle
statusPromise.success(wrappedHandle) statusPromise.success(wrappedHandle)
case SetThrottle(address, direction, mode) case SetThrottle(address, direction, mode)
@ -356,7 +365,7 @@ private[transport] class ThrottledAssociation(
} }
when(WaitExposedHandle) { when(WaitExposedHandle) {
case Event(handle: ThrottlerHandle, Uninitialized) case Event(Handle(handle), Uninitialized)
// register to downstream layer and wait for origin // register to downstream layer and wait for origin
originalHandle.readHandlerPromise.success(ActorHandleEventListener(self)) originalHandle.readHandlerPromise.success(ActorHandleEventListener(self))
goto(WaitOrigin) using ExposedHandle(handle) goto(WaitOrigin) using ExposedHandle(handle)
@ -385,7 +394,7 @@ private[transport] class ThrottledAssociation(
stop() stop()
} else { } else {
associationHandler notify InboundAssociation(exposedHandle) associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self exposedHandle.readHandlerPromise.future.map(Listener(_)) pipeTo self
goto(WaitUpstreamListener) goto(WaitUpstreamListener)
} finally sender ! SetThrottleAck } finally sender ! SetThrottleAck
} }
@ -394,14 +403,14 @@ private[transport] class ThrottledAssociation(
case Event(InboundPayload(p), _) case Event(InboundPayload(p), _)
throttledMessages = throttledMessages enqueue p throttledMessages = throttledMessages enqueue p
stay() stay()
case Event(listener: HandleEventListener, _) case Event(Listener(listener), _)
upstreamListener = listener upstreamListener = listener
self ! Dequeue self ! Dequeue
goto(Throttling) goto(Throttling)
} }
when(WaitModeAndUpstreamListener) { when(WaitModeAndUpstreamListener) {
case Event((listener: HandleEventListener, mode: ThrottleMode), _) case Event(ListenerAndMode(listener: HandleEventListener, mode: ThrottleMode), _)
upstreamListener = listener upstreamListener = listener
inboundThrottleMode = mode inboundThrottleMode = mode
self ! Dequeue self ! Dequeue

View file

@ -4,7 +4,7 @@
package akka.remote.transport package akka.remote.transport
import scala.concurrent.{ Promise, Future } import scala.concurrent.{ Promise, Future }
import akka.actor.{ ActorRef, Address } import akka.actor.{ NoSerializationVerificationNeeded, ActorRef, Address }
import akka.util.ByteString import akka.util.ByteString
import akka.remote.transport.AssociationHandle.HandleEventListener import akka.remote.transport.AssociationHandle.HandleEventListener
import akka.AkkaException import akka.AkkaException
@ -12,7 +12,7 @@ import scala.util.control.NoStackTrace
object Transport { object Transport {
trait AssociationEvent trait AssociationEvent extends NoSerializationVerificationNeeded
/** /**
* Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address, * Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address,
@ -145,7 +145,7 @@ object AssociationHandle {
/** /**
* Trait for events that the registered listener for an [[akka.remote.transport.AssociationHandle]] might receive. * Trait for events that the registered listener for an [[akka.remote.transport.AssociationHandle]] might receive.
*/ */
sealed trait HandleEvent sealed trait HandleEvent extends NoSerializationVerificationNeeded
/** /**
* Message sent to the listener registered to an association (via the Promise returned by * Message sent to the listener registered to an association (via the Promise returned by

View file

@ -1 +1,6 @@
akka.actor.serialize-creators=on akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -500,6 +500,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to serialize a local actor ref from another actor system" in { "be able to serialize a local actor ref from another actor system" in {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
# Additional internal serialization verification need so be off, otherwise it triggers two error messages
# instead of one: one for the internal check, and one for the actual remote send -- tripping off this test
akka.actor.serialize-messages = off
akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"]
akka.remote.test.local-address = "test://other-system@localhost:12347" akka.remote.test.local-address = "test://other-system@localhost:12347"
""").withFallback(remoteSystem.settings.config) """).withFallback(remoteSystem.settings.config)

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit.AkkaSpec
class SerializationChecksPlainRemotingSpec extends AkkaSpec {
"Settings serialize-messages and serialize-creators" must {
"be on for tests" in {
system.settings.SerializeAllCreators must be(true)
system.settings.SerializeAllMessages must be(true)
}
}
}