diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 7c676c4ec0..6213642f61 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -87,7 +87,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout case "ping" ⇒ sender ! "pong" case t: Terminated ⇒ testActor ! WrappedTerminated(t) } - })) + }).withDeploy(Deploy.local)) monitor2 ! "ping" @@ -133,7 +133,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout super.handleFailure(context, child, cause, stats, children) } } - val supervisor = system.actorOf(Props(new Supervisor(strategy))) + val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local)) val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) val brother = Await.result((supervisor ? Props(new Actor { @@ -166,7 +166,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout context unbecome } } - })) + }).withDeploy(Deploy.local)) parent ! "NKOTB" expectMsg("GREEN") @@ -198,7 +198,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } val t1, t2 = TestLatch() - val w = system.actorOf(Props(new Watcher), "myDearWatcher") + val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher") val p = TestProbe() w ! W(p.ref) w ! ((t1, t2)) diff --git a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala index c8d89f7c6e..0f3849963b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala @@ -6,16 +6,15 @@ package akka.io import java.net.InetSocketAddress import java.security.MessageDigest - import scala.concurrent.Await import scala.concurrent.duration.{ Duration, DurationInt } import scala.concurrent.forkjoin.ThreadLocalRandom - import akka.actor.{ Actor, ActorContext, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated } import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext } import akka.pattern.ask import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.util.{ ByteString, Timeout } +import akka.actor.Deploy object BackpressureSpec { @@ -38,7 +37,7 @@ object BackpressureSpec { val init = TcpPipelineHandler.withLogger(log, new TcpReadWriteAdapter >> new BackpressureBuffer(10000, 1000000, Long.MaxValue)) - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline") sender ! Tcp.Register(handler) unstashAll() context.become(connected(init, handler)) @@ -78,6 +77,8 @@ object BackpressureSpec { val failed: Receive = { case _ ⇒ sender ! Failed } + + override def postRestart(thr: Throwable): Unit = context.stop(self) } case object GetPort @@ -113,7 +114,7 @@ object BackpressureSpec { val init = TcpPipelineHandler.withLogger(log, new TcpReadWriteAdapter >> new BackpressureBuffer(10000, 1000000, Long.MaxValue)) - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline") sender ! Tcp.Register(handler) unstashAll() context.become(connected(init, handler)) @@ -147,10 +148,12 @@ object BackpressureSpec { val failed: Receive = { case _ ⇒ sender ! Failed } + + override def postRestart(thr: Throwable): Unit = context.stop(self) } } -class BackpressureSpec extends AkkaSpec with ImplicitSender { +class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with ImplicitSender { import BackpressureSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index 22aad8ae09..71d3f61a50 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -9,7 +9,11 @@ import Tcp._ import akka.TestUtils import TestUtils._ -class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") +class CapacityLimitSpec extends AkkaSpec(""" + akka.loglevel = ERROR + akka.io.tcp.max-channels = 4 + akka.actor.serialize-creators = on + """) with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala index 42a1f8f6a7..6673e9976e 100644 --- a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala @@ -12,8 +12,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import akka.io.TcpPipelineHandler.Management import akka.actor.ActorRef +import akka.actor.Deploy -class DelimiterFramingSpec extends AkkaSpec { +class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") { val addresses = TestUtils.temporaryServerAddresses(4) @@ -40,7 +41,7 @@ class DelimiterFramingSpec extends AkkaSpec { val counter = new AtomicInteger def testSetup(serverAddress: InetSocketAddress, delimiter: String, includeDelimiter: Boolean): Unit = { - val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter)) + val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter).withDeploy(Deploy.local)) val probe = TestProbe() probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) probe.expectMsgType[Tcp.Bound] @@ -68,7 +69,7 @@ class DelimiterFramingSpec extends AkkaSpec { import init._ - val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref), + val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local), "client" + counter.incrementAndGet()) probe.send(connection, Tcp.Register(handler)) @@ -128,7 +129,7 @@ class DelimiterFramingSpec extends AkkaSpec { import init._ val connection = sender - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline") connection ! Tcp.Register(handler) diff --git a/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala index 1d09b92bfd..b1f2d04487 100644 --- a/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala @@ -12,7 +12,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.Try import scala.util.Success -class PipelineSpec extends AkkaSpec { +class PipelineSpec extends AkkaSpec("akka.actor.serialize-creators = on") { trait Level1 trait Level2 diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 13d67e5e4b..1da50bfa97 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -23,7 +23,10 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } import akka.TestUtils._ -class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { +class TcpConnectionSpec extends AkkaSpec(""" + akka.io.tcp.register-timeout = 500ms + akka.actor.serialize-creators = on + """) { // Helper to avoid Windows localization specific differences def ignoreIfWindows(): Unit = if (Helpers.isWindows) { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 383e0fe225..52fa79688a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -12,7 +12,10 @@ import TestUtils._ import akka.testkit.EventFilter import java.io.IOException -class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegrationSpecSupport { +class TcpIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on + """) with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 44757f182e..74d2bd70b6 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -15,7 +15,10 @@ import akka.io.SelectionHandler._ import akka.TestUtils import Tcp._ -class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { +class TcpListenerSpec extends AkkaSpec(""" + akka.io.tcp.batch-accept-limit = 2 + akka.actor.serialize-creators = on + """) { "A TcpListener" must { @@ -130,7 +133,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { private class ListenerParent extends Actor with ChannelRegistry { val listener = context.actorOf( props = Props(classOf[TcpListener], selectorRouter.ref, Tcp(system), this, bindCommander.ref, - Bind(handler.ref, endpoint, 100, Nil)), + Bind(handler.ref, endpoint, 100, Nil)).withDeploy(Deploy.local), name = "test-listener-" + counter.next()) parent.watch(listener) def receive: Receive = { diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index d3a7415f99..eedca77a41 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -10,7 +10,10 @@ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpConnectedIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on + """) with ImplicitSender { val addresses = temporaryServerAddresses(3, udp = true) diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 4b278b4eeb..20c98cbb30 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -10,7 +10,9 @@ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on""") with ImplicitSender { val addresses = temporaryServerAddresses(3, udp = true) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index da6e4cd33d..2f99e9d1d7 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -276,7 +276,7 @@ class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerial })) system stop c - intercept[java.io.NotSerializableException] { + intercept[IllegalArgumentException] { val d = system.actorOf(Props(new NonSerializableActor(system))) } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6fe23e8bc4..96c16a8e91 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -75,7 +75,8 @@ akka { serialize-messages = off # Serializes and deserializes creators (in Props) to ensure that they can be - # sent over the network, this is only intended for testing. + # sent over the network, this is only intended for testing. Purely local deployments + # as marked with deploy.scope == LocalScope are exempt from verification. serialize-creators = off # Timeout for send operations to top-level actors which are in the process diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 25833918c1..7b271cc305 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -16,6 +16,7 @@ import scala.annotation.tailrec object Deploy { final val NoDispatcherGiven = "" final val NoMailboxGiven = "" + val local = Deploy(scope = LocalScope) } /** @@ -99,6 +100,7 @@ abstract class LocalScope extends Scope * which do not set a different scope. It is also the only scope handled by * the LocalActorRefProvider. */ +@SerialVersionUID(1L) case object LocalScope extends LocalScope { /** * Java API: get the singleton instance @@ -113,6 +115,7 @@ case object LocalScope extends LocalScope { */ @SerialVersionUID(1L) abstract class NoScopeGiven extends Scope +@SerialVersionUID(1L) case object NoScopeGiven extends NoScopeGiven { def withFallback(other: Scope): Scope = other diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index ec944c854f..0bd4d50a05 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -168,12 +168,15 @@ private[akka] trait Children { this: ActorCell ⇒ } private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { - if (cell.system.settings.SerializeAllCreators) { - val ser = SerializationExtension(cell.system) - props.args forall (arg ⇒ - arg.isInstanceOf[NoSerializationVerificationNeeded] || - ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null) - } + if (cell.system.settings.SerializeAllCreators && props.deploy.scope != LocalScope) + try { + val ser = SerializationExtension(cell.system) + props.args forall (arg ⇒ + arg.isInstanceOf[NoSerializationVerificationNeeded] || + ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null) + } catch { + case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) + } /* * in case we are currently terminating, fail external attachChild requests * (internal calls cannot happen anyway because we are suspended) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 10852f4372..b3a2486163 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -79,7 +79,7 @@ private[io] object SelectionHandler { override def supervisorStrategy = connectionSupervisorStrategy val selectorPool = context.actorOf( - props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)), + props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)).withDeploy(Deploy.local), name = "selectors") final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = { @@ -253,7 +253,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A if (MaxChannelsPerSelector == -1 || childCount < MaxChannelsPerSelector) { val newName = sequenceNumber.toString sequenceNumber += 1 - val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher), name = newName) + val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher).withDeploy(Deploy.local), name = newName) childCount += 1 if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited } else { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 0e7fbf6e1a..e4dbd04d1f 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -469,7 +469,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { */ val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher), + props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local), name = "IO-TCP") } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 7b98427e19..8bf89e8dc1 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -215,7 +215,7 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension { val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[UdpManager], this), + props = Props(classOf[UdpManager], this).withDeploy(Deploy.local), name = "IO-UDP-FF") } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnected.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala index d0490cac2b..66fbab1e6d 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnected.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala @@ -151,7 +151,7 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[UdpConnectedManager], this), + props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local), name = "IO-UDP-CONN") } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 3e5b3d137d..7d382bad05 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -489,14 +489,17 @@ case object NoRouter extends NoRouter { */ private[akka] trait OverrideUnsetConfig[T <: RouterConfig] extends RouterConfig { - final def overrideUnsetConfig(other: RouterConfig): RouterConfig = { - val wssConf: OverrideUnsetConfig[T] = if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy) - && (other.supervisorStrategy ne Router.defaultSupervisorStrategy)) - this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]] - else this - if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get) - else wssConf - } + final def overrideUnsetConfig(other: RouterConfig): RouterConfig = + if (other == NoRouter) this // NoRouter is the default, hence “neutral” + else { + val wssConf: OverrideUnsetConfig[T] = + if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy) + && (other.supervisorStrategy ne Router.defaultSupervisorStrategy)) + this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]] + else this + if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get) + else wssConf + } def withSupervisorStrategy(strategy: SupervisorStrategy): T diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9125e10222..e10175f346 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -145,7 +145,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(classOf[ClusterDaemon], settings). - withDispatcher(UseDispatcher), name = "cluster") + withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster") } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 288e8736b4..82ceff0914 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -65,7 +65,7 @@ private[akka] class ClusterActorRefProvider( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local), "remote-watcher") } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d2f08d15eb..76bb96a75f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -17,6 +17,7 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.actor.ActorSelection import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.Deploy /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -167,7 +168,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac def receive = { case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg case AddOnMemberUpListener(code) ⇒ - context.actorOf(Props(classOf[OnMemberUpListener], code)) + context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local)) case PublisherCreated(publisher) ⇒ if (settings.MetricsEnabled) { // metrics must be started after core/publisher to be able diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 6998fbb6c8..2dffc5c9d0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -10,6 +10,7 @@ import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } import akka.cluster.ClusterEvent._ import akka.actor.PoisonPill import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.Deploy /** * INTERNAL API @@ -67,7 +68,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } } - }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") + }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") } def self: Member = { diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala index 4698ee3780..3c7076c2ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala @@ -5,7 +5,6 @@ package akka.cluster.routing import java.util.Arrays - import scala.concurrent.forkjoin.ThreadLocalRandom import scala.collection.immutable import akka.actor.Actor @@ -24,6 +23,7 @@ import akka.cluster.StandardMetrics.HeapMemory import akka.event.Logging import akka.japi.Util.immutableSeq import akka.routing._ +import akka.actor.Deploy object AdaptiveLoadBalancingRouter { private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() { @@ -179,7 +179,7 @@ trait AdaptiveLoadBalancingRouterLike { this: RouterConfig ⇒ metricsSelector.weights(metrics))) } - }).withDispatcher(routerDispatcher), name = "metricsListener") + }).withDispatcher(routerDispatcher).withDeploy(Deploy.local), name = "metricsListener") def getNext(): ActorRef = weightedRoutees match { case Some(weighted) ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 897a8bc2df..31f5a1952c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -26,6 +26,7 @@ import akka.actor.ActorRef import akka.remote.RemoteWatcher import akka.actor.ActorSystem import akka.cluster.MultiNodeClusterSpec.EndActor +import akka.actor.Deploy object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -93,7 +94,7 @@ abstract class ClusterDeathWatchSpec watchEstablished.countDown case Terminated(actor) ⇒ testActor ! actor.path } - }), name = "observer1") + }).withDeploy(Deploy.local), name = "observer1") watchEstablished.await enterBarrier("watch-established") @@ -113,7 +114,7 @@ abstract class ClusterDeathWatchSpec } runOn(second, third, fourth) { - system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject") enterBarrier("subjected-started") enterBarrier("watch-established") runOn(third) { @@ -148,7 +149,7 @@ abstract class ClusterDeathWatchSpec def receive = { case t: Terminated ⇒ testActor ! t.actor.path } - }), name = "observer3") + }).withDeploy(Deploy.local), name = "observer3") expectMsg(path) } @@ -158,7 +159,7 @@ abstract class ClusterDeathWatchSpec "be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) { runOn(fifth) { - system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject5") + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject5") } enterBarrier("subjected-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 8fe06de445..8fc63d482c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -66,7 +67,7 @@ abstract class LeaderLeavingSpec case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") enterBarrier("leader-left") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 6c5657cece..1f08a114b1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -58,7 +59,7 @@ abstract class MembershipChangeListenerExitingSpec removedLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") exitingLatch.await removedLatch.await @@ -76,7 +77,7 @@ abstract class MembershipChangeListenerExitingSpec exitingLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") exitingLatch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 2fbdc298df..7d8cb84fa9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Props import akka.actor.Actor +import akka.actor.Deploy object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -49,7 +50,7 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("listener-1-registered") cluster.join(first) latch.await @@ -76,7 +77,7 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("listener-2-registered") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index d546c8f808..5ccb725aa4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -51,7 +52,7 @@ abstract class NodeLeavingAndExitingSpec case _: MemberRemoved ⇒ // not tested here } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index f7d8987e9a..6dbc8d4ba9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -17,6 +17,7 @@ import akka.actor.Props import akka.actor.Actor import akka.actor.RootActorPath import akka.cluster.MemberStatus._ +import akka.actor.Deploy object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") @@ -74,7 +75,7 @@ abstract class RestartFirstSeedNodeSpec seedNode1Address = a sender ! "ok" } - }), name = "address-receiver") + }).withDeploy(Deploy.local), name = "address-receiver") enterBarrier("seed1-address-receiver-ready") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index cdb9b60792..a51b023b25 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -585,7 +585,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { require(level >= 1) def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width) val indexedChildren = - 0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector + 0 until width map { i ⇒ context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) } toVector def receive = { case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job)) @@ -762,7 +762,7 @@ abstract class StressSpec def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { runOn(roles.head) { - val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings), + val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings).withDeploy(Deploy.local), name = "result" + step) if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory)) else aggregator ! ReportTo(None) @@ -1017,7 +1017,7 @@ abstract class StressSpec val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) runOn(masterRoles: _*) { reportResult { - val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree), + val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local), name = "master-" + myself.name) m ! Begin import system.dispatcher @@ -1149,7 +1149,7 @@ abstract class StressSpec "start routers that are running while nodes are joining" taggedAs LongRunningTest in { runOn(roles.take(3): _*) { - system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), + system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = "master-" + myself.name) ! Begin } } @@ -1245,7 +1245,7 @@ abstract class StressSpec "start routers that are running while nodes are removed" taggedAs LongRunningTest in { if (exerciseActors) { runOn(roles.take(3): _*) { - system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), + system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = "master-" + myself.name) ! Begin } } diff --git a/akka-cluster/src/test/resources/reference.conf b/akka-cluster/src/test/resources/reference.conf new file mode 100644 index 0000000000..f05a5c454e --- /dev/null +++ b/akka-cluster/src/test/resources/reference.conf @@ -0,0 +1 @@ +akka.actor.serialize-creators=on \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala new file mode 100644 index 0000000000..206984d0dd --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.AkkaSpec + +class SerializeCreatorsVerificationSpec extends AkkaSpec { + + "serialize-creators must be on" in { + system.settings.SerializeAllCreators must be === true + } + +} \ No newline at end of file diff --git a/akka-docs/rst/java/remoting.rst b/akka-docs/rst/java/remoting.rst index f76b889d7f..512d25be6a 100644 --- a/akka-docs/rst/java/remoting.rst +++ b/akka-docs/rst/java/remoting.rst @@ -111,7 +111,11 @@ actor systems has to have a JAR containing the class. arguments to the actor being created, do not make the factory a non-static inner class: this will inherently capture a reference to its enclosing object, which in most cases is not serializable. It is best to make a static - inner class which implements :class:`UntypedActorFactory`. + inner class which implements :class:`Creator`. + + Serializability of all Props can be tested by setting the configuration item + ``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has + ``LocalScope`` are exempt from this check. .. note:: diff --git a/akka-docs/rst/java/zeromq.rst b/akka-docs/rst/java/zeromq.rst index e026e7bbe4..c598211de1 100644 --- a/akka-docs/rst/java/zeromq.rst +++ b/akka-docs/rst/java/zeromq.rst @@ -13,7 +13,7 @@ ZeroMQ is very opinionated when it comes to multi-threading so configuration opt The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library. The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out. -.. info:: +.. note:: The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported. diff --git a/akka-docs/rst/scala/remoting.rst b/akka-docs/rst/scala/remoting.rst index ec7c09491f..31cf8667b5 100644 --- a/akka-docs/rst/scala/remoting.rst +++ b/akka-docs/rst/scala/remoting.rst @@ -120,6 +120,10 @@ actor systems has to have a JAR containing the class. most cases is not serializable. It is best to create a factory method in the companion object of the actor’s class. + Serializability of all Props can be tested by setting the configuration item + ``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has + ``LocalScope`` are exempt from this check. + .. note:: You can use asterisks as wildcard matches for the actor paths, so you could specify: diff --git a/akka-docs/rst/scala/zeromq.rst b/akka-docs/rst/scala/zeromq.rst index 2984f26b85..272862a422 100644 --- a/akka-docs/rst/scala/zeromq.rst +++ b/akka-docs/rst/scala/zeromq.rst @@ -13,7 +13,7 @@ ZeroMQ is very opinionated when it comes to multi-threading so configuration opt The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library. The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out. -.. info:: +.. note:: The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported. diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 5fe89c860c..681544f692 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -25,6 +25,7 @@ import scala.reflect.classTag import akka.ConfigurationException import akka.AkkaException import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.actor.Deploy /** * The conductor is the one orchestrating the test: it governs the @@ -413,7 +414,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP case CreateServerFSM(channel) ⇒ val (ip, port) = channel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress.getHostAddress, s.getPort) } val name = ip + ":" + port + "-server" + generation.next - sender ! context.actorOf(Props(classOf[ServerFSM], self, channel), name) + sender ! context.actorOf(Props(classOf[ServerFSM], self, channel).withDeploy(Deploy.local), name) case c @ NodeInfo(name, addr, fsm) ⇒ barrier forward c if (nodes contains name) { diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index e728234edf..d9dd4dd758 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -308,7 +308,7 @@ private[remote] class ReliableDeliverySupervisor( AkkaPduProtobufCodec, receiveBuffers = receiveBuffers, reliableDeliverySupervisor = Some(self)) - .withDispatcher("akka.remote.writer-dispatcher"), + .withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "endpointWriter")) } } @@ -581,7 +581,7 @@ private[remote] class EndpointWriter( val newReader = context.watch(context.actorOf( EndpointReader(localAddress, remoteAddress, transport, settings, codec, - msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers), + msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) Some(newReader) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 01c45e06b3..990e6d32b5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -191,7 +191,8 @@ private[akka] class RemoteActorRefProvider( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local), + "remote-watcher") } protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { @@ -265,17 +266,15 @@ private[akka] class RemoteActorRefProvider( case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒ if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) - } else { - try { - val localAddress = transport.localAddressForRemote(addr) - val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). - withUid(path.uid) - new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) - } catch { - case NonFatal(e) ⇒ - log.error(e, "Error while looking up address [{}]", addr) - new EmptyLocalActorRef(this, path, eventStream) - } + } else if (props.deploy.scope == LocalScope) { + throw new IllegalArgumentException(s"configuration requested remote deployment for local-only Props at [$path]") + } else try { + val localAddress = transport.localAddressForRemote(addr) + val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). + withUid(path.uid) + new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) + } catch { + case NonFatal(e) ⇒ throw new IllegalArgumentException(s"remote deployment failed for [$path]", e) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 90401ca176..5446b4a2b7 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -89,7 +89,7 @@ private[remote] object Remoting { } def receive = { - case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name) + case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props.withDeploy(Deploy.local), name) } } @@ -155,7 +155,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ log.info("Starting remoting") val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props(classOf[EndpointManager], provider.remoteSettings.config, log), Remoting.EndpointManagerName) + Props(classOf[EndpointManager], provider.remoteSettings.config, log).withDeploy(Deploy.local), Remoting.EndpointManagerName) endpointManager = Some(manager) try { @@ -624,7 +624,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec, - receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"), + receiveBuffers).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) else context.watch(context.actorOf(EndpointWriter( handleOption, @@ -634,7 +634,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpointSettings, AkkaPduProtobufCodec, receiveBuffers, - reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"), + reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 115ac04777..f8d390a81f 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -121,7 +121,7 @@ private[transport] class AkkaProtocolManager( stateActorAssociationHandler, stateActorSettings, AkkaPduProtobufCodec, - failureDetector), actorNameFor(handle.remoteAddress)) + failureDetector).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress)) case AssociateUnderlying(remoteAddress, statusPromise) ⇒ val stateActorLocalAddress = localAddress @@ -135,7 +135,7 @@ private[transport] class AkkaProtocolManager( stateActorWrappedTransport, stateActorSettings, AkkaPduProtobufCodec, - failureDetector), actorNameFor(remoteAddress)) + failureDetector).withDeploy(Deploy.local), actorNameFor(remoteAddress)) } private def createTransportFailureDetector(): FailureDetector = diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 0132c0f86c..be42d58f07 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -288,7 +288,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val managerRef = self ThrottlerHandle( originalHandle, - context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound), "throttler" + nextId())) + context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound).withDeploy(Deploy.local), + "throttler" + nextId())) } } diff --git a/akka-remote/src/test/resources/reference.conf b/akka-remote/src/test/resources/reference.conf new file mode 100644 index 0000000000..f05a5c454e --- /dev/null +++ b/akka-remote/src/test/resources/reference.conf @@ -0,0 +1 @@ +akka.actor.serialize-creators=on \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala index 4b737b0ff9..49d2d5dece 100644 --- a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -28,9 +28,7 @@ import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStream import java.net.{ InetSocketAddress, SocketException } import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.atomic.AtomicInteger - import scala.concurrent.duration.DurationInt - import akka.TestUtils import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import akka.event.{ Logging, LoggingAdapter } @@ -42,6 +40,7 @@ import akka.remote.security.provider.AkkaProvider import akka.testkit.{ AkkaSpec, TestProbe } import akka.util.{ ByteString, Timeout } import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLServerSocket, SSLSocket, TrustManagerFactory } +import akka.actor.Deploy // TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies class SslTlsSupportSpec extends AkkaSpec { @@ -71,7 +70,7 @@ class SslTlsSupportSpec extends AkkaSpec { "work between a Java client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() val probe = TestProbe() - val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server1")) + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server1")) expectMsg(Tcp.Bound) val client = new JavaSslClient(serverAddress) @@ -83,7 +82,7 @@ class SslTlsSupportSpec extends AkkaSpec { "work between a akka client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() val probe = TestProbe() - val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server2")) + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server2")) expectMsg(Tcp.Bound) val client = new AkkaSslClient(serverAddress) @@ -111,7 +110,7 @@ class SslTlsSupportSpec extends AkkaSpec { import init._ - val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref), + val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local), "client" + counter.incrementAndGet()) probe.send(connection, Tcp.Register(handler)) @@ -163,11 +162,11 @@ class SslTlsSupportSpec extends AkkaSpec { new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000)) val connection = sender - val handler = context.actorOf(Props(new AkkaSslHandler(init))) + val handler = context.actorOf(Props(new AkkaSslHandler(init)).withDeploy(Deploy.local)) //#server context watch handler //#server - val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler)) + val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler).withDeploy(Deploy.local)) connection ! Tcp.Register(pipeline) //#server diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index a3a3c0dc31..1ee8254960 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -44,7 +44,7 @@ akka { def receive = { case t: Terminated ⇒ testActor ! t.actor.path } - }), name = "observer2") + }).withDeploy(Deploy.local), name = "observer2") expectMsg(60.seconds, path) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 5f524029f0..4d758ef06a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -12,7 +12,7 @@ object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.actor.deployment { - /user/service2 { + /service2 { router = round-robin nr-of-instances = 3 remote = "akka://sys@wallace:2552" @@ -34,7 +34,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "A RemoteDeployer" must { "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { - val service = "/user/service2" + val service = "/service2" val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) deployment must be(Some( @@ -46,6 +46,12 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "mydispatcher"))) } + "reject remote deployment when the source requires LocalScope" in { + intercept[IllegalArgumentException] { + system.actorOf(Props.empty.withDeploy(Deploy.local), "service2") + }.getMessage must be === "configuration requested remote deployment for local-only Props at [akka://RemoteDeployerSpec/user/service2]" + } + } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 9592a9509f..afc49a7904 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -164,14 +164,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case x: Int ⇒ sender ! byteStringOfSize(x) case x ⇒ sender ! x } - }), bigBounceId) + }).withDeploy(Deploy.local), bigBounceId) val bigBounceHere = system.actorFor(s"akka.test://remote-sys@localhost:12346/user/$bigBounceId") val eventForwarder = system.actorOf(Props(new Actor { def receive = { case x ⇒ testActor ! x } - })) + }).withDeploy(Deploy.local)) system.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent]) try { @@ -368,10 +368,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } }), "looker2") // child is configured to be deployed on remoteSystem - l ! (Props[Echo1], "child") + l ! ((Props[Echo1], "child")) val child = expectMsgType[ActorRef] // grandchild is configured to be deployed on RemotingSpec (system) - child ! (Props[Echo1], "grandchild") + child ! ((Props[Echo1], "grandchild")) val grandchild = expectMsgType[ActorRef] grandchild.asInstanceOf[ActorRefScope].isLocal must be(true) grandchild ! 53 @@ -399,7 +399,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D child ! PoisonPill expectMsg("postStop") expectMsgType[Terminated].actor must be === child - l ! (Props[Echo1], "child") + l ! ((Props[Echo1], "child")) val child2 = expectMsgType[ActorRef] child2 ! Identify("idReq2") expectMsg(ActorIdentity("idReq2", Some(child2))) diff --git a/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala b/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala new file mode 100644 index 0000000000..6b2a20238a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.remote + +import akka.testkit.AkkaSpec + +class SerializeCreatorsVerificationSpec extends AkkaSpec { + + "serialize-creators must be on" in { + system.settings.SerializeAllCreators must be === true + } + +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index bbd347a1eb..64a6163608 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -20,6 +20,7 @@ import org.junit.runner.RunWith import akka.actor.Terminated import scala.concurrent.duration._ import akka.actor.PoisonPill +import akka.actor.Deploy @RunWith(classOf[JUnitRunner]) class UntrustedSpec extends AkkaSpec(""" @@ -51,7 +52,7 @@ akka.loglevel = DEBUG case d @ Debug(_, _, msg: String) if msg contains "dropping" ⇒ testActor ! d case _ ⇒ } - }), "debugSniffer"), classOf[Logging.Debug]) + }).withDeploy(Deploy.local), "debugSniffer"), classOf[Logging.Debug]) "UntrustedMode" must { @@ -77,7 +78,7 @@ akka.loglevel = DEBUG def receive = { case x ⇒ testActor forward x } - })) + }).withDeploy(Deploy.local)) within(1.second) { expectMsgType[Logging.Debug] expectNoMsg diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index bba95e6d9c..486422d2ce 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -13,6 +13,7 @@ import com.google.protobuf.{ ByteString ⇒ PByteString } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.{ Await, Promise } +import akka.actor.Deploy object AkkaProtocolSpec { @@ -129,7 +130,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(handle.readHandlerPromise.isCompleted) } @@ -143,7 +144,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) reader ! testAssociate(uid = 33, cookie = None) @@ -178,7 +179,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) // a stray message will force a disassociate reader ! testHeartbeat @@ -205,7 +206,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, 42, None)) failureDetector.called must be(true) @@ -238,7 +239,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) reader ! testAssociate(uid = 33, Some("xyzzy")) @@ -257,7 +258,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) // Send the correct cookie reader ! testAssociate(uid = 33, Some("abcde")) @@ -290,7 +291,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = Some("abcde"))) } @@ -308,7 +309,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -343,7 +344,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -378,7 +379,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -416,7 +417,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))