From b566e9393d92fd5303a79fdd7196c0ba734917d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 23 Aug 2013 14:39:21 +0200 Subject: [PATCH] =act, rem, clu #3521: make serialize-messages work with core modules --- .../src/test/resources/reference.conf | 5 +++ .../scala/akka/actor/ActorLifeCycleSpec.scala | 4 +-- .../scala/akka/actor/ActorWithStashSpec.scala | 1 + .../scala/akka/actor/DeathWatchSpec.scala | 13 +++++--- .../test/scala/akka/actor/FSMActorSpec.scala | 7 ++-- .../scala/akka/actor/ReceiveTimeoutSpec.scala | 7 ++-- .../akka/actor/RestartStrategySpec.scala | 2 +- .../test/scala/akka/actor/SchedulerSpec.scala | 1 + .../akka/actor/SupervisorHierarchySpec.scala | 1 + .../scala/akka/actor/SupervisorMiscSpec.scala | 1 + .../scala/akka/actor/SupervisorSpec.scala | 2 +- .../scala/akka/actor/SupervisorTreeSpec.scala | 2 +- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- .../test/scala/akka/config/ConfigSpec.scala | 7 ++-- .../test/scala/akka/dispatch/FutureSpec.scala | 5 +-- .../akka/dispatch/MailboxConfigSpec.scala | 1 + .../scala/akka/event/EventStreamSpec.scala | 4 ++- .../scala/akka/io/DelimiterFramingSpec.scala | 7 ++-- .../scala/akka/io/TcpConnectionSpec.scala | 12 +++++-- .../test/scala/akka/pattern/PatternSpec.scala | 2 +- .../test/scala/akka/routing/ResizerSpec.scala | 1 + .../test/scala/akka/routing/RoutingSpec.scala | 1 + .../akka/serialization/SerializeSpec.scala | 1 + .../src/main/scala/akka/actor/FSM.scala | 8 +++-- akka-actor/src/main/scala/akka/actor/IO.scala | 13 +++++--- .../src/main/scala/akka/actor/dsl/Inbox.scala | 30 +++++++++++------ .../scala/akka/actor/dungeon/Dispatch.scala | 13 +++++--- .../src/main/scala/akka/io/Pipelines.scala | 7 ++-- .../main/scala/akka/io/SelectionHandler.scala | 5 +-- akka-actor/src/main/scala/akka/io/Tcp.scala | 2 +- .../main/scala/akka/io/TcpConnection.scala | 2 +- .../src/main/scala/akka/io/TcpListener.scala | 6 ++-- .../scala/akka/io/TcpPipelineHandler.scala | 6 ++-- .../scala/akka/cluster/ClusterDaemon.scala | 7 ++-- .../scala/akka/cluster/StressSpec.scala | 2 ++ .../src/test/resources/reference.conf | 7 +++- .../cluster/SerializationChecksSpec.scala | 19 +++++++++++ .../pattern/ClusterSingletonManager.scala | 3 +- .../contrib/pattern/ReliableProxySpec.scala | 3 +- .../akka/remote/testconductor/Conductor.scala | 4 +-- .../akka/remote/testconductor/Player.scala | 4 +-- .../testconductor/TestConductorSpec.scala | 7 ++-- .../src/test/resources/reference.conf | 6 ++++ .../akka/remote/SerializationChecksSpec.scala | 19 +++++++++++ .../remote/testconductor/BarrierSpec.scala | 6 ++-- .../src/main/scala/akka/remote/Endpoint.scala | 8 +++-- .../akka/remote/RemoteActorRefProvider.scala | 1 + .../src/main/scala/akka/remote/Remoting.scala | 8 +++-- .../transport/AbstractTransportAdapter.scala | 2 +- .../transport/AkkaProtocolTransport.scala | 14 ++++---- .../transport/ThrottlerTransportAdapter.scala | 33 ++++++++++++------- .../akka/remote/transport/Transport.scala | 6 ++-- akka-remote/src/test/resources/reference.conf | 7 +++- .../test/scala/akka/remote/RemotingSpec.scala | 3 ++ ...SerializationChecksPlainRemotingSpec.scala | 19 +++++++++++ 55 files changed, 260 insertions(+), 109 deletions(-) create mode 100644 akka-actor-tests/src/test/resources/reference.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/SerializationChecksSpec.scala create mode 100644 akka-remote-tests/src/test/resources/reference.conf create mode 100644 akka-remote-tests/src/test/scala/akka/remote/SerializationChecksSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/SerializationChecksPlainRemotingSpec.scala diff --git a/akka-actor-tests/src/test/resources/reference.conf b/akka-actor-tests/src/test/resources/reference.conf new file mode 100644 index 0000000000..a5b04a466c --- /dev/null +++ b/akka-actor-tests/src/test/resources/reference.conf @@ -0,0 +1,5 @@ +akka { + actor { + serialize-messages = on + } +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 79ddb15440..14fbd13756 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -31,7 +31,7 @@ object ActorLifeCycleSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { +class ActorLifeCycleSpec extends AkkaSpec("akka.actor.serialize-messages=off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import ActorLifeCycleSpec._ "An Actor" must { @@ -44,7 +44,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def postRestart(reason: Throwable) { report("postRestart") } - }) + }).withDeploy(Deploy.local) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) expectMsg(("preStart", id, 0)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala index b8232e76a6..c904950d79 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala @@ -72,6 +72,7 @@ object ActorWithStashSpec { } val testConf = """ + akka.actor.serialize-messages = off my-dispatcher { mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 6213642f61..c6e0735c51 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -28,6 +28,12 @@ object DeathWatchSpec { * and therefore the `Terminated` message is wrapped. */ case class WrappedTerminated(t: Terminated) + + case class W(ref: ActorRef) + case class U(ref: ActorRef) + case class FF(fail: Failed) + + case class Latches(t1: TestLatch, t2: TestLatch) extends NoSerializationVerificationNeeded } trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ @@ -126,7 +132,6 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { - case class FF(fail: Failed) val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(child, cause, 0)), child) @@ -185,13 +190,11 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "discard Terminated when unwatched between sysmsg and processing" in { - case class W(ref: ActorRef) - case class U(ref: ActorRef) class Watcher extends Actor { def receive = { case W(ref) ⇒ context watch ref case U(ref) ⇒ context unwatch ref - case (t1: TestLatch, t2: TestLatch) ⇒ + case Latches(t1: TestLatch, t2: TestLatch) ⇒ t1.countDown() Await.ready(t2, 3.seconds) } @@ -201,7 +204,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher") val p = TestProbe() w ! W(p.ref) - w ! ((t1, t2)) + w ! Latches(t1, t2) Await.ready(t1, 3.seconds) watch(p.ref) system stop p.ref diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index b37a9b15aa..e4ab624d1e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -34,6 +34,9 @@ object FSMActorSpec { case object Locked extends LockState case object Open extends LockState + case object Hello + case object Bye + class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] { import latches._ @@ -144,8 +147,6 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } val answerLatch = TestLatch() - object Hello - object Bye val tester = system.actorOf(Props(new Actor { def receive = { case Hello ⇒ lock ! "hello" @@ -254,7 +255,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im "log events and transitions if asked to do so" in { import scala.collection.JavaConverters._ - val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", + val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", "akka.actor.serialize-messages" -> "off", "akka.actor.debug.fsm" -> true).asJava).withFallback(system.settings.config) val fsmEventSystem = ActorSystem("fsmEvent", config) try { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 0e77d4a7a9..af06f59c5f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -11,8 +11,13 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import java.util.concurrent.TimeoutException +object ReceiveTimeoutSpec { + case object Tick +} + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ReceiveTimeoutSpec extends AkkaSpec { + import ReceiveTimeoutSpec._ "An actor with receive timeout" must { @@ -33,7 +38,6 @@ class ReceiveTimeoutSpec extends AkkaSpec { "reschedule timeout after regular receive" in { val timeoutLatch = TestLatch() - case object Tick val timeoutActor = system.actorOf(Props(new Actor { context.setReceiveTimeout(500 milliseconds) @@ -53,7 +57,6 @@ class ReceiveTimeoutSpec extends AkkaSpec { "be able to turn off timeout if desired" in { val count = new AtomicInteger(0) val timeoutLatch = TestLatch() - case object Tick val timeoutActor = system.actorOf(Props(new Actor { context.setReceiveTimeout(500 milliseconds) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 169c84b8e5..a33a4a594a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -19,7 +19,7 @@ import scala.concurrent.duration._ import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RestartStrategySpec extends AkkaSpec with DefaultTimeout { +class RestartStrategySpec extends AkkaSpec("akka.actor.serialize-messages = off") with DefaultTimeout { override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index c40f9b02ff..49d3cae077 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -22,6 +22,7 @@ object SchedulerSpec { val testConf = ConfigFactory.parseString(""" akka.scheduler.implementation = akka.actor.DefaultScheduler akka.scheduler.ticks-per-wheel = 32 + akka.actor.serialize-messages = off """).withFallback(AkkaSpec.testConf) val testConfRevolver = ConfigFactory.parseString(""" diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index ffb2bbb2d5..1e541c7942 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -71,6 +71,7 @@ object SupervisorHierarchySpec { type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator" } akka.loglevel = INFO + akka.actor.serialize-messages = off akka.actor.debug.fsm = on """) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 444f5087da..2f05b18ceb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -17,6 +17,7 @@ import scala.util.control.NonFatal object SupervisorMiscSpec { val config = """ + akka.actor.serialize-messages = off pinned-dispatcher { executor = thread-pool-executor type = PinnedDispatcher diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 2be02f1478..2fcdcf9a9c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -67,7 +67,7 @@ object SupervisorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { +class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import SupervisorSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index f10ae024f3..d05904f7dc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -15,7 +15,7 @@ import akka.dispatch.Dispatchers import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { +class SupervisorTreeSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender with DefaultTimeout { "In a 3 levels deep supervisor tree (linked in the constructor) we" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 14f03799dc..4cf06127e8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -28,7 +28,7 @@ import scala.annotation.tailrec object ActorModelSpec { - sealed trait ActorModelMessage + sealed trait ActorModelMessage extends NoSerializationVerificationNeeded case class TryReply(expect: Any) extends ActorModelMessage diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 5a6b6892de..e51131dc94 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -28,8 +28,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin settings.ConfigVersion must equal("2.3-SNAPSHOT") getBoolean("akka.daemonic") must equal(false) - getBoolean("akka.actor.serialize-messages") must equal(false) - settings.SerializeAllMessages must equal(false) + + // WARNING: This setting must be off in the default reference.conf, but must be on when running + // the test suite. + getBoolean("akka.actor.serialize-messages") must equal(true) + settings.SerializeAllMessages must equal(true) getInt("akka.scheduler.ticks-per-wheel") must equal(512) getMilliseconds("akka.scheduler.tick-duration") must equal(10) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index cfaf34c183..59cbb80ed9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -49,6 +49,9 @@ object FutureSpec { sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) } } + + case class Req[T](req: T) + case class Res[T](res: T) } class JavaFutureSpec extends JavaFutureTests with JUnitSuiteLike @@ -268,8 +271,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "support pattern matching within a for-comprehension" in { filterException[NoSuchElementException] { - case class Req[T](req: T) - case class Res[T](res: T) val actor = system.actorOf(Props(new Actor { def receive = { case Req(s: String) ⇒ sender ! Res(s.length) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index ce90666293..99c7a0a4fb 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -241,6 +241,7 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec { object SingleConsumerOnlyMailboxVerificationSpec { case object Ping val mailboxConf = ConfigFactory.parseString(""" + akka.actor.serialize-messages = off test-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" throughput = 1 diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 1f59969782..23fb529cb9 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -11,12 +11,13 @@ import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ import akka.event.Logging.InitializeLogger import akka.pattern.gracefulStop -import akka.testkit.{ TestProbe, AkkaSpec } +import akka.testkit.{ EventFilter, TestEvent, TestProbe, AkkaSpec } object EventStreamSpec { val config = ConfigFactory.parseString(""" akka { + actor.serialize-messages = off stdout-loglevel = WARNING loglevel = INFO loggers = ["akka.event.EventStreamSpec$MyLog", "%s"] @@ -25,6 +26,7 @@ object EventStreamSpec { val configUnhandled = ConfigFactory.parseString(""" akka { + actor.serialize-messages = off stdout-loglevel = WARNING loglevel = DEBUG actor.debug.unhandled = on diff --git a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala index 5966d15857..32c1b8e137 100644 --- a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala @@ -14,7 +14,12 @@ import akka.io.TcpPipelineHandler.Management import akka.actor.ActorRef import akka.actor.Deploy +object DelimiterFramingSpec { + case class Listener(ref: ActorRef) +} + class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") { + import DelimiterFramingSpec._ val addresses = TestUtils.temporaryServerAddresses(4) @@ -110,8 +115,6 @@ class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on" } } - case class Listener(ref: ActorRef) - class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging { import Tcp.Connected diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index ea492f82ae..9560548293 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -23,10 +23,17 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } import akka.TestUtils._ +object TcpConnectionSpec { + case object Ack extends Event + case class Registration(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded +} + class TcpConnectionSpec extends AkkaSpec(""" akka.io.tcp.register-timeout = 500ms akka.actor.serialize-creators = on """) { + import TcpConnectionSpec._ + // Helper to avoid Windows localization specific differences def ignoreIfWindows(): Unit = if (Helpers.isWindows) { @@ -725,7 +732,7 @@ class TcpConnectionSpec extends AkkaSpec(""" } def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = - registerCallReceiver.ref.tell(channel -> initialOps, channelActor) + registerCallReceiver.ref.tell(Registration(channel, initialOps), channelActor) def setServerSocketOptions() = () @@ -755,7 +762,7 @@ class TcpConnectionSpec extends AkkaSpec(""" lazy val clientSideChannel = connectionActor.underlyingActor.channel override def run(body: ⇒ Unit): Unit = super.run { - registerCallReceiver.expectMsg(clientSideChannel -> OP_CONNECT) + registerCallReceiver.expectMsg(Registration(clientSideChannel, OP_CONNECT)) registerCallReceiver.sender must be(connectionActor) body } @@ -903,5 +910,4 @@ class TcpConnectionSpec extends AkkaSpec(""" } } - object Ack extends Event } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index fda9850fe8..4da15664ed 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -23,7 +23,7 @@ object PatternSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PatternSpec extends AkkaSpec { +class PatternSpec extends AkkaSpec("akka.actor.serialize-messages = off") { implicit val ec = system.dispatcher import PatternSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 398a931ca6..803dd9b9d1 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -18,6 +18,7 @@ import scala.util.Try object ResizerSpec { val config = """ + akka.actor.serialize-messages = off akka.actor.deployment { /router1 { router = round-robin diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 8387cfc2c3..a95b6e94e8 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger object RoutingSpec { val config = """ + akka.actor.serialize-messages = off akka.actor.deployment { /router1 { router = round-robin diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 2f99e9d1d7..fdaa7b6b49 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -25,6 +25,7 @@ object SerializationTests { val serializeConf = """ akka { actor { + serialize-messages = off serializers { test = "akka.serialization.TestSerializer" } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index c77d3fe040..3644bc237b 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -87,7 +87,9 @@ object FSM { /** * INTERNAL API */ - private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) { + // FIXME: what about the cancellable? + private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) + extends NoSerializationVerificationNeeded { private var ref: Option[Cancellable] = _ private val scheduler = context.system.scheduler private implicit val executionContext = context.dispatcher @@ -676,13 +678,13 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging { * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an * `Event`, which allows pattern matching to extract both state and data. */ - case class Event(event: Any, stateData: D) + case class Event(event: Any, stateData: D) extends NoSerializationVerificationNeeded /** * Case class representing the state of the [[akka.actor.FSM]] whithin the * `onTermination` block. */ - case class StopEvent(reason: Reason, currentState: S, stateData: D) + case class StopEvent(reason: Reason, currentState: S, stateData: D) extends NoSerializationVerificationNeeded } /** diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index c23e8ec31a..7382569ad0 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -243,7 +243,7 @@ object IO { /** * Messages used to communicate with an [[akka.actor.IOManager]]. */ - sealed trait IOMessage + sealed trait IOMessage extends NoSerializationVerificationNeeded /** * Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel @@ -912,6 +912,13 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { require(readBufferSize <= Int.MaxValue && readBufferSize > 0) require(selectInterval > 0) } + + /** + * INTERNAL API + * + * unique message that is sent to ourself to initiate the next select + */ + private[akka] case object Select } /** @@ -922,6 +929,7 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { final class IOManagerActor(val settings: Settings) extends Actor with ActorLogging { import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT } import settings.{ defaultBacklog, selectInterval, readBufferSize } + import IOManager.Select private type ReadChannel = ReadableByteChannel with SelectableChannel private type WriteChannel = WritableByteChannel with SelectableChannel @@ -956,9 +964,6 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi */ private var fastSelect = false - /** unique message that is sent to ourself to initiate the next select */ - private case object Select - /** This method should be called after receiving any message */ private def run() { if (!running) { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 0dd8afe1c2..6cbeb13d2b 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -21,16 +21,10 @@ import akka.pattern.ask import akka.actor.ActorDSL import akka.actor.Props -trait Inbox { this: ActorDSL.type ⇒ - - protected trait InboxExtension { this: Extension ⇒ - val DSLInboxQueueSize = config.getInt("inbox-size") - - val inboxNr = new AtomicInteger - val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize) - - def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet) - } +/** + * INTERNAL API + */ +private[akka] object Inbox { private sealed trait Query { def deadline: Deadline @@ -45,6 +39,22 @@ trait Inbox { this: ActorDSL.type ⇒ } private case class StartWatch(target: ActorRef) private case object Kick + +} + +trait Inbox { this: ActorDSL.type ⇒ + + import Inbox._ + + protected trait InboxExtension { this: Extension ⇒ + val DSLInboxQueueSize = config.getInt("inbox-size") + + val inboxNr = new AtomicInteger + val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize) + + def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet) + } + private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 6c0b9971e1..2d1604de65 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -107,10 +107,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒ def sendMessage(msg: Envelope): Unit = try { - val m = msg.message.asInstanceOf[AnyRef] - if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) { - val s = SerializationExtension(system) - s.deserialize(s.serialize(m).get, m.getClass).get + if (system.settings.SerializeAllMessages) { + val unwrapped = (msg.message match { + case DeadLetter(wrapped, _, _) ⇒ wrapped + case other ⇒ other + }).asInstanceOf[AnyRef] + if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) { + val s = SerializationExtension(system) + s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get + } } dispatcher.dispatch(this, msg) } catch handleException diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala index 79c785f340..75e44af13f 100644 --- a/akka-actor/src/main/scala/akka/io/Pipelines.scala +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -736,6 +736,11 @@ object BackpressureBuffer { */ trait LowWatermarkReached extends Tcp.Event case object LowWatermarkReached extends LowWatermarkReached + + /** + * INTERNAL API + */ + private[io] case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event } /** @@ -769,8 +774,6 @@ class BackpressureBuffer(lowBytes: Long, highBytes: Long, maxBytes: Long) require(highBytes >= lowBytes, "highWatermark needs to be at least as large as lowWatermark") require(maxBytes >= highBytes, "maxCapacity needs to be at least as large as highWatermark") - case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event - override def apply(ctx: HasLogging) = new PipePair[Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { import Tcp._ diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index e37e387737..59e126e9a6 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -54,7 +54,7 @@ private[io] trait ChannelRegistry { * a result of it having called `register` on the `ChannelRegistry`. * Enables a channel actor to directly schedule interest setting tasks to the selector mgmt. dispatcher. */ -private[io] trait ChannelRegistration { +private[io] trait ChannelRegistration extends NoSerializationVerificationNeeded { def enableInterest(op: Int) def disableInterest(op: Int) } @@ -66,8 +66,9 @@ private[io] object SelectionHandler { } case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry ⇒ Props) + extends NoSerializationVerificationNeeded - case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) } + case class Retry(command: WorkerForCommand, retriesLeft: Int) extends NoSerializationVerificationNeeded { require(retriesLeft >= 0) } case object ChannelConnectable case object ChannelAcceptable diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index c2b8a311d3..e6b0c6e1bd 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -94,7 +94,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { /** * The common interface for [[Command]] and [[Event]]. */ - sealed trait Message + sealed trait Message extends NoSerializationVerificationNeeded /// COMMANDS diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 627c6a7513..839c9af554 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -437,7 +437,7 @@ private[io] object TcpConnection { // INTERNAL MESSAGES /** Informs actor that no writing was possible but there is still work remaining */ - case class SendBufferFull(remainingWrite: PendingWrite) + case class SendBufferFull(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded /** Informs actor that a pending file write has finished */ case object WriteFileFinished /** Informs actor that a pending WriteFile failed */ diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 040e09ef88..c18bd8a310 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -8,7 +8,7 @@ import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel } import java.net.InetSocketAddress import scala.annotation.tailrec import scala.util.control.NonFatal -import akka.actor.{ Props, ActorLogging, ActorRef, Actor } +import akka.actor._ import akka.io.SelectionHandler._ import akka.io.Tcp._ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } @@ -18,11 +18,11 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } */ private[io] object TcpListener { - case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage { + case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage with NoSerializationVerificationNeeded { def failureMessage = FailedRegisterIncoming(channel) } - case class FailedRegisterIncoming(channel: SocketChannel) + case class FailedRegisterIncoming(channel: SocketChannel) extends NoSerializationVerificationNeeded } diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala index 3d4d2295bc..261d5cdb82 100644 --- a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -6,7 +6,7 @@ package akka.io import scala.beans.BeanProperty import scala.util.{ Failure, Success } -import akka.actor.{ Actor, ActorContext, ActorRef, Props, Terminated } +import akka.actor._ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString import akka.event.Logging @@ -50,12 +50,12 @@ object TcpPipelineHandler { /** * Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor. */ - case class Command(@BeanProperty cmd: Cmd) + case class Command(@BeanProperty cmd: Cmd) extends NoSerializationVerificationNeeded /** * Wrapper class for events emitted by the [[TcpPipelineHandler]] actor. */ - case class Event(@BeanProperty evt: Evt) + case class Event(@BeanProperty evt: Evt) extends NoSerializationVerificationNeeded } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8ac8f72f11..d03d21fa29 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -9,14 +9,11 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } -import akka.actor.OneForOneStrategy +import akka.actor._ import akka.actor.SupervisorStrategy.Stop import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ -import akka.actor.ActorSelection import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } -import akka.actor.Deploy /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -132,7 +129,7 @@ private[cluster] object InternalClusterAction { * Comand to [[akka.cluster.ClusterDaemon]] to create a * [[akka.cluster.OnMemberUpListener]]. */ - case class AddOnMemberUpListener(callback: Runnable) + case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded sealed trait SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index addbced1f2..9c4a96fe79 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -118,6 +118,8 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { exercise-actors = on } + akka.actor.serialize-messages = off + akka.actor.serialize-creators = off akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { auto-down = on diff --git a/akka-cluster/src/test/resources/reference.conf b/akka-cluster/src/test/resources/reference.conf index f05a5c454e..4948247054 100644 --- a/akka-cluster/src/test/resources/reference.conf +++ b/akka-cluster/src/test/resources/reference.conf @@ -1 +1,6 @@ -akka.actor.serialize-creators=on \ No newline at end of file +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/SerializationChecksSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SerializationChecksSpec.scala new file mode 100644 index 0000000000..81011ed890 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/SerializationChecksSpec.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import akka.testkit.AkkaSpec + +class SerializationChecksSpec extends AkkaSpec { + + "Settings serialize-messages and serialize-creators" must { + + "be on for tests" in { + system.settings.SerializeAllCreators must be(true) + system.settings.SerializeAllMessages must be(true) + } + + } + +} diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 6cfe995efb..59adf4607f 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import scala.collection.immutable import akka.actor.Actor import akka.actor.Actor.Receive +import akka.actor.Deploy import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.ActorSelection @@ -38,7 +39,7 @@ object ClusterSingletonManager { maxTakeOverRetries: Int = 15, retryInterval: FiniteDuration = 1.second): Props = Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role, - maxHandOverRetries, maxTakeOverRetries, retryInterval) + maxHandOverRetries, maxTakeOverRetries, retryInterval).withDeploy(Deploy.local) /** * Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]. diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index 9020794c06..f7803a3138 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -12,6 +12,7 @@ import org.scalatest.BeforeAndAfterEach import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.actor.Props import akka.actor.Actor +import akka.actor.Deploy import akka.testkit.ImplicitSender import scala.concurrent.duration._ import akka.actor.FSM @@ -60,7 +61,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod def receive = { case x ⇒ testActor ! x } - }), "echo") + }).withDeploy(Deploy.local), "echo") } enterBarrier("initialize") diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 681544f692..986c4448ba 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -4,7 +4,7 @@ package akka.remote.testconductor import language.postfixOps -import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props } +import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, NoSerializationVerificationNeeded } import RemoteConnection.getAddrString import TestConductorProtocol._ import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent } @@ -362,7 +362,7 @@ private[akka] object Controller { class ClientDisconnectedException(msg: String) extends AkkaException(msg) with NoStackTrace case object GetNodes case object GetSockAddr - case class CreateServerFSM(channel: Channel) + case class CreateServerFSM(channel: Channel) extends NoSerializationVerificationNeeded case class NodeInfo(name: RoleName, addr: Address, fsm: ActorRef) } diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index f0d8a1a43f..043ff76568 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -6,7 +6,7 @@ package akka.remote.testconductor import language.postfixOps import java.util.concurrent.TimeoutException -import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } +import akka.actor._ import akka.remote.testconductor.RemoteConnection.getAddrString import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Await, Future } @@ -122,7 +122,7 @@ private[akka] object ClientFSM { case class Data(channel: Option[Channel], runningOp: Option[(String, ActorRef)]) - case class Connected(channel: Channel) + case class Connected(channel: Channel) extends NoSerializationVerificationNeeded case class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace case object Disconnected } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 233139d24e..b6d5dd0259 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -5,8 +5,7 @@ package akka.remote.testconductor import language.postfixOps import com.typesafe.config.ConfigFactory -import akka.actor.Props -import akka.actor.Actor +import akka.actor.{Props, Actor, ActorIdentity, Identify, Deploy} import scala.concurrent.Await import scala.concurrent.Awaitable import scala.concurrent.duration._ @@ -16,8 +15,6 @@ import java.net.InetSocketAddress import java.net.InetAddress import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig } import akka.remote.transport.ThrottlerTransportAdapter.Direction -import akka.actor.Identify -import akka.actor.ActorIdentity object TestConductorMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false)) @@ -50,7 +47,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with ST def receive = { case x ⇒ testActor ! x; sender ! x } - }), "echo") + }).withDeploy(Deploy.local), "echo") } enterBarrier("name") diff --git a/akka-remote-tests/src/test/resources/reference.conf b/akka-remote-tests/src/test/resources/reference.conf new file mode 100644 index 0000000000..471b1efcfd --- /dev/null +++ b/akka-remote-tests/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/SerializationChecksSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/SerializationChecksSpec.scala new file mode 100644 index 0000000000..7d6e742b6b --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/SerializationChecksSpec.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import akka.testkit.AkkaSpec + +class SerializationChecksSpec extends AkkaSpec { + + "Settings serialize-messages and serialize-creators" must { + + "be on for tests" in { + system.settings.SerializeAllCreators must be(true) + system.settings.SerializeAllMessages must be(true) + } + + } + +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index fe24dc1da1..321e856ac9 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -5,7 +5,7 @@ package akka.remote.testconductor import language.postfixOps -import akka.actor.{ Props, AddressFromURIString, ActorRef, Actor, OneForOneStrategy, SupervisorStrategy, PoisonPill } +import akka.actor._ import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter, TestProbe, TimingTest } import scala.concurrent.duration._ import akka.event.Logging @@ -531,7 +531,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { def receive = { case x: InetSocketAddress ⇒ testActor ! controller } - })) + }).withDeploy(Deploy.local)) val actor = expectMsgType[ActorRef] f(actor) actor ! PoisonPill // clean up so network connections don't accumulate during test run @@ -550,7 +550,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { def receive = { case _ ⇒ sender ! barrier } - })) ! "" + }).withDeploy(Deploy.local)) ! "" expectMsgType[ActorRef] } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 0a4b9291c6..4c460287da 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -388,13 +388,15 @@ private[remote] object EndpointWriter { * used instead. * @param handle Handle of the new inbound association. */ - case class TakeOver(handle: AkkaProtocolHandle) + case class TakeOver(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded case object BackoffTimer case object FlushAndStop case object AckIdleCheckTimer case class StopReading(writer: ActorRef) case class StoppedReading(writer: ActorRef) + case class Handle(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded + case class OutboundAck(ack: Ack) sealed trait State @@ -475,7 +477,7 @@ private[remote] class EndpointWriter( reader = startReadEndpoint(h) Writing case None ⇒ - transport.associate(remoteAddress) pipeTo self + transport.associate(remoteAddress).mapTo[AkkaProtocolHandle].map(Handle(_)) pipeTo self Initializing }, stateData = ()) @@ -489,7 +491,7 @@ private[remote] class EndpointWriter( publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel) case Event(Status.Failure(e), _) ⇒ publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel) - case Event(inboundHandle: AkkaProtocolHandle, _) ⇒ + case Event(Handle(inboundHandle), _) ⇒ // Assert handle == None? context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) handle = Some(inboundHandle) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a84e572955..0fe85857e6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -24,6 +24,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } */ private[akka] object RemoteActorRefProvider { private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) + extends NoSerializationVerificationNeeded sealed trait TerminatorState case object Uninitialized extends TerminatorState diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index ab2ce17756..fecae6d90a 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -83,7 +83,7 @@ private[remote] object Remoting { } } - case class RegisterTransportActor(props: Props, name: String) + case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { override def supervisorStrategy = OneForOneStrategy() { @@ -233,7 +233,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private[remote] object EndpointManager { // Messages between Remoting and EndpointManager - sealed trait RemotingCommand + sealed trait RemotingCommand extends NoSerializationVerificationNeeded case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand case object StartupFinished extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand @@ -250,10 +250,12 @@ private[remote] object EndpointManager { case class ManagementCommandAck(status: Boolean) // Messages internal to EndpointManager - case object Prune + case object Prune extends NoSerializationVerificationNeeded case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]], results: Seq[(Transport, Address, Promise[AssociationEventListener])]) + extends NoSerializationVerificationNeeded case class ListensFailure(addressesPromise: Promise[Seq[(Transport, Address)]], cause: Throwable) + extends NoSerializationVerificationNeeded // Helper class to store address pairs case class Link(localAddress: Address, remoteAddress: Address) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 4e70a18cad..937274b7a1 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -120,7 +120,7 @@ abstract class AbstractTransportAdapterHandle(val originalLocalAddress: Address, } object ActorTransportAdapter { - sealed trait TransportOperation + sealed trait TransportOperation extends NoSerializationVerificationNeeded case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 9a1ad860c7..3ef720b4ee 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -91,7 +91,7 @@ private[remote] class AkkaProtocolTransport( protected def managerProps = { val wt = wrappedTransport val s = settings - Props(classOf[AkkaProtocolManager], wt, s) + Props(classOf[AkkaProtocolManager], wt, s).withDeploy(Deploy.local) } } @@ -185,9 +185,11 @@ private[transport] object ProtocolStateActor { */ case object Open extends AssociationState - case object HeartbeatTimer + case object HeartbeatTimer extends NoSerializationVerificationNeeded - case class HandleListenerRegistered(listener: HandleEventListener) + case class Handle(handle: AssociationHandle) extends NoSerializationVerificationNeeded + + case class HandleListenerRegistered(listener: HandleEventListener) extends NoSerializationVerificationNeeded sealed trait ProtocolStateData trait InitialProtocolStateData extends ProtocolStateData @@ -251,7 +253,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat initialData match { case d: OutboundUnassociated ⇒ - d.transport.associate(d.remoteAddress) pipeTo self + d.transport.associate(d.remoteAddress).map(Handle(_)) pipeTo self startWith(Closed, d) case d: InboundUnassociated ⇒ @@ -266,7 +268,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat statusPromise.failure(e) stop() - case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _)) ⇒ + case Event(Handle(wrappedHandle), OutboundUnassociated(_, statusPromise, _)) ⇒ wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self)) if (sendAssociate(wrappedHandle, localHandshakeInfo)) { failureDetector.heartbeat() @@ -275,7 +277,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } else { // Underlying transport was busy -- Associate could not be sent - setTimer("associate-retry", wrappedHandle, RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false) + setTimer("associate-retry", Handle(wrappedHandle), RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false) stay() } diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 2f80ccf475..94c47241c0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -8,7 +8,7 @@ import akka.pattern.{ PromiseActorRef, ask, pipe } import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.AkkaPduCodec.Associate import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } -import akka.remote.transport.ThrottlerManager.Checkin +import akka.remote.transport.ThrottlerManager.{ Listener, Handle, ListenerAndMode, Checkin } import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.Transport._ import akka.util.{ Timeout, ByteString } @@ -90,7 +90,7 @@ object ThrottlerTransportAdapter { def getInstance = this } - sealed trait ThrottleMode { + sealed trait ThrottleMode extends NoSerializationVerificationNeeded { def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration } @@ -183,8 +183,16 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA * INTERNAL API */ private[transport] object ThrottlerManager { - case class OriginResolved() - case class Checkin(origin: Address, handle: ThrottlerHandle) + case class Checkin(origin: Address, handle: ThrottlerHandle) extends NoSerializationVerificationNeeded + + case class AssociateResult(handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) + extends NoSerializationVerificationNeeded + + case class ListenerAndMode(listener: HandleEventListener, mode: ThrottleMode) extends NoSerializationVerificationNeeded + + case class Handle(handle: ThrottlerHandle) extends NoSerializationVerificationNeeded + + case class Listener(listener: HandleEventListener) extends NoSerializationVerificationNeeded } /** @@ -192,6 +200,7 @@ private[transport] object ThrottlerManager { */ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager { + import ThrottlerManager._ import context.dispatcher private var throttlingModes = Map[Address, (ThrottleMode, Direction)]() @@ -202,20 +211,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A override def ready: Receive = { case InboundAssociation(handle) ⇒ val wrappedHandle = wrapHandle(handle, associationListener, inbound = true) - wrappedHandle.throttlerActor ! wrappedHandle + wrappedHandle.throttlerActor ! Handle(wrappedHandle) case AssociateUnderlying(remoteAddress, statusPromise) ⇒ wrappedTransport.associate(remoteAddress) onComplete { // Slight modification of pipe, only success is sent, failure is propagated to a separate future - case Success(handle) ⇒ self ! ((handle, statusPromise)) + case Success(handle) ⇒ self ! AssociateResult(handle, statusPromise) case Failure(e) ⇒ statusPromise.failure(e) } // Finished outbound association and got back the handle - case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒ //FIXME switch to a real message iso Tuple2 + case AssociateResult(handle, statusPromise) ⇒ val wrappedHandle = wrapHandle(handle, associationListener, inbound = false) val naked = nakedAddress(handle.remoteAddress) val inMode = getInboundMode(naked) wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked)) - wrappedHandle.readHandlerPromise.future map { _ -> inMode } pipeTo wrappedHandle.throttlerActor + wrappedHandle.readHandlerPromise.future map { ListenerAndMode(_, inMode) } pipeTo wrappedHandle.throttlerActor handleTable ::= naked -> wrappedHandle statusPromise.success(wrappedHandle) case SetThrottle(address, direction, mode) ⇒ @@ -356,7 +365,7 @@ private[transport] class ThrottledAssociation( } when(WaitExposedHandle) { - case Event(handle: ThrottlerHandle, Uninitialized) ⇒ + case Event(Handle(handle), Uninitialized) ⇒ // register to downstream layer and wait for origin originalHandle.readHandlerPromise.success(ActorHandleEventListener(self)) goto(WaitOrigin) using ExposedHandle(handle) @@ -385,7 +394,7 @@ private[transport] class ThrottledAssociation( stop() } else { associationHandler notify InboundAssociation(exposedHandle) - exposedHandle.readHandlerPromise.future pipeTo self + exposedHandle.readHandlerPromise.future.map(Listener(_)) pipeTo self goto(WaitUpstreamListener) } finally sender ! SetThrottleAck } @@ -394,14 +403,14 @@ private[transport] class ThrottledAssociation( case Event(InboundPayload(p), _) ⇒ throttledMessages = throttledMessages enqueue p stay() - case Event(listener: HandleEventListener, _) ⇒ + case Event(Listener(listener), _) ⇒ upstreamListener = listener self ! Dequeue goto(Throttling) } when(WaitModeAndUpstreamListener) { - case Event((listener: HandleEventListener, mode: ThrottleMode), _) ⇒ + case Event(ListenerAndMode(listener: HandleEventListener, mode: ThrottleMode), _) ⇒ upstreamListener = listener inboundThrottleMode = mode self ! Dequeue diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 42d88f9a3a..f71b797afe 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -4,7 +4,7 @@ package akka.remote.transport import scala.concurrent.{ Promise, Future } -import akka.actor.{ ActorRef, Address } +import akka.actor.{ NoSerializationVerificationNeeded, ActorRef, Address } import akka.util.ByteString import akka.remote.transport.AssociationHandle.HandleEventListener import akka.AkkaException @@ -12,7 +12,7 @@ import scala.util.control.NoStackTrace object Transport { - trait AssociationEvent + trait AssociationEvent extends NoSerializationVerificationNeeded /** * Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address, @@ -145,7 +145,7 @@ object AssociationHandle { /** * Trait for events that the registered listener for an [[akka.remote.transport.AssociationHandle]] might receive. */ - sealed trait HandleEvent + sealed trait HandleEvent extends NoSerializationVerificationNeeded /** * Message sent to the listener registered to an association (via the Promise returned by diff --git a/akka-remote/src/test/resources/reference.conf b/akka-remote/src/test/resources/reference.conf index f05a5c454e..471b1efcfd 100644 --- a/akka-remote/src/test/resources/reference.conf +++ b/akka-remote/src/test/resources/reference.conf @@ -1 +1,6 @@ -akka.actor.serialize-creators=on \ No newline at end of file +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index ed10fb8c5c..66534ef5d6 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -500,6 +500,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to serialize a local actor ref from another actor system" in { val config = ConfigFactory.parseString(""" + # Additional internal serialization verification need so be off, otherwise it triggers two error messages + # instead of one: one for the internal check, and one for the actual remote send -- tripping off this test + akka.actor.serialize-messages = off akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] akka.remote.test.local-address = "test://other-system@localhost:12347" """).withFallback(remoteSystem.settings.config) diff --git a/akka-remote/src/test/scala/akka/remote/SerializationChecksPlainRemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/SerializationChecksPlainRemotingSpec.scala new file mode 100644 index 0000000000..33434a560c --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/SerializationChecksPlainRemotingSpec.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import akka.testkit.AkkaSpec + +class SerializationChecksPlainRemotingSpec extends AkkaSpec { + + "Settings serialize-messages and serialize-creators" must { + + "be on for tests" in { + system.settings.SerializeAllCreators must be(true) + system.settings.SerializeAllMessages must be(true) + } + + } + +}