From 1bd41e948e7e31f145b85df337c1e3d58697e22c Mon Sep 17 00:00:00 2001 From: Gert Vanthienen Date: Wed, 16 May 2012 23:49:42 +0200 Subject: [PATCH 01/21] Add sbtosgi plugin configuration to generate OSGi metadata --- project/AkkaBuild.scala | 74 +++++++++++++++++++++++++++++++++++------ project/plugins.sbt | 2 ++ 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 4804c0f796..a291abae7d 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -11,6 +11,8 @@ import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOpti import com.typesafe.schoir.SchoirPlugin.schoirSettings import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys +import com.typesafe.sbtosgi.OsgiPlugin.osgiSettings +import com.typesafe.sbtosgi.OsgiKeys import java.lang.Boolean.getBoolean import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, sphinxTags } @@ -45,7 +47,7 @@ object AkkaBuild extends Build { lazy val actor = Project( id = "akka-actor", base = file("akka-actor"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.actor ++ Seq( autoCompilerPlugins := true, libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) }, scalacOptions += "-P:continuations:enable", @@ -79,7 +81,7 @@ object AkkaBuild extends Build { id = "akka-remote", base = file("akka-remote"), dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ Seq( + settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ OSGi.remote ++ Seq( libraryDependencies ++= Dependencies.remote, // disable parallel tests parallelExecution in Test := false, @@ -98,7 +100,7 @@ object AkkaBuild extends Build { id = "akka-cluster", base = file("akka-cluster"), dependencies = Seq(remote, remote % "test->test", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ Seq( + settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ OSGi.cluster ++ Seq( libraryDependencies ++= Dependencies.cluster, // disable parallel tests parallelExecution in Test := false, @@ -117,7 +119,7 @@ object AkkaBuild extends Build { id = "akka-slf4j", base = file("akka-slf4j"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.slf4j ++ Seq( libraryDependencies ++= Dependencies.slf4j ) ) @@ -126,7 +128,7 @@ object AkkaBuild extends Build { id = "akka-agent", base = file("akka-agent"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.agent ++ Seq( libraryDependencies ++= Dependencies.agent ) ) @@ -135,7 +137,7 @@ object AkkaBuild extends Build { id = "akka-transactor", base = file("akka-transactor"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.transactor ++ Seq( libraryDependencies ++= Dependencies.transactor ) ) @@ -153,7 +155,7 @@ object AkkaBuild extends Build { id = "akka-mailboxes-common", base = file("akka-durable-mailboxes/akka-mailboxes-common"), dependencies = Seq(remote, testkit % "compile;test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.mailboxesCommon ++ Seq( libraryDependencies ++= Dependencies.mailboxes ) ) @@ -162,7 +164,7 @@ object AkkaBuild extends Build { id = "akka-file-mailbox", base = file("akka-durable-mailboxes/akka-file-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.fileMailbox ++ Seq( libraryDependencies ++= Dependencies.fileMailbox ) ) @@ -171,7 +173,7 @@ object AkkaBuild extends Build { id = "akka-zeromq", base = file("akka-zeromq"), dependencies = Seq(actor, testkit % "test;test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.zeroMQ ++ Seq( libraryDependencies ++= Dependencies.zeroMQ ) ) @@ -189,7 +191,7 @@ object AkkaBuild extends Build { id = "akka-camel", base = file("akka-camel"), dependencies = Seq(actor, slf4j, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.camel ++ Seq( libraryDependencies ++= Dependencies.camel ) ) @@ -435,3 +437,55 @@ object Dependency { val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 } } + +// OSGi settings + +object OSGi { + + val actor = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka*", "com.typesafe.config.*", "com.eaio.*", "org.jboss.netty.akka.util"), + OsgiKeys.importPackage := Seq(scalaImport(), "*;resolution:=optional"), + OsgiKeys.privatePackage := Seq("org.jboss.netty.akka.util.internal") + ) + + val agent = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.agent.*") + ) + + val camel = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.camel.*", "akka.camelexamples"), + OsgiKeys.importPackage := Seq(scalaImport(), akkaImport(), "org.apache.camel.*") + ) + + val cluster = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.cluster.*") + ) + + val fileMailbox = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.actor.mailbox.*") + ) + + val mailboxesCommon = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.actor.mailbox.*") + ) + + val remote = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*") + ) + + val slf4j = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.event.slf4j.*") + ) + + val transactor = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.transactor.*") + ) + + val zeroMQ = osgiSettings ++ Seq( + OsgiKeys.exportPackage := Seq("akka.zeromq.*") + ) + + def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.1,2.10)\"".format(packageName) + def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,3)\"".format(packageName) + +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 80ff9db95a..f5355bd1d2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,6 +7,8 @@ addSbtPlugin("com.typesafe.schoir" % "schoir" % "0.1.2") addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.3.1") +addSbtPlugin("com.typesafe.sbtosgi" % "sbtosgi" % "0.2.0") + resolvers ++= Seq( "less is" at "http://repo.lessis.me", "coda" at "http://repo.codahale.com") From 80ca257f25bd6de74c346c140979e0f5e791daea Mon Sep 17 00:00:00 2001 From: Gert Vanthienen Date: Thu, 17 May 2012 08:09:59 +0200 Subject: [PATCH 02/21] sun.misc.Unsafe should be loaded through boot delegation instead --- akka-docs/additional/index.rst | 1 + akka-docs/additional/osgi.rst | 10 ++++++++++ project/AkkaBuild.scala | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 akka-docs/additional/osgi.rst diff --git a/akka-docs/additional/index.rst b/akka-docs/additional/index.rst index b3c89356c9..284586d59d 100644 --- a/akka-docs/additional/index.rst +++ b/akka-docs/additional/index.rst @@ -6,3 +6,4 @@ Additional Information recipes language-bindings + osgi diff --git a/akka-docs/additional/osgi.rst b/akka-docs/additional/osgi.rst new file mode 100644 index 0000000000..aea554ef9c --- /dev/null +++ b/akka-docs/additional/osgi.rst @@ -0,0 +1,10 @@ +Akka in OSGi +============ + +Configuring the OSGi Framework +------------------------------ + +To use Akka in an OSGi environment, the ``org.osgi.framework.bootdelegation`` +property must be set to always delegate the ``sun.misc`` package to the boot classloader +instead of resolving it through the normal OSGi class space. + diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a291abae7d..59f7c62a04 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -444,7 +444,7 @@ object OSGi { val actor = osgiSettings ++ Seq( OsgiKeys.exportPackage := Seq("akka*", "com.typesafe.config.*", "com.eaio.*", "org.jboss.netty.akka.util"), - OsgiKeys.importPackage := Seq(scalaImport(), "*;resolution:=optional"), + OsgiKeys.importPackage := Seq("!sun.misc", scalaImport()), OsgiKeys.privatePackage := Seq("org.jboss.netty.akka.util.internal") ) From 82a29bbd3b94a2d7f141996b8543b30c427c6df0 Mon Sep 17 00:00:00 2001 From: Gert Vanthienen Date: Fri, 18 May 2012 15:09:52 +0200 Subject: [PATCH 03/21] Tighten akka import version range and make some more packages private --- project/AkkaBuild.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 59f7c62a04..5066d3929a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -443,9 +443,9 @@ object Dependency { object OSGi { val actor = osgiSettings ++ Seq( - OsgiKeys.exportPackage := Seq("akka*", "com.typesafe.config.*", "com.eaio.*", "org.jboss.netty.akka.util"), + OsgiKeys.exportPackage := Seq("akka*", "com.typesafe.config.*"), OsgiKeys.importPackage := Seq("!sun.misc", scalaImport()), - OsgiKeys.privatePackage := Seq("org.jboss.netty.akka.util.internal") + OsgiKeys.privatePackage := Seq("org.jboss.netty.akka.util.*", "com.eaio.*") ) val agent = osgiSettings ++ Seq( @@ -486,6 +486,6 @@ object OSGi { ) def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.1,2.10)\"".format(packageName) - def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,3)\"".format(packageName) + def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName) } From 2e4f01b6123c2acc7d3e704f4555069bf4ba5065 Mon Sep 17 00:00:00 2001 From: Gert Vanthienen Date: Thu, 24 May 2012 23:39:13 +0200 Subject: [PATCH 04/21] Inadventently deleted akka-intellij-code-style.jar --- .../_sphinx/static/akka-intellij-code-style.jar | Bin 0 -> 8587 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 akka-docs/_sphinx/static/akka-intellij-code-style.jar diff --git a/akka-docs/_sphinx/static/akka-intellij-code-style.jar b/akka-docs/_sphinx/static/akka-intellij-code-style.jar new file mode 100644 index 0000000000000000000000000000000000000000..55866c22c522de2f21a4fb2a6c26ec30ad3c5486 GIT binary patch literal 8587 zcmWIWW@h1H00GJ33DzJQhPfG77?Sf-Qj1F}b5e`-|D9rB04Ww>;9%fjKvyisz<{hc zF*`dk9;7)QtXr=lH|OmQ$NVM(3AP8;`xm{ObXR8An*^gPA#as9n;MI5wwkQ7N@rE~ zI<`MP@QUkE5#jAKzqmc9{=KyJl8`}EyfL59whr~c8x33AwODjy-0~+E+-b0@TzMsN z^-9S{eB6&7TF*8VxFnhFU^VxWSJ`eM{or#6!ORCbo_QPk#XYRH_z)n^zU=N%vyTcM z8u6X%r;m0yE|6E3xpX)$PR@ck$!gnnxd@SUn`gg193l7NO!%JHA7_|btb1o+W|%9V zx|n_WBLCLFrdykq-h8t8lOO-i70w%WpYGA5Ic*P1k>{f-#}7OoQunA9p1vH%mHnbs?o-sqn83SiSJ{16Yze*SrM3G0#;&PX z)_RjwUxU}qQ=?b2| z$LCL3wyc&pz?+@pgM)Hg3nK$VgCKHznxaL%AOizKenClQeqOPDeo01Z5hx0u#{4gS zv`g^+-@l@IZ%f|Y-7`aAl6R5st->CTea2>6eT&R^u3Ko+i$=8wuCU!ng{`DX*|4Fj; zp&LDOCCnu>1(+kZ2*iDOwC&qV1(Ek2-i) zyk|Kl;@%o?Roc~W%LbVjfyc_lS6jCyG5<2}^JIBtP$+YlTUTPzgkID9Y_Hgq@(Eo! zDfZtcOxvH&abL#1@x6MVe_^x#I-i&e*AE?k_Ji|$!B&UL3<;0TIl=E1+HEpjxnJnn zo;?=o2JJrG2ik&j*!w4GYOE=nGfnYiS6P0!=vAk-Ol99)!GYR%cJ=sNW?TA2)OV9e?BnDm(a#o}Y!E%WZ}OA# z-`ni2?AVf77sj^jXXxgt6HSTVCSJX4t!BP-P4==>o>XshpLKo9LcX6!+I-G`!nvkZ z-gAvYeeVkxPYS6nbDtRU_ujUBd>5u2I~K5+`>$v03yGdp_5oL>Doc1?P&TYBb?V%g z^KjDT8|554?4mrCR=+hOi_V)(Ff}o;-kN83*2CGm{af21pL1J&S1qra%U~73l`Nwx ze%#cIxzqR9sWS)WyUt>H^jr0(^xJDz*_(B8^iQ5YXa2lg$ojwRlSvs`lWu+~pYX@p zH(#jYqv4ZxT@D&u9x4(RDuFsDRAy9MR!RD}%Sfa)h==pg$-SFiy%cNrj^;~Ht-8|N zrz;)Leob%Ec@Ew;rn{P*Dy;WsO16iL05dQS-ab`pX8Id ze_)ErMTzGcJ~LbE472j3CUERl54`8%q4a;-M3;l^=Z~H{Zu#}`c5{>)@5QLlcRUe?ET{jO!>7cf(4s$W?dgX5 z6V#Odb>u1(uZT)((<|k@=^VW4zLdo^!HJ7X73N8K#2r^vsVY_4-sfRHWs{?ylF{bl zo-Upt?y0AKC0yOS(%-Ew{(79;tZDc3Ppn$JB}M7B%dC|Vru6>I&`|~Rl6BA z3|a1M6PC#Em;|WK^nclJD;vOCY$m+S`|l}NmV_nzE9&^U913~*S99syoV1HQaoLRK zs>}ty7ivkG{W^3dxykoq<-x^CUvr8prq5io@0o4sv-y_icZE9k{p7j1>_Teiv#`fa z8^5Z4{(P$8ft!Ec^2n1n z$~DdgPJGW2e6c6;-wlC?-FtqoJ$?FgkVEa^Wk-)cJHKB|MQ%^Yv}VCqM`|AkZx@=&l&dt0#n=a1eRqqJ+Y&egB=F4#5gZS_I!GF7wm zEz=a2PrkY}FN&+(N5% z*@u#y8=vfAc%FUtZO>(CC7EB`-A`l+ZT9!W0m|M4^9b?M2M^7R6CfkGd6W?c3!Vn}#k*p?Ax zuR3Slrab3tnfzHk(%v(!>pa{uJB^J$dqdJZ#y`s+E}CdHbEaI*75&3=6xYfn7gl9S zCArS`$=-eB<)XqTELS|w_w)S8e&qV~wea$MuD9_l5smeNGY;?f*8FnPewyIlc>C+? z&diZ}T`jDmf6PuSk9G2y57t{NKIAnlo_YA2`YYDLUxjIGQiU65tmA!eBJ?2Y*c`jF zI;W+R17F^lF0k&%o!!~br(WLO&meeo#vR>P2etJr&P4t#5{Y^7Sn|x8tsJj+AA9M0 z&2Ynf5gi^IOSV_DFRrvWYIs?!cDm}smxTr2w#i*6_|>~pd*8<-2b1rweirOv%y=(x zK>xd3t18#mk3HhQrdp)6e>mRIq+6KA3k$*}fao7kr6j=9pe zwJ!BBa%xKMf3QR7d%e8M8;xUe1t0$GzL-$#!FbwyzPrHR#cI9Be67lA)PGkqIh^7S zn)JS{^ES&-{~t%XW$za^O-?t;+Lgj))Zd`7oijyL_vQS_C6`;5#T(8%`hEAdBn$Bt ztDo*BGoKzanSV5w3R(JJZ9h_R)Ap$Ge~!<8<7$>Y6nyq}uf_WJ{LgRk z{%u^(Kl5cQ`{eI0HD`#dIoEjEV4m@*b0;#R<}9<~o^<52aCgq5B>wQa=$z~X@rdAk z#+Tw`+O7Q;x2x$Km}Z(V)i?e}U+bl5W&Y;n^W4*QudmvDuQHzxO!;?d zRnLUiF1GK_|4V=Mx`llz#~#hucUDBNiDBo9WL~qLd2h_&!iM|H-50hoEZ!F!STAkt zcRzf~yKDancKuzPk|9vOz3^d4Rf@!}K&=SPKegA-)E+uBlX=FTq;(l#l@sdu3rhbO zHMmaS;e9%zFzV=maxvxfKS3g?W-L~Hdko%HTwEpcSXWSc)`_?2XAiS(>zdFJDVh3Z zb<=&X=hNj>Tz$Efd_M$6U%Q%E|J3ESP3T1*@1W2tH@m&2EqH6kGIj3egMTM%Tw;5B zdAIZy-E)0;>deWXYnFXq8oJJKmM+`rn-LS^W9#?W{5=u(ApWG3`HZ_2jI2+dCYSR+ z6Q3@xe@k5cyuMe>^&fAH+_#xaANe`+cUtP#S${OLUmo!<*%~!f`9;@*0+Z7xAL?d3 zUVOV{`_bPkg&&8fD^?|Lk&oW?Cq-iF8NQbSwV#}-I2OM5eH3ov74JVUIsIqaf5V>I zPjg%>jHi4p|7{o?^G0Oa6_dFKWG7aB+;wne&)!!FraGTio;2m#wDRt;^X2RZ9M*8% zkGtBTySg&sZg^PQvDTPhznGaa8s$X3%$Qmkdj0yFcNWa;7Z*Lc$I2|l)jdh6+vswK zN@H|r_hxUFmdh*!%Uv`*ikfx=8TttcbHpF{uA8(>wLT^*Vq)pR{49?0vLZ%-%4HL8 z%zjcHu}VGQvhLE^&S!$s*aB-;XP#Ez3W?1A$G=IF$GLp(pGV7frtJ2YN_-gk!upLy z#$AECZ{&J^vS|O8_{2PGKO@)6zF=RmLeJDPAuB`eS0Z~1n|XGwXqynAqi&*RJoCqd z$@xjMstYx=ug z(XWN`?GFE5HTUi7zvoxg7D4e)6>bab9-WlSw)%zRs(I%EHp>YESLd;=6h9baQ)rcJltDJnkD` zj~-R!x-GMUYj@?rGSM6NPDI7NU3p_)#&-KTvGa<*{TA&16dXHi=7UZDonEd^d}92- zq{CnhU&HK-e|r=+92fg(^!fdU!;_AcdQ82)Eawzo%FZn-Sq#6`zMniPYa8c;%?VfL zoc|;ExS&XF+s(H@Vb_#?%au&|c_jC-X5Zodv&;M$X4xK`+<*Q0^>AKCBm3La@}6@_ zKZ(7Ru;R(BGo_rh3%oD4^5?9r-j(~uQ>sB}vqs+c5`%?(Nq<((ysNR`?4<{cv3;}7 zFPL`pB>P6yZJ)k-zWI5R`Qy#bBLWPQEdH(y%(=haZ~eK`e#IM}l%6$t{-)VBea)&T z^X6{%ukX0==<)h9d8<^mM@@e#uQ>DW{|}Ll-d8iC*S_vv%%OW;-`Gq3xVqm9M|r(% z{+~jBKRR4^B){k9Ot%HTyH)4iy3nb0QT*-a)$%6i6OBIYEqq$BUjO{Q`o37BWxna} zE>5dGXQ#5pM#*(jsFD2^y>sXK?T;`0KJ{2l-OaS+OYBe7YqQ_lXSd&`??m|Vugfho z;NL6WTrG6?h-qT0B>+S!QLq|IYU-Kb+TEH@C1{_o$Isv+3`cp40EXt$7@(7}s+1 z<{5Q{KdO`O^?CpP*_(5qe%Yr6i}l8guln}{3O$xOvd1)VmB;V<&)iP89NxQQ&#Hx$ zvvuZP+j4J`-rKnbAD8(2e$MeA-f-UWOZ8qm9~%d}|M|bOsQd1gb>bIi95;7wxc4;4 z?a3cDb@s^fv+aI<(D%#!s*}EoT_^qVf+;cPd;gx{yW>=I;@%_i;H`dVR8p2pCOtiN z<@QteP!kWa%S_#;OtYu^d{gF%n|8*gdxCmb&f+aMN)|6-&p(p)GVtivG;!u5N7l)T zhg_8wzR7>2r1TmH_`SNrblc`LnHtxU1*=9U|a6VxyBG79Ajot;y0cxDzy?;&EtM<|Nda&1?e|y_1q4J9k%uGIiY)C z=go&E=Dgf2PQSV%zHmN%o3%4@-Ol@Q*|y%+24ejGes8F{I_bF)`9i>vr9uK_nho&%U0f(x$dvhUH|`VpkDk>Jy)}Keg=jUevn=~ zWC-VL>{Tn+@C}IOV_;xN%gjj)NzEs_4s z)8&zaO4ia-?ccCv^VL$gHMHi7B)C zmIOpC5)V3gTG`^jIjckU*WSMW@Oyqa$h$GMvf(8R3=9((kiDB1egnk|pg<_W;#G8Y z!VC-y2zABDi8+a&;f=S`;{6X9h#bBDQ^b6#K<~wavl-bU6aH}DE||?S-$#~pddb^2 zR!8#R-(ZsxF{&||xc%POJH`34K6_uwSSQL*=vT!*R^uZz|@JV7<;A)l)yII?q7cFYt@3!HPiQl;{ro9i(TzDOD?QGu8 zqQKQ=0%ts*aN19KdEtwkjT}phC(ESUDFZd9Tim`=i}p|d70*)J+S)(=W!ZycHVaE-xrAJI zKA)pg;PcevHZ$Y0{2#VOD*UE9zw@N$oxAggGcWA^j+J|k#h4jYy_yhOzz6^-8t_J7qi*TcV@eTkJRTkIvsG?ER+6y_ItybA9vpl zn!uY8tRZ>f_-}SE2fZtsCcJLg_oR8Z`mMJXc21F;eFqp$My=k(m67XkxzG8;%gG*b zP7d0!9!pXVyVkC0*;6>#pzlufBR9|cOH(-07It%e-=y0n6e781ik>A~$Vo#B2vm*{m`xIaOpw&BY{=RFS6VSlRBe#(J%98cH`$}S+5;G9^G$SxXLN# zWXJVOx>HzK|Kkj+|zNi_Ueet)r{PDm^@s|&M%9+EXYc2Kfrpc+} znMpSdPCZ_!`sJ#WWukHP{+?LV?l`k~H~sQwEPeUstXNL&%&&(sD-Ol!oXO9a)xYQs zf7E^J+j{q{<$Q}{?%gacm3ht%ihvXCJ9j!VGB8YGMvj1_fL@e=Vo8(;C{Il)PAw|S zOinEZXR7Gv;@jJV>dvn}uwoXU?PKPhFTL}!ju*aBoB69q$MWTwnZ|M*ExKAF2bdP9 znojoh*5iQoMf84LF;^ zes_XKBfF~Di_iV`f3|$z^m}3-TiPAY3pby=zoXvOt@p|;a&^DN+=XdH`OlYjYi`)M zD$~r;`_A5FuV$<=n= zH*NKcGNzoE5U6?4WbQmeuVcLBo3Gz|vb$Y9f7i6r;w;xb9>3ReXM!bL`2nSG8@E1t zcxQQGmXOtb$N3NS`&>_4aS;4}IY3~3lj6ncQvwgNb!T>*e>r9KHXbXD1((DVy>7k# z<0dJ3YwEY>8$=#to;asEcl~_5Aa>0S-2~mVf6|P>tE}?G2eV2GtuMH z75jtE`5hc5HBQ-VIk{DV=h;Sv$xDA_Jk{Xl2xb@5VV>HqGxJvI!$(q+XZ%R+;>_s& ztFgoARn+{#n{Ly>kL%WUrtt2xJQbq)Tjs+=rZ&IMfVi`B65pt; z-q@v9UY?L}DX%gtWp{dl)5bim(2k<$b>}j!Zp?l2ym;A*n3?Jo&)63CXubb#|0Z5; z&OS?>rRy&}5dCy8()!Tlhqn(Go4;Hg=ean1ug+Af&sm%1gbQXJQwZ^Fw_0s>S$yZl zws#FKfmZw1@vmK%_jTfRhdsd=4YiJmM!U98NdEC+iMa}s(}M}DhV42Vn6v2UEuRzlg;D_y~ma>=KOl~|G$2_<(%KSe@zQ|XT+R)Gg;_mix;!O$J&&4Eh}Ap zn%7S|_+PW-y7yb5RljGeOuN5cQSW=v^fMcSK6c0XFaMNrW@**OocA~5^x2=Ql}7Q! zMqd{Gc~(1$Ws`Skxzo*M^C~sJPWcv-F0j#=qvS^U(FBZb&&M3!a``u8o=A%=RX8LOzQ8R&tKxZu(c~QxH`lGXbzt?@gK{!F-mCZA zvpcZFZiA2X<|#{Z!Y0jMdt+Iy<2v^|h0QA$<$jrDG%aQ7@m04?qh9}RTAK3i&q~gP z&0APnFX{Z9|8UaFudlLBEm@&fJm-}5PCXq(iRi4jZ3niB#>ckqc&)=aUF%@b#;udH z{bgCFOJ3!vjos_ zYZ@F0e$pV|I$_z987?>LE{DZDtBosOx_4f;sFm4cWvzz%hNjd$?_bkTEO*@;KX1$3 zGvZTPA0&DFir)5M17|q*`Z$lQ^`{qK{`~mw!w(PktMt#JXwM z+B)CK7ri$3^?0EY7Z^SIB+?-89-f1GQEd)^#(ewHuq{Z4=I)BdrqE^$87 zo?~B^OP;YWYdF96b6H2c;$HCy`afe5%JO@qd!t_IUb&xq)?VInVV$U@55Jw#zDG%c zjdvd%IW{TTGq0<7A&>W~S4XlAD4!SoP=5P}|KjTbA5N{0x{$kib?%9+tJj(F?v%L3 z(sUwV@zju5@wnYbJ5)}UE&DE3EZKZ1(p2Z6N4s>}j@|D$)}`bus68QZPwhgkiM3wp zq*?P%c^|KQnd)+7>dXC8vuC`#eX4HZLk|8(v$yLM))f~oe0lHSWp<0dyD!VGUj90M z)2jX9S8LewTz#wWgeN_>;k)B==bKpgZHGC3r28)2pZxRq!EYM1)l>G_di{?}@VpZz zf9HO=w06$*zeSIyT}pn-ukh~uY1W2wpEoz>|GCZjob^hyLh0|dhrUI#N_=HVefVj` z@vfbl--{K!@7P$S^l`?l+w-EQ++FnMvD3{xS$XkQdD|b)PW;evrd7J-pihLF&R&&O z_Rbgl99ZkP&pO_6S>}G9ed`ioEtS{=q4o%EhXXx_Y|VbJy(4qtc0ci&j z)i*D_x+3MT=;Sxs|JPaTobHEOvl2 zIY4cC1_t!$Nm&L42G6{b)SR45F9lB*S4Rc+ocyH39EIT2l9J54^x^<-MkX!>h-UP; zYS6?ssCR_GpqXm$M0GKAnwybHgaOjp1ZhT|G6hX^BQ)v~oc0De1ZEb<7eDn79XHS< zE5fY5j98{zLE2&dz}CA%Xg@5-fYH}OcmveiL+(T=A~Z5EFucdmi>2>}>=g7iI3%E9 zW^NKf?q7lQ!9oC}54lwj>WRU0F)-Xh(FgCBp&NkQDg`zE5eBRfLADsy6+qSxYP%vQ zF;LqTp Date: Fri, 25 May 2012 17:27:24 +0200 Subject: [PATCH 05/21] Added more logging to Cluster's who to gossip selection process. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 76e3356143..e7d672d051 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -795,6 +795,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { + log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", ")) if (addresses.isEmpty) false else { val peers = addresses filter (_ != remoteAddress) // filter out myself From d99b1cd7f00d0d091b91195226369fb0e798c1bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 May 2012 17:29:29 +0200 Subject: [PATCH 06/21] Rewritten old in-memory ClientDowningSpec into multi-node specs. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split it up into two different specs: - ClientDowningNodeThatIsUnreachableMultiJvmSpec - ClientDowningNodeThatIsUpMultiJvmSpec Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 111 ++++++++++++++ .../ClientDowningNodeThatIsUpSpec.scala | 108 +++++++++++++ .../akka/cluster/ClientDowningSpec.scala | 145 ------------------ 3 files changed, 219 insertions(+), 145 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala delete mode 100644 akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala new file mode 100644 index 0000000000..8e02420050 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address + +object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val waitForConvergence = 20 seconds + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka { + #loglevel = "DEBUG" + #stdout-loglevel = "DEBUG" + cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off + } + } + """))) +} + +class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec + +class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ + + override def initialParticipants = 4 + + def node = Cluster(system) + + def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address))), waitForConvergence) + } + + "Client of a 4 node cluster" must { + + "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { + runOn(first) { + node.self + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + + // kill 'third' node + testConductor.shutdown(third, 0) + testConductor.removeNode(third) + + // mark 'third' node as DOWN + node.down(thirdAddress) + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + node.latestGossip.members.exists(_.address == thirdAddress) must be(false) + testConductor.enter("await-completion") + } + + runOn(second) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + + runOn(third) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + } + + runOn(fourth) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala new file mode 100644 index 0000000000..52d37a4ed3 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address + +object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val waitForConvergence = 20 seconds + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka { + #loglevel = "DEBUG" + #stdout-loglevel = "DEBUG" + cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off + } + } + """))) +} + +class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec + +class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import ClientDowningNodeThatIsUpMultiJvmSpec._ + + override def initialParticipants = 4 + + def node = Cluster(system) + + def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port))), waitForConvergence) + } + + "Client of a 4 node cluster" must { + + "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { + runOn(first) { + node.self + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + + // mark 'third' node as DOWN + testConductor.removeNode(third) + node.down(thirdAddress) + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + node.latestGossip.members.exists(_.address == thirdAddress) must be(false) + testConductor.enter("await-completion") + } + + runOn(second) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + + runOn(third) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + } + + runOn(fourth) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala deleted file mode 100644 index 0e7b0ed330..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import com.typesafe.config._ - -import java.net.InetSocketAddress - -class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with ImplicitSender { - val portPrefix = 1 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - var node4: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - var system4: ActorSystemImpl = _ - - try { - "Client of a 4 node cluster" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Cluster(system1) - val fd1 = node1.failureDetector - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Cluster(system2) - val fd2 = node2.failureDetector - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] - node3 = Cluster(system3) - val fd3 = node3.failureDetector - val address3 = node3.remoteAddress - - // ======= NODE 4 ======== - system4 = ActorSystem("system4", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d553 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider] - node4 = Cluster(system4) - val fd4 = node4.failureDetector - val address4 = node4.remoteAddress - - "be able to DOWN a node that is UP" taggedAs LongRunningTest in { - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: node4 :: Nil) - - node3.shutdown() - system3.shutdown() - - // client marks node3 as DOWN - node1.down(address3) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: node4 :: Nil) - - node1.latestGossip.members.size must be(3) - node1.latestGossip.members.exists(_.address == address3) must be(false) - } - - "be able to DOWN a node that is UNREACHABLE" taggedAs LongRunningTest in { - node4.shutdown() - system4.shutdown() - - // clien marks node4 as DOWN - node2.down(address4) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: Nil) - - node1.latestGossip.members.size must be(2) - node1.latestGossip.members.exists(_.address == address4) must be(false) - node1.latestGossip.members.exists(_.address == address3) must be(false) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - - if (node4 ne null) node4.shutdown() - if (system4 ne null) system4.shutdown() - } -} From ea99a1f315e716da9782ec86b10c96800fc75196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 May 2012 17:43:03 +0200 Subject: [PATCH 07/21] Simplified config and removed old too-long timeout. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 22 +++++++------------ .../ClientDowningNodeThatIsUpSpec.scala | 22 +++++++------------ 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 8e02420050..95510a701d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -17,18 +17,12 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val waitForConvergence = 20 seconds - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka { - #loglevel = "DEBUG" - #stdout-loglevel = "DEBUG" - cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } + akka.cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off } """))) } @@ -46,9 +40,9 @@ class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowning def node = Cluster(system) def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address))), waitForConvergence) + awaitCond(node.latestGossip.members.size == nrOfMembers) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address)))) } "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 52d37a4ed3..b92a45f2e4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -17,18 +17,12 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val waitForConvergence = 20 seconds - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka { - #loglevel = "DEBUG" - #stdout-loglevel = "DEBUG" - cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } + akka.cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off } """))) } @@ -46,9 +40,9 @@ class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatI def node = Cluster(system) def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port))), waitForConvergence) + awaitCond(node.latestGossip.members.size == nrOfMembers) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port)))) } "Client of a 4 node cluster" must { From 57313cc9e0cbbc0bde9a2c7f33bc3c21b46a8eb7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 13:03:57 +0200 Subject: [PATCH 08/21] Move LeaderElectionSpec to multi-jvm. See #2113 --- .../scala/akka/cluster/LeaderElectionSpec.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename akka-cluster/src/{test => multi-jvm}/scala/akka/cluster/LeaderElectionSpec.scala (100%) diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala From 597271052f9c650ec8c7df5cc8318ccfd0be4018 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 14:48:00 +0200 Subject: [PATCH 09/21] Port LeaderElectionSpec to MultiNodeSpec. See #2113 --- .../akka/cluster/LeaderElectionSpec.scala | 212 ++++++++---------- .../akka/cluster/MultiNodeClusterSpec.scala | 20 +- 2 files changed, 110 insertions(+), 122 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index c262fad8c3..56cfbee75d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -4,128 +4,100 @@ package akka.cluster +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ -import com.typesafe.config._ +object LeaderElectionMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val forth = role("forth") -import java.net.InetSocketAddress + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down = off + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec + +abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ + + override def initialParticipants = 4 + + val firstAddress = node(first).address + val myAddress = node(mySelf).address + + // sorted in the order used by the cluster + val roles = Seq(first, second, third, forth).sorted + + "A cluster of three nodes" must { + + "be able to 'elect' a single leader" in { + // make sure that the first cluster is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = 4) + cluster.isLeader must be(mySelf == roles.head) + testConductor.enter("after") + } + + def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { + val currentRoles = roles.drop(alreadyShutdown) + currentRoles.size must be >= (2) + + runOn(currentRoles.head) { + cluster.shutdown() + testConductor.enter("after-shutdown") + testConductor.enter("after-down") + } + + // runOn previously shutdown cluster nodes + if ((roles diff currentRoles).contains(mySelf)) { + testConductor.enter("after-shutdown") + testConductor.enter("after-down") + } + + // runOn remaining cluster nodes + if (currentRoles.tail.contains(mySelf)) { + + testConductor.enter("after-shutdown") + + runOn(currentRoles.last) { + // user marks the shutdown leader as DOWN + val leaderAddress = node(currentRoles.head).address + cluster.down(leaderAddress) + } + + testConductor.enter("after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + } + + testConductor.enter("after") + } + + "be able to 're-elect' a single leader after leader has left" in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) + } + + "be able to 're-elect' a single leader after leader has left (again)" in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) + } + } -class LeaderElectionSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 5 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - - try { - "A cluster of three nodes" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node1 = Cluster(system1) - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node2 = Cluster(system2) - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node3 = Cluster(system3) - val address3 = node3.remoteAddress - - "be able to 'elect' a single leader" taggedAs LongRunningTest in { - - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: Nil) - - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node1.shutdown() - system1.shutdown() - - // user marks node1 as DOWN - node2.down(address1) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node2 :: node3 :: Nil) - - // check leader - node2.isLeader must be(true) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node2.shutdown() - system2.shutdown() - - // user marks node2 as DOWN - node3.down(address2) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node3 :: Nil) - - // check leader - node3.isLeader must be(true) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..48f1d0b520 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ */ def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { nodesInCluster.length must not be (0) - import Member.addressOrdering - val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + val expectedLeader = roleOfLeader(nodesInCluster) cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) } @@ -60,4 +59,21 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ awaitCond(cluster.convergence.isDefined, 10 seconds) } + def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + nodesInCluster.length must not be (0) + nodesInCluster.sorted.head + } + + /** + * Sort the roles in the order used by the cluster. + */ + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + } + + def roleName(address: Address): Option[RoleName] = { + testConductor.getNodes.await.find(node(_).address == address) + } + } \ No newline at end of file From a44bd10fc33e1ce4284c9d0cec79a7131466d71c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 27 May 2012 19:15:31 +0200 Subject: [PATCH 10/21] Tag as LongRunningTest. See 2113 --- .../scala/akka/cluster/LeaderElectionSpec.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 56cfbee75d..007ab941dc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -33,15 +33,14 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp override def initialParticipants = 4 - val firstAddress = node(first).address - val myAddress = node(mySelf).address + lazy val firstAddress = node(first).address // sorted in the order used by the cluster - val roles = Seq(first, second, third, forth).sorted + lazy val roles = Seq(first, second, third, forth).sorted - "A cluster of three nodes" must { + "A cluster of four nodes" must { - "be able to 'elect' a single leader" in { + "be able to 'elect' a single leader" taggedAs LongRunningTest in { // make sure that the first cluster is started before other join runOn(first) { cluster @@ -91,11 +90,11 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp testConductor.enter("after") } - "be able to 're-elect' a single leader after leader has left" in { + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) } - "be able to 're-elect' a single leader after leader has left (again)" in { + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) } } From fcaa4751b39bf3d8bfb0d3c4caa7ca81b653e188 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 27 May 2012 19:20:30 +0200 Subject: [PATCH 11/21] Pass include/exclude tags arguments to multi-jvm tests, see #2139 --- project/AkkaBuild.scala | 53 +++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 13c90ed61e..2ffe034c3c 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -85,7 +85,7 @@ object AkkaBuild extends Build { extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -101,7 +101,7 @@ object AkkaBuild extends Build { extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -118,7 +118,7 @@ object AkkaBuild extends Build { extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -298,7 +298,7 @@ object AkkaBuild extends Build { val defaultExcludedTags = Seq("timing", "long-running") - val defaultMultiJvmOptions: Seq[String] = { + lazy val defaultMultiJvmOptions: Seq[String] = { (System.getProperty("akka.test.timefactor") match { case null => Nil case x => List("-Dakka.test.timefactor=" + x) @@ -306,6 +306,31 @@ object AkkaBuild extends Build { (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) } + // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) + lazy val defaultExcludeTestNames: Seq[String] = { + val exclude = System.getProperty("akka.test.names.exclude", "") + if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq + } + + // for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing) + lazy val defaultExcludeTestTags: Seq[String] = { + val exclude = System.getProperty("akka.test.tags.exclude", "") + if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq + } + + // for including tests by tag (or use system property: -Dakka.test.tags.include=timing) + lazy val defaultIncludeTestTags: Seq[String] = { + val include = System.getProperty("akka.test.tags.include", "") + if (include.isEmpty) Seq.empty else include.split(",").toSeq + } + + lazy val defaultMultiJvmScalatestOptions: Seq[String] = { + val excludeTags = (defaultExcludeTestTags.toSet -- defaultIncludeTestTags.toSet).toSeq + Seq("-r", "org.scalatest.akka.QuietReporter") ++ + (if (excludeTags.isEmpty) Seq.empty else Seq("-l", excludeTags.mkString(" "))) ++ + (if (defaultIncludeTestTags.isEmpty) Seq.empty else Seq("-n", defaultIncludeTestTags.mkString(" "))) + } + lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq( resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", @@ -318,23 +343,9 @@ object AkkaBuild extends Build { parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, - // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) - excludeTestNames := { - val exclude = System.getProperty("akka.test.names.exclude", "") - if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq - }, - - // for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing) - excludeTestTags := { - val exclude = System.getProperty("akka.test.tags.exclude", "") - if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq - }, - - // for including tests by tag (or use system property: -Dakka.test.tags.include=timing) - includeTestTags := { - val include = System.getProperty("akka.test.tags.include", "") - if (include.isEmpty) Seq.empty else include.split(",").toSeq - }, + excludeTestNames := defaultExcludeTestNames, + excludeTestTags := defaultExcludeTestTags, + includeTestTags := defaultIncludeTestTags, // add filters for tests excluded by name testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) }, From 6993064cdeb3c2e8863e0e07a6e876a7bab5f701 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 27 May 2012 19:21:38 +0200 Subject: [PATCH 12/21] Revert "Commented out the cluster tests because it's not possible to exclude them right now" This reverts commit 2abe5308dabf452885eaad2c1b63c3ce34774dfe. --- .../akka/cluster/JoinTwoClustersSpec.scala | 180 +++++++++--------- .../MembershipChangeListenerSpec.scala | 154 +++++++-------- .../akka/cluster/NodeMembershipSpec.scala | 140 +++++++------- .../scala/akka/cluster/NodeStartupSpec.scala | 148 +++++++------- 4 files changed, 311 insertions(+), 311 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 87129a7a7c..4bbe703405 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -1,90 +1,90 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -// -//package akka.cluster -// -//import org.scalatest.BeforeAndAfter -//import com.typesafe.config.ConfigFactory -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { -// val a1 = role("a1") -// val a2 = role("a2") -// val b1 = role("b1") -// val b2 = role("b2") -// val c1 = role("c1") -// val c2 = role("c2") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -// -//abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import JoinTwoClustersMultiJvmSpec._ -// -// override def initialParticipants = 6 -// -// after { -// testConductor.enter("after") -// } -// -// val a1Address = node(a1).address -// val b1Address = node(b1).address -// val c1Address = node(c1).address -// -// "Three different clusters (A, B and C)" must { -// -// "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { -// -// runOn(a1, a2) { -// cluster.join(a1Address) -// } -// runOn(b1, b2) { -// cluster.join(b1Address) -// } -// runOn(c1, c2) { -// cluster.join(c1Address) -// } -// -// awaitUpConvergence(numberOfMembers = 2) -// -// assertLeader(a1, a2) -// assertLeader(b1, b2) -// assertLeader(c1, c2) -// -// runOn(b2) { -// cluster.join(a1Address) -// } -// -// runOn(a1, a2, b1, b2) { -// awaitUpConvergence(numberOfMembers = 4) -// } -// -// assertLeader(a1, a2, b1, b2) -// assertLeader(c1, c2) -// -// } -// -// "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { -// -// runOn(b2) { -// cluster.join(c1Address) -// } -// -// awaitUpConvergence(numberOfMembers = 6) -// -// assertLeader(a1, a2, b1, b2, c1, c2) -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { + val a1 = role("a1") + val a2 = role("a2") + val b1 = role("b1") + val b2 = role("b2") + val c1 = role("c1") + val c2 = role("c2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec + +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import JoinTwoClustersMultiJvmSpec._ + + override def initialParticipants = 6 + + after { + testConductor.enter("after") + } + + val a1Address = node(a1).address + val b1Address = node(b1).address + val c1Address = node(c1).address + + "Three different clusters (A, B and C)" must { + + "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { + + runOn(a1, a2) { + cluster.join(a1Address) + } + runOn(b1, b2) { + cluster.join(b1Address) + } + runOn(c1, c2) { + cluster.join(c1Address) + } + + awaitUpConvergence(numberOfMembers = 2) + + assertLeader(a1, a2) + assertLeader(b1, b2) + assertLeader(c1, c2) + + runOn(b2) { + cluster.join(a1Address) + } + + runOn(a1, a2, b1, b2) { + awaitUpConvergence(numberOfMembers = 4) + } + + assertLeader(a1, a2, b1, b2) + assertLeader(c1, c2) + + } + + "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { + + runOn(b2) { + cluster.join(c1Address) + } + + awaitUpConvergence(numberOfMembers = 6) + + assertLeader(a1, a2, b1, b2, c1, c2) + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 6bb0f556d5..64019c102c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -1,77 +1,77 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import scala.collection.immutable.SortedSet -//import org.scalatest.BeforeAndAfter -//import com.typesafe.config.ConfigFactory -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// val third = role("third") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -//class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -//class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec -// -//abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) -// with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import MembershipChangeListenerMultiJvmSpec._ -// -// override def initialParticipants = 3 -// -// after { -// testConductor.enter("after") -// } -// -// "A set of connected cluster systems" must { -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// -// "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { -// -// runOn(first, second) { -// cluster.join(firstAddress) -// val latch = TestLatch() -// cluster.registerListener(new MembershipChangeListener { -// def notify(members: SortedSet[Member]) { -// if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) -// latch.countDown() -// } -// }) -// latch.await -// cluster.convergence.isDefined must be(true) -// } -// -// } -// -// "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { -// -// runOn(third) { -// cluster.join(firstAddress) -// } -// -// val latch = TestLatch() -// cluster.registerListener(new MembershipChangeListener { -// def notify(members: SortedSet[Member]) { -// if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) -// latch.countDown() -// } -// }) -// latch.await -// cluster.convergence.isDefined must be(true) -// -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec + +abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerMultiJvmSpec._ + + override def initialParticipants = 3 + + after { + testConductor.enter("after") + } + + "A set of connected cluster systems" must { + + val firstAddress = node(first).address + val secondAddress = node(second).address + + "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + runOn(first, second) { + cluster.join(firstAddress) + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + cluster.convergence.isDefined must be(true) + } + + } + + "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(firstAddress) + } + + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + cluster.convergence.isDefined must be(true) + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 21defd1d97..f96265ac5a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -1,70 +1,70 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import com.typesafe.config.ConfigFactory -//import org.scalatest.BeforeAndAfter -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object NodeMembershipMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// val third = role("third") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec -//class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec -//class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -// -//abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import NodeMembershipMultiJvmSpec._ -// -// override def initialParticipants = 3 -// -// after { -// testConductor.enter("after") -// } -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// val thirdAddress = node(third).address -// -// "A set of connected cluster systems" must { -// -// "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { -// -// runOn(first, second) { -// cluster.join(firstAddress) -// awaitCond(cluster.latestGossip.members.size == 2) -// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) -// awaitCond { -// cluster.latestGossip.members.forall(_.status == MemberStatus.Up) -// } -// awaitCond(cluster.convergence.isDefined) -// } -// -// } -// -// "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { -// -// runOn(third) { -// cluster.join(firstAddress) -// } -// -// awaitCond(cluster.latestGossip.members.size == 3) -// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) -// awaitCond { -// cluster.latestGossip.members.forall(_.status == MemberStatus.Up) -// } -// awaitCond(cluster.convergence.isDefined) -// -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object NodeMembershipMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec + +abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeMembershipMultiJvmSpec._ + + override def initialParticipants = 3 + + after { + testConductor.enter("after") + } + + val firstAddress = node(first).address + val secondAddress = node(second).address + val thirdAddress = node(third).address + + "A set of connected cluster systems" must { + + "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + + runOn(first, second) { + cluster.join(firstAddress) + awaitCond(cluster.latestGossip.members.size == 2) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) + awaitCond { + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + } + awaitCond(cluster.convergence.isDefined) + } + + } + + "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.size == 3) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) + awaitCond { + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + } + awaitCond(cluster.convergence.isDefined) + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index ff4c06215d..65cd7891a9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -1,74 +1,74 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import com.typesafe.config.ConfigFactory -//import org.scalatest.BeforeAndAfter -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object NodeStartupMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class NodeStartupMultiJvmNode1 extends NodeStartupSpec -//class NodeStartupMultiJvmNode2 extends NodeStartupSpec -// -//abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import NodeStartupMultiJvmSpec._ -// -// override def initialParticipants = 2 -// -// after { -// testConductor.enter("after") -// } -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// -// "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { -// -// "be a singleton cluster when started up" taggedAs LongRunningTest in { -// runOn(first) { -// awaitCond(cluster.isSingletonCluster) -// // FIXME #2117 singletonCluster should reach convergence -// //awaitCond(cluster.convergence.isDefined) -// } -// } -// -// "be in 'Joining' phase when started up" taggedAs LongRunningTest in { -// runOn(first) { -// val members = cluster.latestGossip.members -// members.size must be(1) -// -// val joiningMember = members find (_.address == firstAddress) -// joiningMember must not be (None) -// joiningMember.get.status must be(MemberStatus.Joining) -// } -// } -// } -// -// "A second cluster node" must { -// "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { -// -// runOn(second) { -// cluster.join(firstAddress) -// } -// -// awaitCond { -// cluster.latestGossip.members.exists { member ⇒ -// member.address == secondAddress && member.status == MemberStatus.Up -// } -// } -// cluster.latestGossip.members.size must be(2) -// awaitCond(cluster.convergence.isDefined) -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object NodeStartupMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeStartupMultiJvmNode1 extends NodeStartupSpec +class NodeStartupMultiJvmNode2 extends NodeStartupSpec + +abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeStartupMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + val firstAddress = node(first).address + val secondAddress = node(second).address + + "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { + + "be a singleton cluster when started up" taggedAs LongRunningTest in { + runOn(first) { + awaitCond(cluster.isSingletonCluster) + // FIXME #2117 singletonCluster should reach convergence + //awaitCond(cluster.convergence.isDefined) + } + } + + "be in 'Joining' phase when started up" taggedAs LongRunningTest in { + runOn(first) { + val members = cluster.latestGossip.members + members.size must be(1) + + val joiningMember = members find (_.address == firstAddress) + joiningMember must not be (None) + joiningMember.get.status must be(MemberStatus.Joining) + } + } + } + + "A second cluster node" must { + "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond { + cluster.latestGossip.members.exists { member ⇒ + member.address == secondAddress && member.status == MemberStatus.Up + } + } + cluster.latestGossip.members.size must be(2) + awaitCond(cluster.convergence.isDefined) + } + } + +} From 4786078565f7599f087d4b3605fd86e5cb0c79c9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 27 May 2012 19:46:42 +0200 Subject: [PATCH 13/21] Adjust cluster tests to not fail when excluded, see 2139 --- .../scala/akka/cluster/JoinTwoClustersSpec.scala | 11 ++++++++--- .../akka/cluster/MembershipChangeListenerSpec.scala | 12 +++++++++--- .../scala/akka/cluster/NodeMembershipSpec.scala | 12 +++++++++--- .../scala/akka/cluster/NodeStartupSpec.scala | 4 ++-- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 4bbe703405..6a7ebcee86 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -38,13 +38,18 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm testConductor.enter("after") } - val a1Address = node(a1).address - val b1Address = node(b1).address - val c1Address = node(c1).address + lazy val a1Address = node(a1).address + lazy val b1Address = node(b1).address + lazy val c1Address = node(c1).address "Three different clusters (A, B and C)" must { "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(a1, b1, c1) { + cluster + } + testConductor.enter("first-started") runOn(a1, a2) { cluster.join(a1Address) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 64019c102c..dc915912ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -33,13 +33,19 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan testConductor.enter("after") } + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + "A set of connected cluster systems" must { - val firstAddress = node(first).address - val secondAddress = node(second).address - "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + runOn(first, second) { cluster.join(firstAddress) val latch = TestLatch() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index f96265ac5a..232d6ca0e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -31,14 +31,20 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp testConductor.enter("after") } - val firstAddress = node(first).address - val secondAddress = node(second).address - val thirdAddress = node(third).address + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address "A set of connected cluster systems" must { "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + runOn(first, second) { cluster.join(firstAddress) awaitCond(cluster.latestGossip.members.size == 2) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index 65cd7891a9..fcbcce746f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -29,8 +29,8 @@ abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) wi testConductor.enter("after") } - val firstAddress = node(first).address - val secondAddress = node(second).address + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { From fc56d40d9e9fa9dd2346c92d0004b4ad5fe9cdf8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 27 May 2012 20:44:34 +0200 Subject: [PATCH 14/21] Increased some timeouts of BarrierSpec --- .../akka/remote/testconductor/BarrierSpec.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index b8bce31708..f66e120195 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -84,7 +84,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.send(barrier, EnterBarrier("bar")) noMsg(a, b) - within(1 second) { + within(2 second) { b.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -100,7 +100,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) b.send(barrier, EnterBarrier("bar")) noMsg(a, b, c) - within(1 second) { + within(2 second) { c.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -119,7 +119,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! RemoveClient(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(1 second) { + b.within(2 second) { barrier ! RemoveClient(C) b.expectMsg(ToClient(BarrierResult("bar", true))) } @@ -265,7 +265,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with b.expectMsg(ToClient(Done)) a.send(barrier, EnterBarrier("bar")) noMsg(a, b) - within(1 second) { + within(2 second) { b.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -284,7 +284,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with c.expectMsg(ToClient(Done)) b.send(barrier, EnterBarrier("bar")) noMsg(a, b, c) - within(1 second) { + within(2 second) { c.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -306,7 +306,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! Remove(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(1 second) { + b.within(2 second) { barrier ! Remove(C) b.expectMsg(ToClient(BarrierResult("bar", true))) } From 50806243903f6242878a6d7206a92d92016068f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 27 May 2012 21:22:30 +0200 Subject: [PATCH 15/21] Incorporated feedback - switched to MultiNodeClusterSpec etc. --- ...ientDowningNodeThatIsUnreachableSpec.scala | 60 ++++++------------- .../ClientDowningNodeThatIsUpSpec.scala | 60 ++++++------------- .../akka/cluster/MultiNodeClusterSpec.scala | 14 ++++- 3 files changed, 47 insertions(+), 87 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 95510a701d..a80c0a3caa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ import akka.actor.Address object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { @@ -17,14 +16,9 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } - """))) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec @@ -32,25 +26,20 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeT class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +class ClientDowningNodeThatIsUnreachableSpec + extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 - def node = Cluster(system) - - def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address)))) - } - "Client of a 4 node cluster" must { "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { runOn(first) { - node.self - assertMemberRing(nrOfMembers = 4) + cluster.self + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address @@ -60,44 +49,31 @@ class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowning testConductor.removeNode(third) // mark 'third' node as DOWN - node.down(thirdAddress) + cluster.down(thirdAddress) testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) - node.latestGossip.members.exists(_.address == thirdAddress) must be(false) - testConductor.enter("await-completion") - } - - runOn(second) { - node.join(node(first).address) - - assertMemberRing(nrOfMembers = 4) - testConductor.enter("all-up") - - val thirdAddress = node(third).address - testConductor.enter("down-third-node") - - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) testConductor.enter("await-completion") } runOn(third) { - node.join(node(first).address) + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } - runOn(fourth) { - node.join(node(first).address) + runOn(second, fourth) { + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) testConductor.enter("await-completion") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index b92a45f2e4..adfc7aa514 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ import akka.actor.Address object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { @@ -17,14 +16,9 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } - """))) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec @@ -32,69 +26,51 @@ class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSp class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +class ClientDowningNodeThatIsUpSpec + extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { import ClientDowningNodeThatIsUpMultiJvmSpec._ override def initialParticipants = 4 - def node = Cluster(system) - - def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port)))) - } - "Client of a 4 node cluster" must { "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { runOn(first) { - node.self - assertMemberRing(nrOfMembers = 4) + cluster.self + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address // mark 'third' node as DOWN testConductor.removeNode(third) - node.down(thirdAddress) + cluster.down(thirdAddress) testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) - node.latestGossip.members.exists(_.address == thirdAddress) must be(false) - testConductor.enter("await-completion") - } - - runOn(second) { - node.join(node(first).address) - - assertMemberRing(nrOfMembers = 4) - testConductor.enter("all-up") - - val thirdAddress = node(third).address - testConductor.enter("down-third-node") - - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) testConductor.enter("await-completion") } runOn(third) { - node.join(node(first).address) + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } - runOn(fourth) { - node.join(node(first).address) + runOn(second, fourth) { + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) testConductor.enter("await-completion") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..cadbb7b298 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -51,8 +51,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ } /** - * Wait until the expected number of members has status Up - * and convergence has been reached. + * Wait until the expected number of members has status Up and convergence has been reached. */ def awaitUpConvergence(numberOfMembers: Int): Unit = { awaitCond(cluster.latestGossip.members.size == numberOfMembers) @@ -60,4 +59,13 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ awaitCond(cluster.convergence.isDefined, 10 seconds) } -} \ No newline at end of file + /** + * Wait until the expected number of members has status Up and convergence has been reached. + * Also asserts that nodes in the 'canNotBePartOfRing' are *not* part of the cluster ring. + */ + def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(cluster.latestGossip.members.size == nrOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(cluster.latestGossip.members exists (_.address.port == address.port)))) + } +} From 4ec49f6ac1d8a63bae380262d4bc9e175073da9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 27 May 2012 21:55:33 +0200 Subject: [PATCH 16/21] Fixed indeterministic ordering bug in test --- .../cluster/ClientDowningNodeThatIsUnreachableSpec.scala | 6 +++--- .../scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index a80c0a3caa..3a4148e3f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -40,9 +40,9 @@ class ClientDowningNodeThatIsUnreachableSpec runOn(first) { cluster.self awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") // kill 'third' node testConductor.shutdown(third, 0) @@ -66,11 +66,11 @@ class ClientDowningNodeThatIsUnreachableSpec runOn(second, fourth) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") + testConductor.enter("down-third-node") awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index adfc7aa514..0f48951305 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -40,9 +40,9 @@ class ClientDowningNodeThatIsUpSpec runOn(first) { cluster.self awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") // mark 'third' node as DOWN testConductor.removeNode(third) @@ -56,18 +56,17 @@ class ClientDowningNodeThatIsUpSpec runOn(third) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } runOn(second, fourth) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") + testConductor.enter("down-third-node") awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) From 04ba2cf4908649fbaef0c9e99b71192937f66cd8 Mon Sep 17 00:00:00 2001 From: viktorklang Date: Sun, 27 May 2012 23:18:32 +0300 Subject: [PATCH 17/21] Clarified Awaitable doc. --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 54ec2d08b4..e3c7f8348c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -68,6 +68,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out + * @throws [[java.lang.Throwable]] (throws clause is Exception due to Java) if there was a problem * @return The returned value as returned by Awaitable.result */ @throws(classOf[Exception]) From c7a4aa5163028a26d25aea79b19d46426c79e443 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 27 May 2012 23:43:05 +0200 Subject: [PATCH 18/21] Fixing wording in use of BalancingDispatcher as routerDispatcher --- akka-actor/src/main/scala/akka/routing/Routing.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index e60e46c247..2f585a1790 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -33,7 +33,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) throw new ConfigurationException( "Configuration for actor [" + _path.toString + - "] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'") + "] is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") /* * CAUTION: RoutedActorRef is PROBLEMATIC From 5fd1aad0d07863bc8812e73f301641bd027de286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Mon, 28 May 2012 10:33:59 +0200 Subject: [PATCH 19/21] added docs about BalancingDispatcher and routers, see #2080 --- akka-docs/java/dispatchers.rst | 2 ++ akka-docs/java/routing.rst | 41 +++++++++++++++++++++++++++++---- akka-docs/scala/dispatchers.rst | 2 ++ akka-docs/scala/routing.rst | 36 +++++++++++++++++++++++++---- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 9260fc11e5..2723883e9c 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -72,6 +72,8 @@ There are 4 different types of message dispatchers: - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - All the actors share a single Mailbox that they get their messages from. + - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. - Sharability: Actors of the same type only diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 38cf3a1a80..16aa4cee6f 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -380,11 +380,16 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. -(You can however use it for the **Routees**) +stealing it from their siblings. -The “head” router, of course, cannot run on the same balancing dispatcher, -because it does not process the same messages, hence this special actor does +.. note:: + + If you provide a collection of actors to route to, then they will still use the same dispatcher + that was configured for them in their ``Props``, it is not possible to change an actors dispatcher + after it has been created. + +The “head” router cannot always run on the same dispatcher, because it +does not process the same type of messages, hence this special actor does not use the dispatcher configured in :class:`Props`, but takes the ``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to the actor system’s default dispatcher. All standard routers allow setting this @@ -393,3 +398,31 @@ implement the method in a suitable way. .. includecode:: code/docs/jrouting/CustomRouterDocTestBase.java#dispatchers +.. note:: + + It is not allowed to configure the ``routerDispatcher`` to be a + :class:`BalancingDispatcher` since the messages meant for the special + router actor cannot be processed by any other actor. + +At first glance there seems to be an overlap between the +:class:`BalancingDispatcher` and Routers, but they complement each other. +The balancing dispatcher is in charge of running the actors while the routers +are in charge of deciding which message goes where. A router can also have +children that span multiple actor systems, even remote ones, but a dispatcher +lives inside a single actor system. + +When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher` +there are some configuration settings to take into account. + +- There can only be ``nr-of-instances`` messages being processed at the same + time no matter how many threads are configured for the + :class:`BalancingDispatcher`. + +- Having ``throughput`` set to a low number makes no sense since you will only + be handing off to another actor that processes the same :class:`MailBox` + as yourself, which can be costly. Either the message just got into the + mailbox and you can receive it as well as anybody else, or everybody else + is busy and you are the only one available to receive the message. + +- Resizing the number of routees only introduce inertia, since resizing + is performed at specified intervals, but work stealing is instantaneous. diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 100b882b5b..cea9ee6e0a 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -73,6 +73,8 @@ There are 4 different types of message dispatchers: - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - All the actors share a single Mailbox that they get their messages from. + - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. - Sharability: Actors of the same type only diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 25f582e085..5a37b3471a 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -380,9 +380,7 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. -(You can however use it for the **Routees**) - +stealing it from their siblings. .. note:: @@ -390,8 +388,8 @@ stealing it from their siblings. Note that you can **not** use a ``BalancingDisp that was configured for them in their ``Props``, it is not possible to change an actors dispatcher after it has been created. -The “head” router, of course, cannot run on the same balancing dispatcher, -because it does not process the same messages, hence this special actor does +The “head” router cannot always run on the same dispatcher, because it +does not process the same type of messages, hence this special actor does not use the dispatcher configured in :class:`Props`, but takes the ``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to the actor system’s default dispatcher. All standard routers allow setting this @@ -400,3 +398,31 @@ implement the method in a suitable way. .. includecode:: code/docs/routing/RouterDocSpec.scala#dispatchers +.. note:: + + It is not allowed to configure the ``routerDispatcher`` to be a + :class:`BalancingDispatcher` since the messages meant for the special + router actor cannot be processed by any other actor. + +At first glance there seems to be an overlap between the +:class:`BalancingDispatcher` and Routers, but they complement each other. +The balancing dispatcher is in charge of running the actors while the routers +are in charge of deciding which message goes where. A router can also have +children that span multiple actor systems, even remote ones, but a dispatcher +lives inside a single actor system. + +When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher` +there are some configuration settings to take into account. + +- There can only be ``nr-of-instances`` messages being processed at the same + time no matter how many threads are configured for the + :class:`BalancingDispatcher`. + +- Having ``throughput`` set to a low number makes no sense since you will only + be handing off to another actor that processes the same :class:`MailBox` + as yourself, which can be costly. Either the message just got into the + mailbox and you can receive it as well as anybody else, or everybody else + is busy and you are the only one available to receive the message. + +- Resizing the number of routees only introduce inertia, since resizing + is performed at specified intervals, but work stealing is instantaneous. From e3eec7e344c26cc912add339e611f5a8786029e9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 May 2012 11:05:02 +0200 Subject: [PATCH 20/21] LeaderElectionSpec with hard exits. See #2113 and #2138 --- .../src/main/scala/akka/actor/Props.scala | 2 +- .../cluster/HardExitLeaderElectionSpec.scala | 108 ++++++++++++++++++ .../akka/cluster/LeaderElectionSpec.scala | 8 +- 3 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index f6552179c3..dfd6200fd3 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -146,7 +146,7 @@ case class Props( /** * Returns a new Props with the specified creator set. - * + * * The creator must not return the same instance multiple times. * * Scala API. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala new file mode 100644 index 0000000000..0360e4f1b8 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object HardExitLeaderElectionMultiJvmSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down = off + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class HardExitLeaderElectionMultiJvmNode1 extends HardExitLeaderElectionSpec +class HardExitLeaderElectionMultiJvmNode2 extends HardExitLeaderElectionSpec +class HardExitLeaderElectionMultiJvmNode3 extends HardExitLeaderElectionSpec +class HardExitLeaderElectionMultiJvmNode4 extends HardExitLeaderElectionSpec +class HardExitLeaderElectionMultiJvmNode5 extends HardExitLeaderElectionSpec + +abstract class HardExitLeaderElectionSpec extends MultiNodeSpec(HardExitLeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { + import HardExitLeaderElectionMultiJvmSpec._ + + override def initialParticipants = 5 + + lazy val firstAddress = node(first).address + + // sorted in the order used by the cluster + lazy val roles = Seq(first, second, third, fourth).sorted + + "A cluster of four nodes" must { + + "be able to 'elect' a single leader" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + if (mySelf != controller) { + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = roles.size) + cluster.isLeader must be(mySelf == roles.head) + } + testConductor.enter("after") + } + + def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { + val currentRoles = roles.drop(alreadyShutdown) + currentRoles.size must be >= (2) + val leader = currentRoles.head + val aUser = currentRoles.last + + mySelf match { + + case `controller` ⇒ + testConductor.enter("before-shutdown") + testConductor.shutdown(leader, 0) + testConductor.removeNode(leader) + testConductor.enter("after-shutdown", "after-down", "completed") + + case `leader` ⇒ + testConductor.enter("before-shutdown") + // this node will be shutdown by the controller and doesn't participate in more barriers + + case `aUser` ⇒ + val leaderAddress = node(leader).address + testConductor.enter("before-shutdown", "after-shutdown") + // user marks the shutdown leader as DOWN + cluster.down(leaderAddress) + testConductor.enter("after-down", "completed") + + case _ if currentRoles.tail.contains(mySelf) ⇒ + // remaining cluster nodes, not shutdown + testConductor.enter("before-shutdown", "after-shutdown", "after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + + testConductor.enter("completed") + + } + + } + + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) + } + + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 007ab941dc..886556de54 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -13,7 +13,7 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") - val forth = role("forth") + val fourth = role("fourth") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" @@ -36,19 +36,19 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp lazy val firstAddress = node(first).address // sorted in the order used by the cluster - lazy val roles = Seq(first, second, third, forth).sorted + lazy val roles = Seq(first, second, third, fourth).sorted "A cluster of four nodes" must { "be able to 'elect' a single leader" taggedAs LongRunningTest in { - // make sure that the first cluster is started before other join + // make sure that the node-to-join is started before other join runOn(first) { cluster } testConductor.enter("first-started") cluster.join(firstAddress) - awaitUpConvergence(numberOfMembers = 4) + awaitUpConvergence(numberOfMembers = roles.size) cluster.isLeader must be(mySelf == roles.head) testConductor.enter("after") } From 59dd754819764327f4cb1e5c4aad7d04cc3425f1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 May 2012 13:55:22 +0200 Subject: [PATCH 21/21] Use only the hard exit LeaderElectionSpec, see #2113 --- .../cluster/HardExitLeaderElectionSpec.scala | 108 ------------------ .../akka/cluster/LeaderElectionSpec.scala | 58 +++++----- 2 files changed, 32 insertions(+), 134 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala deleted file mode 100644 index 0360e4f1b8..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/HardExitLeaderElectionSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object HardExitLeaderElectionMultiJvmSpec extends MultiNodeConfig { - val controller = role("controller") - val first = role("first") - val second = role("second") - val third = role("third") - val fourth = role("fourth") - - commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" - akka.cluster.auto-down = off - """)). - withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class HardExitLeaderElectionMultiJvmNode1 extends HardExitLeaderElectionSpec -class HardExitLeaderElectionMultiJvmNode2 extends HardExitLeaderElectionSpec -class HardExitLeaderElectionMultiJvmNode3 extends HardExitLeaderElectionSpec -class HardExitLeaderElectionMultiJvmNode4 extends HardExitLeaderElectionSpec -class HardExitLeaderElectionMultiJvmNode5 extends HardExitLeaderElectionSpec - -abstract class HardExitLeaderElectionSpec extends MultiNodeSpec(HardExitLeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { - import HardExitLeaderElectionMultiJvmSpec._ - - override def initialParticipants = 5 - - lazy val firstAddress = node(first).address - - // sorted in the order used by the cluster - lazy val roles = Seq(first, second, third, fourth).sorted - - "A cluster of four nodes" must { - - "be able to 'elect' a single leader" taggedAs LongRunningTest in { - // make sure that the node-to-join is started before other join - runOn(first) { - cluster - } - testConductor.enter("first-started") - - if (mySelf != controller) { - cluster.join(firstAddress) - awaitUpConvergence(numberOfMembers = roles.size) - cluster.isLeader must be(mySelf == roles.head) - } - testConductor.enter("after") - } - - def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { - val currentRoles = roles.drop(alreadyShutdown) - currentRoles.size must be >= (2) - val leader = currentRoles.head - val aUser = currentRoles.last - - mySelf match { - - case `controller` ⇒ - testConductor.enter("before-shutdown") - testConductor.shutdown(leader, 0) - testConductor.removeNode(leader) - testConductor.enter("after-shutdown", "after-down", "completed") - - case `leader` ⇒ - testConductor.enter("before-shutdown") - // this node will be shutdown by the controller and doesn't participate in more barriers - - case `aUser` ⇒ - val leaderAddress = node(leader).address - testConductor.enter("before-shutdown", "after-shutdown") - // user marks the shutdown leader as DOWN - cluster.down(leaderAddress) - testConductor.enter("after-down", "completed") - - case _ if currentRoles.tail.contains(mySelf) ⇒ - // remaining cluster nodes, not shutdown - testConductor.enter("before-shutdown", "after-shutdown", "after-down") - - awaitUpConvergence(currentRoles.size - 1) - val nextExpectedLeader = currentRoles.tail.head - cluster.isLeader must be(mySelf == nextExpectedLeader) - - testConductor.enter("completed") - - } - - } - - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { - shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) - } - - "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { - shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) - } - } - -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 886556de54..54f744a6c8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { + val controller = role("controller") val first = role("first") val second = role("second") val third = role("third") @@ -27,11 +28,12 @@ class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { import LeaderElectionMultiJvmSpec._ - override def initialParticipants = 4 + override def initialParticipants = 5 lazy val firstAddress = node(first).address @@ -47,47 +49,51 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp } testConductor.enter("first-started") - cluster.join(firstAddress) - awaitUpConvergence(numberOfMembers = roles.size) - cluster.isLeader must be(mySelf == roles.head) + if (mySelf != controller) { + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = roles.size) + cluster.isLeader must be(mySelf == roles.head) + } testConductor.enter("after") } def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { val currentRoles = roles.drop(alreadyShutdown) currentRoles.size must be >= (2) + val leader = currentRoles.head + val aUser = currentRoles.last - runOn(currentRoles.head) { - cluster.shutdown() - testConductor.enter("after-shutdown") - testConductor.enter("after-down") - } + mySelf match { - // runOn previously shutdown cluster nodes - if ((roles diff currentRoles).contains(mySelf)) { - testConductor.enter("after-shutdown") - testConductor.enter("after-down") - } + case `controller` ⇒ + testConductor.enter("before-shutdown") + testConductor.shutdown(leader, 0) + testConductor.removeNode(leader) + testConductor.enter("after-shutdown", "after-down", "completed") - // runOn remaining cluster nodes - if (currentRoles.tail.contains(mySelf)) { + case `leader` ⇒ + testConductor.enter("before-shutdown") + // this node will be shutdown by the controller and doesn't participate in more barriers - testConductor.enter("after-shutdown") - - runOn(currentRoles.last) { + case `aUser` ⇒ + val leaderAddress = node(leader).address + testConductor.enter("before-shutdown", "after-shutdown") // user marks the shutdown leader as DOWN - val leaderAddress = node(currentRoles.head).address cluster.down(leaderAddress) - } + testConductor.enter("after-down", "completed") - testConductor.enter("after-down") + case _ if currentRoles.tail.contains(mySelf) ⇒ + // remaining cluster nodes, not shutdown + testConductor.enter("before-shutdown", "after-shutdown", "after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + + testConductor.enter("completed") - awaitUpConvergence(currentRoles.size - 1) - val nextExpectedLeader = currentRoles.tail.head - cluster.isLeader must be(mySelf == nextExpectedLeader) } - testConductor.enter("after") } "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {