!rem #3765: Change the defaults for remoting
- removed retry-window and related settings - removed gate-invalid-addresses-for - gate is now mandatory - remoting has a dedicated dispatcher by default - updated tests to work with changed timings - added doc section for association lifecycle
This commit is contained in:
parent
a04a784121
commit
cf58402dd9
20 changed files with 699 additions and 143 deletions
|
|
@ -49,10 +49,18 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
class RemoteChild extends Actor {
|
||||
import context.dispatcher
|
||||
context.system.scheduler.scheduleOnce(500.millis, self, "boom")
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
context.system.scheduler.scheduleOnce(2.seconds, self, "boom")
|
||||
sender ! "hello"
|
||||
case "boom" ⇒ throw new SimulatedException
|
||||
case x ⇒ sender ! x
|
||||
}
|
||||
}
|
||||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case m ⇒ sender ! m
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -70,8 +78,8 @@ class SurviveNetworkInstabilityMultiJvmNode8 extends SurviveNetworkInstabilitySp
|
|||
|
||||
abstract class SurviveNetworkInstabilitySpec
|
||||
extends MultiNodeSpec(SurviveNetworkInstabilityMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender {
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender {
|
||||
|
||||
import SurviveNetworkInstabilityMultiJvmSpec._
|
||||
|
||||
|
|
@ -85,15 +93,31 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(expected))
|
||||
}
|
||||
|
||||
system.actorOf(Props[Echo], "echo")
|
||||
|
||||
def assertCanTalk(alive: RoleName*): Unit = {
|
||||
runOn(alive: _*) {
|
||||
for (to ← alive) {
|
||||
val sel = system.actorSelection(node(to) / "user" / "echo")
|
||||
awaitAssert {
|
||||
sel ! "ping"
|
||||
expectMsg(1.second, "ping")
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("ping-ok")
|
||||
}
|
||||
|
||||
"A network partition tolerant cluster" must {
|
||||
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(first, second, third, fourth, fifth)
|
||||
|
||||
enterBarrier("after-1")
|
||||
assertCanTalk(first, second, third, fourth, fifth)
|
||||
}
|
||||
|
||||
"heal after a broken pair" taggedAs LongRunningTest in within(30.seconds) {
|
||||
"heal after a broken pair" taggedAs LongRunningTest in within(45.seconds) {
|
||||
runOn(first) {
|
||||
testConductor.blackhole(first, second, Direction.Both).await
|
||||
}
|
||||
|
|
@ -119,9 +143,10 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
|
||||
awaitAllReachable()
|
||||
enterBarrier("after-2")
|
||||
assertCanTalk(first, second, third, fourth, fifth)
|
||||
}
|
||||
|
||||
"heal after one isolated node" taggedAs LongRunningTest in within(30.seconds) {
|
||||
"heal after one isolated node" taggedAs LongRunningTest in within(45.seconds) {
|
||||
val others = Vector(second, third, fourth, fifth)
|
||||
runOn(first) {
|
||||
for (other ← others) {
|
||||
|
|
@ -145,9 +170,10 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
enterBarrier("repair-3")
|
||||
awaitAllReachable()
|
||||
enterBarrier("after-3")
|
||||
assertCanTalk((others :+ first): _*)
|
||||
}
|
||||
|
||||
"heal two isolated islands" taggedAs LongRunningTest in within(30.seconds) {
|
||||
"heal two isolated islands" taggedAs LongRunningTest in within(45.seconds) {
|
||||
val island1 = Vector(first, second)
|
||||
val island2 = Vector(third, fourth, fifth)
|
||||
runOn(first) {
|
||||
|
|
@ -175,9 +201,10 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
enterBarrier("repair-4")
|
||||
awaitAllReachable()
|
||||
enterBarrier("after-4")
|
||||
assertCanTalk((island1 ++ island2): _*)
|
||||
}
|
||||
|
||||
"heal after unreachable when ring is changed" taggedAs LongRunningTest in within(45.seconds) {
|
||||
"heal after unreachable when ring is changed" taggedAs LongRunningTest in within(60.seconds) {
|
||||
val joining = Vector(sixth, seventh)
|
||||
val others = Vector(second, third, fourth, fifth)
|
||||
runOn(first) {
|
||||
|
|
@ -220,9 +247,10 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
awaitMembersUp(roles.size - 1)
|
||||
}
|
||||
enterBarrier("after-5")
|
||||
assertCanTalk((joining ++ others): _*)
|
||||
}
|
||||
|
||||
"down and remove quarantined node" taggedAs LongRunningTest in within(45.seconds) {
|
||||
"down and remove quarantined node" taggedAs LongRunningTest in within(60.seconds) {
|
||||
val others = Vector(first, third, fourth, fifth, sixth, seventh)
|
||||
|
||||
runOn(second) {
|
||||
|
|
@ -269,9 +297,10 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
}
|
||||
|
||||
enterBarrier("after-6")
|
||||
assertCanTalk(others: _*)
|
||||
}
|
||||
|
||||
"continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(45.seconds) {
|
||||
"continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(60.seconds) {
|
||||
// note that second is already removed in previous step
|
||||
val side1 = Vector(first, third, fourth)
|
||||
val side1AfterJoin = side1 :+ eighth
|
||||
|
|
@ -331,8 +360,9 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
}
|
||||
|
||||
enterBarrier("after-7")
|
||||
assertCanTalk((side1AfterJoin): _*)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -359,7 +359,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
|||
enterBarrier("after-7")
|
||||
}
|
||||
|
||||
"rebalance to nodes with less shards" in within(30 seconds) {
|
||||
"rebalance to nodes with less shards" in within(60 seconds) {
|
||||
|
||||
runOn(fourth) {
|
||||
// third, fourth and fifth are still alive
|
||||
|
|
|
|||
BIN
akka-docs/rst/images/association_lifecycle.png
Normal file
BIN
akka-docs/rst/images/association_lifecycle.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 81 KiB |
447
akka-docs/rst/images/association_lifecycle.svg
Normal file
447
akka-docs/rst/images/association_lifecycle.svg
Normal file
|
|
@ -0,0 +1,447 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!-- Created with Inkscape (http://www.inkscape.org/) -->
|
||||
|
||||
<svg
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:cc="http://creativecommons.org/ns#"
|
||||
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
|
||||
xmlns:svg="http://www.w3.org/2000/svg"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
|
||||
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
|
||||
width="642.48157"
|
||||
height="649.93738"
|
||||
id="svg2"
|
||||
version="1.1"
|
||||
inkscape:version="0.48.2 r9819"
|
||||
sodipodi:docname="association_lifecycle.svg"
|
||||
inkscape:export-filename="D:\workspace\akka\association_lifecycle.png"
|
||||
inkscape:export-xdpi="86.823746"
|
||||
inkscape:export-ydpi="86.823746">
|
||||
<defs
|
||||
id="defs4">
|
||||
<marker
|
||||
inkscape:stockid="Arrow2Lstart"
|
||||
orient="auto"
|
||||
refY="0"
|
||||
refX="0"
|
||||
id="Arrow2Lstart"
|
||||
style="overflow:visible">
|
||||
<path
|
||||
id="path3946"
|
||||
style="font-size:12px;fill-rule:evenodd;stroke-width:0.625;stroke-linejoin:round"
|
||||
d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 C 6.97309,-1.6296469 6.9831476,1.6157441 8.7185878,4.0337352 z"
|
||||
transform="matrix(1.1,0,0,1.1,1.1,0)"
|
||||
inkscape:connector-curvature="0" />
|
||||
</marker>
|
||||
<marker
|
||||
inkscape:stockid="Arrow2Lend"
|
||||
orient="auto"
|
||||
refY="0"
|
||||
refX="0"
|
||||
id="Arrow2Lend"
|
||||
style="overflow:visible">
|
||||
<path
|
||||
id="path3949"
|
||||
style="font-size:12px;fill-rule:evenodd;stroke-width:0.625;stroke-linejoin:round"
|
||||
d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 C 6.97309,-1.6296469 6.9831476,1.6157441 8.7185878,4.0337352 z"
|
||||
transform="matrix(-1.1,0,0,-1.1,-1.1,0)"
|
||||
inkscape:connector-curvature="0" />
|
||||
</marker>
|
||||
</defs>
|
||||
<sodipodi:namedview
|
||||
id="base"
|
||||
pagecolor="#ffffff"
|
||||
bordercolor="#666666"
|
||||
borderopacity="1.0"
|
||||
inkscape:pageopacity="0.0"
|
||||
inkscape:pageshadow="2"
|
||||
inkscape:zoom="0.94342858"
|
||||
inkscape:cx="319.2676"
|
||||
inkscape:cy="328.22305"
|
||||
inkscape:document-units="px"
|
||||
inkscape:current-layer="layer1"
|
||||
showgrid="false"
|
||||
inkscape:window-width="1600"
|
||||
inkscape:window-height="838"
|
||||
inkscape:window-x="-8"
|
||||
inkscape:window-y="-8"
|
||||
inkscape:window-maximized="1"
|
||||
fit-margin-top="40"
|
||||
fit-margin-left="20"
|
||||
fit-margin-right="20"
|
||||
fit-margin-bottom="20" />
|
||||
<metadata
|
||||
id="metadata7">
|
||||
<rdf:RDF>
|
||||
<cc:Work
|
||||
rdf:about="">
|
||||
<dc:format>image/svg+xml</dc:format>
|
||||
<dc:type
|
||||
rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
|
||||
<dc:title />
|
||||
</cc:Work>
|
||||
</rdf:RDF>
|
||||
</metadata>
|
||||
<g
|
||||
inkscape:label="Layer 1"
|
||||
inkscape:groupmode="layer"
|
||||
id="layer1"
|
||||
transform="translate(-52.77964,-42.189247)">
|
||||
<rect
|
||||
style="fill:#ffffff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none;stroke-dashoffset:0"
|
||||
id="rect3777"
|
||||
width="205.52536"
|
||||
height="179.72238"
|
||||
x="358.72018"
|
||||
y="490.40427"
|
||||
rx="10.392074"
|
||||
ry="10.392074" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:28px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
x="374.21671"
|
||||
y="524.16431"
|
||||
id="text3779"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3781"
|
||||
x="374.21671"
|
||||
y="524.16431">Quarantined</tspan></text>
|
||||
<rect
|
||||
ry="10.392074"
|
||||
rx="10.392074"
|
||||
y="86.890343"
|
||||
x="131.78986"
|
||||
height="54.048653"
|
||||
width="85.145142"
|
||||
id="rect3783"
|
||||
style="fill:#ffffff;fill-opacity:1;stroke:#000000;stroke-width:3.99999952;stroke-linejoin:round;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none;stroke-dashoffset:0" />
|
||||
<text
|
||||
sodipodi:linespacing="125%"
|
||||
id="text3785"
|
||||
y="122.42919"
|
||||
x="148.2804"
|
||||
style="font-size:28px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
xml:space="preserve"><tspan
|
||||
y="122.42919"
|
||||
x="148.2804"
|
||||
id="tspan3787"
|
||||
sodipodi:role="line">Idle</tspan></text>
|
||||
<rect
|
||||
style="fill:#ffffff;fill-opacity:1;stroke:#000000;stroke-width:3.99999976;stroke-linejoin:round;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none;stroke-dashoffset:0"
|
||||
id="rect3789"
|
||||
width="195.46362"
|
||||
height="115.50124"
|
||||
x="360.57117"
|
||||
y="84.669167"
|
||||
rx="10.392074"
|
||||
ry="10.392074" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:28px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
x="377.24457"
|
||||
y="120.20802"
|
||||
id="text3791"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
x="377.24457"
|
||||
y="120.20802"
|
||||
id="tspan3795">Active</tspan></text>
|
||||
<rect
|
||||
ry="10.392074"
|
||||
rx="10.392074"
|
||||
y="278.65201"
|
||||
x="74.77964"
|
||||
height="107.12277"
|
||||
width="199.16559"
|
||||
id="rect3799"
|
||||
style="fill:#ffffff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none;stroke-dashoffset:0" />
|
||||
<text
|
||||
sodipodi:linespacing="125%"
|
||||
id="text3801"
|
||||
y="314.19086"
|
||||
x="88.425385"
|
||||
style="font-size:28px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
xml:space="preserve"><tspan
|
||||
id="tspan3803"
|
||||
y="314.19086"
|
||||
x="88.425385"
|
||||
sodipodi:role="line">Gated</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:20px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
x="105.87613"
|
||||
y="319.3736"
|
||||
id="text3809"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3811"
|
||||
x="105.87613"
|
||||
y="319.3736" /></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:14px;font-style:normal;font-variant:normal;font-weight:bold;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans Bold"
|
||||
x="142.89577"
|
||||
y="169.8143"
|
||||
id="text3815"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3817"
|
||||
x="142.89577"
|
||||
y="169.8143" /></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="117.73823"
|
||||
y="253.59573"
|
||||
id="text3819"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3821"
|
||||
x="117.73823"
|
||||
y="253.59573">Gate time </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="117.73823"
|
||||
y="268.59573"
|
||||
id="tspan3853">elapses</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="226.32596"
|
||||
y="72.187294"
|
||||
id="text3825"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3827"
|
||||
x="226.32596"
|
||||
y="72.187294">● Message send to</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="226.32596"
|
||||
y="87.187294"
|
||||
id="tspan3829">remote system</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="226.32596"
|
||||
y="102.18729"
|
||||
id="tspan3072">● Successful inbound</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="226.32596"
|
||||
y="117.18729"
|
||||
id="tspan3074">connection</tspan></text>
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#Arrow2Lend)"
|
||||
d="M 394.62921,200.17039 C 393.60715,201.53315 376.11707,230.87967 348.73727,254.97352 316.61464,283.241 273.94523,306.78692 273.94523,306.78692"
|
||||
id="path3831"
|
||||
inkscape:connector-curvature="0"
|
||||
sodipodi:nodetypes="csc" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="290.97424"
|
||||
y="310.48886"
|
||||
id="text3833"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3835"
|
||||
x="290.97424"
|
||||
y="310.48886">Communication</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="290.97424"
|
||||
y="325.48886"
|
||||
id="tspan3837">failure:</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="290.97424"
|
||||
y="340.48886"
|
||||
id="tspan6955">● Failed TCP connection</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="290.97424"
|
||||
y="355.48886"
|
||||
id="tspan3839">● Transport FD trigger</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="290.97424"
|
||||
y="370.48886"
|
||||
id="tspan3841">● Name lookup failure</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="290.97424"
|
||||
y="385.48886"
|
||||
id="tspan3843">● Remote system shutdown</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:14px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#555753;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="376.8598"
|
||||
y="141.97742"
|
||||
id="text3845"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3847"
|
||||
x="376.8598"
|
||||
y="141.97742">(Connecting or Connected)</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="376.8598"
|
||||
y="159.47742"
|
||||
id="tspan3849">Messages are delivered</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="376.8598"
|
||||
y="176.97742"
|
||||
id="tspan3851">or buffered if needed</tspan></text>
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#Arrow2Lend)"
|
||||
d="M 231.74286,278.652 C 231.74286,278.652 274.94957,227.53908 296.73868,210.51012 318.52778,193.48115 359.09037,174.99705 359.09037,174.99705"
|
||||
id="path3855"
|
||||
inkscape:connector-curvature="0"
|
||||
sodipodi:nodetypes="czc" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="252.03088"
|
||||
y="171.36469"
|
||||
id="text3857"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3859"
|
||||
x="252.03088"
|
||||
y="171.36469">Successful</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="252.03088"
|
||||
y="186.36469"
|
||||
id="tspan3861">inbound </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="252.03088"
|
||||
y="201.36469"
|
||||
id="tspan3885">connection</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:14px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#555753;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="89.587494"
|
||||
y="334.18146"
|
||||
id="text3863"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3865"
|
||||
x="89.587494"
|
||||
y="334.18146">All outbound messages </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="89.587494"
|
||||
y="351.68146"
|
||||
id="tspan3869">destined to the gated system</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="89.587494"
|
||||
y="369.18146"
|
||||
id="tspan3867">are dropped</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="520.59717"
|
||||
y="221.50888"
|
||||
id="text3877"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="221.50888"
|
||||
id="tspan6941">Catastrophic communication</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="236.50888"
|
||||
id="tspan6945">failure:</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="251.50888"
|
||||
id="tspan6939">● Remote DeathWatch trigger</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="266.50888"
|
||||
id="tspan3881">● System message </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="281.50888"
|
||||
id="tspan3883">delivery failure</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="296.50888"
|
||||
id="tspan3887">● Cluster MemberRemoved </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="520.59717"
|
||||
y="311.50888"
|
||||
id="tspan3062">event</tspan></text>
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:14px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#555753;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="375.36417"
|
||||
y="545.63568"
|
||||
id="text3889"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3891"
|
||||
x="375.36417"
|
||||
y="545.63568">All outbound and inbound</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="563.13568"
|
||||
id="tspan3893">messages arriving from the</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="580.63568"
|
||||
id="tspan3895">quarantined system are</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="598.13568"
|
||||
id="tspan3897">dropped. Remote system</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="615.63568"
|
||||
id="tspan6949">must be restarted to be able</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="633.13568"
|
||||
id="tspan6951">establish communication </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="375.36417"
|
||||
y="650.63568"
|
||||
id="tspan6953">again.</tspan></text>
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-start:url(#Arrow2Lstart);marker-end:none"
|
||||
d="M 173.25185,141.67938 173.25185,280.13278"
|
||||
id="path3899"
|
||||
inkscape:connector-curvature="0" />
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#Arrow2Lend)"
|
||||
d="M 216.19462,124.28486 359.83076,124.28486"
|
||||
id="path3901"
|
||||
inkscape:connector-curvature="0" />
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#Arrow2Lend)"
|
||||
d="M 513.83241,198.68961 513.83241,488.92348"
|
||||
id="path3903"
|
||||
inkscape:connector-curvature="0" />
|
||||
<path
|
||||
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-start:url(#Arrow2Lstart);marker-end:none"
|
||||
d="M 470.88964,200.17039 470.88964,489.66387"
|
||||
id="path3905"
|
||||
inkscape:connector-curvature="0" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-size:12px;font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Franklin Gothic Medium;-inkscape-font-specification:Sans"
|
||||
x="350.03479"
|
||||
y="432.1474"
|
||||
id="text3907"
|
||||
sodipodi:linespacing="125%"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan3909"
|
||||
x="350.03479"
|
||||
y="432.1474">Successful</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="350.03479"
|
||||
y="447.1474"
|
||||
id="tspan3915">inbound our outbound</tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="350.03479"
|
||||
y="462.1474"
|
||||
id="tspan3913">connection from/to </tspan><tspan
|
||||
sodipodi:role="line"
|
||||
x="350.03479"
|
||||
y="477.1474"
|
||||
id="tspan3923"><tspan
|
||||
style="font-style:oblique;-inkscape-font-specification:'Franklin Gothic Medium, Oblique'"
|
||||
id="tspan6947">restarted</tspan> system</tspan></text>
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 18 KiB |
|
|
@ -149,6 +149,30 @@ you can advise the system to create a child on that remote node like so:
|
|||
|
||||
.. includecode:: code/docs/remoting/RemoteDeploymentDocTest.java#deploy
|
||||
|
||||
|
||||
Lifecycle and Failure Recovery Model
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. image:: ../images/association_lifecycle.png
|
||||
:align: center
|
||||
:width: 620
|
||||
|
||||
Each link with a remote system can be in one of the four states as illustrated above. Before any communication
|
||||
happens with a remote system at a given ``Address`` the state of the association is ``Idle``. The first time a message
|
||||
is attempted to be sent to the remote system or an inbound connection is accepted the state of the link transitions to
|
||||
``Active`` denoting that the two systems has messages to send or receive and no failures were encountered so far.
|
||||
When a communication failure happens and the connection is lost between the two systems the link becomes ``Gated``.
|
||||
|
||||
In this state the system will not attempt to connect to the remote host and all outbound messages will be dropped. The time
|
||||
while the link is in the ``Gated`` state is controlled by the setting ``akka.remote.retry-gate-closed-for``:
|
||||
after this time elapses the link state transitions to ``Idle`` again. ``Gate`` is one-sided in the
|
||||
sense that whenever a successful *inbound* connection is accepted from a remote system during ``Gate`` it automatically
|
||||
transitions to ``Active`` and communication resumes immediately.
|
||||
|
||||
In the face of communication failures that are unrecoverable because the state of the participating systems are inconsistent,
|
||||
the remote system becomes ``Quarantined``. Unlike ``Gate``, quarantining is permanent and lasts until one of the systems
|
||||
is restarted. After a restart communication can be resumed again and the link can become ``Active`` again.
|
||||
|
||||
Watching Remote Actors
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ configured time of unreachability. This feature is disabled by default, as it al
|
|||
|
||||
During the deprecation phase ``akka.cluster.auto-down=on`` is interpreted at as instant auto-down.
|
||||
|
||||
|
||||
|
||||
=======
|
||||
Routers
|
||||
=======
|
||||
|
|
@ -99,6 +101,43 @@ Changed cluster expected-response-after configuration
|
|||
Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after``
|
||||
has been renamed to ``akka.cluster.failure-detector.expected-response-after``.
|
||||
|
||||
Removed automatic retry feature from Remoting in favor of retry-gate
|
||||
====================================================================
|
||||
|
||||
The retry-gate feature is now the only failure handling strategy in Remoting. This change means that when remoting detects faulty
|
||||
connections it goes into a gated state where all buffered and subsequent remote messages are dropped until the configurable
|
||||
time defined by the configuration key ``akka.remote.retry-gate-closed-for`` elapses after the failure event. This
|
||||
behavior prevents reconnect storms and unbounded buffer growth during network instabilities. After the configured
|
||||
time elapses the gate is lifted and a new connection will be attempted when there are new remote messages to be
|
||||
delivered.
|
||||
|
||||
In concert with this change all settings related to the old reconnect behavior (``akka.remote.retry-window`` and
|
||||
``akka.remote.maximum-retries-in-window``) were removed.
|
||||
|
||||
The timeout setting ``akka.remote.gate-invalid-addresses-for`` that controlled the gate interval for certain failure
|
||||
events is also removed and all gating intervals are now controlled by the ``akka.remote.retry-gate-closed-for`` setting
|
||||
instead.
|
||||
|
||||
Reduced default sensitivity settings for transport failure detector in Remoting
|
||||
===============================================================================
|
||||
|
||||
Since the most commonly used transport with Remoting is TCP, which provides proper connection termination events the failure detector sensitivity
|
||||
setting ``akka.remote.transport-failure-detector.acceptable-heartbeat-pause`` now defaults to 20 seconds to reduce load induced
|
||||
false-positive failure detection events in remoting. In case a non-connection-oriented protocol is used it is recommended
|
||||
to change this and the ``akka.remote.transport-failure-detector.heartbeat-interval`` setting to a more sensitive value.
|
||||
|
||||
Quarantine is now permanent
|
||||
===========================
|
||||
|
||||
The setting that controlled the length of quarantine ``akka.remote.quarantine-systems-for`` has been removed. The only
|
||||
setting available now is ``akka.remote.prune-quarantine-marker-after`` which influences how long quarantine tombstones
|
||||
are kept around to avoid long-term memory leaks. This new setting defaults to 5 days.
|
||||
|
||||
Remoting uses a dedicated dispatcher by default
|
||||
===============================================
|
||||
|
||||
The default value of ``akka.remote.use-dispatcher`` has been changed to a dedicated dispatcher.
|
||||
|
||||
Dataflow is Deprecated
|
||||
======================
|
||||
|
||||
|
|
|
|||
|
|
@ -156,6 +156,29 @@ you can advise the system to create a child on that remote node like so:
|
|||
|
||||
.. includecode:: code/docs/remoting/RemoteDeploymentDocSpec.scala#deploy
|
||||
|
||||
Lifecycle and Failure Recovery Model
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. image:: ../images/association_lifecycle.png
|
||||
:align: center
|
||||
:width: 620
|
||||
|
||||
Each link with a remote system can be in one of the four states as illustrated above. Before any communication
|
||||
happens with a remote system at a given ``Address`` the state of the association is ``Idle``. The first time a message
|
||||
is attempted to be sent to the remote system or an inbound connection is accepted the state of the link transitions to
|
||||
``Active`` denoting that the two systems has messages to send or receive and no failures were encountered so far.
|
||||
When a communication failure happens and the connection is lost between the two systems the link becomes ``Gated``.
|
||||
|
||||
In this state the system will not attempt to connect to the remote host and all outbound messages will be dropped. The time
|
||||
while the link is in the ``Gated`` state is controlled by the setting ``akka.remote.retry-gate-closed-for``:
|
||||
after this time elapses the link state transitions to ``Idle`` again. ``Gate`` is one-sided in the
|
||||
sense that whenever a successful *inbound* connection is accepted from a remote system during ``Gate`` it automatically
|
||||
transitions to ``Active`` and communication resumes immediately.
|
||||
|
||||
In the face of communication failures that are unrecoverable because the state of the participating systems are inconsistent,
|
||||
the remote system becomes ``Quarantined``. Unlike ``Gate``, quarantining is permanent and lasts until one of the systems
|
||||
is restarted. After a restart communication can be resumed again and the link can become ``Active`` again.
|
||||
|
||||
Watching Remote Actors
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,9 @@ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig {
|
|||
ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
""")))
|
||||
## Use a tighter setting than the default, otherwise it takes 20s for DeathWatch to trigger
|
||||
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3 s
|
||||
""")))
|
||||
|
||||
case class WatchIt(watchee: ActorRef)
|
||||
case class UnwatchIt(watchee: ActorRef)
|
||||
|
|
|
|||
|
|
@ -30,7 +30,9 @@ object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig {
|
|||
ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
""")))
|
||||
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
|
||||
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
|
||||
""")))
|
||||
|
||||
testTransport(on = true)
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,9 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig {
|
|||
ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-remote-lifecycle-events = INFO
|
||||
#akka.remote.retry-gate-closed-for = 0.5 s
|
||||
## Keep it tight, otherwise reestablishing a connection takes too much time
|
||||
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
|
||||
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
|
||||
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s
|
||||
akka.remote.gate-invalid-addresses-for = 0.5 s
|
||||
""")))
|
||||
|
|
@ -87,13 +89,9 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
|||
// Trigger reconnect attempt and also queue up a system message to be in limbo state (UID of remote system
|
||||
// is unknown, and system message is pending)
|
||||
system.stop(subject)
|
||||
subject ! "hello"
|
||||
subject ! "hello"
|
||||
subject ! "hello"
|
||||
|
||||
// Get rid of old system -- now SHUTDOWN is lost
|
||||
testConductor.shutdown(second).await
|
||||
expectTerminated(subject, 10.seconds)
|
||||
|
||||
// At this point the second node is restarting, while the first node is trying to reconnect without resetting
|
||||
// the system message send state
|
||||
|
|
@ -102,8 +100,10 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
|||
within(30.seconds) {
|
||||
// retry because the Subject actor might not be started yet
|
||||
awaitAssert {
|
||||
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "echo"
|
||||
expectMsg(1.second, "echo")
|
||||
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject")
|
||||
expectMsgPF(1 second) {
|
||||
case ActorIdentity("subject", Some(ref)) ⇒ true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +115,10 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
|||
watch(subjectNew)
|
||||
|
||||
subjectNew ! "shutdown"
|
||||
expectTerminated(subjectNew)
|
||||
fishForMessage(5.seconds) {
|
||||
case _: ActorIdentity ⇒ false
|
||||
case Terminated(subjectNew) ⇒ true
|
||||
}
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ akka {
|
|||
# that since remoting can load arbitrary 3rd party drivers (see
|
||||
# "enabled-transport" and "adapters" entries) it is not guaranteed that
|
||||
# every module will respect this setting.
|
||||
use-dispatcher = ""
|
||||
use-dispatcher = "akka.remote.default-remote-dispatcher"
|
||||
|
||||
### Security settings
|
||||
|
||||
|
|
@ -157,7 +157,7 @@ akka {
|
|||
implementation-class = "akka.remote.PhiAccrualFailureDetector"
|
||||
|
||||
# How often keep-alive heartbeat messages should be sent to each connection.
|
||||
heartbeat-interval = 1 s
|
||||
heartbeat-interval = 4 s
|
||||
|
||||
# Defines the failure detector threshold.
|
||||
# A low threshold is prone to generate many wrong suspicions but ensures
|
||||
|
|
@ -181,7 +181,7 @@ akka {
|
|||
# This margin is important to be able to survive sudden, occasional,
|
||||
# pauses in heartbeat arrivals, due to for example garbage collect or
|
||||
# network drop.
|
||||
acceptable-heartbeat-pause = 3 s
|
||||
acceptable-heartbeat-pause = 10 s
|
||||
}
|
||||
|
||||
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
||||
|
|
@ -219,7 +219,7 @@ akka {
|
|||
# This margin is important to be able to survive sudden, occasional,
|
||||
# pauses in heartbeat arrivals, due to for example garbage collect or
|
||||
# network drop.
|
||||
acceptable-heartbeat-pause = 4 s
|
||||
acceptable-heartbeat-pause = 10 s
|
||||
|
||||
|
||||
# How often to check for nodes marked as unreachable by the failure
|
||||
|
|
@ -237,35 +237,20 @@ akka {
|
|||
# address as failed. This configuration option controls how much time should
|
||||
# be elapsed before reattempting a new connection. While the address is
|
||||
# gated, all messages sent to the address are delivered to dead-letters.
|
||||
# If this setting is 0, the remoting will always immediately reattempt
|
||||
# to establish a failed outbound connection and will buffer writes until
|
||||
# it succeeds.
|
||||
retry-gate-closed-for = 0 s
|
||||
# Since this setting limits the rate of reconnects setting it to a
|
||||
# very short interval (i.e. less than a second) may result in a storm of
|
||||
# reconnect attempts.
|
||||
retry-gate-closed-for = 5 s
|
||||
|
||||
# If the retry gate function is disabled (see retry-gate-closed-for) the
|
||||
# remoting subsystem will always attempt to reestablish failed outbound
|
||||
# connections. The settings below together control the maximum number of
|
||||
# reattempts in a given time window. The number of reattempts during
|
||||
# a window of "retry-window" will be maximum "maximum-retries-in-window".
|
||||
retry-window = 60 s
|
||||
maximum-retries-in-window = 3
|
||||
|
||||
# The length of time to gate an address whose name lookup has failed
|
||||
# or has explicitly signalled that it will not accept connections
|
||||
# (remote system is shutting down or the requesting system is quarantined).
|
||||
# No connection attempts will be made to an address while it remains
|
||||
# gated. Any messages sent to a gated address will be directed to dead
|
||||
# letters instead. Name lookups are costly, and the time to recovery
|
||||
# is typically large, therefore this setting should be a value in the
|
||||
# order of seconds or minutes.
|
||||
gate-invalid-addresses-for = 60 s
|
||||
|
||||
# This settings controls how long a system will be quarantined after
|
||||
# catastrophic communication failures that result in the loss of system
|
||||
# messages. Quarantining prevents communication with the remote system
|
||||
# of a given UID. This function can be disabled by setting the value
|
||||
# to "off".
|
||||
quarantine-systems-for = 60s
|
||||
# After catastrophic communication failures that result in the loss of system
|
||||
# messages or after the remote DeathWatch triggers the remote system gets
|
||||
# quarantined to prevent inconsistent behavior.
|
||||
# This setting controls how long the Quarantine marker will be kept around
|
||||
# before being removed to avoid long-term memory leaks.
|
||||
# WARNING: DO NOT change this to a small value to re-enable communication with
|
||||
# quarantined nodes. Such feature is not supported and any behavior between
|
||||
# the affected systems after lifting the quarantine is undefined.
|
||||
prune-quarantine-marker-after = 5 d
|
||||
|
||||
# This setting defines the maximum number of unacknowledged system messages
|
||||
# allowed for a remote system. If this limit is reached the remote system is
|
||||
|
|
@ -280,11 +265,12 @@ akka {
|
|||
# an individual ack.
|
||||
system-message-ack-piggyback-timeout = 0.3 s
|
||||
|
||||
# This setting defines the time after messages that have not been
|
||||
# This setting defines the time after internal management signals
|
||||
# between actors (used for DeathWatch and supervision) that have not been
|
||||
# explicitly acknowledged or negatively acknowledged are resent.
|
||||
# Messages that were negatively acknowledged are always immediately
|
||||
# resent.
|
||||
resend-interval = 1 s
|
||||
resend-interval = 2 s
|
||||
|
||||
### Transports and adapters
|
||||
|
||||
|
|
@ -488,6 +474,19 @@ akka {
|
|||
debug = off
|
||||
}
|
||||
|
||||
### Default dispatcher for the remoting subsystem
|
||||
|
||||
default-remote-dispatcher {
|
||||
type = Dispatcher
|
||||
executor = "fork-join-executor"
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 2
|
||||
parallelism-max = 2
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -187,12 +187,10 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
val transport: AkkaProtocolTransport,
|
||||
val settings: RemoteSettings,
|
||||
val codec: AkkaPduCodec,
|
||||
val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor {
|
||||
val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor with ActorLogging {
|
||||
import ReliableDeliverySupervisor._
|
||||
import context.dispatcher
|
||||
|
||||
def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
|
||||
|
||||
var autoResendTimer: Option[Cancellable] = None
|
||||
|
||||
def scheduleAutoResend(): Unit = if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
|
||||
|
|
@ -206,20 +204,18 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
scheduleAutoResend()
|
||||
}
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
|
||||
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
|
||||
case e @ (_: AssociationProblem) ⇒ Escalate
|
||||
case NonFatal(e) ⇒
|
||||
log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason is: [{}].",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage)
|
||||
uidConfirmed = false // Need confirmation of UID again
|
||||
if (retryGateEnabled) {
|
||||
context.become(gated)
|
||||
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
|
||||
context.unwatch(writer)
|
||||
currentHandle = None
|
||||
context.parent ! StoppedReading(self)
|
||||
Stop
|
||||
} else {
|
||||
Restart
|
||||
}
|
||||
context.become(gated)
|
||||
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
|
||||
context.unwatch(writer)
|
||||
currentHandle = None
|
||||
context.parent ! StoppedReading(self)
|
||||
Stop
|
||||
}
|
||||
|
||||
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
|
||||
|
|
|
|||
|
|
@ -60,20 +60,8 @@ final class RemoteSettings(val config: Config) {
|
|||
config.getMillisDuration("akka.remote.retry-gate-closed-for")
|
||||
} requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0")
|
||||
|
||||
val UnknownAddressGateClosedFor: FiniteDuration = {
|
||||
config.getMillisDuration("akka.remote.gate-invalid-addresses-for")
|
||||
} requiring (_ > Duration.Zero, "gate-invalid-addresses-for must be > 0")
|
||||
|
||||
val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections")
|
||||
|
||||
val MaximumRetriesInWindow: Int = {
|
||||
getInt("akka.remote.maximum-retries-in-window")
|
||||
} requiring (_ > 0, "maximum-retries-in-window must be > 0")
|
||||
|
||||
val RetryWindow: FiniteDuration = {
|
||||
config.getMillisDuration("akka.remote.retry-window")
|
||||
} requiring (_ > Duration.Zero, "retry-window must be > 0")
|
||||
|
||||
val BackoffPeriod: FiniteDuration = {
|
||||
config.getMillisDuration("akka.remote.backoff-interval")
|
||||
} requiring (_ > Duration.Zero, "backoff-interval must be > 0")
|
||||
|
|
@ -90,10 +78,9 @@ final class RemoteSettings(val config: Config) {
|
|||
getInt("akka.remote.system-message-buffer-size")
|
||||
} requiring (_ > 0, "system-message-buffer-size must be > 0")
|
||||
|
||||
val QuarantineDuration: Duration = {
|
||||
if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Undefined
|
||||
else config.getMillisDuration("akka.remote.quarantine-systems-for").requiring(_ > Duration.Zero,
|
||||
"quarantine-systems-for must be > 0 or off")
|
||||
val QuarantineDuration: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.prune-quarantine-marker-after"), MILLISECONDS).requiring(_ > Duration.Zero,
|
||||
"prune-quarantine-marker-after must be > 0 ms")
|
||||
}
|
||||
|
||||
val CommandAckTimeout: Timeout = {
|
||||
|
|
|
|||
|
|
@ -395,16 +395,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒
|
||||
log.warning("Tried to associate with unreachable remote address [{}]. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
|
||||
remoteAddress, settings.UnknownAddressGateClosedFor.toMillis, reason.getMessage)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case ShutDownAssociation(localAddress, remoteAddress, _) ⇒
|
||||
log.debug("Remote system with address [{}] has shut down. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
|
||||
remoteAddress, settings.UnknownAddressGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
|
|
@ -419,13 +419,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + d)
|
||||
case _ ⇒
|
||||
}
|
||||
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
|
|
@ -482,22 +479,18 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender
|
||||
|
||||
case Quarantine(address, uid) ⇒
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
// Stop writers
|
||||
endpoints.writableEndpointWithPolicyFor(address) match {
|
||||
case Some(Pass(endpoint)) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
// Stop inbound read-only associations
|
||||
endpoints.readOnlyEndpointFor(address) match {
|
||||
case Some(endpoint) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
|
||||
case _ ⇒ // Ignore
|
||||
// Stop writers
|
||||
endpoints.writableEndpointWithPolicyFor(address) match {
|
||||
case Some(Pass(endpoint)) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
// Stop inbound read-only associations
|
||||
endpoints.readOnlyEndpointFor(address) match {
|
||||
case Some(endpoint) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
|
||||
|
||||
case s @ Send(message, senderOption, recipientRef, _) ⇒
|
||||
val recipientAddress = recipientRef.path.address
|
||||
|
|
|
|||
|
|
@ -453,8 +453,6 @@ private[transport] class ThrottledAssociation(
|
|||
sender ! SetThrottleAck
|
||||
stay()
|
||||
case Event(Disassociated(info), _) ⇒
|
||||
if (upstreamListener ne null) upstreamListener notify Disassociated(info)
|
||||
originalHandle.disassociate()
|
||||
stop()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -380,7 +380,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
|||
|
||||
// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
|
||||
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
|
||||
case Address(_, _, Some(host), Some(port)) ⇒ Future { new InetSocketAddress(InetAddress.getByName(host), port) }
|
||||
case Address(_, _, Some(host), Some(port)) ⇒ Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
|
||||
case _ ⇒ Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
akka.remote.netty.tcp.port = 0
|
||||
""") {
|
||||
|
||||
"Remoting" must {
|
||||
"Remoting" should {
|
||||
|
||||
"contain correct configuration values in reference.conf" in {
|
||||
val remoteSettings = RARP(system).provider.remoteSettings
|
||||
|
|
@ -33,17 +33,14 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
ShutdownTimeout.duration should be(10 seconds)
|
||||
FlushWait should be(2 seconds)
|
||||
StartupTimeout.duration should be(10 seconds)
|
||||
RetryGateClosedFor should be(Duration.Zero)
|
||||
UnknownAddressGateClosedFor should be(1 minute)
|
||||
Dispatcher should equal("")
|
||||
RetryGateClosedFor should be(5 seconds)
|
||||
Dispatcher should equal("akka.remote.default-remote-dispatcher")
|
||||
UsePassiveConnections should be(true)
|
||||
MaximumRetriesInWindow should be(3)
|
||||
RetryWindow should be(60 seconds)
|
||||
BackoffPeriod should be(10 millis)
|
||||
SysMsgAckTimeout should be(0.3 seconds)
|
||||
SysResendTimeout should be(1 seconds)
|
||||
SysResendTimeout should be(2 seconds)
|
||||
SysMsgBufferSize should be(1000)
|
||||
QuarantineDuration should be(60 seconds)
|
||||
QuarantineDuration should be(5 days)
|
||||
CommandAckTimeout.duration should be(30 seconds)
|
||||
Transports.size should be(1)
|
||||
Transports.head._1 should be(classOf[akka.remote.transport.netty.NettyTransport].getName)
|
||||
|
|
@ -58,7 +55,7 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
WatchUnreachableReaperInterval should be(1 second)
|
||||
WatchFailureDetectorConfig.getDouble("threshold") should be(10.0 +- 0.0001)
|
||||
WatchFailureDetectorConfig.getInt("max-sample-size") should be(200)
|
||||
WatchFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(4 seconds)
|
||||
WatchFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(10 seconds)
|
||||
WatchFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis)
|
||||
|
||||
remoteSettings.config.getString("akka.remote.log-frame-size-exceeding") should be("off")
|
||||
|
|
@ -72,10 +69,10 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
SecureCookie should equal(None)
|
||||
|
||||
TransportFailureDetectorImplementationClass should be(classOf[PhiAccrualFailureDetector].getName)
|
||||
TransportHeartBeatInterval should equal(1.seconds)
|
||||
TransportHeartBeatInterval should equal(4.seconds)
|
||||
TransportFailureDetectorConfig.getDouble("threshold") should be(7.0 +- 0.0001)
|
||||
TransportFailureDetectorConfig.getInt("max-sample-size") should be(100)
|
||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(3 seconds)
|
||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(10 seconds)
|
||||
TransportFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ object AkkaProtocolStressTest {
|
|||
val configA: Config = ConfigFactory parseString ("""
|
||||
akka {
|
||||
#loglevel = DEBUG
|
||||
actor.serialize-messages = off
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
|
||||
remote.log-remote-lifecycle-events = on
|
||||
|
|
@ -22,13 +23,12 @@ object AkkaProtocolStressTest {
|
|||
threshold = 1.0
|
||||
max-sample-size = 2
|
||||
min-std-deviation = 1 ms
|
||||
acceptable-heartbeat-pause = 0.01 s
|
||||
## We want lots of lost connections in this test, keep it sensitive
|
||||
heartbeat-interval = 1 s
|
||||
acceptable-heartbeat-pause = 1 s
|
||||
}
|
||||
remote.retry-window = 1 s
|
||||
# This test drops messages, but dropping too much will make it fail. The reason is that this test
|
||||
# expects at least a few of the final messages to arrive to prove that the Remoting does not get stuck
|
||||
# in an irrecoverable state. The retry limit enabled case is covered by the SystemMessageDelivery tests.
|
||||
remote.maximum-retries-in-window = 100
|
||||
## Keep gate duration in this test for a reasonably low value otherwise too much messages are dropped
|
||||
remote.retry-gate-closed-for = 1 s
|
||||
|
||||
remote.netty.tcp {
|
||||
applied-adapters = ["gremlin"]
|
||||
|
|
@ -38,6 +38,8 @@ object AkkaProtocolStressTest {
|
|||
}
|
||||
""")
|
||||
|
||||
object ResendFinal
|
||||
|
||||
class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor {
|
||||
import context.dispatcher
|
||||
|
||||
|
|
@ -58,13 +60,25 @@ object AkkaProtocolStressTest {
|
|||
if (seq > maxSeq) {
|
||||
losses += seq - maxSeq - 1
|
||||
maxSeq = seq
|
||||
if (seq > limit * 0.9) {
|
||||
// Due to the (bursty) lossyness of gate, we are happy with receiving at least one message from the upper
|
||||
// half (> 50000). Since messages are sent in bursts of 2000 0.5 seconds apart, this is reasonable.
|
||||
// The purpose of this test is not reliable delivery (there is a gremlin with 30% loss anyway) but respecting
|
||||
// the proper ordering.
|
||||
if (seq > limit * 0.5) {
|
||||
controller ! ((maxSeq, losses))
|
||||
context.system.scheduler.schedule(1.second, 1.second, self, ResendFinal)
|
||||
context.become(done)
|
||||
}
|
||||
} else {
|
||||
controller ! s"Received out of order message. Previous: ${maxSeq} Received: ${seq}"
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the other side eventually "gets the message"
|
||||
def done: Receive = {
|
||||
case ResendFinal ⇒
|
||||
controller ! ((maxSeq, losses))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,17 +36,19 @@ object SystemMessageDeliveryStressTest {
|
|||
akka {
|
||||
#loglevel = DEBUG
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
actor.serialize-messages = off
|
||||
|
||||
remote.log-remote-lifecycle-events = on
|
||||
|
||||
remote.failure-detector {
|
||||
remote.transport-failure-detector {
|
||||
threshold = 1.0
|
||||
max-sample-size = 2
|
||||
min-std-deviation = 1 ms
|
||||
acceptable-heartbeat-pause = 0.01 s
|
||||
heartbeat-interval = 500 ms
|
||||
acceptable-heartbeat-pause = 2 s
|
||||
}
|
||||
remote.retry-window = 1 s
|
||||
remote.maximum-retries-in-window = 2
|
||||
## Keep this setting tight, otherwise the test takes a long time or times out
|
||||
remote.resend-interval = 0.5 s
|
||||
remote.use-passive-connections = on
|
||||
|
||||
remote.netty.tcp {
|
||||
|
|
@ -142,12 +144,9 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
|
|||
|
||||
}
|
||||
|
||||
class SystemMessageDeliveryDefault extends SystemMessageDeliveryStressTest("retry gate off, passive connections on", "")
|
||||
class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections on",
|
||||
class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("passive connections on",
|
||||
"akka.remote.retry-gate-closed-for = 0.5 s")
|
||||
class SystemMessageDeliveryNoPassive extends SystemMessageDeliveryStressTest("retry gate off, passive connections off",
|
||||
"akka.remote.use-passive-connections = off")
|
||||
class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections off",
|
||||
class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("passive connections off",
|
||||
"""
|
||||
akka.remote.use-passive-connections = off
|
||||
akka.remote.retry-gate-closed-for = 0.5 s
|
||||
|
|
|
|||
|
|
@ -19,6 +19,9 @@ object ThrottlerTransportAdapterSpec {
|
|||
|
||||
remote.netty.tcp.hostname = "localhost"
|
||||
remote.log-remote-lifecycle-events = off
|
||||
remote.retry-gate-closed-for = 1 s
|
||||
remote.transport-failure-detector.heartbeat-interval = 1 s
|
||||
remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
|
||||
|
||||
remote.netty.tcp.applied-adapters = ["trttl"]
|
||||
remote.netty.tcp.port = 0
|
||||
|
|
@ -115,7 +118,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
|
|||
here ! "Blackhole 3"
|
||||
false
|
||||
}
|
||||
}, 5.seconds)
|
||||
}, 15.seconds)
|
||||
|
||||
here ! "Cleanup"
|
||||
fishForMessage(5.seconds) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue