Rename addr to address in non-public API #21874
This commit is contained in:
parent
ad1ffeda2b
commit
f623d10522
24 changed files with 85 additions and 85 deletions
|
|
@ -139,18 +139,18 @@ class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
|
||||||
val ser = SerializationExtension(system)
|
val ser = SerializationExtension(system)
|
||||||
import ser._
|
import ser._
|
||||||
|
|
||||||
val addr = Address("120", "Monroe Street", "Santa Clara", "95050")
|
val address = Address("120", "Monroe Street", "Santa Clara", "95050")
|
||||||
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
|
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
|
||||||
|
|
||||||
"Serialization" must {
|
"Serialization" must {
|
||||||
|
|
||||||
"have correct bindings" in {
|
"have correct bindings" in {
|
||||||
ser.bindings.collectFirst { case (c, s) if c == addr.getClass ⇒ s.getClass } should ===(Some(classOf[JavaSerializer]))
|
ser.bindings.collectFirst { case (c, s) if c == address.getClass ⇒ s.getClass } should ===(Some(classOf[JavaSerializer]))
|
||||||
ser.bindings.collectFirst { case (c, s) if c == classOf[PlainMessage] ⇒ s.getClass } should ===(Some(classOf[NoopSerializer]))
|
ser.bindings.collectFirst { case (c, s) if c == classOf[PlainMessage] ⇒ s.getClass } should ===(Some(classOf[NoopSerializer]))
|
||||||
}
|
}
|
||||||
|
|
||||||
"serialize Address" in {
|
"serialize Address" in {
|
||||||
assert(deserialize(serialize(addr).get, classOf[Address]).get === addr)
|
assert(deserialize(serialize(address).get, classOf[Address]).get === address)
|
||||||
}
|
}
|
||||||
|
|
||||||
"serialize Person" in {
|
"serialize Person" in {
|
||||||
|
|
|
||||||
|
|
@ -54,8 +54,8 @@ object ActorPath {
|
||||||
* Parse string as actor path; throws java.net.MalformedURLException if unable to do so.
|
* Parse string as actor path; throws java.net.MalformedURLException if unable to do so.
|
||||||
*/
|
*/
|
||||||
def fromString(s: String): ActorPath = s match {
|
def fromString(s: String): ActorPath = s match {
|
||||||
case ActorPathExtractor(addr, elems) ⇒ RootActorPath(addr) / elems
|
case ActorPathExtractor(address, elems) ⇒ RootActorPath(address) / elems
|
||||||
case _ ⇒ throw new MalformedURLException("cannot parse as ActorPath: " + s)
|
case _ ⇒ throw new MalformedURLException("cannot parse as ActorPath: " + s)
|
||||||
}
|
}
|
||||||
|
|
||||||
private final val ValidSymbols = """-_.*$+:@&=,!~';"""
|
private final val ValidSymbols = """-_.*$+:@&=,!~';"""
|
||||||
|
|
@ -367,10 +367,10 @@ final class ChildActorPath private[akka] (val parent: ActorPath, val name: Strin
|
||||||
appendUidFragment(sb).toString
|
appendUidFragment(sb).toString
|
||||||
}
|
}
|
||||||
|
|
||||||
private def addressStringLengthDiff(addr: Address): Int = {
|
private def addressStringLengthDiff(address: Address): Int = {
|
||||||
val r = root
|
val r = root
|
||||||
if (r.address.host.isDefined) 0
|
if (r.address.host.isDefined) 0
|
||||||
else (addr.toString.length - r.address.toString.length)
|
else (address.toString.length - r.address.toString.length)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider {
|
||||||
|
|
||||||
@throws[UnknownHostException]
|
@throws[UnknownHostException]
|
||||||
def addr: InetAddress = addrOption match {
|
def addr: InetAddress = addrOption match {
|
||||||
case Some(addr) ⇒ addr
|
case Some(ipAddress) ⇒ ipAddress
|
||||||
case None ⇒ throw new UnknownHostException(name)
|
case None ⇒ throw new UnknownHostException(name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,4 +96,4 @@ object IpVersionSelector {
|
||||||
case "true" ⇒ ipv6 orElse ipv4
|
case "true" ⇒ ipv6 orElse ipv4
|
||||||
case _ ⇒ ipv4 orElse ipv6
|
case _ ⇒ ipv4 orElse ipv6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -372,7 +372,7 @@ abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMe
|
||||||
val (sum, count) = acc(address)
|
val (sum, count) = acc(address)
|
||||||
acc + (address → ((sum + capacity, count + 1)))
|
acc + (address → ((sum + capacity, count + 1)))
|
||||||
}.map {
|
}.map {
|
||||||
case (addr, (sum, count)) ⇒ addr → (sum / count)
|
case (address, (sum, count)) ⇒ address → (sum / count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -434,7 +434,7 @@ abstract class CapacityMetricsSelector extends MetricsSelector {
|
||||||
val (_, min) = capacity.minBy { case (_, c) ⇒ c }
|
val (_, min) = capacity.minBy { case (_, c) ⇒ c }
|
||||||
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
|
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
|
||||||
val divisor = math.max(0.01, min)
|
val divisor = math.max(0.01, min)
|
||||||
capacity map { case (addr, c) ⇒ (addr → math.round((c) / divisor).toInt) }
|
capacity map { case (address, c) ⇒ (address → math.round((c) / divisor).toInt) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
|
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
|
||||||
|
|
||||||
val SeedNodes: immutable.IndexedSeq[Address] =
|
val SeedNodes: immutable.IndexedSeq[Address] =
|
||||||
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector
|
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(address) ⇒ address }.toVector
|
||||||
val SeedNodeTimeout: FiniteDuration = cc.getMillisDuration("seed-node-timeout")
|
val SeedNodeTimeout: FiniteDuration = cc.getMillisDuration("seed-node-timeout")
|
||||||
val RetryUnsuccessfulJoinAfter: Duration = {
|
val RetryUnsuccessfulJoinAfter: Duration = {
|
||||||
val key = "retry-unsuccessful-join-after"
|
val key = "retry-unsuccessful-join-after"
|
||||||
|
|
|
||||||
|
|
@ -238,9 +238,9 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
|
|
||||||
val sys1 = ActorSystem("AdditionalSys", system.settings.config)
|
val sys1 = ActorSystem("AdditionalSys", system.settings.config)
|
||||||
val addr = Cluster(sys1).selfAddress
|
val address = Cluster(sys1).selfAddress
|
||||||
try {
|
try {
|
||||||
Cluster(sys1).join(addr)
|
Cluster(sys1).join(address)
|
||||||
new TestKit(sys1) with ImplicitSender {
|
new TestKit(sys1) with ImplicitSender {
|
||||||
|
|
||||||
val r = newReplicator(sys1)
|
val r = newReplicator(sys1)
|
||||||
|
|
@ -276,11 +276,11 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
|
||||||
"AdditionalSys",
|
"AdditionalSys",
|
||||||
// use the same port
|
// use the same port
|
||||||
ConfigFactory.parseString(s"""
|
ConfigFactory.parseString(s"""
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
akka.remote.netty.tcp.port = ${addr.port.get}
|
akka.remote.netty.tcp.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
try {
|
try {
|
||||||
Cluster(sys2).join(addr)
|
Cluster(sys2).join(address)
|
||||||
new TestKit(sys2) with ImplicitSender {
|
new TestKit(sys2) with ImplicitSender {
|
||||||
|
|
||||||
val r2: ActorRef = newReplicator(sys2)
|
val r2: ActorRef = newReplicator(sys2)
|
||||||
|
|
|
||||||
|
|
@ -140,10 +140,10 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
|
||||||
enterBarrier("pruned")
|
enterBarrier("pruned")
|
||||||
|
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
val addr = cluster2.selfAddress
|
val address = cluster2.selfAddress
|
||||||
val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
akka.remote.netty.tcp.port = ${addr.port.get}
|
akka.remote.netty.tcp.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
val cluster3 = Cluster(sys3)
|
val cluster3 = Cluster(sys3)
|
||||||
val replicator3 = startReplicator(sys3)
|
val replicator3 = startReplicator(sys3)
|
||||||
|
|
|
||||||
|
|
@ -324,9 +324,9 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
||||||
}
|
}
|
||||||
|
|
||||||
when(Initial, stateTimeout = 10 seconds) {
|
when(Initial, stateTimeout = 10 seconds) {
|
||||||
case Event(Hello(name, addr), _) ⇒
|
case Event(Hello(name, address), _) ⇒
|
||||||
roleName = RoleName(name)
|
roleName = RoleName(name)
|
||||||
controller ! NodeInfo(roleName, addr, self)
|
controller ! NodeInfo(roleName, address, self)
|
||||||
goto(Ready)
|
goto(Ready)
|
||||||
case Event(x: NetworkOp, _) ⇒
|
case Event(x: NetworkOp, _) ⇒
|
||||||
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
|
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
|
||||||
|
|
@ -426,7 +426,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
|
||||||
val (ip, port) = channel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress.getHostAddress, s.getPort) }
|
val (ip, port) = channel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress.getHostAddress, s.getPort) }
|
||||||
val name = ip + ":" + port + "-server" + generation.next
|
val name = ip + ":" + port + "-server" + generation.next
|
||||||
sender() ! context.actorOf(Props(classOf[ServerFSM], self, channel).withDeploy(Deploy.local), name)
|
sender() ! context.actorOf(Props(classOf[ServerFSM], self, channel).withDeploy(Deploy.local), name)
|
||||||
case c @ NodeInfo(name, addr, fsm) ⇒
|
case c @ NodeInfo(name, address, fsm) ⇒
|
||||||
barrier forward c
|
barrier forward c
|
||||||
if (nodes contains name) {
|
if (nodes contains name) {
|
||||||
if (initialParticipants > 0) {
|
if (initialParticipants > 0) {
|
||||||
|
|
@ -442,7 +442,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
|
||||||
initialParticipants = 0
|
initialParticipants = 0
|
||||||
}
|
}
|
||||||
if (addrInterest contains name) {
|
if (addrInterest contains name) {
|
||||||
addrInterest(name) foreach (_ ! ToClient(AddressReply(name, addr)))
|
addrInterest(name) foreach (_ ! ToClient(AddressReply(name, address)))
|
||||||
addrInterest -= name
|
addrInterest -= name
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,8 +75,8 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
|
||||||
case x: NetworkOp ⇒
|
case x: NetworkOp ⇒
|
||||||
val w = TCP.Wrapper.newBuilder
|
val w = TCP.Wrapper.newBuilder
|
||||||
x match {
|
x match {
|
||||||
case Hello(name, addr) ⇒
|
case Hello(name, address) ⇒
|
||||||
w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr))
|
w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(address))
|
||||||
case EnterBarrier(name, timeout) ⇒
|
case EnterBarrier(name, timeout) ⇒
|
||||||
val barrier = TCP.EnterBarrier.newBuilder.setName(name)
|
val barrier = TCP.EnterBarrier.newBuilder.setName(name)
|
||||||
timeout foreach (t ⇒ barrier.setTimeout(t.toNanos))
|
timeout foreach (t ⇒ barrier.setTimeout(t.toNanos))
|
||||||
|
|
@ -101,8 +101,8 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
|
||||||
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
|
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
|
||||||
case GetAddress(node) ⇒
|
case GetAddress(node) ⇒
|
||||||
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
|
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
|
||||||
case AddressReply(node, addr) ⇒
|
case AddressReply(node, address) ⇒
|
||||||
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name).setAddr(addr))
|
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name).setAddr(address))
|
||||||
case _: Done ⇒
|
case _: Done ⇒
|
||||||
w.setDone("")
|
w.setDone("")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -226,9 +226,9 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
||||||
log.warning("did not expect {}", op)
|
log.warning("did not expect {}", op)
|
||||||
}
|
}
|
||||||
stay using d.copy(runningOp = None)
|
stay using d.copy(runningOp = None)
|
||||||
case AddressReply(node, addr) ⇒
|
case AddressReply(node, address) ⇒
|
||||||
runningOp match {
|
runningOp match {
|
||||||
case Some((_, requester)) ⇒ requester ! addr
|
case Some((_, requester)) ⇒ requester ! address
|
||||||
case None ⇒ log.warning("did not expect {}", op)
|
case None ⇒ log.warning("did not expect {}", op)
|
||||||
}
|
}
|
||||||
stay using d.copy(runningOp = None)
|
stay using d.copy(runningOp = None)
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ abstract class RemoteNodeRestartDeathWatchSpec(multiNodeConfig: RemoteNodeRestar
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
system.actorOf(Props[Subject], "subject")
|
system.actorOf(Props[Subject], "subject")
|
||||||
enterBarrier("actors-started")
|
enterBarrier("actors-started")
|
||||||
|
|
||||||
|
|
@ -112,8 +112,8 @@ abstract class RemoteNodeRestartDeathWatchSpec(multiNodeConfig: RemoteNodeRestar
|
||||||
Await.ready(system.whenTerminated, 30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp.port = ${addr.port.get}
|
akka.remote.netty.tcp.port = ${address.port.get}
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ abstract class RemoteNodeRestartGateSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
val firstAddress = node(first).address
|
val firstAddress = node(first).address
|
||||||
|
|
||||||
enterBarrier("gated")
|
enterBarrier("gated")
|
||||||
|
|
@ -99,8 +99,8 @@ abstract class RemoteNodeRestartGateSpec
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.retry-gate-closed-for = 0.5 s
|
akka.remote.retry-gate-closed-for = 0.5 s
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
hostname = ${addr.host.get}
|
hostname = ${address.host.get}
|
||||||
port = ${addr.port.get}
|
port = ${address.port.get}
|
||||||
}
|
}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,7 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
system.actorOf(Props[Subject], "subject")
|
system.actorOf(Props[Subject], "subject")
|
||||||
system.actorOf(Props[Subject], "sysmsgBarrier")
|
system.actorOf(Props[Subject], "sysmsgBarrier")
|
||||||
val path = node(first)
|
val path = node(first)
|
||||||
|
|
@ -134,8 +134,8 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
||||||
Await.ready(system.whenTerminated, 30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp.port = ${addr.port.get}
|
akka.remote.netty.tcp.port = ${address.port.get}
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
system.actorOf(Props[Subject], "subject")
|
system.actorOf(Props[Subject], "subject")
|
||||||
enterBarrier("actors-started")
|
enterBarrier("actors-started")
|
||||||
|
|
||||||
|
|
@ -112,8 +112,8 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie
|
||||||
Await.ready(system.whenTerminated, 30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp.port = ${addr.port.get}
|
akka.remote.netty.tcp.port = ${address.port.get}
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,7 @@ abstract class RemoteRestartedQuarantinedSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
val firstAddress = node(first).address
|
val firstAddress = node(first).address
|
||||||
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
|
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
|
||||||
|
|
||||||
|
|
@ -125,8 +125,8 @@ abstract class RemoteRestartedQuarantinedSpec
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.retry-gate-closed-for = 0.5 s
|
akka.remote.retry-gate-closed-for = 0.5 s
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
hostname = ${addr.host.get}
|
hostname = ${address.host.get}
|
||||||
port = ${addr.port.get}
|
port = ${address.port.get}
|
||||||
}
|
}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,13 +99,13 @@ abstract class HandshakeRestartReceiverSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
enterBarrier("before-shutdown")
|
enterBarrier("before-shutdown")
|
||||||
|
|
||||||
Await.result(system.whenTerminated, 10.seconds)
|
Await.result(system.whenTerminated, 10.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject2")
|
freshSystem.actorOf(Props[Subject], "subject2")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ abstract class RemoteRestartedQuarantinedSpec extends RemotingMultiNodeSpec(Remo
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
val firstAddress = node(first).address
|
val firstAddress = node(first).address
|
||||||
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
|
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
|
||||||
|
|
||||||
|
|
@ -106,7 +106,7 @@ abstract class RemoteRestartedQuarantinedSpec extends RemotingMultiNodeSpec(Remo
|
||||||
Await.result(system.whenTerminated, 10.seconds)
|
Await.result(system.whenTerminated, 10.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.artery.canonical.port = ${addr.port.get}
|
akka.remote.artery.canonical.port = ${address.port.get}
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
|
|
||||||
val probe = TestProbe()(freshSystem)
|
val probe = TestProbe()(freshSystem)
|
||||||
|
|
|
||||||
|
|
@ -299,8 +299,8 @@ private[akka] class RemoteActorRefProvider(
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match {
|
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match {
|
||||||
case d @ Deploy(_, _, _, RemoteScope(addr), _, _) ⇒
|
case d @ Deploy(_, _, _, RemoteScope(address), _, _) ⇒
|
||||||
if (hasAddress(addr)) {
|
if (hasAddress(address)) {
|
||||||
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
|
||||||
} else if (props.deploy.scope == LocalScope) {
|
} else if (props.deploy.scope == LocalScope) {
|
||||||
throw new ConfigurationException(s"configuration requested remote deployment for local-only Props at [$path]")
|
throw new ConfigurationException(s"configuration requested remote deployment for local-only Props at [$path]")
|
||||||
|
|
@ -313,8 +313,8 @@ private[akka] class RemoteActorRefProvider(
|
||||||
case NonFatal(e) ⇒ throw new ConfigurationException(
|
case NonFatal(e) ⇒ throw new ConfigurationException(
|
||||||
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e)
|
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e)
|
||||||
}
|
}
|
||||||
val localAddress = transport.localAddressForRemote(addr)
|
val localAddress = transport.localAddressForRemote(address)
|
||||||
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
|
val rpath = (RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
|
||||||
withUid(path.uid)
|
withUid(path.uid)
|
||||||
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
||||||
|
|
@ -468,10 +468,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
||||||
handle ← if (isDatagram)
|
handle ← if (isDatagram)
|
||||||
Future {
|
Future {
|
||||||
readyChannel.getRemoteAddress match {
|
readyChannel.getRemoteAddress match {
|
||||||
case addr: InetSocketAddress ⇒
|
case address: InetSocketAddress ⇒
|
||||||
val handle = new UdpAssociationHandle(localAddress, remoteAddress, readyChannel, NettyTransport.this)
|
val handle = new UdpAssociationHandle(localAddress, remoteAddress, readyChannel, NettyTransport.this)
|
||||||
handle.readHandlerPromise.future.foreach {
|
handle.readHandlerPromise.future.foreach {
|
||||||
listener ⇒ udpConnectionTable.put(addr, listener)
|
listener ⇒ udpConnectionTable.put(address, listener)
|
||||||
}
|
}
|
||||||
handle
|
handle
|
||||||
case unknown ⇒ throw new NettyTransportException(s"Unknown outbound remote address type [${unknown.getClass.getName}]")
|
case unknown ⇒ throw new NettyTransportException(s"Unknown outbound remote address type [${unknown.getClass.getName}]")
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import scala.collection.JavaConverters._
|
||||||
|
|
||||||
class DaemonicSpec extends AkkaSpec {
|
class DaemonicSpec extends AkkaSpec {
|
||||||
|
|
||||||
def addr(sys: ActorSystem, proto: String) =
|
def getOtherAddress(sys: ActorSystem, proto: String) =
|
||||||
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
||||||
|
|
||||||
def unusedPort = {
|
def unusedPort = {
|
||||||
|
|
@ -38,7 +38,7 @@ class DaemonicSpec extends AkkaSpec {
|
||||||
akka.log-dead-letters-during-shutdown = off
|
akka.log-dead-letters-during-shutdown = off
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
|
val unusedAddress = getOtherAddress(daemonicSystem, "tcp").copy(port = Some(unusedPort))
|
||||||
val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
|
val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
|
||||||
selection ! "whatever"
|
selection ! "whatever"
|
||||||
Thread.sleep(2.seconds.dilated.toMillis)
|
Thread.sleep(2.seconds.dilated.toMillis)
|
||||||
|
|
|
||||||
|
|
@ -144,11 +144,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
"/gonk" → "tcp",
|
"/gonk" → "tcp",
|
||||||
"/zagzag" → "udp",
|
"/zagzag" → "udp",
|
||||||
"/roghtaar" → "ssl.tcp")
|
"/roghtaar" → "ssl.tcp")
|
||||||
) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto))))
|
) deploy(system, Deploy(name, scope = RemoteScope(getOtherAddress(remoteSystem, proto))))
|
||||||
|
|
||||||
def addr(sys: ActorSystem, proto: String) =
|
def getOtherAddress(sys: ActorSystem, proto: String) =
|
||||||
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
||||||
def port(sys: ActorSystem, proto: String) = addr(sys, proto).port.get
|
def port(sys: ActorSystem, proto: String) = getOtherAddress(sys, proto).port.get
|
||||||
def deploy(sys: ActorSystem, d: Deploy) {
|
def deploy(sys: ActorSystem, d: Deploy) {
|
||||||
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
||||||
}
|
}
|
||||||
|
|
@ -239,8 +239,8 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
EventFilter.warning(pattern = "received dead letter.*")))
|
EventFilter.warning(pattern = "received dead letter.*")))
|
||||||
sys.actorOf(Props[Echo2], name = "echo")
|
sys.actorOf(Props[Echo2], name = "echo")
|
||||||
}
|
}
|
||||||
val moreRefs = moreSystems map (sys ⇒ system.actorSelection(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
|
val moreRefs = moreSystems map (sys ⇒ system.actorSelection(RootActorPath(getOtherAddress(sys, "tcp")) / "user" / "echo"))
|
||||||
val aliveEcho = system.actorSelection(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo")
|
val aliveEcho = system.actorSelection(RootActorPath(getOtherAddress(remoteSystem, "tcp")) / "user" / "echo")
|
||||||
val n = 100
|
val n = 100
|
||||||
|
|
||||||
// first everything is up and running
|
// first everything is up and running
|
||||||
|
|
@ -549,13 +549,13 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
try {
|
try {
|
||||||
val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy")
|
val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy")
|
||||||
// check that we use the specified transport address instead of the default
|
// check that we use the specified transport address instead of the default
|
||||||
val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp"))
|
val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(getOtherAddress(otherSystem, "tcp"))
|
||||||
val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo")
|
val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo")
|
||||||
val proxyTcp = system.actorOf(Props(classOf[Proxy], remoteEchoHereTcp, testActor), "proxy-tcp")
|
val proxyTcp = system.actorOf(Props(classOf[Proxy], remoteEchoHereTcp, testActor), "proxy-tcp")
|
||||||
proxyTcp ! otherGuy
|
proxyTcp ! otherGuy
|
||||||
expectMsg(3.seconds, ("pong", otherGuyRemoteTcp))
|
expectMsg(3.seconds, ("pong", otherGuyRemoteTcp))
|
||||||
// now check that we fall back to default when we haven't got a corresponding transport
|
// now check that we fall back to default when we haven't got a corresponding transport
|
||||||
val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test"))
|
val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(getOtherAddress(otherSystem, "test"))
|
||||||
val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo")
|
val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo")
|
||||||
val proxySsl = system.actorOf(Props(classOf[Proxy], remoteEchoHereSsl, testActor), "proxy-ssl")
|
val proxySsl = system.actorOf(Props(classOf[Proxy], remoteEchoHereSsl, testActor), "proxy-ssl")
|
||||||
EventFilter.warning(start = "Error while resolving ActorRef", occurrences = 1).intercept {
|
EventFilter.warning(start = "Error while resolving ActorRef", occurrences = 1).intercept {
|
||||||
|
|
|
||||||
|
|
@ -73,21 +73,21 @@ akka.loglevel = DEBUG
|
||||||
akka.actor.provider = remote
|
akka.actor.provider = remote
|
||||||
akka.remote.netty.tcp.port = 0
|
akka.remote.netty.tcp.port = 0
|
||||||
"""))
|
"""))
|
||||||
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
|
|
||||||
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
||||||
|
|
||||||
lazy val remoteDaemon = {
|
lazy val remoteDaemon = {
|
||||||
{
|
{
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(IdentifyReq("/remote"), p.ref)
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(IdentifyReq("/remote"), p.ref)
|
||||||
p.expectMsgType[ActorIdentity].ref.get
|
p.expectMsgType[ActorIdentity].ref.get
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val target2 = {
|
lazy val target2 = {
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(
|
||||||
IdentifyReq("child2"), p.ref)
|
IdentifyReq("child2"), p.ref)
|
||||||
p.expectMsgType[ActorIdentity].ref.get
|
p.expectMsgType[ActorIdentity].ref.get
|
||||||
}
|
}
|
||||||
|
|
@ -102,7 +102,7 @@ akka.loglevel = DEBUG
|
||||||
"UntrustedMode" must {
|
"UntrustedMode" must {
|
||||||
|
|
||||||
"allow actor selection to configured white list" in {
|
"allow actor selection to configured white list" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements)
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectMsg("hello")
|
expectMsg("hello")
|
||||||
}
|
}
|
||||||
|
|
@ -144,14 +144,14 @@ akka.loglevel = DEBUG
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection" in {
|
"discard actor selection" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / testActor.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / testActor.path.elements)
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection with non root anchor" in {
|
"discard actor selection with non root anchor" in {
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(
|
||||||
Identify(None), p.ref)
|
Identify(None), p.ref)
|
||||||
val clientReceptionistRef = p.expectMsgType[ActorIdentity].ref.get
|
val clientReceptionistRef = p.expectMsgType[ActorIdentity].ref.get
|
||||||
|
|
||||||
|
|
@ -161,19 +161,19 @@ akka.loglevel = DEBUG
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection to child of matching white list" in {
|
"discard actor selection to child of matching white list" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements / "child1")
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "child1")
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection with wildcard" in {
|
"discard actor selection with wildcard" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements / "*")
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "*")
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection containing harmful message" in {
|
"discard actor selection containing harmful message" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements)
|
||||||
sel ! PoisonPill
|
sel ! PoisonPill
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -74,21 +74,21 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli
|
||||||
import UntrustedSpec._
|
import UntrustedSpec._
|
||||||
|
|
||||||
val client = newRemoteSystem(name = Some("UntrustedSpec-client"))
|
val client = newRemoteSystem(name = Some("UntrustedSpec-client"))
|
||||||
val addr = RARP(system).provider.getDefaultAddress
|
val address = RARP(system).provider.getDefaultAddress
|
||||||
|
|
||||||
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
||||||
|
|
||||||
lazy val remoteDaemon = {
|
lazy val remoteDaemon = {
|
||||||
{
|
{
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(IdentifyReq("/remote"), p.ref)
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(IdentifyReq("/remote"), p.ref)
|
||||||
p.expectMsgType[ActorIdentity].ref.get
|
p.expectMsgType[ActorIdentity].ref.get
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val target2 = {
|
lazy val target2 = {
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(
|
||||||
IdentifyReq("child2"), p.ref)
|
IdentifyReq("child2"), p.ref)
|
||||||
p.expectMsgType[ActorIdentity].ref.get
|
p.expectMsgType[ActorIdentity].ref.get
|
||||||
}
|
}
|
||||||
|
|
@ -99,7 +99,7 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli
|
||||||
"UntrustedMode" must {
|
"UntrustedMode" must {
|
||||||
|
|
||||||
"allow actor selection to configured white list" in {
|
"allow actor selection to configured white list" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements)
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectMsg("hello")
|
expectMsg("hello")
|
||||||
}
|
}
|
||||||
|
|
@ -141,14 +141,14 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection" in {
|
"discard actor selection" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / testActor.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / testActor.path.elements)
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection with non root anchor" in {
|
"discard actor selection with non root anchor" in {
|
||||||
val p = TestProbe()(client)
|
val p = TestProbe()(client)
|
||||||
client.actorSelection(RootActorPath(addr) / receptionist.path.elements).tell(
|
client.actorSelection(RootActorPath(address) / receptionist.path.elements).tell(
|
||||||
Identify(None), p.ref)
|
Identify(None), p.ref)
|
||||||
val clientReceptionistRef = p.expectMsgType[ActorIdentity].ref.get
|
val clientReceptionistRef = p.expectMsgType[ActorIdentity].ref.get
|
||||||
|
|
||||||
|
|
@ -158,19 +158,19 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection to child of matching white list" in {
|
"discard actor selection to child of matching white list" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements / "child1")
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "child1")
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection with wildcard" in {
|
"discard actor selection with wildcard" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements / "*")
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "*")
|
||||||
sel ! "hello"
|
sel ! "hello"
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"discard actor selection containing harmful message" in {
|
"discard actor selection containing harmful message" in {
|
||||||
val sel = client.actorSelection(RootActorPath(addr) / receptionist.path.elements)
|
val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements)
|
||||||
sel ! PoisonPill
|
sel ! PoisonPill
|
||||||
expectNoMsg(1.second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -378,8 +378,8 @@ private[stream] object ConnectionSourceStage {
|
||||||
@InternalApi private[akka] object TcpIdleTimeout {
|
@InternalApi private[akka] object TcpIdleTimeout {
|
||||||
def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
|
def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
|
||||||
val connectionToString = remoteAddress match {
|
val connectionToString = remoteAddress match {
|
||||||
case Some(addr) ⇒ s" on connection to [$addr]"
|
case Some(address) ⇒ s" on connection to [$address]"
|
||||||
case _ ⇒ ""
|
case _ ⇒ ""
|
||||||
}
|
}
|
||||||
|
|
||||||
val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
|
val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue