make LocalScope mean “purely local” and avoid Props serialization check, see #3210
This commit is contained in:
parent
a4af04ca09
commit
92db59183e
51 changed files with 205 additions and 117 deletions
|
|
@ -87,7 +87,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
case "ping" ⇒ sender ! "pong"
|
||||
case t: Terminated ⇒ testActor ! WrappedTerminated(t)
|
||||
}
|
||||
}))
|
||||
}).withDeploy(Deploy.local))
|
||||
|
||||
monitor2 ! "ping"
|
||||
|
||||
|
|
@ -133,7 +133,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
super.handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}
|
||||
val supervisor = system.actorOf(Props(new Supervisor(strategy)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local))
|
||||
|
||||
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
|
||||
val brother = Await.result((supervisor ? Props(new Actor {
|
||||
|
|
@ -166,7 +166,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
context unbecome
|
||||
}
|
||||
}
|
||||
}))
|
||||
}).withDeploy(Deploy.local))
|
||||
|
||||
parent ! "NKOTB"
|
||||
expectMsg("GREEN")
|
||||
|
|
@ -198,7 +198,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
}
|
||||
|
||||
val t1, t2 = TestLatch()
|
||||
val w = system.actorOf(Props(new Watcher), "myDearWatcher")
|
||||
val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher")
|
||||
val p = TestProbe()
|
||||
w ! W(p.ref)
|
||||
w ! ((t1, t2))
|
||||
|
|
|
|||
|
|
@ -6,16 +6,15 @@ package akka.io
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import java.security.MessageDigest
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration.{ Duration, DurationInt }
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
|
||||
import akka.actor.{ Actor, ActorContext, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated }
|
||||
import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext }
|
||||
import akka.pattern.ask
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import akka.util.{ ByteString, Timeout }
|
||||
import akka.actor.Deploy
|
||||
|
||||
object BackpressureSpec {
|
||||
|
||||
|
|
@ -38,7 +37,7 @@ object BackpressureSpec {
|
|||
val init = TcpPipelineHandler.withLogger(log,
|
||||
new TcpReadWriteAdapter >>
|
||||
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||
sender ! Tcp.Register(handler)
|
||||
unstashAll()
|
||||
context.become(connected(init, handler))
|
||||
|
|
@ -78,6 +77,8 @@ object BackpressureSpec {
|
|||
val failed: Receive = {
|
||||
case _ ⇒ sender ! Failed
|
||||
}
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = context.stop(self)
|
||||
}
|
||||
|
||||
case object GetPort
|
||||
|
|
@ -113,7 +114,7 @@ object BackpressureSpec {
|
|||
val init = TcpPipelineHandler.withLogger(log,
|
||||
new TcpReadWriteAdapter >>
|
||||
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||
sender ! Tcp.Register(handler)
|
||||
unstashAll()
|
||||
context.become(connected(init, handler))
|
||||
|
|
@ -147,10 +148,12 @@ object BackpressureSpec {
|
|||
val failed: Receive = {
|
||||
case _ ⇒ sender ! Failed
|
||||
}
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
class BackpressureSpec extends AkkaSpec with ImplicitSender {
|
||||
class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with ImplicitSender {
|
||||
|
||||
import BackpressureSpec._
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,11 @@ import Tcp._
|
|||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
|
||||
class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4")
|
||||
class CapacityLimitSpec extends AkkaSpec("""
|
||||
akka.loglevel = ERROR
|
||||
akka.io.tcp.max-channels = 4
|
||||
akka.actor.serialize-creators = on
|
||||
""")
|
||||
with TcpIntegrationSpecSupport {
|
||||
|
||||
"The TCP transport implementation" should {
|
||||
|
|
|
|||
|
|
@ -12,8 +12,9 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import scala.concurrent.duration._
|
||||
import akka.io.TcpPipelineHandler.Management
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Deploy
|
||||
|
||||
class DelimiterFramingSpec extends AkkaSpec {
|
||||
class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") {
|
||||
|
||||
val addresses = TestUtils.temporaryServerAddresses(4)
|
||||
|
||||
|
|
@ -40,7 +41,7 @@ class DelimiterFramingSpec extends AkkaSpec {
|
|||
val counter = new AtomicInteger
|
||||
|
||||
def testSetup(serverAddress: InetSocketAddress, delimiter: String, includeDelimiter: Boolean): Unit = {
|
||||
val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter))
|
||||
val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter).withDeploy(Deploy.local))
|
||||
val probe = TestProbe()
|
||||
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress))
|
||||
probe.expectMsgType[Tcp.Bound]
|
||||
|
|
@ -68,7 +69,7 @@ class DelimiterFramingSpec extends AkkaSpec {
|
|||
|
||||
import init._
|
||||
|
||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref),
|
||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local),
|
||||
"client" + counter.incrementAndGet())
|
||||
probe.send(connection, Tcp.Register(handler))
|
||||
|
||||
|
|
@ -128,7 +129,7 @@ class DelimiterFramingSpec extends AkkaSpec {
|
|||
import init._
|
||||
|
||||
val connection = sender
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
|
||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||
|
||||
connection ! Tcp.Register(handler)
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
|
|||
import scala.util.Try
|
||||
import scala.util.Success
|
||||
|
||||
class PipelineSpec extends AkkaSpec {
|
||||
class PipelineSpec extends AkkaSpec("akka.actor.serialize-creators = on") {
|
||||
|
||||
trait Level1
|
||||
trait Level2
|
||||
|
|
|
|||
|
|
@ -23,7 +23,10 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
|
|||
import akka.util.{ Helpers, ByteString }
|
||||
import akka.TestUtils._
|
||||
|
||||
class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") {
|
||||
class TcpConnectionSpec extends AkkaSpec("""
|
||||
akka.io.tcp.register-timeout = 500ms
|
||||
akka.actor.serialize-creators = on
|
||||
""") {
|
||||
// Helper to avoid Windows localization specific differences
|
||||
def ignoreIfWindows(): Unit =
|
||||
if (Helpers.isWindows) {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,10 @@ import TestUtils._
|
|||
import akka.testkit.EventFilter
|
||||
import java.io.IOException
|
||||
|
||||
class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegrationSpecSupport {
|
||||
class TcpIntegrationSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.serialize-creators = on
|
||||
""") with TcpIntegrationSpecSupport {
|
||||
|
||||
"The TCP transport implementation" should {
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,10 @@ import akka.io.SelectionHandler._
|
|||
import akka.TestUtils
|
||||
import Tcp._
|
||||
|
||||
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||
class TcpListenerSpec extends AkkaSpec("""
|
||||
akka.io.tcp.batch-accept-limit = 2
|
||||
akka.actor.serialize-creators = on
|
||||
""") {
|
||||
|
||||
"A TcpListener" must {
|
||||
|
||||
|
|
@ -130,7 +133,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
|||
private class ListenerParent extends Actor with ChannelRegistry {
|
||||
val listener = context.actorOf(
|
||||
props = Props(classOf[TcpListener], selectorRouter.ref, Tcp(system), this, bindCommander.ref,
|
||||
Bind(handler.ref, endpoint, 100, Nil)),
|
||||
Bind(handler.ref, endpoint, 100, Nil)).withDeploy(Deploy.local),
|
||||
name = "test-listener-" + counter.next())
|
||||
parent.watch(listener)
|
||||
def receive: Receive = {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,10 @@ import akka.util.ByteString
|
|||
import java.net.InetSocketAddress
|
||||
import akka.actor.ActorRef
|
||||
|
||||
class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
|
||||
class UdpConnectedIntegrationSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.serialize-creators = on
|
||||
""") with ImplicitSender {
|
||||
|
||||
val addresses = temporaryServerAddresses(3, udp = true)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,9 @@ import akka.util.ByteString
|
|||
import java.net.InetSocketAddress
|
||||
import akka.actor.ActorRef
|
||||
|
||||
class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
|
||||
class UdpIntegrationSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.serialize-creators = on""") with ImplicitSender {
|
||||
|
||||
val addresses = temporaryServerAddresses(3, udp = true)
|
||||
|
||||
|
|
|
|||
|
|
@ -276,7 +276,7 @@ class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerial
|
|||
}))
|
||||
system stop c
|
||||
|
||||
intercept[java.io.NotSerializableException] {
|
||||
intercept[IllegalArgumentException] {
|
||||
val d = system.actorOf(Props(new NonSerializableActor(system)))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ akka {
|
|||
serialize-messages = off
|
||||
|
||||
# Serializes and deserializes creators (in Props) to ensure that they can be
|
||||
# sent over the network, this is only intended for testing.
|
||||
# sent over the network, this is only intended for testing. Purely local deployments
|
||||
# as marked with deploy.scope == LocalScope are exempt from verification.
|
||||
serialize-creators = off
|
||||
|
||||
# Timeout for send operations to top-level actors which are in the process
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import scala.annotation.tailrec
|
|||
object Deploy {
|
||||
final val NoDispatcherGiven = ""
|
||||
final val NoMailboxGiven = ""
|
||||
val local = Deploy(scope = LocalScope)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -99,6 +100,7 @@ abstract class LocalScope extends Scope
|
|||
* which do not set a different scope. It is also the only scope handled by
|
||||
* the LocalActorRefProvider.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case object LocalScope extends LocalScope {
|
||||
/**
|
||||
* Java API: get the singleton instance
|
||||
|
|
@ -113,6 +115,7 @@ case object LocalScope extends LocalScope {
|
|||
*/
|
||||
@SerialVersionUID(1L)
|
||||
abstract class NoScopeGiven extends Scope
|
||||
@SerialVersionUID(1L)
|
||||
case object NoScopeGiven extends NoScopeGiven {
|
||||
def withFallback(other: Scope): Scope = other
|
||||
|
||||
|
|
|
|||
|
|
@ -168,12 +168,15 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
}
|
||||
|
||||
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
|
||||
if (cell.system.settings.SerializeAllCreators) {
|
||||
val ser = SerializationExtension(cell.system)
|
||||
props.args forall (arg ⇒
|
||||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
|
||||
ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
|
||||
}
|
||||
if (cell.system.settings.SerializeAllCreators && props.deploy.scope != LocalScope)
|
||||
try {
|
||||
val ser = SerializationExtension(cell.system)
|
||||
props.args forall (arg ⇒
|
||||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
|
||||
ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
|
||||
}
|
||||
/*
|
||||
* in case we are currently terminating, fail external attachChild requests
|
||||
* (internal calls cannot happen anyway because we are suspended)
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ private[io] object SelectionHandler {
|
|||
override def supervisorStrategy = connectionSupervisorStrategy
|
||||
|
||||
val selectorPool = context.actorOf(
|
||||
props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
|
||||
props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)).withDeploy(Deploy.local),
|
||||
name = "selectors")
|
||||
|
||||
final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = {
|
||||
|
|
@ -253,7 +253,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A
|
|||
if (MaxChannelsPerSelector == -1 || childCount < MaxChannelsPerSelector) {
|
||||
val newName = sequenceNumber.toString
|
||||
sequenceNumber += 1
|
||||
val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher), name = newName)
|
||||
val child = context.actorOf(props = cmd.childProps(registry).withDispatcher(WorkerDispatcher).withDeploy(Deploy.local), name = newName)
|
||||
childCount += 1
|
||||
if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -469,7 +469,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
*/
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher),
|
||||
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local),
|
||||
name = "IO-TCP")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -215,7 +215,7 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
props = Props(classOf[UdpManager], this),
|
||||
props = Props(classOf[UdpManager], this).withDeploy(Deploy.local),
|
||||
name = "IO-UDP-FF")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
props = Props(classOf[UdpConnectedManager], this),
|
||||
props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local),
|
||||
name = "IO-UDP-CONN")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -489,14 +489,17 @@ case object NoRouter extends NoRouter {
|
|||
*/
|
||||
private[akka] trait OverrideUnsetConfig[T <: RouterConfig] extends RouterConfig {
|
||||
|
||||
final def overrideUnsetConfig(other: RouterConfig): RouterConfig = {
|
||||
val wssConf: OverrideUnsetConfig[T] = if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy)
|
||||
&& (other.supervisorStrategy ne Router.defaultSupervisorStrategy))
|
||||
this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]]
|
||||
else this
|
||||
if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get)
|
||||
else wssConf
|
||||
}
|
||||
final def overrideUnsetConfig(other: RouterConfig): RouterConfig =
|
||||
if (other == NoRouter) this // NoRouter is the default, hence “neutral”
|
||||
else {
|
||||
val wssConf: OverrideUnsetConfig[T] =
|
||||
if ((this.supervisorStrategy eq Router.defaultSupervisorStrategy)
|
||||
&& (other.supervisorStrategy ne Router.defaultSupervisorStrategy))
|
||||
this.withSupervisorStrategy(other.supervisorStrategy).asInstanceOf[OverrideUnsetConfig[T]]
|
||||
else this
|
||||
if (wssConf.resizer.isEmpty && other.resizer.isDefined) wssConf.withResizer(other.resizer.get)
|
||||
else wssConf
|
||||
}
|
||||
|
||||
def withSupervisorStrategy(strategy: SupervisorStrategy): T
|
||||
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
// create supervisor for daemons under path "/system/cluster"
|
||||
private val clusterDaemons: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(classOf[ClusterDaemon], settings).
|
||||
withDispatcher(UseDispatcher), name = "cluster")
|
||||
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ private[akka] class ClusterActorRefProvider(
|
|||
failureDetector,
|
||||
heartbeatInterval = WatchHeartBeatInterval,
|
||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local), "remote-watcher")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import akka.cluster.MemberStatus._
|
|||
import akka.cluster.ClusterEvent._
|
||||
import akka.actor.ActorSelection
|
||||
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
||||
import akka.actor.Deploy
|
||||
|
||||
/**
|
||||
* Base trait for all cluster messages. All ClusterMessage's are serializable.
|
||||
|
|
@ -167,7 +168,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
def receive = {
|
||||
case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg
|
||||
case AddOnMemberUpListener(code) ⇒
|
||||
context.actorOf(Props(classOf[OnMemberUpListener], code))
|
||||
context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local))
|
||||
case PublisherCreated(publisher) ⇒
|
||||
if (settings.MetricsEnabled) {
|
||||
// metrics must be started after core/publisher to be able
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props }
|
|||
import akka.cluster.ClusterEvent._
|
||||
import akka.actor.PoisonPill
|
||||
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
||||
import akka.actor.Deploy
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -67,7 +68,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes
|
||||
}
|
||||
}
|
||||
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")
|
||||
}).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener")
|
||||
}
|
||||
|
||||
def self: Member = {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
package akka.cluster.routing
|
||||
|
||||
import java.util.Arrays
|
||||
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.collection.immutable
|
||||
import akka.actor.Actor
|
||||
|
|
@ -24,6 +23,7 @@ import akka.cluster.StandardMetrics.HeapMemory
|
|||
import akka.event.Logging
|
||||
import akka.japi.Util.immutableSeq
|
||||
import akka.routing._
|
||||
import akka.actor.Deploy
|
||||
|
||||
object AdaptiveLoadBalancingRouter {
|
||||
private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() {
|
||||
|
|
@ -179,7 +179,7 @@ trait AdaptiveLoadBalancingRouterLike { this: RouterConfig ⇒
|
|||
metricsSelector.weights(metrics)))
|
||||
}
|
||||
|
||||
}).withDispatcher(routerDispatcher), name = "metricsListener")
|
||||
}).withDispatcher(routerDispatcher).withDeploy(Deploy.local), name = "metricsListener")
|
||||
|
||||
def getNext(): ActorRef = weightedRoutees match {
|
||||
case Some(weighted) ⇒
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import akka.actor.ActorRef
|
|||
import akka.remote.RemoteWatcher
|
||||
import akka.actor.ActorSystem
|
||||
import akka.cluster.MultiNodeClusterSpec.EndActor
|
||||
import akka.actor.Deploy
|
||||
|
||||
object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -93,7 +94,7 @@ abstract class ClusterDeathWatchSpec
|
|||
watchEstablished.countDown
|
||||
case Terminated(actor) ⇒ testActor ! actor.path
|
||||
}
|
||||
}), name = "observer1")
|
||||
}).withDeploy(Deploy.local), name = "observer1")
|
||||
|
||||
watchEstablished.await
|
||||
enterBarrier("watch-established")
|
||||
|
|
@ -113,7 +114,7 @@ abstract class ClusterDeathWatchSpec
|
|||
}
|
||||
|
||||
runOn(second, third, fourth) {
|
||||
system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject")
|
||||
system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject")
|
||||
enterBarrier("subjected-started")
|
||||
enterBarrier("watch-established")
|
||||
runOn(third) {
|
||||
|
|
@ -148,7 +149,7 @@ abstract class ClusterDeathWatchSpec
|
|||
def receive = {
|
||||
case t: Terminated ⇒ testActor ! t.actor.path
|
||||
}
|
||||
}), name = "observer3")
|
||||
}).withDeploy(Deploy.local), name = "observer3")
|
||||
|
||||
expectMsg(path)
|
||||
}
|
||||
|
|
@ -158,7 +159,7 @@ abstract class ClusterDeathWatchSpec
|
|||
|
||||
"be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) {
|
||||
runOn(fifth) {
|
||||
system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject5")
|
||||
system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject5")
|
||||
}
|
||||
enterBarrier("subjected-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import scala.concurrent.duration._
|
|||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.actor.Deploy
|
||||
|
||||
object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -66,7 +67,7 @@ abstract class LeaderLeavingSpec
|
|||
case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
enterBarrier("leader-left")
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import scala.concurrent.duration._
|
|||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.actor.Deploy
|
||||
|
||||
object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -58,7 +59,7 @@ abstract class MembershipChangeListenerExitingSpec
|
|||
removedLatch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
exitingLatch.await
|
||||
removedLatch.await
|
||||
|
|
@ -76,7 +77,7 @@ abstract class MembershipChangeListenerExitingSpec
|
|||
exitingLatch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
exitingLatch.await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.testkit._
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Deploy
|
||||
|
||||
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -49,7 +50,7 @@ abstract class MembershipChangeListenerUpSpec
|
|||
latch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("listener-1-registered")
|
||||
cluster.join(first)
|
||||
latch.await
|
||||
|
|
@ -76,7 +77,7 @@ abstract class MembershipChangeListenerUpSpec
|
|||
latch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("listener-2-registered")
|
||||
|
||||
runOn(third) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import scala.concurrent.duration._
|
|||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.actor.Deploy
|
||||
|
||||
object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -51,7 +52,7 @@ abstract class NodeLeavingAndExitingSpec
|
|||
case _: MemberRemoved ⇒ // not tested here
|
||||
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
}).withDeploy(Deploy.local)), classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
runOn(third) {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import akka.actor.Props
|
|||
import akka.actor.Actor
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.actor.Deploy
|
||||
|
||||
object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
||||
val seed1 = role("seed1")
|
||||
|
|
@ -74,7 +75,7 @@ abstract class RestartFirstSeedNodeSpec
|
|||
seedNode1Address = a
|
||||
sender ! "ok"
|
||||
}
|
||||
}), name = "address-receiver")
|
||||
}).withDeploy(Deploy.local), name = "address-receiver")
|
||||
enterBarrier("seed1-address-receiver-ready")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -585,7 +585,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
require(level >= 1)
|
||||
def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width)
|
||||
val indexedChildren =
|
||||
0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector
|
||||
0 until width map { i ⇒ context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) } toVector
|
||||
|
||||
def receive = {
|
||||
case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job))
|
||||
|
|
@ -762,7 +762,7 @@ abstract class StressSpec
|
|||
|
||||
def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = {
|
||||
runOn(roles.head) {
|
||||
val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings),
|
||||
val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings).withDeploy(Deploy.local),
|
||||
name = "result" + step)
|
||||
if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory))
|
||||
else aggregator ! ReportTo(None)
|
||||
|
|
@ -1017,7 +1017,7 @@ abstract class StressSpec
|
|||
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
|
||||
runOn(masterRoles: _*) {
|
||||
reportResult {
|
||||
val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree),
|
||||
val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name)
|
||||
m ! Begin
|
||||
import system.dispatcher
|
||||
|
|
@ -1149,7 +1149,7 @@ abstract class StressSpec
|
|||
|
||||
"start routers that are running while nodes are joining" taggedAs LongRunningTest in {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false),
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name) ! Begin
|
||||
}
|
||||
}
|
||||
|
|
@ -1245,7 +1245,7 @@ abstract class StressSpec
|
|||
"start routers that are running while nodes are removed" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false),
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name) ! Begin
|
||||
}
|
||||
}
|
||||
|
|
|
|||
1
akka-cluster/src/test/resources/reference.conf
Normal file
1
akka-cluster/src/test/resources/reference.conf
Normal file
|
|
@ -0,0 +1 @@
|
|||
akka.actor.serialize-creators=on
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class SerializeCreatorsVerificationSpec extends AkkaSpec {
|
||||
|
||||
"serialize-creators must be on" in {
|
||||
system.settings.SerializeAllCreators must be === true
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -111,7 +111,11 @@ actor systems has to have a JAR containing the class.
|
|||
arguments to the actor being created, do not make the factory a non-static
|
||||
inner class: this will inherently capture a reference to its enclosing
|
||||
object, which in most cases is not serializable. It is best to make a static
|
||||
inner class which implements :class:`UntypedActorFactory`.
|
||||
inner class which implements :class:`Creator<T extends Actor>`.
|
||||
|
||||
Serializability of all Props can be tested by setting the configuration item
|
||||
``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has
|
||||
``LocalScope`` are exempt from this check.
|
||||
|
||||
.. note::
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ ZeroMQ is very opinionated when it comes to multi-threading so configuration opt
|
|||
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
|
||||
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
|
||||
|
||||
.. info::
|
||||
.. note::
|
||||
|
||||
The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported.
|
||||
|
||||
|
|
|
|||
|
|
@ -120,6 +120,10 @@ actor systems has to have a JAR containing the class.
|
|||
most cases is not serializable. It is best to create a factory method in the
|
||||
companion object of the actor’s class.
|
||||
|
||||
Serializability of all Props can be tested by setting the configuration item
|
||||
``akka.actor.serialize-creators=on``. Only Props whose ``deploy`` has
|
||||
``LocalScope`` are exempt from this check.
|
||||
|
||||
.. note::
|
||||
|
||||
You can use asterisks as wildcard matches for the actor paths, so you could specify:
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ ZeroMQ is very opinionated when it comes to multi-threading so configuration opt
|
|||
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
|
||||
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
|
||||
|
||||
.. info::
|
||||
.. note::
|
||||
|
||||
The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported.
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import scala.reflect.classTag
|
|||
import akka.ConfigurationException
|
||||
import akka.AkkaException
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.actor.Deploy
|
||||
|
||||
/**
|
||||
* The conductor is the one orchestrating the test: it governs the
|
||||
|
|
@ -413,7 +414,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
|
|||
case CreateServerFSM(channel) ⇒
|
||||
val (ip, port) = channel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress.getHostAddress, s.getPort) }
|
||||
val name = ip + ":" + port + "-server" + generation.next
|
||||
sender ! context.actorOf(Props(classOf[ServerFSM], self, channel), name)
|
||||
sender ! context.actorOf(Props(classOf[ServerFSM], self, channel).withDeploy(Deploy.local), name)
|
||||
case c @ NodeInfo(name, addr, fsm) ⇒
|
||||
barrier forward c
|
||||
if (nodes contains name) {
|
||||
|
|
|
|||
|
|
@ -308,7 +308,7 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
AkkaPduProtobufCodec,
|
||||
receiveBuffers = receiveBuffers,
|
||||
reliableDeliverySupervisor = Some(self))
|
||||
.withDispatcher("akka.remote.writer-dispatcher"),
|
||||
.withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local),
|
||||
"endpointWriter"))
|
||||
}
|
||||
}
|
||||
|
|
@ -581,7 +581,7 @@ private[remote] class EndpointWriter(
|
|||
val newReader =
|
||||
context.watch(context.actorOf(
|
||||
EndpointReader(localAddress, remoteAddress, transport, settings, codec,
|
||||
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers),
|
||||
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local),
|
||||
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
|
||||
handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
|
||||
Some(newReader)
|
||||
|
|
|
|||
|
|
@ -191,7 +191,8 @@ private[akka] class RemoteActorRefProvider(
|
|||
failureDetector,
|
||||
heartbeatInterval = WatchHeartBeatInterval,
|
||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local),
|
||||
"remote-watcher")
|
||||
}
|
||||
|
||||
protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = {
|
||||
|
|
@ -265,17 +266,15 @@ private[akka] class RemoteActorRefProvider(
|
|||
case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒
|
||||
if (hasAddress(addr)) {
|
||||
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
||||
} else {
|
||||
try {
|
||||
val localAddress = transport.localAddressForRemote(addr)
|
||||
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
|
||||
withUid(path.uid)
|
||||
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
log.error(e, "Error while looking up address [{}]", addr)
|
||||
new EmptyLocalActorRef(this, path, eventStream)
|
||||
}
|
||||
} else if (props.deploy.scope == LocalScope) {
|
||||
throw new IllegalArgumentException(s"configuration requested remote deployment for local-only Props at [$path]")
|
||||
} else try {
|
||||
val localAddress = transport.localAddressForRemote(addr)
|
||||
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
|
||||
withUid(path.uid)
|
||||
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ throw new IllegalArgumentException(s"remote deployment failed for [$path]", e)
|
||||
}
|
||||
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ private[remote] object Remoting {
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name)
|
||||
case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props.withDeploy(Deploy.local), name)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -155,7 +155,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
case None ⇒
|
||||
log.info("Starting remoting")
|
||||
val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
Props(classOf[EndpointManager], provider.remoteSettings.config, log), Remoting.EndpointManagerName)
|
||||
Props(classOf[EndpointManager], provider.remoteSettings.config, log).withDeploy(Deploy.local), Remoting.EndpointManagerName)
|
||||
endpointManager = Some(manager)
|
||||
|
||||
try {
|
||||
|
|
@ -624,7 +624,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
transport,
|
||||
endpointSettings,
|
||||
AkkaPduProtobufCodec,
|
||||
receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"),
|
||||
receiveBuffers).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local),
|
||||
"reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
|
||||
else context.watch(context.actorOf(EndpointWriter(
|
||||
handleOption,
|
||||
|
|
@ -634,7 +634,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
endpointSettings,
|
||||
AkkaPduProtobufCodec,
|
||||
receiveBuffers,
|
||||
reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"),
|
||||
reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local),
|
||||
"endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -121,7 +121,7 @@ private[transport] class AkkaProtocolManager(
|
|||
stateActorAssociationHandler,
|
||||
stateActorSettings,
|
||||
AkkaPduProtobufCodec,
|
||||
failureDetector), actorNameFor(handle.remoteAddress))
|
||||
failureDetector).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress))
|
||||
|
||||
case AssociateUnderlying(remoteAddress, statusPromise) ⇒
|
||||
val stateActorLocalAddress = localAddress
|
||||
|
|
@ -135,7 +135,7 @@ private[transport] class AkkaProtocolManager(
|
|||
stateActorWrappedTransport,
|
||||
stateActorSettings,
|
||||
AkkaPduProtobufCodec,
|
||||
failureDetector), actorNameFor(remoteAddress))
|
||||
failureDetector).withDeploy(Deploy.local), actorNameFor(remoteAddress))
|
||||
}
|
||||
|
||||
private def createTransportFailureDetector(): FailureDetector =
|
||||
|
|
|
|||
|
|
@ -288,7 +288,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
|
|||
val managerRef = self
|
||||
ThrottlerHandle(
|
||||
originalHandle,
|
||||
context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound), "throttler" + nextId()))
|
||||
context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound).withDeploy(Deploy.local),
|
||||
"throttler" + nextId()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
1
akka-remote/src/test/resources/reference.conf
Normal file
1
akka-remote/src/test/resources/reference.conf
Normal file
|
|
@ -0,0 +1 @@
|
|||
akka.actor.serialize-creators=on
|
||||
|
|
@ -28,9 +28,7 @@ import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStream
|
|||
import java.net.{ InetSocketAddress, SocketException }
|
||||
import java.security.{ KeyStore, SecureRandom }
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
||||
import akka.TestUtils
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
|
|
@ -42,6 +40,7 @@ import akka.remote.security.provider.AkkaProvider
|
|||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
import akka.util.{ ByteString, Timeout }
|
||||
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLServerSocket, SSLSocket, TrustManagerFactory }
|
||||
import akka.actor.Deploy
|
||||
|
||||
// TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies
|
||||
class SslTlsSupportSpec extends AkkaSpec {
|
||||
|
|
@ -71,7 +70,7 @@ class SslTlsSupportSpec extends AkkaSpec {
|
|||
"work between a Java client and a akka server" in {
|
||||
val serverAddress = TestUtils.temporaryServerAddress()
|
||||
val probe = TestProbe()
|
||||
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server1"))
|
||||
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server1"))
|
||||
expectMsg(Tcp.Bound)
|
||||
|
||||
val client = new JavaSslClient(serverAddress)
|
||||
|
|
@ -83,7 +82,7 @@ class SslTlsSupportSpec extends AkkaSpec {
|
|||
"work between a akka client and a akka server" in {
|
||||
val serverAddress = TestUtils.temporaryServerAddress()
|
||||
val probe = TestProbe()
|
||||
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server2"))
|
||||
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server2"))
|
||||
expectMsg(Tcp.Bound)
|
||||
|
||||
val client = new AkkaSslClient(serverAddress)
|
||||
|
|
@ -111,7 +110,7 @@ class SslTlsSupportSpec extends AkkaSpec {
|
|||
|
||||
import init._
|
||||
|
||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref),
|
||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local),
|
||||
"client" + counter.incrementAndGet())
|
||||
probe.send(connection, Tcp.Register(handler))
|
||||
|
||||
|
|
@ -163,11 +162,11 @@ class SslTlsSupportSpec extends AkkaSpec {
|
|||
new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000))
|
||||
|
||||
val connection = sender
|
||||
val handler = context.actorOf(Props(new AkkaSslHandler(init)))
|
||||
val handler = context.actorOf(Props(new AkkaSslHandler(init)).withDeploy(Deploy.local))
|
||||
//#server
|
||||
context watch handler
|
||||
//#server
|
||||
val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler))
|
||||
val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler).withDeploy(Deploy.local))
|
||||
|
||||
connection ! Tcp.Register(pipeline)
|
||||
//#server
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ akka {
|
|||
def receive = {
|
||||
case t: Terminated ⇒ testActor ! t.actor.path
|
||||
}
|
||||
}), name = "observer2")
|
||||
}).withDeploy(Deploy.local), name = "observer2")
|
||||
|
||||
expectMsg(60.seconds, path)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ object RemoteDeployerSpec {
|
|||
val deployerConf = ConfigFactory.parseString("""
|
||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
akka.actor.deployment {
|
||||
/user/service2 {
|
||||
/service2 {
|
||||
router = round-robin
|
||||
nr-of-instances = 3
|
||||
remote = "akka://sys@wallace:2552"
|
||||
|
|
@ -34,7 +34,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
|
|||
"A RemoteDeployer" must {
|
||||
|
||||
"be able to parse 'akka.actor.deployment._' with specified remote nodes" in {
|
||||
val service = "/user/service2"
|
||||
val service = "/service2"
|
||||
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1))
|
||||
|
||||
deployment must be(Some(
|
||||
|
|
@ -46,6 +46,12 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
|
|||
"mydispatcher")))
|
||||
}
|
||||
|
||||
"reject remote deployment when the source requires LocalScope" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
system.actorOf(Props.empty.withDeploy(Deploy.local), "service2")
|
||||
}.getMessage must be === "configuration requested remote deployment for local-only Props at [akka://RemoteDeployerSpec/user/service2]"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -164,14 +164,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
case x: Int ⇒ sender ! byteStringOfSize(x)
|
||||
case x ⇒ sender ! x
|
||||
}
|
||||
}), bigBounceId)
|
||||
}).withDeploy(Deploy.local), bigBounceId)
|
||||
val bigBounceHere = system.actorFor(s"akka.test://remote-sys@localhost:12346/user/$bigBounceId")
|
||||
|
||||
val eventForwarder = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor ! x
|
||||
}
|
||||
}))
|
||||
}).withDeploy(Deploy.local))
|
||||
system.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent])
|
||||
system.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent])
|
||||
try {
|
||||
|
|
@ -368,10 +368,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
}
|
||||
}), "looker2")
|
||||
// child is configured to be deployed on remoteSystem
|
||||
l ! (Props[Echo1], "child")
|
||||
l ! ((Props[Echo1], "child"))
|
||||
val child = expectMsgType[ActorRef]
|
||||
// grandchild is configured to be deployed on RemotingSpec (system)
|
||||
child ! (Props[Echo1], "grandchild")
|
||||
child ! ((Props[Echo1], "grandchild"))
|
||||
val grandchild = expectMsgType[ActorRef]
|
||||
grandchild.asInstanceOf[ActorRefScope].isLocal must be(true)
|
||||
grandchild ! 53
|
||||
|
|
@ -399,7 +399,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
child ! PoisonPill
|
||||
expectMsg("postStop")
|
||||
expectMsgType[Terminated].actor must be === child
|
||||
l ! (Props[Echo1], "child")
|
||||
l ! ((Props[Echo1], "child"))
|
||||
val child2 = expectMsgType[ActorRef]
|
||||
child2 ! Identify("idReq2")
|
||||
expectMsg(ActorIdentity("idReq2", Some(child2)))
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class SerializeCreatorsVerificationSpec extends AkkaSpec {
|
||||
|
||||
"serialize-creators must be on" in {
|
||||
system.settings.SerializeAllCreators must be === true
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,7 @@ import org.junit.runner.RunWith
|
|||
import akka.actor.Terminated
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Deploy
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class UntrustedSpec extends AkkaSpec("""
|
||||
|
|
@ -51,7 +52,7 @@ akka.loglevel = DEBUG
|
|||
case d @ Debug(_, _, msg: String) if msg contains "dropping" ⇒ testActor ! d
|
||||
case _ ⇒
|
||||
}
|
||||
}), "debugSniffer"), classOf[Logging.Debug])
|
||||
}).withDeploy(Deploy.local), "debugSniffer"), classOf[Logging.Debug])
|
||||
|
||||
"UntrustedMode" must {
|
||||
|
||||
|
|
@ -77,7 +78,7 @@ akka.loglevel = DEBUG
|
|||
def receive = {
|
||||
case x ⇒ testActor forward x
|
||||
}
|
||||
}))
|
||||
}).withDeploy(Deploy.local))
|
||||
within(1.second) {
|
||||
expectMsgType[Logging.Debug]
|
||||
expectNoMsg
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import com.google.protobuf.{ ByteString ⇒ PByteString }
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ Await, Promise }
|
||||
import akka.actor.Deploy
|
||||
|
||||
object AkkaProtocolSpec {
|
||||
|
||||
|
|
@ -129,7 +130,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
ActorAssociationEventListener(testActor),
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(handle.readHandlerPromise.isCompleted)
|
||||
}
|
||||
|
|
@ -143,7 +144,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
ActorAssociationEventListener(testActor),
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
reader ! testAssociate(uid = 33, cookie = None)
|
||||
|
||||
|
|
@ -178,7 +179,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
ActorAssociationEventListener(testActor),
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
// a stray message will force a disassociate
|
||||
reader ! testHeartbeat
|
||||
|
|
@ -205,7 +206,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, 42, None))
|
||||
failureDetector.called must be(true)
|
||||
|
|
@ -238,7 +239,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
ActorAssociationEventListener(testActor),
|
||||
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
reader ! testAssociate(uid = 33, Some("xyzzy"))
|
||||
|
||||
|
|
@ -257,7 +258,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
ActorAssociationEventListener(testActor),
|
||||
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
// Send the correct cookie
|
||||
reader ! testAssociate(uid = 33, Some("abcde"))
|
||||
|
|
@ -290,7 +291,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = Some("abcde")))
|
||||
}
|
||||
|
|
@ -308,7 +309,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
|
||||
|
||||
|
|
@ -343,7 +344,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
|
||||
|
||||
|
|
@ -378,7 +379,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
|
||||
|
||||
|
|
@ -416,7 +417,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
transport,
|
||||
new AkkaProtocolSettings(conf),
|
||||
codec,
|
||||
failureDetector))
|
||||
failureDetector).withDeploy(Deploy.local))
|
||||
|
||||
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue