diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 7c676c4ec0..6213642f61 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -87,7 +87,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout case "ping" ⇒ sender ! "pong" case t: Terminated ⇒ testActor ! WrappedTerminated(t) } - })) + }).withDeploy(Deploy.local)) monitor2 ! "ping" @@ -133,7 +133,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout super.handleFailure(context, child, cause, stats, children) } } - val supervisor = system.actorOf(Props(new Supervisor(strategy))) + val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local)) val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) val brother = Await.result((supervisor ? Props(new Actor { @@ -166,7 +166,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout context unbecome } } - })) + }).withDeploy(Deploy.local)) parent ! "NKOTB" expectMsg("GREEN") @@ -198,7 +198,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } val t1, t2 = TestLatch() - val w = system.actorOf(Props(new Watcher), "myDearWatcher") + val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher") val p = TestProbe() w ! W(p.ref) w ! ((t1, t2)) diff --git a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala index c8d89f7c6e..1217acbc27 100644 --- a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala @@ -6,16 +6,15 @@ package akka.io import java.net.InetSocketAddress import java.security.MessageDigest - import scala.concurrent.Await import scala.concurrent.duration.{ Duration, DurationInt } import scala.concurrent.forkjoin.ThreadLocalRandom - import akka.actor.{ Actor, ActorContext, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated } import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext } import akka.pattern.ask import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.util.{ ByteString, Timeout } +import akka.actor.Deploy object BackpressureSpec { @@ -38,7 +37,7 @@ object BackpressureSpec { val init = TcpPipelineHandler.withLogger(log, new TcpReadWriteAdapter >> new BackpressureBuffer(10000, 1000000, Long.MaxValue)) - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline") sender ! Tcp.Register(handler) unstashAll() context.become(connected(init, handler)) @@ -78,6 +77,8 @@ object BackpressureSpec { val failed: Receive = { case _ ⇒ sender ! Failed } + + override def postRestart(thr: Throwable): Unit = context.stop(self) } case object GetPort @@ -113,7 +114,7 @@ object BackpressureSpec { val init = TcpPipelineHandler.withLogger(log, new TcpReadWriteAdapter >> new BackpressureBuffer(10000, 1000000, Long.MaxValue)) - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline") sender ! Tcp.Register(handler) unstashAll() context.become(connected(init, handler)) @@ -147,10 +148,12 @@ object BackpressureSpec { val failed: Receive = { case _ ⇒ sender ! Failed } + + override def postRestart(thr: Throwable): Unit = context.stop(self) } } -class BackpressureSpec extends AkkaSpec with ImplicitSender { +class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with ImplicitSender { import BackpressureSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index 22aad8ae09..71d3f61a50 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -9,7 +9,11 @@ import Tcp._ import akka.TestUtils import TestUtils._ -class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") +class CapacityLimitSpec extends AkkaSpec(""" + akka.loglevel = ERROR + akka.io.tcp.max-channels = 4 + akka.actor.serialize-creators = on + """) with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala index 42a1f8f6a7..e2c441d08d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala @@ -12,8 +12,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import akka.io.TcpPipelineHandler.Management import akka.actor.ActorRef +import akka.actor.Deploy -class DelimiterFramingSpec extends AkkaSpec { +class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") { val addresses = TestUtils.temporaryServerAddresses(4) @@ -40,7 +41,7 @@ class DelimiterFramingSpec extends AkkaSpec { val counter = new AtomicInteger def testSetup(serverAddress: InetSocketAddress, delimiter: String, includeDelimiter: Boolean): Unit = { - val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter)) + val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter).withDeploy(Deploy.local)) val probe = TestProbe() probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) probe.expectMsgType[Tcp.Bound] @@ -68,7 +69,7 @@ class DelimiterFramingSpec extends AkkaSpec { import init._ - val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref), + val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local), "client" + counter.incrementAndGet()) probe.send(connection, Tcp.Register(handler)) @@ -128,7 +129,7 @@ class DelimiterFramingSpec extends AkkaSpec { import init._ val connection = sender - val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline") connection ! Tcp.Register(handler) diff --git a/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala index 1d09b92bfd..b1f2d04487 100644 --- a/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala @@ -12,7 +12,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.Try import scala.util.Success -class PipelineSpec extends AkkaSpec { +class PipelineSpec extends AkkaSpec("akka.actor.serialize-creators = on") { trait Level1 trait Level2 diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 49305848ba..843593409c 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -23,7 +23,10 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } import akka.TestUtils._ -class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { +class TcpConnectionSpec extends AkkaSpec(""" + akka.io.tcp.register-timeout = 500ms + akka.actor.serialize-creators = on + """) { // Helper to avoid Windows localization specific differences def ignoreIfWindows(): Unit = if (Helpers.isWindows) { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 383e0fe225..52fa79688a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -12,7 +12,10 @@ import TestUtils._ import akka.testkit.EventFilter import java.io.IOException -class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegrationSpecSupport { +class TcpIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on + """) with TcpIntegrationSpecSupport { "The TCP transport implementation" should { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 44757f182e..74d2bd70b6 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -15,7 +15,10 @@ import akka.io.SelectionHandler._ import akka.TestUtils import Tcp._ -class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { +class TcpListenerSpec extends AkkaSpec(""" + akka.io.tcp.batch-accept-limit = 2 + akka.actor.serialize-creators = on + """) { "A TcpListener" must { @@ -130,7 +133,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { private class ListenerParent extends Actor with ChannelRegistry { val listener = context.actorOf( props = Props(classOf[TcpListener], selectorRouter.ref, Tcp(system), this, bindCommander.ref, - Bind(handler.ref, endpoint, 100, Nil)), + Bind(handler.ref, endpoint, 100, Nil)).withDeploy(Deploy.local), name = "test-listener-" + counter.next()) parent.watch(listener) def receive: Receive = { diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index d3a7415f99..eedca77a41 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -10,7 +10,10 @@ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpConnectedIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on + """) with ImplicitSender { val addresses = temporaryServerAddresses(3, udp = true) diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 4b278b4eeb..20c98cbb30 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -10,7 +10,9 @@ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpIntegrationSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.serialize-creators = on""") with ImplicitSender { val addresses = temporaryServerAddresses(3, udp = true) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index da6e4cd33d..2f99e9d1d7 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -276,7 +276,7 @@ class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerial })) system stop c - intercept[java.io.NotSerializableException] { + intercept[IllegalArgumentException] { val d = system.actorOf(Props(new NonSerializableActor(system))) } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 7d74a2f9cd..6782b7999a 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -75,7 +75,8 @@ akka { serialize-messages = off # Serializes and deserializes creators (in Props) to ensure that they can be - # sent over the network, this is only intended for testing. + # sent over the network, this is only intended for testing. Purely local deployments + # as marked with deploy.scope == LocalScope are exempt from verification. serialize-creators = off # Timeout for send operations to top-level actors which are in the process diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 25833918c1..7b271cc305 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -16,6 +16,7 @@ import scala.annotation.tailrec object Deploy { final val NoDispatcherGiven = "" final val NoMailboxGiven = "" + val local = Deploy(scope = LocalScope) } /** @@ -99,6 +100,7 @@ abstract class LocalScope extends Scope * which do not set a different scope. It is also the only scope handled by * the LocalActorRefProvider. */ +@SerialVersionUID(1L) case object LocalScope extends LocalScope { /** * Java API: get the singleton instance @@ -113,6 +115,7 @@ case object LocalScope extends LocalScope { */ @SerialVersionUID(1L) abstract class NoScopeGiven extends Scope +@SerialVersionUID(1L) case object NoScopeGiven extends NoScopeGiven { def withFallback(other: Scope): Scope = other diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index ec944c854f..0bd4d50a05 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -168,12 +168,15 @@ private[akka] trait Children { this: ActorCell ⇒ } private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { - if (cell.system.settings.SerializeAllCreators) { - val ser = SerializationExtension(cell.system) - props.args forall (arg ⇒ - arg.isInstanceOf[NoSerializationVerificationNeeded] || - ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null) - } + if (cell.system.settings.SerializeAllCreators && props.deploy.scope != LocalScope) + try { + val ser = SerializationExtension(cell.system) + props.args forall (arg ⇒ + arg.isInstanceOf[NoSerializationVerificationNeeded] || + ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null) + } catch { + case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) + } /* * in case we are currently terminating, fail external attachChild requests * (internal calls cannot happen anyway because we are suspended) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 10852f4372..b3a2486163 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -79,7 +79,7 @@ private[io] object SelectionHandler { override def supervisorStrategy = connectionSupervisorStrategy val selectorPool = context.actorOf( - props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)), + props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)).withDeploy(Deploy.local), name = "selectors") final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = { @@ -253,7 +253,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A if (MaxChannelsPerSelector == -1 || childCount < MaxChannelsPerSelector) { val newName = sequenceNumber.toString sequenceNumber += 1 - val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher), name = newName) + val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher).withDeploy(Deploy.local), name = newName) childCount += 1 if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited } else { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 98ba12e309..4f3bb3aa16 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -470,7 +470,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { */ val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher), + props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local), name = "IO-TCP") } diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala index 193d1d7e72..5c49d92957 100644 --- a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -94,15 +94,9 @@ object TcpPipelineHandler { case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command /** - * Scala API: create [[Props]] for a pipeline handler + * create [[Props]] for a pipeline handler */ - def apply[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) = - Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler) - - /** - * Java API: create [[Props]] for a pipeline handler - */ - def create[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) = + def props[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) = Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler) } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 7b98427e19..8bf89e8dc1 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -215,7 +215,7 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension { val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[UdpManager], this), + props = Props(classOf[UdpManager], this).withDeploy(Deploy.local), name = "IO-UDP-FF") } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnected.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala index d0490cac2b..66fbab1e6d 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnected.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala @@ -151,7 +151,7 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(classOf[UdpConnectedManager], this), + props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local), name = "IO-UDP-CONN") } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 3e5b3d137d..7d382bad05 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -489,14 +489,17 @@ case object NoRouter extends NoRouter { */ private[akka] trait OverrideUnsetConfig[T <: RouterConfig] extends RouterConfig { - final def overrideUnsetConfig(other: RouterConfig): RouterConfig = { - val wssConf: OverrideUnsetConfig[T] = if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy) - && (other.supervisorStrategy ne Router.defaultSupervisorStrategy)) - this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]] - else this - if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get) - else wssConf - } + final def overrideUnsetConfig(other: RouterConfig): RouterConfig = + if (other == NoRouter) this // NoRouter is the default, hence “neutral” + else { + val wssConf: OverrideUnsetConfig[T] = + if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy) + && (other.supervisorStrategy ne Router.defaultSupervisorStrategy)) + this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]] + else this + if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get) + else wssConf + } def withSupervisorStrategy(strategy: SupervisorStrategy): T diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9125e10222..e10175f346 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -145,7 +145,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(classOf[ClusterDaemon], settings). - withDispatcher(UseDispatcher), name = "cluster") + withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster") } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7c668cbc01..99d261f012 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -17,6 +17,7 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.actor.ActorSelection import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.Deploy /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -167,7 +168,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac def receive = { case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg case AddOnMemberUpListener(code) ⇒ - context.actorOf(Props(classOf[OnMemberUpListener], code)) + context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local)) case PublisherCreated(publisher) ⇒ if (settings.MetricsEnabled) { // metrics must be started after core/publisher to be able diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 222f7d922c..b1d280f45a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -10,6 +10,7 @@ import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } import akka.cluster.ClusterEvent._ import akka.actor.PoisonPill import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.Deploy /** * INTERNAL API @@ -67,7 +68,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } } - }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") + }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") } def self: Member = { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index c3dde99738..a98eaa1f57 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -13,6 +13,7 @@ import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher +import akka.actor.Deploy /** * INTERNAL API @@ -27,7 +28,7 @@ private[cluster] object ClusterRemoteWatcher { unreachableReaperInterval: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration): Props = Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, - heartbeatExpectedResponseAfter) + heartbeatExpectedResponseAfter).withDeploy(Deploy.local) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala index 4698ee3780..3c7076c2ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala @@ -5,7 +5,6 @@ package akka.cluster.routing import java.util.Arrays - import scala.concurrent.forkjoin.ThreadLocalRandom import scala.collection.immutable import akka.actor.Actor @@ -24,6 +23,7 @@ import akka.cluster.StandardMetrics.HeapMemory import akka.event.Logging import akka.japi.Util.immutableSeq import akka.routing._ +import akka.actor.Deploy object AdaptiveLoadBalancingRouter { private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() { @@ -179,7 +179,7 @@ trait AdaptiveLoadBalancingRouterLike { this: RouterConfig ⇒ metricsSelector.weights(metrics))) } - }).withDispatcher(routerDispatcher), name = "metricsListener") + }).withDispatcher(routerDispatcher).withDeploy(Deploy.local), name = "metricsListener") def getNext(): ActorRef = weightedRoutees match { case Some(weighted) ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 897a8bc2df..31f5a1952c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -26,6 +26,7 @@ import akka.actor.ActorRef import akka.remote.RemoteWatcher import akka.actor.ActorSystem import akka.cluster.MultiNodeClusterSpec.EndActor +import akka.actor.Deploy object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -93,7 +94,7 @@ abstract class ClusterDeathWatchSpec watchEstablished.countDown case Terminated(actor) ⇒ testActor ! actor.path } - }), name = "observer1") + }).withDeploy(Deploy.local), name = "observer1") watchEstablished.await enterBarrier("watch-established") @@ -113,7 +114,7 @@ abstract class ClusterDeathWatchSpec } runOn(second, third, fourth) { - system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject") enterBarrier("subjected-started") enterBarrier("watch-established") runOn(third) { @@ -148,7 +149,7 @@ abstract class ClusterDeathWatchSpec def receive = { case t: Terminated ⇒ testActor ! t.actor.path } - }), name = "observer3") + }).withDeploy(Deploy.local), name = "observer3") expectMsg(path) } @@ -158,7 +159,7 @@ abstract class ClusterDeathWatchSpec "be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) { runOn(fifth) { - system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject5") + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject5") } enterBarrier("subjected-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 8fe06de445..8fc63d482c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -66,7 +67,7 @@ abstract class LeaderLeavingSpec case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") enterBarrier("leader-left") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 6c5657cece..1f08a114b1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -58,7 +59,7 @@ abstract class MembershipChangeListenerExitingSpec removedLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") exitingLatch.await removedLatch.await @@ -76,7 +77,7 @@ abstract class MembershipChangeListenerExitingSpec exitingLatch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") exitingLatch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 2fbdc298df..7d8cb84fa9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Props import akka.actor.Actor +import akka.actor.Deploy object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -49,7 +50,7 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("listener-1-registered") cluster.join(first) latch.await @@ -76,7 +77,7 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() case _ ⇒ // ignore } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("listener-2-registered") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index d546c8f808..5ccb725aa4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import akka.actor.Deploy object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -51,7 +52,7 @@ abstract class NodeLeavingAndExitingSpec case _: MemberRemoved ⇒ // not tested here } - })), classOf[MemberEvent]) + }).withDeploy(Deploy.local)), classOf[MemberEvent]) enterBarrier("registered-listener") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index f7d8987e9a..6dbc8d4ba9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -17,6 +17,7 @@ import akka.actor.Props import akka.actor.Actor import akka.actor.RootActorPath import akka.cluster.MemberStatus._ +import akka.actor.Deploy object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") @@ -74,7 +75,7 @@ abstract class RestartFirstSeedNodeSpec seedNode1Address = a sender ! "ok" } - }), name = "address-receiver") + }).withDeploy(Deploy.local), name = "address-receiver") enterBarrier("seed1-address-receiver-ready") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index c72d82efff..35bb05a6fb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -591,7 +591,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { require(level >= 1) def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width) val indexedChildren = - 0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector + 0 until width map { i ⇒ context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) } toVector def receive = { case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job)) @@ -769,7 +769,7 @@ abstract class StressSpec def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { runOn(roles.head) { - val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings), + val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings).withDeploy(Deploy.local), name = "result" + step) if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory)) else aggregator ! ReportTo(None) @@ -1024,7 +1024,7 @@ abstract class StressSpec val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) runOn(masterRoles: _*) { reportResult { - val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree), + val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local), name = "master-" + myself.name) m ! Begin import system.dispatcher @@ -1156,7 +1156,7 @@ abstract class StressSpec "start routers that are running while nodes are joining" taggedAs LongRunningTest in { runOn(roles.take(3): _*) { - system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), + system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = "master-" + myself.name) ! Begin } } @@ -1252,7 +1252,7 @@ abstract class StressSpec "start routers that are running while nodes are removed" taggedAs LongRunningTest in { if (exerciseActors) { runOn(roles.take(3): _*) { - system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), + system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = "master-" + myself.name) ! Begin } } diff --git a/akka-cluster/src/test/resources/reference.conf b/akka-cluster/src/test/resources/reference.conf new file mode 100644 index 0000000000..f05a5c454e --- /dev/null +++ b/akka-cluster/src/test/resources/reference.conf @@ -0,0 +1 @@ +akka.actor.serialize-creators=on \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala new file mode 100644 index 0000000000..206984d0dd --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/SerializeCreatorsVerificationSpec.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.AkkaSpec + +class SerializeCreatorsVerificationSpec extends AkkaSpec { + + "serialize-creators must be on" in { + system.settings.SerializeAllCreators must be === true + } + +} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java index 5d0bf86626..aa87296ed1 100644 --- a/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java +++ b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java @@ -92,7 +92,7 @@ public class SslDocTest { // create handler for pipeline, setting ourselves as payload recipient final ActorRef handler = getContext().actorOf( - TcpPipelineHandler.create(init, getSender(), getSelf())); + TcpPipelineHandler.props(init, getSender(), getSelf())); // register the SSL handler with the connection getSender().tell(TcpMessage.register(handler), getSelf()); @@ -157,7 +157,7 @@ public class SslDocTest { // create handler for pipeline, setting ourselves as payload recipient final ActorRef handler = getContext().actorOf( - TcpPipelineHandler.create(init, getSender(), getSelf())); + TcpPipelineHandler.props(init, getSender(), getSelf())); // register the SSL handler with the connection getSender().tell(TcpMessage.register(handler), getSelf()); diff --git a/akka-docs/rst/java/remoting.rst b/akka-docs/rst/java/remoting.rst index f76b889d7f..512d25be6a 100644 --- a/akka-docs/rst/java/remoting.rst +++ b/akka-docs/rst/java/remoting.rst @@ -111,7 +111,11 @@ actor systems has to have a JAR containing the class. arguments to the actor being created, do not make the factory a non-static inner class: this will inherently capture a reference to its enclosing object, which in most cases is not serializable. It is best to make a static - inner class which implements :class:`UntypedActorFactory`. + inner class which implements :class:`Creator`. + + Serializability of all Props can be tested by setting the configuration item + ``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has + ``LocalScope`` are exempt from this check. .. note:: diff --git a/akka-docs/rst/scala/remoting.rst b/akka-docs/rst/scala/remoting.rst index ec7c09491f..31cf8667b5 100644 --- a/akka-docs/rst/scala/remoting.rst +++ b/akka-docs/rst/scala/remoting.rst @@ -120,6 +120,10 @@ actor systems has to have a JAR containing the class. most cases is not serializable. It is best to create a factory method in the companion object of the actor’s class. + Serializability of all Props can be tested by setting the configuration item + ``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has + ``LocalScope`` are exempt from this check. + .. note:: You can use asterisks as wildcard matches for the actor paths, so you could specify: diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 5fe89c860c..681544f692 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -25,6 +25,7 @@ import scala.reflect.classTag import akka.ConfigurationException import akka.AkkaException import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.actor.Deploy /** * The conductor is the one orchestrating the test: it governs the @@ -413,7 +414,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP case CreateServerFSM(channel) ⇒ val (ip, port) = channel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress.getHostAddress, s.getPort) } val name = ip + ":" + port + "-server" + generation.next - sender ! context.actorOf(Props(classOf[ServerFSM], self, channel), name) + sender ! context.actorOf(Props(classOf[ServerFSM], self, channel).withDeploy(Deploy.local), name) case c @ NodeInfo(name, addr, fsm) ⇒ barrier forward c if (nodes contains name) { diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index e728234edf..5f8eb81926 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -147,7 +147,7 @@ private[remote] object ReliableDeliverySupervisor { case object Ungate case class GotUid(uid: Int) - def apply( + def props( handleOrActive: Option[AkkaProtocolHandle], localAddress: Address, remoteAddress: Address, @@ -299,7 +299,7 @@ private[remote] class ReliableDeliverySupervisor( } private def createWriter(): ActorRef = { - context.watch(context.actorOf(EndpointWriter( + context.watch(context.actorOf(EndpointWriter.props( handleOrActive = currentHandle, localAddress = localAddress, remoteAddress = remoteAddress, @@ -308,7 +308,7 @@ private[remote] class ReliableDeliverySupervisor( AkkaPduProtobufCodec, receiveBuffers = receiveBuffers, reliableDeliverySupervisor = Some(self)) - .withDispatcher("akka.remote.writer-dispatcher"), + .withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "endpointWriter")) } } @@ -339,7 +339,7 @@ private[remote] abstract class EndpointActor( */ private[remote] object EndpointWriter { - def apply( + def props( handleOrActive: Option[AkkaProtocolHandle], localAddress: Address, remoteAddress: Address, @@ -580,8 +580,8 @@ private[remote] class EndpointWriter( private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = { val newReader = context.watch(context.actorOf( - EndpointReader(localAddress, remoteAddress, transport, settings, codec, - msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers), + EndpointReader.props(localAddress, remoteAddress, transport, settings, codec, + msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) Some(newReader) @@ -603,7 +603,7 @@ private[remote] class EndpointWriter( */ private[remote] object EndpointReader { - def apply( + def props( localAddress: Address, remoteAddress: Address, transport: Transport, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2873e4720d..2245542b4e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -191,7 +191,8 @@ private[akka] class RemoteActorRefProvider( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), + "remote-watcher") } protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { @@ -268,17 +269,15 @@ private[akka] class RemoteActorRefProvider( case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒ if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) - } else { - try { - val localAddress = transport.localAddressForRemote(addr) - val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). - withUid(path.uid) - new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) - } catch { - case NonFatal(e) ⇒ - log.error(e, "Error while looking up address [{}]", addr) - new EmptyLocalActorRef(this, path, eventStream) - } + } else if (props.deploy.scope == LocalScope) { + throw new IllegalArgumentException(s"configuration requested remote deployment for local-only Props at [$path]") + } else try { + val localAddress = transport.localAddressForRemote(addr) + val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). + withUid(path.uid) + new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) + } catch { + case NonFatal(e) ⇒ throw new IllegalArgumentException(s"remote deployment failed for [$path]", e) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 68bc4c683d..f8904a2256 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -17,6 +17,7 @@ import akka.ConfigurationException import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.actor.InternalActorRef import akka.dispatch.sysmsg.DeathWatchNotification +import akka.actor.Deploy /** * INTERNAL API @@ -32,7 +33,7 @@ private[akka] object RemoteWatcher { unreachableReaperInterval: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration): Props = Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, - heartbeatExpectedResponseAfter) + heartbeatExpectedResponseAfter).withDeploy(Deploy.local) case class WatchRemote(watchee: ActorRef, watcher: ActorRef) case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 90401ca176..50e3e86607 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -89,7 +89,7 @@ private[remote] object Remoting { } def receive = { - case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name) + case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props.withDeploy(Deploy.local), name) } } @@ -155,7 +155,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ log.info("Starting remoting") val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props(classOf[EndpointManager], provider.remoteSettings.config, log), Remoting.EndpointManagerName) + Props(classOf[EndpointManager], provider.remoteSettings.config, log).withDeploy(Deploy.local), Remoting.EndpointManagerName) endpointManager = Some(manager) try { @@ -617,16 +617,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends writing: Boolean): ActorRef = { assert(transportMapping contains localAddress) - if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor( + if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor.props( handleOption, localAddress, remoteAddress, transport, endpointSettings, AkkaPduProtobufCodec, - receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"), + receiveBuffers).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) - else context.watch(context.actorOf(EndpointWriter( + else context.watch(context.actorOf(EndpointWriter.props( handleOption, localAddress, remoteAddress, @@ -634,7 +634,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpointSettings, AkkaPduProtobufCodec, receiveBuffers, - reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"), + reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 115ac04777..f8d390a81f 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -121,7 +121,7 @@ private[transport] class AkkaProtocolManager( stateActorAssociationHandler, stateActorSettings, AkkaPduProtobufCodec, - failureDetector), actorNameFor(handle.remoteAddress)) + failureDetector).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress)) case AssociateUnderlying(remoteAddress, statusPromise) ⇒ val stateActorLocalAddress = localAddress @@ -135,7 +135,7 @@ private[transport] class AkkaProtocolManager( stateActorWrappedTransport, stateActorSettings, AkkaPduProtobufCodec, - failureDetector), actorNameFor(remoteAddress)) + failureDetector).withDeploy(Deploy.local), actorNameFor(remoteAddress)) } private def createTransportFailureDetector(): FailureDetector = diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 0132c0f86c..be42d58f07 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -288,7 +288,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val managerRef = self ThrottlerHandle( originalHandle, - context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound), "throttler" + nextId())) + context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound).withDeploy(Deploy.local), + "throttler" + nextId())) } } diff --git a/akka-remote/src/test/resources/reference.conf b/akka-remote/src/test/resources/reference.conf new file mode 100644 index 0000000000..f05a5c454e --- /dev/null +++ b/akka-remote/src/test/resources/reference.conf @@ -0,0 +1 @@ +akka.actor.serialize-creators=on \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala index 6a8841d4c9..589d582682 100644 --- a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -28,9 +28,7 @@ import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStream import java.net.{ InetSocketAddress, SocketException } import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.atomic.AtomicInteger - import scala.concurrent.duration.DurationInt - import akka.TestUtils import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import akka.event.{ Logging, LoggingAdapter } @@ -42,6 +40,7 @@ import akka.remote.security.provider.AkkaProvider import akka.testkit.{ AkkaSpec, TestProbe } import akka.util.{ ByteString, Timeout } import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLServerSocket, SSLSocket, TrustManagerFactory } +import akka.actor.Deploy // TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies class SslTlsSupportSpec extends AkkaSpec { @@ -71,7 +70,7 @@ class SslTlsSupportSpec extends AkkaSpec { "work between a Java client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() val probe = TestProbe() - val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server1")) + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server1")) expectMsg(Tcp.Bound) val client = new JavaSslClient(serverAddress) @@ -83,7 +82,7 @@ class SslTlsSupportSpec extends AkkaSpec { "work between a akka client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() val probe = TestProbe() - val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server2")) + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server2")) expectMsg(Tcp.Bound) val client = new AkkaSslClient(serverAddress) @@ -111,7 +110,7 @@ class SslTlsSupportSpec extends AkkaSpec { import init._ - val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref), + val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local), "client" + counter.incrementAndGet()) probe.send(connection, Tcp.Register(handler)) @@ -163,11 +162,12 @@ class SslTlsSupportSpec extends AkkaSpec { new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000)) val connection = sender - val handler = context.actorOf(Props(new AkkaSslHandler(init))) + val handler = context.actorOf(Props(new AkkaSslHandler(init)).withDeploy(Deploy.local)) //#server context watch handler //#server - val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler)) + val pipeline = context.actorOf(TcpPipelineHandler.props( + init, sender, handler).withDeploy(Deploy.local)) connection ! Tcp.Register(pipeline) //#server diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index a3a3c0dc31..1ee8254960 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -44,7 +44,7 @@ akka { def receive = { case t: Terminated ⇒ testActor ! t.actor.path } - }), name = "observer2") + }).withDeploy(Deploy.local), name = "observer2") expectMsg(60.seconds, path) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 5f524029f0..4d758ef06a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -12,7 +12,7 @@ object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.actor.deployment { - /user/service2 { + /service2 { router = round-robin nr-of-instances = 3 remote = "akka://sys@wallace:2552" @@ -34,7 +34,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "A RemoteDeployer" must { "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { - val service = "/user/service2" + val service = "/service2" val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) deployment must be(Some( @@ -46,6 +46,12 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "mydispatcher"))) } + "reject remote deployment when the source requires LocalScope" in { + intercept[IllegalArgumentException] { + system.actorOf(Props.empty.withDeploy(Deploy.local), "service2") + }.getMessage must be === "configuration requested remote deployment for local-only Props at [akka://RemoteDeployerSpec/user/service2]" + } + } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 9592a9509f..afc49a7904 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -164,14 +164,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case x: Int ⇒ sender ! byteStringOfSize(x) case x ⇒ sender ! x } - }), bigBounceId) + }).withDeploy(Deploy.local), bigBounceId) val bigBounceHere = system.actorFor(s"akka.test://remote-sys@localhost:12346/user/$bigBounceId") val eventForwarder = system.actorOf(Props(new Actor { def receive = { case x ⇒ testActor ! x } - })) + }).withDeploy(Deploy.local)) system.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent]) try { @@ -368,10 +368,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } }), "looker2") // child is configured to be deployed on remoteSystem - l ! (Props[Echo1], "child") + l ! ((Props[Echo1], "child")) val child = expectMsgType[ActorRef] // grandchild is configured to be deployed on RemotingSpec (system) - child ! (Props[Echo1], "grandchild") + child ! ((Props[Echo1], "grandchild")) val grandchild = expectMsgType[ActorRef] grandchild.asInstanceOf[ActorRefScope].isLocal must be(true) grandchild ! 53 @@ -399,7 +399,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D child ! PoisonPill expectMsg("postStop") expectMsgType[Terminated].actor must be === child - l ! (Props[Echo1], "child") + l ! ((Props[Echo1], "child")) val child2 = expectMsgType[ActorRef] child2 ! Identify("idReq2") expectMsg(ActorIdentity("idReq2", Some(child2))) diff --git a/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala b/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala new file mode 100644 index 0000000000..6b2a20238a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/SerializeCreatorsVerificationSpec.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.remote + +import akka.testkit.AkkaSpec + +class SerializeCreatorsVerificationSpec extends AkkaSpec { + + "serialize-creators must be on" in { + system.settings.SerializeAllCreators must be === true + } + +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index bbd347a1eb..64a6163608 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -20,6 +20,7 @@ import org.junit.runner.RunWith import akka.actor.Terminated import scala.concurrent.duration._ import akka.actor.PoisonPill +import akka.actor.Deploy @RunWith(classOf[JUnitRunner]) class UntrustedSpec extends AkkaSpec(""" @@ -51,7 +52,7 @@ akka.loglevel = DEBUG case d @ Debug(_, _, msg: String) if msg contains "dropping" ⇒ testActor ! d case _ ⇒ } - }), "debugSniffer"), classOf[Logging.Debug]) + }).withDeploy(Deploy.local), "debugSniffer"), classOf[Logging.Debug]) "UntrustedMode" must { @@ -77,7 +78,7 @@ akka.loglevel = DEBUG def receive = { case x ⇒ testActor forward x } - })) + }).withDeploy(Deploy.local)) within(1.second) { expectMsgType[Logging.Debug] expectNoMsg diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index bba95e6d9c..486422d2ce 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -13,6 +13,7 @@ import com.google.protobuf.{ ByteString ⇒ PByteString } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.{ Await, Promise } +import akka.actor.Deploy object AkkaProtocolSpec { @@ -129,7 +130,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(handle.readHandlerPromise.isCompleted) } @@ -143,7 +144,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) reader ! testAssociate(uid = 33, cookie = None) @@ -178,7 +179,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) // a stray message will force a disassociate reader ! testHeartbeat @@ -205,7 +206,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, 42, None)) failureDetector.called must be(true) @@ -238,7 +239,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) reader ! testAssociate(uid = 33, Some("xyzzy")) @@ -257,7 +258,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) // Send the correct cookie reader ! testAssociate(uid = 33, Some("abcde")) @@ -290,7 +291,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = Some("abcde"))) } @@ -308,7 +309,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -343,7 +344,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -378,7 +379,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None)) @@ -416,7 +417,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(conf), codec, - failureDetector)) + failureDetector).withDeploy(Deploy.local)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))