From b41c7bc1cb0d78db5a840639dced6050e05bed18 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 14 Jul 2011 22:42:01 +0200 Subject: [PATCH 01/13] Ticket 981: Generate html report with results and charts --- .../common/BenchResultRepository.scala | 7 +- .../trading/common/PerformanceTest.scala | 93 +-------- .../performance/trading/common/Report.scala | 179 ++++++++++++++++++ 3 files changed, 184 insertions(+), 95 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala index 6e1739b0bd..2f9ea89dd8 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala @@ -86,6 +86,7 @@ class FileBenchResultRepository extends BenchResultRepository { } private def save(stats: Stats) { + new File(dir).mkdirs if (!dirExists) return val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" @@ -98,8 +99,7 @@ class FileBenchResultRepository extends BenchResultRepository { case e: Exception ⇒ EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]". format(stats, f.getAbsolutePath, e.getMessage)) - } - finally { + } finally { if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } } } @@ -117,8 +117,7 @@ class FileBenchResultRepository extends BenchResultRepository { EventHandler.error(this, "Failed to load from [%s], due to [%s]". format(f.getAbsolutePath, e.getMessage)) None - } - finally { + } finally { if (in ne null) try { in.close() } catch { case ignore: Exception ⇒ } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index ee06c33b5a..69a7b4bd08 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -52,8 +52,7 @@ trait PerformanceTest extends JUnitSuite { var stat: DescriptiveStatistics = _ val resultRepository = BenchResultRepository() - - val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + lazy val report = new Report(resultRepository, compareResultWith) type TS <: TradingSystem @@ -128,95 +127,7 @@ trait PerformanceTest extends JUnitSuite { resultRepository.add(stats) - EventHandler.info(this, formatResultsTable(resultRepository.get(name))) - - percentilesChart(stats) - latencyAndThroughputChart(stats) - comparePercentilesChart(stats) - compareWithHistoricalPercentiliesChart(stats) - - } - - def percentilesChart(stats: Stats) { - val chartTitle = stats.name + " Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - - def comparePercentilesChart(stats: Stats) { - for { - compareName ← compareResultWith - compareStats ← resultRepository.get(compareName, stats.load) - } { - val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - } - - def compareWithHistoricalPercentiliesChart(stats: Stats) { - val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) - if (withHistorical.size > 1) { - val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, - stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - } - - def latencyAndThroughputChart(stats: Stats) { - val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" - val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - - def formatResultsTable(statsSeq: Seq[Stats]): String = { - - val name = statsSeq.head.name - - val spaces = " " - val headerScenarioCol = ("Scenario" + spaces).take(name.length) - - val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) - .mkString("\t") - val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) - .mkString("\t") - val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString - val formattedStats = "\n" + - line.replace('-', '=') + "\n" + - headerLine + "\n" + - headerLine2 + "\n" + - line + "\n" + - statsSeq.map(formatStats(_)).mkString("\n") + "\n" + - line + "\n" - - formattedStats - - } - - def formatStats(stats: Stats): String = { - val durationS = stats.durationNanos.toDouble / 1000000000.0 - val duration = durationS.formatted("%.0f") - - val tpsStr = stats.tps.formatted("%.0f") - val meanStr = stats.mean.formatted("%.0f") - - val summaryLine = - stats.name :: - stats.load.toString :: - tpsStr :: - meanStr :: - stats.percentiles(5).toString :: - stats.percentiles(25).toString :: - stats.percentiles(50).toString :: - stats.percentiles(75).toString :: - stats.percentiles(95).toString :: - duration :: - stats.n.toString :: - Nil - - summaryLine.mkString("\t") - + report.html(resultRepository.get(name)) } def delay(delayMs: Int) { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala new file mode 100644 index 0000000000..9160fa631e --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala @@ -0,0 +1,179 @@ +package akka.performance.trading.common +import java.io.File +import java.text.SimpleDateFormat +import java.io.PrintWriter +import java.io.FileWriter +import akka.event.EventHandler +import java.util.Date + +class Report( + resultRepository: BenchResultRepository, + compareResultWith: Option[String] = None) { + + private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") + + private def dirExists: Boolean = new File(dir).exists + private def log = System.getProperty("benchmark.logResult", "false").toBoolean + + val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") + + def html(statistics: Seq[Stats]): Unit = if (dirExists) { + + val current = statistics.last + val sb = new StringBuilder + + val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp)) + sb.append(header(title)) + sb.append("

%s

\n".format(title)) + + sb.append("
\n")
+    sb.append(formatResultsTable(statistics))
+    sb.append("\n
\n") + + sb.append(img(percentilesChart(current))) + sb.append(img(latencyAndThroughputChart(current))) + + for (stats ← statistics) { + compareWithHistoricalPercentiliesChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + for (stats ← statistics) { + comparePercentilesChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + if (dirExists) { + val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) + val name = current.name + "--" + timestamp + ".html" + write(sb.toString, name) + } + + } + + private def img(url: String): String = { + """""".format( + url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" + } + + def percentilesChart(stats: Stats): String = { + val chartTitle = stats.name + " Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + + def comparePercentilesChart(stats: Stats): Seq[String] = { + for { + compareName ← compareResultWith.toSeq + compareStats ← resultRepository.get(compareName, stats.load) + } yield { + val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + } + + def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = { + val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) + if (withHistorical.size > 1) { + val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, + stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + Some(chartUrl) + } else { + None + } + } + + def latencyAndThroughputChart(stats: Stats): String = { + val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" + val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + + def formatResultsTable(statsSeq: Seq[Stats]): String = { + + val name = statsSeq.head.name + + val spaces = " " + val headerScenarioCol = ("Scenario" + spaces).take(name.length) + + val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) + .mkString("\t") + val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) + .mkString("\t") + val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString + val formattedStats = "\n" + + line.replace('-', '=') + "\n" + + headerLine + "\n" + + headerLine2 + "\n" + + line + "\n" + + statsSeq.map(formatStats(_)).mkString("\n") + "\n" + + line + "\n" + + if (log) EventHandler.info(this, formattedStats) + + formattedStats + + } + + def formatStats(stats: Stats): String = { + val durationS = stats.durationNanos.toDouble / 1000000000.0 + val duration = durationS.formatted("%.0f") + + val tpsStr = stats.tps.formatted("%.0f") + val meanStr = stats.mean.formatted("%.0f") + + val summaryLine = + stats.name :: + stats.load.toString :: + tpsStr :: + meanStr :: + stats.percentiles(5).toString :: + stats.percentiles(25).toString :: + stats.percentiles(50).toString :: + stats.percentiles(75).toString :: + stats.percentiles(95).toString :: + duration :: + stats.n.toString :: + Nil + + summaryLine.mkString("\t") + + } + + def write(content: String, fileName: String) { + val f = new File(dir, fileName) + var writer: PrintWriter = null + try { + writer = new PrintWriter(new FileWriter(f)) + writer.print(content) + writer.flush() + } catch { + case e: Exception ⇒ + EventHandler.error(this, "Failed to save report to [%s], due to [%s]". + format(f.getAbsolutePath, e.getMessage)) + } finally { + if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + } + } + + def header(title: String) = + """| + | + | + | + |%s + | + | + |""".stripMargin.format(title) + + def footer = + """|" + |""".stripMargin + +} \ No newline at end of file From f93624e7e0ebb38c8ed351fbbd045ef1c4fbd207 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 08:12:15 +0300 Subject: [PATCH 02/13] ticket 972 --- .../akka/actor/supervisor/Ticket669Spec.scala | 3 + .../scala/akka/dispatch/ActorModelSpec.scala | 136 ++++++++++++--- .../src/main/scala/akka/actor/ActorRef.scala | 165 +++++++++++------- .../main/scala/akka/dispatch/Dispatcher.scala | 13 +- .../NewLeaderChangeListenerMultiJvmNode1.conf | 2 +- .../NewLeaderChangeListenerMultiJvmNode2.conf | 2 +- ...eConnectedChangeListenerMultiJvmNode1.conf | 2 +- ...eConnectedChangeListenerMultiJvmNode2.conf | 2 +- ...sconnectedChangeListenerMultiJvmNode1.conf | 2 +- ...sconnectedChangeListenerMultiJvmNode2.conf | 2 +- .../ConfigurationStorageMultiJvmNode1.conf | 2 +- .../ConfigurationStorageMultiJvmNode2.conf | 2 +- .../election/LeaderElectionMultiJvmNode1.conf | 2 +- .../election/LeaderElectionMultiJvmNode2.conf | 2 +- .../registry/RegistryStoreMultiJvmNode1.conf | 2 +- .../registry/RegistryStoreMultiJvmNode2.conf | 2 +- .../deployment/DeploymentMultiJvmNode1.conf | 2 +- .../deployment/DeploymentMultiJvmNode2.conf | 2 +- .../MigrationAutomaticMultiJvmNode1.conf | 2 +- .../MigrationAutomaticMultiJvmNode2.conf | 2 +- .../MigrationAutomaticMultiJvmNode3.conf | 2 +- .../MigrationExplicitMultiJvmNode1.conf | 2 +- .../MigrationExplicitMultiJvmNode2.conf | 2 +- ...LogWriteBehindNoSnapshotMultiJvmNode1.conf | 2 +- ...LogWriteBehindNoSnapshotMultiJvmNode2.conf | 2 +- ...onLogWriteBehindSnapshotMultiJvmNode1.conf | 2 +- ...onLogWriteBehindSnapshotMultiJvmNode2.conf | 2 +- ...LogWriteBehindNoSnapshotMultiJvmNode1.conf | 2 +- ...LogWriteBehindNoSnapshotMultiJvmNode2.conf | 2 +- ...ogWriteThroughNoSnapshotMultiJvmNode1.conf | 2 +- ...ogWriteThroughNoSnapshotMultiJvmNode2.conf | 2 +- ...nLogWriteThroughSnapshotMultiJvmNode1.conf | 2 +- ...nLogWriteThroughSnapshotMultiJvmNode2.conf | 2 +- .../BadAddressDirectRoutingMultiJvmNode1.conf | 2 +- ...ultiReplicaDirectRoutingMultiJvmNode1.conf | 2 +- ...ultiReplicaDirectRoutingMultiJvmNode2.conf | 2 +- ...ngleReplicaDirectRoutingMultiJvmNode1.conf | 2 +- ...ngleReplicaDirectRoutingMultiJvmNode2.conf | 2 +- .../homenode/HomeNodeMultiJvmNode1.conf | 2 +- .../homenode/HomeNodeMultiJvmNode2.conf | 2 +- .../RoundRobin1ReplicaMultiJvmNode1.conf | 2 +- .../RoundRobin2ReplicasMultiJvmNode1.conf | 2 +- .../RoundRobin2ReplicasMultiJvmNode2.conf | 2 +- .../RoundRobin3ReplicasMultiJvmNode1.conf | 2 +- .../RoundRobin3ReplicasMultiJvmNode2.conf | 2 +- .../RoundRobin3ReplicasMultiJvmNode3.conf | 2 +- .../RoundRobinFailoverMultiJvmNode1.conf | 2 +- .../RoundRobinFailoverMultiJvmNode2.conf | 2 +- .../RoundRobinFailoverMultiJvmNode3.conf | 2 +- .../RoundRobinFailoverMultiJvmNode4.conf | 2 +- 50 files changed, 269 insertions(+), 140 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 05626854eb..bddad26176 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -14,10 +14,13 @@ import org.scalatest.matchers.MustMatchers class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { import Ticket669Spec._ + override def beforeAll = Thread.interrupted() //remove interrupted status. + override def afterAll = Actor.registry.local.shutdownAll "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { + val latch = new CountDownLatch(1) val sender = Actor.actorOf(new Sender(latch)).start() diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 156726ca0b..8ef6e1c930 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -4,33 +4,49 @@ package akka.actor.dispatch import org.scalatest.junit.JUnitSuite -import org.junit.Test import org.scalatest.Assertions._ import akka.testkit.Testing import akka.dispatch._ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor -import akka.util.{ Duration, Switch } -import org.multiverse.api.latches.StandardLatch -import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor } +import akka.util.Switch +import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor} +import java.rmi.RemoteException +import org.junit.{After, Test} object ActorModelSpec { sealed trait ActorModelMessage + case class Reply_?(expect: Any) extends ActorModelMessage + case class Reply(expect: Any) extends ActorModelMessage + case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage + case class CountDown(latch: CountDownLatch) extends ActorModelMessage + case class Increment(counter: AtomicLong) extends ActorModelMessage + case class Await(latch: CountDownLatch) extends ActorModelMessage + case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage + case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage + case class Wait(time: Long) extends ActorModelMessage + case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage + + case object Interrupt extends ActorModelMessage + case object Restart extends ActorModelMessage + case class ThrowException(e: Throwable) extends ActorModelMessage + + val Ping = "Ping" val Pong = "Pong" @@ -52,17 +68,19 @@ object ActorModelSpec { } def receive = { - case Await(latch) ⇒ ack; latch.await(); busy.switchOff() - case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() - case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() - case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() - case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() - case Reply_?(msg) ⇒ ack; self.reply_?(msg); busy.switchOff() - case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() - case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() - case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() + case Await(latch) ⇒ ack; latch.await(); busy.switchOff() + case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() + case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() + case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() + case Reply_?(msg) ⇒ ack; self.reply_?(msg); busy.switchOff() + case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() + case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() + case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() - case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") + case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") + case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!") + case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e } } @@ -183,7 +201,9 @@ object ActorModelSpec { if (condition) return true Thread.sleep(intervalMs) - } catch { case e: InterruptedException ⇒ } + } catch { + case e: InterruptedException ⇒ + } } false } @@ -192,10 +212,17 @@ object ActorModelSpec { } abstract class ActorModelSpec extends JUnitSuite { + import ActorModelSpec._ protected def newInterceptedDispatcher: MessageDispatcherInterceptor + @After + def after { + //remove the interrupted status since we are messing with interrupted exceptions. + Thread.interrupted() + } + @Test def dispatcherShouldDynamicallyHandleItsOwnLifeCycle { implicit val dispatcher = newInterceptedDispatcher @@ -215,13 +242,17 @@ abstract class ActorModelSpec extends JUnitSuite { msgsProcessed = 0, restarts = 0) - val futures = for (i ← 1 to 10) yield Future { i } + val futures = for (i ← 1 to 10) yield Future { + i + } await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 2, stops = 2) val a2 = newTestActor a2.start - val futures2 = for (i ← 1 to 10) yield Future { i } + val futures2 = for (i ← 1 to 10) yield Future { + i + } await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 3, stops = 2) @@ -259,7 +290,13 @@ abstract class ActorModelSpec extends JUnitSuite { val counter = new CountDownLatch(200) a.start() - for (i ← 1 to 10) { spawn { for (i ← 1 to 20) { a ! WaitAck(1, counter) } } } + for (i ← 1 to 10) { + spawn { + for (i ← 1 to 20) { + a ! WaitAck(1, counter) + } + } + } assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) @@ -267,7 +304,15 @@ abstract class ActorModelSpec extends JUnitSuite { } def spawn(f: ⇒ Unit) { - val thread = new Thread { override def run { try { f } catch { case e ⇒ e.printStackTrace } } } + val thread = new Thread { + override def run { + try { + f + } catch { + case e ⇒ e.printStackTrace + } + } + } thread.start() } @@ -329,8 +374,9 @@ abstract class ActorModelSpec extends JUnitSuite { def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) - (1 to num) foreach { _ ⇒ - newTestActor.start() ! cachedMessage + (1 to num) foreach { + _ ⇒ + newTestActor.start() ! cachedMessage } assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } @@ -356,6 +402,52 @@ abstract class ActorModelSpec extends JUnitSuite { assert(each.exception.get.isInstanceOf[ActorKilledException]) a.stop() } + + @Test + def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? Interrupt + val f4 = a ? Reply("foo2") + val f5 = a ? Interrupt + val f6 = a ? Reply("bar2") + + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[InterruptedException] { + f3.get + }).getMessage === "Ping!") + assert(f4.get === "foo2") + assert((intercept[InterruptedException] { + f5.get + }).getMessage === "Ping!") + assert(f6.get === "bar2") + } + + @Test + def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException")) + val f4 = a ? Reply("foo2") + val f5 = a ? new ThrowException(new RemoteException("RemoteException")) + val f6 = a ? Reply("bar2") + + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[IndexOutOfBoundsException] { + f3.get + }).getMessage === "IndexOutOfBoundsException") + assert(f4.get === "foo2") + assert((intercept[RemoteException] { + f5.get + }).getMessage === "RemoteException") + assert(f6.get === "bar2") + } } class DispatcherModelTest extends ActorModelSpec { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 361f989a9c..3ceadf08d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -9,15 +9,15 @@ import akka.dispatch._ import akka.config._ import akka.config.Supervision._ import akka.util._ -import akka.serialization.{ Format, Serializer, Serialization } +import akka.serialization.{Serializer, Serialization} import ReflectiveAccess._ import ClusterModule._ -import DeploymentConfig.{ TransactionLog ⇒ TransactionLogConfig, _ } +import DeploymentConfig.{TransactionLog ⇒ TransactionLogConfig, _} import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } -import java.util.{ Map ⇒ JMap } +import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit} +import java.util.{Map ⇒ JMap} import scala.reflect.BeanProperty import scala.collection.immutable.Stack @@ -30,10 +30,15 @@ private[akka] object ActorRefInternals { * LifeCycles for ActorRefs. */ private[akka] sealed trait StatusType + object UNSTARTED extends StatusType + object RUNNING extends StatusType + object BEING_RESTARTED extends StatusType + object SHUTDOWN extends StatusType + } /** @@ -68,7 +73,8 @@ private[akka] object ActorRefInternals { * * @author Jonas Bonér */ -trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef ⇒ +trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { + scalaRef: ScalaActorRef ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = newUuid @@ -105,6 +111,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com def setReceiveTimeout(timeout: Long) { this.receiveTimeout = Some(timeout) } + def getReceiveTimeout: Option[Long] = receiveTimeout /** @@ -121,6 +128,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * */ def setFaultHandler(handler: FaultHandlingStrategy) + def getFaultHandler: FaultHandlingStrategy /** @@ -139,6 +147,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * */ def setLifeCycle(lifeCycle: LifeCycle) + def getLifeCycle: LifeCycle /** @@ -153,7 +162,10 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - def setDispatcher(dispatcher: MessageDispatcher) { this.dispatcher = dispatcher } + def setDispatcher(dispatcher: MessageDispatcher) { + this.dispatcher = dispatcher + } + def getDispatcher: MessageDispatcher = dispatcher /** @@ -177,6 +189,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * Returns the uuid for the actor. */ def getUuid = _uuid + def uuid = _uuid /** @@ -366,9 +379,13 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com */ def sendException(ex: Throwable) {} + def isUsableOnlyOnce = false + def isUsable = true + def isReplyable = true + def canSendException = false /** @@ -382,9 +399,9 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Long, - channel: UntypedChannel): Future[Any] + message: Any, + timeout: Long, + channel: UntypedChannel): Future[Any] protected[akka] def actorInstance: AtomicReference[Actor] @@ -393,6 +410,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com protected[akka] def supervisor_=(sup: Option[ActorRef]) protected[akka] def mailbox: AnyRef + protected[akka] def mailbox_=(value: AnyRef): AnyRef protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) @@ -416,7 +434,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * * @author Jonas Bonér */ -class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, val address: String) +class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, val address: String) extends ActorRef with ScalaActorRef { protected[akka] val guard = new ReentrantGuard @@ -442,7 +460,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, @volatile private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher - protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } + protected[akka] val actorInstance = guard.withGuard { + new AtomicReference[Actor](newActor) + } def serializerErrorDueTo(reason: String) = throw new akka.config.ConfigurationException( @@ -480,16 +500,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, // used only for deserialization private[akka] def this( - __uuid: Uuid, - __address: String, - __timeout: Long, - __receiveTimeout: Option[Long], - __lifeCycle: LifeCycle, - __supervisor: Option[ActorRef], - __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () ⇒ Actor) = { + __uuid: Uuid, + __address: String, + __timeout: Long, + __receiveTimeout: Option[Long], + __lifeCycle: LifeCycle, + __supervisor: Option[ActorRef], + __hotswap: Stack[PartialFunction[Any, Unit]], + __factory: () ⇒ Actor) = { - this(__factory, __address) + this (__factory, __address) _uuid = __uuid timeout = __timeout @@ -627,7 +647,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, */ def mailbox: AnyRef = _mailbox - protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } + protected[akka] def mailbox_=(value: AnyRef): AnyRef = { + _mailbox = value; value + } /** * Returns the supervisor, if there is one. @@ -651,12 +673,12 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, dispatcher dispatchMessage new MessageInvocation(this, message, channel) protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Long, - channel: UntypedChannel): Future[Any] = { + message: Any, + timeout: Long, + channel: UntypedChannel): Future[Any] = { val future = channel match { case f: ActorPromise ⇒ f - case _ ⇒ new ActorPromise(timeout) + case _ ⇒ new ActorPromise(timeout) } dispatcher dispatchMessage new MessageInvocation(this, message, future) future @@ -677,7 +699,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, currentMessage = null // reset current message after successful invocation } catch { case e: InterruptedException ⇒ - currentMessage = null // received message while actor is shutting down, ignore + handleExceptionInDispatch(e, messageHandle.message) + Thread.currentThread().interrupt() //Restore interrupt case e ⇒ handleExceptionInDispatch(e, messageHandle.message) } @@ -716,13 +739,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { + //Immortal false - } else if (withinTimeRange.isEmpty) { // restrict number of restarts + } else if (withinTimeRange.isEmpty) { + // restrict number of restarts val retries = maxNrOfRetriesCount + 1 maxNrOfRetriesCount = retries //Increment number of retries retries > maxNrOfRetries.get - } else { // cannot restart more than N within M timerange + } else { + // cannot restart more than N within M timerange val retries = maxNrOfRetriesCount + 1 val windowStart = restartTimeWindowStartNanos @@ -816,7 +842,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary ⇒ shutDownTemporaryActor(actorRef) - case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -826,7 +852,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, // ========= PRIVATE FUNCTIONS ========= private[this] def newActor: Actor = { - import Actor.{ actorRefInCreation ⇒ refStack } + import Actor.{actorRefInCreation ⇒ refStack} val stackBefore = refStack.get refStack.set(stackBefore.push(this)) try { @@ -837,7 +863,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self } } match { - case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") case valid ⇒ valid } @@ -861,26 +887,28 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, else { lifeCycle match { case Temporary ⇒ shutDownTemporaryActor(this) - case _ ⇒ dispatcher.resume(this) //Resume processing for this actor + case _ ⇒ dispatcher.resume(this) //Resume processing for this actor } } } private def notifySupervisorWithMessage(notification: LifeCycleMessage) { // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - _supervisor.foreach { sup ⇒ - if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - //Scoped stop all linked actors, to avoid leaking the 'i' val - { - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove + _supervisor.foreach { + sup ⇒ + if (sup.isShutdown) { + // if supervisor is shut down, game over for all linked actors + //Scoped stop all linked actors, to avoid leaking the 'i' val + { + val i = _linkedActors.values.iterator + while (i.hasNext) { + i.next.stop() + i.remove + } } - } - //Stop the actor itself - stop - } else sup ! notification // else notify supervisor + //Stop the actor itself + stop + } else sup ! notification // else notify supervisor } } @@ -921,7 +949,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, protected[akka] def checkReceiveTimeout() { cancelReceiveTimeout() - if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed + if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { + //Only reschedule if desired and there are currently no more messages to be processed _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) } } @@ -949,11 +978,11 @@ object RemoteActorSystemMessage { * * @author Jonas Bonér */ -private[akka] case class RemoteActorRef private[akka] ( - val remoteAddress: InetSocketAddress, - val address: String, - _timeout: Long, - loader: Option[ClassLoader]) +private[akka] case class RemoteActorRef private[akka]( + val remoteAddress: InetSocketAddress, + val address: String, + _timeout: Long, + loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { ClusterModule.ensureEnabled() @@ -965,22 +994,22 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { val chSender = channel match { case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None + case _ ⇒ None } Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader) } def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Long, - channel: UntypedChannel): Future[Any] = { + message: Any, + timeout: Long, + channel: UntypedChannel): Future[Any] = { val chSender = channel match { case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None + case _ ⇒ None } val chFuture = channel match { case f: Promise[Any] ⇒ Some(f) - case _ ⇒ None + case _ ⇒ None } val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader) if (future.isDefined) ActorPromise(future.get) @@ -1011,34 +1040,49 @@ private[akka] case class RemoteActorRef private[akka] ( def dispatcher_=(md: MessageDispatcher) { unsupported } + def dispatcher: MessageDispatcher = unsupported + def link(actorRef: ActorRef) { unsupported } + def unlink(actorRef: ActorRef) { unsupported } + def startLink(actorRef: ActorRef): ActorRef = unsupported + def supervisor: Option[ActorRef] = unsupported + def linkedActors: JMap[Uuid, ActorRef] = unsupported + protected[akka] def mailbox: AnyRef = unsupported + protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { unsupported } + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { unsupported } + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { unsupported } + protected[akka] def invoke(messageHandle: MessageInvocation) { unsupported } + protected[akka] def supervisor_=(sup: Option[ActorRef]) { unsupported } + protected[akka] def actorInstance: AtomicReference[Actor] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } @@ -1070,7 +1114,8 @@ trait ActorRefShared { * There are implicit conversions in ../actor/Implicits.scala * from ActorRef -> ScalaActorRef and back */ -trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef ⇒ +trait ScalaActorRef extends ActorRefShared with ForwardableChannel { + ref: ActorRef ⇒ /** * Address for actor, must be a unique one. @@ -1114,7 +1159,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR if (msg eq null) None else msg.channel match { case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None + case _ ⇒ None } } @@ -1128,7 +1173,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR if (msg eq null) None else msg.channel match { case f: ActorPromise ⇒ Some(f) - case _ ⇒ None + case _ ⇒ None } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 4f9a82986a..e6f2d3128f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -160,8 +160,6 @@ class Dispatcher( private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) - private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = () - protected override def cleanUpMailboxFor(actorRef: ActorRef) { val m = getMailbox(actorRef) if (!m.isEmpty) { @@ -195,19 +193,10 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ def dispatcher: Dispatcher final def run = { - try { - processMailbox() - } catch { - case ie: InterruptedException ⇒ - } - finally { - dispatcherLock.unlock() - } + try { processMailbox()} finally {dispatcherLock.unlock()} if (!self.isEmpty) dispatcher.reRegisterForExecution(this) - - dispatcher.doneProcessingMailbox(this) } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode1.conf index 6f117d6ce2..e23553c931 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode1.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode2.conf index 6f117d6ce2..e23553c931 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmNode2.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf index 7d8a1476ad..a17a4d98ab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf index 7d8a1476ad..a17a4d98ab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf index 7d8a1476ad..a17a4d98ab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf index 762f32d92a..2f642a20f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf @@ -1,2 +1,2 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf index d8bee0cb07..8a5bd70eec 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf index d8bee0cb07..8a5bd70eec 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf index 8aeaf3135f..84969a04e5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf index 8aeaf3135f..84969a04e5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf index 470c4c7a33..211cdbd6ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf index 5fb92ab01f..567b03b9cb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf index 470c4c7a33..211cdbd6ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf index 5fb92ab01f..567b03b9cb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf index 1d332847b6..58c66d3e42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf index 1d332847b6..58c66d3e42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replicas = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.conf index 4d95c03296..1345a2287c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf index 612e01723c..40fcfa5d51 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf @@ -1,3 +1,3 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf index 70c94d3252..b60f6a3b5c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf @@ -1,3 +1,3 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf index fa86d85d6d..2a3d9ba765 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf index 612e01723c..40fcfa5d51 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf @@ -1,3 +1,3 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "direct" \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf index 221ccd25ae..dcbd276918 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.conf index 401a5bd8e4..09f4cfc93a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"] akka.actor.deployment.service-hello.clustered.replicas = 2 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.conf index 401a5bd8e4..09f4cfc93a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"] akka.actor.deployment.service-hello.clustered.replicas = 2 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode1.conf index 851d7a98e8..75249c7713 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode1.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 3 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.conf index 851d7a98e8..75249c7713 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 3 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.conf index 851d7a98e8..75249c7713 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.conf @@ -1,4 +1,4 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.replicas = 3 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode1.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode2.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode3.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode3.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode3.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf index 0a5f18c2b9..d7e17c84d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf @@ -1,5 +1,5 @@ akka.enabled-modules = ["cluster"] -akka.event-handler-level = "DEBUG" +akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"] akka.actor.deployment.service-hello.clustered.replicas = 1 \ No newline at end of file From 8bbb9b056679ef180adc726b4610bec90b7fa77b Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 08:41:42 +0300 Subject: [PATCH 03/13] issue 956 --- akka-docs/scala/actors.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index eb25cb2d1a..609f2e3f60 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -561,6 +561,15 @@ The actor has a well-defined non-circular life-cycle. => STARTED (when 'start' is invoked) - can receive messages => SHUT DOWN (when 'exit' or 'stop' is invoked) - can't do anything +What happens with a message when exception is thrown while processing +--------------------------------------------------------------------- + +If an exception is thrown while a message is being processed (so taken of his mailbox and handed over the the receive), +then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to +retry processing of a message, you need to deal with it yourself by catching the exception and retry your flow. Make +sure that you put a bound on the number of retries since you don't want a system to livelock (so consuming a lot of +cpu cycles without making progress). + Extending Actors using PartialFunction chaining ----------------------------------------------- From 964c53203af9f7fea7581ed2801a03260ec26ce7 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 08:54:44 +0300 Subject: [PATCH 04/13] issue 956 --- akka-docs/scala/actors.rst | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 609f2e3f60..6ac13da9d9 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -561,8 +561,13 @@ The actor has a well-defined non-circular life-cycle. => STARTED (when 'start' is invoked) - can receive messages => SHUT DOWN (when 'exit' or 'stop' is invoked) - can't do anything -What happens with a message when exception is thrown while processing ---------------------------------------------------------------------- +Actors and exceptions +--------------------- +It can happen that while a message is being processed by an actor, that some kind of exception is thrown, e.g. a +database exception. + +What happens to the Message +^^^^^^^^^^^^^^^^^^^^^^^^^^^ If an exception is thrown while a message is being processed (so taken of his mailbox and handed over the the receive), then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to @@ -570,6 +575,17 @@ retry processing of a message, you need to deal with it yourself by catching the sure that you put a bound on the number of retries since you don't want a system to livelock (so consuming a lot of cpu cycles without making progress). +What happens to the mailbox +^^^^^^^^^^^^^^^^^^^^^^^^^^^ +If an exception is thrown while a message is being processed, nothing happens to the mailbox. If the actor is restarted, +the same mailbox will be there. So all messages on that mailbox, will be there as well. + +What happens to the actor +^^^^^^^^^^^^^^^^^^^^^^^^^ +If an exception is thrownn and the actor is not supervised, the actor object itself is discarded and a new instance is +created. This new instance will now be used in the actor references to this actor. + + Extending Actors using PartialFunction chaining ----------------------------------------------- From 5678692e66d2256e3232cc9a6c425a87081182d7 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 08:56:12 +0300 Subject: [PATCH 05/13] issue 956 --- akka-docs/scala/actors.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 6ac13da9d9..2996520ca1 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -583,7 +583,8 @@ the same mailbox will be there. So all messages on that mailbox, will be there a What happens to the actor ^^^^^^^^^^^^^^^^^^^^^^^^^ If an exception is thrownn and the actor is not supervised, the actor object itself is discarded and a new instance is -created. This new instance will now be used in the actor references to this actor. +created. This new instance will now be used in the actor references to this actor (so this is done invisible +to the developer). Extending Actors using PartialFunction chaining From 966f7d92979ea544d8ac245bbf22c4aea7c33fa4 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 09:55:45 +0300 Subject: [PATCH 06/13] ticket 1025 --- .../scala/akka/cluster/ClusterInterface.scala | 4 +- .../src/main/scala/akka/cluster/Cluster.scala | 595 ++++++++++-------- 2 files changed, 319 insertions(+), 280 deletions(-) diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 3963042f22..690a69841f 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -122,8 +122,6 @@ object NodeAddress { trait ClusterNode { import ChangeListener._ - val isConnected = new AtomicBoolean(false) - private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() def membershipNodes: Array[String] @@ -136,7 +134,7 @@ trait ClusterNode { def remoteServerAddress: InetSocketAddress - def isRunning: Boolean = isConnected.get + def isRunning: Boolean def start(): ClusterNode diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 57253b2572..152de0368f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -6,15 +6,14 @@ package akka.cluster import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event._ import org.apache.zookeeper.data.Stat -import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener } +import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener} import org.I0Itec.zkclient._ import org.I0Itec.zkclient.serialize._ import org.I0Itec.zkclient.exception._ -import java.util.{ List ⇒ JList } -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } -import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap } +import java.util.{List ⇒ JList} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.net.InetSocketAddress import javax.management.StandardMBean @@ -30,17 +29,17 @@ import Status._ import DeploymentConfig._ import akka.event.EventHandler -import akka.dispatch.{ Dispatchers, Future } +import akka.dispatch.{Dispatchers, Future} import akka.remoteinterface._ import akka.routing.RouterType -import akka.config.{ Config, Supervision } +import akka.config.{Config, Supervision} import Supervision._ import Config._ -import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization } +import akka.serialization.{Serialization, Serializer, ActorSerialization} import ActorSerialization._ -import Compression.LZF +import akka.serialization.Compression.LZF import akka.cluster.zookeeper._ import ChangeListener._ @@ -50,6 +49,7 @@ import RemoteDaemonMessageType._ import com.eaio.uuid.UUID import com.google.protobuf.ByteString +import java.util.concurrent.{CopyOnWriteArrayList, Callable, ConcurrentHashMap} // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down @@ -84,7 +84,7 @@ trait ClusterNodeMBean { def getMemberNodes: Array[String] - def getNodeAddres():NodeAddress + def getNodeAddres(): NodeAddress def getLeaderLockName: String @@ -112,31 +112,31 @@ trait ClusterNodeMBean { def getConfigElementKeys: Array[String] - def getMemberShipPathFor(node:String):String + def getMemberShipPathFor(node: String): String - def getConfigurationPathFor(key:String):String + def getConfigurationPathFor(key: String): String - def getActorAddresstoNodesPathFor(actorAddress:String):String + def getActorAddresstoNodesPathFor(actorAddress: String): String - def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String):String + def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String): String - def getNodeToUuidsPathFor(node:String):String + def getNodeToUuidsPathFor(node: String): String - def getNodeToUuidsPathFor(node:String, uuid:UUID):String + def getNodeToUuidsPathFor(node: String, uuid: UUID): String - def getActorAddressRegistryPathFor(actorAddress:String):String + def getActorAddressRegistryPathFor(actorAddress: String): String - def getActorAddressRegistrySerializerPathFor(actorAddress:String):String + def getActorAddressRegistrySerializerPathFor(actorAddress: String): String - def getActorAddressRegistryUuidPathFor(actorAddress:String):String + def getActorAddressRegistryUuidPathFor(actorAddress: String): String - def getActorUuidRegistryNodePathFor(uuid: UUID):String + def getActorUuidRegistryNodePathFor(uuid: UUID): String - def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID):String + def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID): String - def getActorAddressToUuidsPathFor(actorAddress: String):String + def getActorAddressToUuidsPathFor(actorAddress: String): String - def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID):String + def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String } /** @@ -181,17 +181,17 @@ object Cluster { private def nodename: String = properties.get("akka.cluster.nodename") match { case Some(uberride) ⇒ uberride - case None ⇒ Config.nodename + case None ⇒ Config.nodename } private def hostname: String = properties.get("akka.cluster.hostname") match { case Some(uberride) ⇒ uberride - case None ⇒ Config.hostname + case None ⇒ Config.hostname } private def port: Int = properties.get("akka.cluster.port") match { case Some(uberride) ⇒ uberride.toInt - case None ⇒ Config.remoteServerPort + case None ⇒ Config.remoteServerPort } val defaultZooKeeperSerializer = new SerializableSerializer @@ -329,12 +329,12 @@ object Cluster { * * @author Jonas Bonér */ -class DefaultClusterNode private[akka] ( - val nodeAddress: NodeAddress, - val hostname: String = Config.hostname, - val port: Int = Config.remoteServerPort, - val zkServerAddresses: String, - val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { +class DefaultClusterNode private[akka]( + val nodeAddress: NodeAddress, + val hostname: String = Config.hostname, + val port: Int = Config.remoteServerPort, + val zkServerAddresses: String, + val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { self ⇒ if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") @@ -349,7 +349,7 @@ class DefaultClusterNode private[akka] ( def receive = { case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() - case _ ⇒ //ignore other + case _ ⇒ //ignore other } }, "akka.cluster.RemoteClientLifeCycleListener").start() @@ -373,6 +373,8 @@ class DefaultClusterNode private[akka] ( lazy val remoteServerAddress: InetSocketAddress = remoteService.address + val isConnected = new Switch(false) + // static nodes val CLUSTER_PATH = "/" + nodeAddress.clusterName val MEMBERSHIP_PATH = CLUSTER_PATH + "/members" @@ -445,39 +447,65 @@ class DefaultClusterNode private[akka] ( // Node // ======================================= + def isRunning: Boolean = isConnected.isOn + def start(): ClusterNode = { - if (isConnected.compareAndSet(false, true)) { + isConnected.switchOn { initializeNode() } + this } + private[cluster] def initializeNode() { + EventHandler.info(this, + ("\nCreating cluster node with" + + "\n\tcluster name = [%s]" + + "\n\tnode name = [%s]" + + "\n\tport = [%s]" + + "\n\tzookeeper server addresses = [%s]" + + "\n\tserializer = [%s]") + .format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer)) + EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) + createZooKeeperPathStructureIfNeeded() + registerListeners() + joinCluster() + joinLeaderElection() + fetchMembershipNodes() + EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) + } + + def shutdown() { - if (isConnected.compareAndSet(true, false)) { - ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) - - locallyCachedMembershipNodes.clear() - - nodeConnections.toList.foreach({ - case (_, (address, _)) ⇒ - Actor.remote.shutdownClientConnection(address) // shut down client connections - }) - - remoteService.shutdown() // shutdown server - - remoteClientLifeCycleListener.stop() - remoteDaemon.stop() - - // for monitoring remote listener - registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) - - nodeConnections.clear() - - disconnect() - EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + isConnected.switchOff { + shutdownNode() } } + private def shutdownNode() { + ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) + + locallyCachedMembershipNodes.clear() + + nodeConnections.toList.foreach({ + case (_, (address, _)) ⇒ + Actor.remote.shutdownClientConnection(address) // shut down client connections + }) + + remoteService.shutdown() // shutdown server + + remoteClientLifeCycleListener.stop() + remoteDaemon.stop() + + // for monitoring remote listener + registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) + + nodeConnections.clear() + + disconnect() + EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + } + def disconnect(): ClusterNode = { zkClient.unsubscribeAll() zkClient.close() @@ -668,12 +696,12 @@ class DefaultClusterNode private[akka] ( * available durable store. */ def store( - actorAddress: String, - actorFactory: () ⇒ ActorRef, - replicationFactor: Int, - replicationScheme: ReplicationScheme, - serializeMailbox: Boolean, - serializer: Serializer): ClusterNode = if (isConnected.get) { + actorAddress: String, + actorFactory: () ⇒ ActorRef, + replicationFactor: Int, + replicationScheme: ReplicationScheme, + serializeMailbox: Boolean, + serializer: Serializer): ClusterNode = if (isConnected.isOn) { val serializerClassName = serializer.getClass.getName @@ -704,7 +732,7 @@ class DefaultClusterNode private[akka] ( } } }) match { - case Left(path) ⇒ path + case Left(path) ⇒ path case Right(exception) ⇒ actorAddressRegistryPath } } @@ -749,7 +777,7 @@ class DefaultClusterNode private[akka] ( /** * Is the actor with uuid clustered or not? */ - def isClustered(actorAddress: String): Boolean = if (isConnected.get) { + def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) { zkClient.exists(actorAddressRegistryPathFor(actorAddress)) } else false @@ -761,7 +789,7 @@ class DefaultClusterNode private[akka] ( /** * Is the actor with uuid in use or not? */ - def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) { + def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) { zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName)) } else false @@ -775,7 +803,7 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) { + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) { val nodeName = nodeAddress.nodeName ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName))) @@ -791,7 +819,7 @@ class DefaultClusterNode private[akka] ( val actorFactory = Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match { - case Left(error) ⇒ throw error + case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef] } @@ -860,7 +888,7 @@ class DefaultClusterNode private[akka] ( EventHandler.debug(this, "Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress)) - if (isConnected.get) { + if (isConnected.isOn) { val builder = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) @@ -871,11 +899,12 @@ class DefaultClusterNode private[akka] ( val command = builder.build - nodes foreach { node ⇒ - nodeConnections.get(node) foreach { - case (_, connection) ⇒ - sendCommandToNode(connection, command, async = false) - } + nodes foreach { + node ⇒ + nodeConnections.get(node) foreach { + case (_, connection) ⇒ + sendCommandToNode(connection, command, async = false) + } } } } @@ -908,15 +937,16 @@ class DefaultClusterNode private[akka] ( // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method? - if (isConnected.get) { + if (isConnected.isOn) { ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName))) - uuidsForActorAddress(actorAddress) foreach { uuid ⇒ - EventHandler.debug(this, - "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid)) + uuidsForActorAddress(actorAddress) foreach { + uuid ⇒ + EventHandler.debug(this, + "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid)) - ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid))) - ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid))) + ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid))) + ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid))) } } } @@ -925,7 +955,7 @@ class DefaultClusterNode private[akka] ( * Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'. */ private[akka] def releaseActorOnAllNodes(actorAddress: String) { - if (isConnected.get) { + if (isConnected.isOn) { EventHandler.debug(this, "Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress)) @@ -934,10 +964,11 @@ class DefaultClusterNode private[akka] ( .setActorAddress(actorAddress) .build - nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒ - nodeConnections.get(node) foreach { - case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) - } + nodesForActorsInUseWithAddress(actorAddress) foreach { + node ⇒ + nodeConnections.get(node) foreach { + case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) + } } } } @@ -945,14 +976,16 @@ class DefaultClusterNode private[akka] ( /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) { + def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) { val addresses = addressesForActor(actorAddress) EventHandler.debug(this, "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]" .format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t"))) val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT) - addresses foreach { case (_, address) ⇒ clusterActorRefs.put(address, actorRef) } + addresses foreach { + case (_, address) ⇒ clusterActorRefs.put(address, actorRef) + } actorRef.start() } else throw new ClusterException("Not connected to cluster") @@ -970,7 +1003,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors registered in this cluster. */ - private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) { zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] } else Array.empty[UUID] @@ -982,7 +1015,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor id for the actor with a specific UUID. */ - private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) { + private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) { try { Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String]) } catch { @@ -999,7 +1032,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor UUIDs for actor ID. */ - private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map { case c: CharSequence ⇒ new UUID(c) @@ -1012,7 +1045,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the node names of all actors in use with UUID. */ - private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) { + private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) { try { zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]] } catch { @@ -1023,7 +1056,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors in use registered on a specific node. */ - private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map { case c: CharSequence ⇒ new UUID(c) @@ -1036,7 +1069,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the addresses of all actors in use registered on a specific node. */ - def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) { + def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) { val uuids = try { zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map { @@ -1059,7 +1092,8 @@ class DefaultClusterNode private[akka] ( case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) } - ReflectiveAccess.getClassFor(serializerClassName) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. + ReflectiveAccess.getClassFor(serializerClassName) match { + // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. case Right(clazz) ⇒ clazz.newInstance.asInstanceOf[Serializer] case Left(error) ⇒ EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString)) @@ -1183,7 +1217,7 @@ class DefaultClusterNode private[akka] ( } } }) match { - case Left(_) ⇒ /* do nothing */ + case Left(_) ⇒ /* do nothing */ case Right(exception) ⇒ throw exception } } @@ -1242,44 +1276,35 @@ class DefaultClusterNode private[akka] ( } private[cluster] def membershipPathFor(node: String): String = "%s/%s".format(MEMBERSHIP_PATH, node) + private[cluster] def configurationPathFor(key: String): String = "%s/%s".format(CONFIGURATION_PATH, key) private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_NODES_TO_PATH, actorAddress) + private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String = "%s/%s".format(actorAddressToNodesPathFor(actorAddress), nodeName) private[cluster] def nodeToUuidsPathFor(node: String): String = "%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node) + private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String = "%s/%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node, uuid) private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_REGISTRY_PATH, actorAddress) + private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "serializer") + private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "uuid") private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String = "%s/%s".format(ACTOR_UUID_REGISTRY_PATH, uuid) + private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "node") + private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "address") private[cluster] def actorUuidRegistryRemoteAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "remote-address") private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_PATH, actorAddress.replace('.', '_')) + private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid) - private[cluster] def initializeNode() { - EventHandler.info(this, - ("\nCreating cluster node with" + - "\n\tcluster name = [%s]" + - "\n\tnode name = [%s]" + - "\n\tport = [%s]" + - "\n\tzookeeper server addresses = [%s]" + - "\n\tserializer = [%s]") - .format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer)) - EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) - createZooKeeperPathStructureIfNeeded() - registerListeners() - joinCluster() - joinLeaderElection() - fetchMembershipNodes() - EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) - } /** * Returns a random set with node names of size 'replicationFactor'. @@ -1295,7 +1320,8 @@ class DefaultClusterNode private[akka] ( "] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]") val preferredNodes = - if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor + if (actorAddress.isDefined) { + // use 'preferred-nodes' in deployment config for the actor Deployer.deploymentFor(actorAddress.get) match { case Deploy(_, _, Clustered(nodes, _, _)) ⇒ nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor @@ -1350,13 +1376,16 @@ class DefaultClusterNode private[akka] ( * @returns a Map with the remote socket addresses to of disconnected node connections */ private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster( - newlyConnectedMembershipNodes: Traversable[String], - newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { + newlyConnectedMembershipNodes: Traversable[String], + newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { // cache the disconnected connections in a map, needed for fail-over of these connections later var disconnectedConnections = Map.empty[String, InetSocketAddress] - newlyDisconnectedMembershipNodes foreach { node ⇒ - disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) ⇒ address })) + newlyDisconnectedMembershipNodes foreach { + node ⇒ + disconnectedConnections += (node -> (nodeConnections(node) match { + case (address, _) ⇒ address + })) } if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) { @@ -1365,17 +1394,20 @@ class DefaultClusterNode private[akka] ( newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_)) // add connections newly arrived nodes - newlyConnectedMembershipNodes foreach { node ⇒ - if (!nodeConnections.contains(node)) { // only connect to each replica once + newlyConnectedMembershipNodes foreach { + node ⇒ + if (!nodeConnections.contains(node)) { + // only connect to each replica once - remoteSocketAddressForNode(node) foreach { address ⇒ - EventHandler.debug(this, - "Setting up connection to node with nodename [%s] and address [%s]".format(node, address)) + remoteSocketAddressForNode(node) foreach { + address ⇒ + EventHandler.debug(this, + "Setting up connection to node with nodename [%s] and address [%s]".format(node, address)) - val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start() - nodeConnections.put(node, (address, clusterDaemon)) + val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start() + nodeConnections.put(node, (address, clusterDaemon)) + } } - } } } finally { connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false) @@ -1422,84 +1454,87 @@ class DefaultClusterNode private[akka] ( } private[cluster] def migrateActorsOnFailedNodes( - failedNodes: List[String], - currentClusterNodes: List[String], - oldClusterNodes: List[String], - disconnectedConnections: Map[String, InetSocketAddress]) { + failedNodes: List[String], + currentClusterNodes: List[String], + oldClusterNodes: List[String], + disconnectedConnections: Map[String, InetSocketAddress]) { - failedNodes.foreach { failedNodeName ⇒ + failedNodes.foreach { + failedNodeName ⇒ - val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName) + val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName) - val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) - val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName) + val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) + val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName) - // Migrate to the successor of the failed node (using a sorted circular list of the node names) - if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail - (failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor? + // Migrate to the successor of the failed node (using a sorted circular list of the node names) + if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail + (failedNodeIndex == myIndex + 1)) { + // Am I the leftmost successor? - // Takes the lead of migrating the actors. Not all to this node. - // All to this node except if the actor already resides here, then pick another node it is not already on. + // Takes the lead of migrating the actors. Not all to this node. + // All to this node except if the actor already resides here, then pick another node it is not already on. - // Yes I am the node to migrate the actor to (can only be one in the cluster) - val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList + // Yes I am the node to migrate the actor to (can only be one in the cluster) + val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList - actorUuidsForFailedNode.foreach { uuidAsString ⇒ - EventHandler.debug(this, - "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" - .format(failedNodeName, uuidAsString, nodeAddress.nodeName)) + actorUuidsForFailedNode.foreach { + uuidAsString ⇒ + EventHandler.debug(this, + "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" + .format(failedNodeName, uuidAsString, nodeAddress.nodeName)) - val uuid = uuidFrom(uuidAsString) - val actorAddress = actorAddressForUuid(uuid).getOrElse( - throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]")) + val uuid = uuidFrom(uuidAsString) + val actorAddress = actorAddressForUuid(uuid).getOrElse( + throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]")) - val migrateToNodeAddress = - if (isInUseOnNode(actorAddress)) { - // already in use on this node, pick another node to instantiate the actor on - val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress) - val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet + val migrateToNodeAddress = + if (isInUseOnNode(actorAddress)) { + // already in use on this node, pick another node to instantiate the actor on + val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress) + val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet - if (nodesAvailableForMigration.isEmpty) throw new ClusterException( - "Can not migrate actor to new node since there are not any available nodes left. " + - "(However, the actor already has >1 replica in cluster, so we are ok)") + if (nodesAvailableForMigration.isEmpty) throw new ClusterException( + "Can not migrate actor to new node since there are not any available nodes left. " + + "(However, the actor already has >1 replica in cluster, so we are ok)") - NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head) - } else { - // actor is not in use on this node, migrate it here - nodeAddress - } + NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head) + } else { + // actor is not in use on this node, migrate it here + nodeAddress + } - // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.) - val replicateFromUuid = - if (isReplicated(actorAddress)) Some(uuid) - else None + // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.) + val replicateFromUuid = + if (isReplicated(actorAddress)) Some(uuid) + else None - migrateWithoutCheckingThatActorResidesOnItsHomeNode( - failedNodeAddress, - migrateToNodeAddress, - actorAddress, - replicateFromUuid) + migrateWithoutCheckingThatActorResidesOnItsHomeNode( + failedNodeAddress, + migrateToNodeAddress, + actorAddress, + replicateFromUuid) + } + + // notify all available nodes that they should fail-over all connections from 'from' to 'to' + val from = disconnectedConnections(failedNodeName) + val to = remoteServerAddress + + Serialization.serialize((from, to)) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + + val command = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FAIL_OVER_CONNECTIONS) + .setPayload(ByteString.copyFrom(bytes)) + .build + + // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed? + nodeConnections.values foreach { + case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) + } + } } - - // notify all available nodes that they should fail-over all connections from 'from' to 'to' - val from = disconnectedConnections(failedNodeName) - val to = remoteServerAddress - - Serialization.serialize((from, to)) match { - case Left(error) ⇒ throw error - case Right(bytes) ⇒ - - val command = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FAIL_OVER_CONNECTIONS) - .setPayload(ByteString.copyFrom(bytes)) - .build - - // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed? - nodeConnections.values foreach { - case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) - } - } - } } } @@ -1507,7 +1542,7 @@ class DefaultClusterNode private[akka] ( * Used when the ephemeral "home" node is already gone, so we can't check if it is available. */ private def migrateWithoutCheckingThatActorResidesOnItsHomeNode( - from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) { + from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) { EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to)) if (!isInUseOnNode(actorAddress, to)) { @@ -1533,16 +1568,17 @@ class DefaultClusterNode private[akka] ( EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH)) } - basePaths.foreach { path ⇒ - try { - ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) - EventHandler.debug(this, "Created node [%s]".format(path)) - } catch { - case e ⇒ - val error = new ClusterException(e.toString) - EventHandler.error(error, this) - throw error - } + basePaths.foreach { + path ⇒ + try { + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) + EventHandler.debug(this, "Created node [%s]".format(path)) + } catch { + case e ⇒ + val error = new ClusterException(e.toString) + EventHandler.error(error, this) + throw error + } } } @@ -1578,7 +1614,7 @@ class DefaultClusterNode private[akka] ( override def resign() = self.resign() - override def isConnected = self.isConnected.get + override def isConnected = self.isConnected.isOn override def getNodeAddres = self.nodeAddress @@ -1620,27 +1656,27 @@ class DefaultClusterNode private[akka] ( override def getConfigElementKeys = self.getConfigElementKeys.toArray - override def getMemberShipPathFor(node:String) = self.membershipPathFor(node) + override def getMemberShipPathFor(node: String) = self.membershipPathFor(node) - override def getConfigurationPathFor(key:String) = self.configurationPathFor(key) + override def getConfigurationPathFor(key: String) = self.configurationPathFor(key) - override def getActorAddresstoNodesPathFor(actorAddress:String) = self.actorAddressToNodesPathFor(actorAddress) + override def getActorAddresstoNodesPathFor(actorAddress: String) = self.actorAddressToNodesPathFor(actorAddress) - override def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String) = self.actorAddressToNodesPathFor(actorAddress, nodeName) + override def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String) = self.actorAddressToNodesPathFor(actorAddress, nodeName) - override def getNodeToUuidsPathFor(node:String) = self.nodeToUuidsPathFor(node) + override def getNodeToUuidsPathFor(node: String) = self.nodeToUuidsPathFor(node) - override def getNodeToUuidsPathFor(node:String, uuid:UUID) = self.nodeToUuidsPathFor(node, uuid) + override def getNodeToUuidsPathFor(node: String, uuid: UUID) = self.nodeToUuidsPathFor(node, uuid) - override def getActorAddressRegistryPathFor(actorAddress:String) = self.actorAddressRegistryPathFor(actorAddress) + override def getActorAddressRegistryPathFor(actorAddress: String) = self.actorAddressRegistryPathFor(actorAddress) - override def getActorAddressRegistrySerializerPathFor(actorAddress:String) = self.actorAddressRegistrySerializerPathFor(actorAddress) + override def getActorAddressRegistrySerializerPathFor(actorAddress: String) = self.actorAddressRegistrySerializerPathFor(actorAddress) - override def getActorAddressRegistryUuidPathFor(actorAddress:String) = self.actorAddressRegistryUuidPathFor(actorAddress) + override def getActorAddressRegistryUuidPathFor(actorAddress: String) = self.actorAddressRegistryUuidPathFor(actorAddress) override def getActorUuidRegistryNodePathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid) - override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID)= self.actorUuidRegistryNodePathFor(uuid) + override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid) override def getActorAddressToUuidsPathFor(actorAddress: String) = self.actorAddressToUuidsPathFor(actorAddress) @@ -1770,81 +1806,85 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { try { if (message.hasActorAddress) { val actorAddress = message.getActorAddress - cluster.serializerForActor(actorAddress) foreach { serializer ⇒ - cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ - cluster.remoteService.register(actorAddress, newActorRef) + cluster.serializerForActor(actorAddress) foreach { + serializer ⇒ + cluster.use(actorAddress, serializer) foreach { + newActorRef ⇒ + cluster.remoteService.register(actorAddress, newActorRef) - if (message.hasReplicateActorFromUuid) { - // replication is used - fetch the messages and replay them - import akka.remote.protocol.RemoteProtocol._ - import akka.remote.MessageSerializer + if (message.hasReplicateActorFromUuid) { + // replication is used - fetch the messages and replay them + import akka.remote.protocol.RemoteProtocol._ + import akka.remote.MessageSerializer - val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) - val deployment = Deployer.deploymentFor(actorAddress) - val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( - throw new IllegalStateException( - "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) - val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) + val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) + val deployment = Deployer.deploymentFor(actorAddress) + val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( + throw new IllegalStateException( + "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) + val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) - try { - // get the transaction log for the actor UUID - val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) + try { + // get the transaction log for the actor UUID + val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) - // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) - val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries + // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries - // deserialize and restore actor snapshot - val actorRefToUseForReplay = - snapshotAsBytes match { + // deserialize and restore actor snapshot + val actorRefToUseForReplay = + snapshotAsBytes match { - // we have a new actor ref - the snapshot - case Some(bytes) ⇒ - // stop the new actor ref and use the snapshot instead - cluster.remoteService.unregister(actorAddress) + // we have a new actor ref - the snapshot + case Some(bytes) ⇒ + // stop the new actor ref and use the snapshot instead + cluster.remoteService.unregister(actorAddress) - // deserialize the snapshot actor ref and register it as remote actor - val uncompressedBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes + // deserialize the snapshot actor ref and register it as remote actor + val uncompressedBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes - val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() - cluster.remoteService.register(actorAddress, snapshotActorRef) + val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() + cluster.remoteService.register(actorAddress, snapshotActorRef) - // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) - //newActorRef.stop() + // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) + //newActorRef.stop() - snapshotActorRef + snapshotActorRef - // we have no snapshot - use the new actor ref - case None ⇒ - newActorRef + // we have no snapshot - use the new actor ref + case None ⇒ + newActorRef + } + + // deserialize the messages + val messages: Vector[AnyRef] = entriesAsBytes map { + bytes ⇒ + val messageBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) + } + + EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + + // replay all messages + messages foreach { + message ⇒ + EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) + + // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? + actorRefToUseForReplay ! message + } + + } catch { + case e: Throwable ⇒ + EventHandler.error(e, this, e.toString) + throw e } - - // deserialize the messages - val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒ - val messageBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes - MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) } - - EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) - - // replay all messages - messages foreach { message ⇒ - EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) - - // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? - actorRefToUseForReplay ! message - } - - } catch { - case e: Throwable ⇒ - EventHandler.error(e, this, e.toString) - throw e - } } - } } } else { EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message)) @@ -1859,8 +1899,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { case RELEASE ⇒ if (message.hasActorUuid) { - cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ - cluster.release(address) + cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { + address ⇒ + cluster.release(address) } } else if (message.hasActorAddress) { cluster release message.getActorAddress @@ -1870,15 +1911,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { .format(message)) } - case START ⇒ cluster.start() + case START ⇒ cluster.start() - case STOP ⇒ cluster.shutdown() + case STOP ⇒ cluster.shutdown() case DISCONNECT ⇒ cluster.disconnect() - case RECONNECT ⇒ cluster.reconnect() + case RECONNECT ⇒ cluster.reconnect() - case RESIGN ⇒ cluster.resign() + case RESIGN ⇒ cluster.resign() case FAIL_OVER_CONNECTIONS ⇒ val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)]) @@ -1942,7 +1983,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { - case Left(error) ⇒ throw error + case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[T] } } From 6ce8be6e59b571af9c09ec5a90dbb9b6ce6f886f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 09:39:04 +0200 Subject: [PATCH 07/13] Adding warning docs for register/unregister --- .../src/main/scala/akka/dispatch/MessageHandling.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index ae9d3f1ff9..42caa4fca8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -127,6 +127,10 @@ trait MessageDispatcher { } } + /** + * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, + * and only call it under the dispatcher-guard, see "attach" for the only invocation + */ private[akka] def register(actorRef: ActorRef) { if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) @@ -139,6 +143,10 @@ trait MessageDispatcher { } } + /** + * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, + * and only call it under the dispatcher-guard, see "detach" for the only invocation + */ private[akka] def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { cleanUpMailboxFor(actorRef) From 4017a86b4c110bb6ab02afa87629aaf10f2e49ba Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 15 Jul 2011 10:59:29 +0300 Subject: [PATCH 08/13] 1025: some cleanup --- .../src/main/scala/akka/AkkaException.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 48 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 542bece2ea..f87453db10 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -6,7 +6,7 @@ package akka import akka.actor.newUuid import java.net.{ InetAddress, UnknownHostException } - + /** * Akka base Exception. Each Exception gets: *
    diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 152de0368f..9a26ad985b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -477,35 +477,35 @@ class DefaultClusterNode private[akka]( def shutdown() { + def shutdownNode() { + ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) + + locallyCachedMembershipNodes.clear() + + nodeConnections.toList.foreach({ + case (_, (address, _)) ⇒ + Actor.remote.shutdownClientConnection(address) // shut down client connections + }) + + remoteService.shutdown() // shutdown server + + remoteClientLifeCycleListener.stop() + remoteDaemon.stop() + + // for monitoring remote listener + registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) + + nodeConnections.clear() + + disconnect() + EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + } + isConnected.switchOff { shutdownNode() } } - private def shutdownNode() { - ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) - - locallyCachedMembershipNodes.clear() - - nodeConnections.toList.foreach({ - case (_, (address, _)) ⇒ - Actor.remote.shutdownClientConnection(address) // shut down client connections - }) - - remoteService.shutdown() // shutdown server - - remoteClientLifeCycleListener.stop() - remoteDaemon.stop() - - // for monitoring remote listener - registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) - - nodeConnections.clear() - - disconnect() - EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) - } - def disconnect(): ClusterNode = { zkClient.unsubscribeAll() zkClient.close() From f3c019df8ca10824c0d681ad831498f19f07cec2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 11:17:02 +0200 Subject: [PATCH 09/13] Tweaking the interrupt restore it and breaking out of throughput --- .../src/test/scala/akka/dispatch/ActorModelSpec.scala | 6 ------ .../testkit/CallingThreadDispatcherModelSpec.scala | 8 ++++++++ akka-actor/src/main/scala/akka/actor/ActorRef.scala | 2 +- .../src/main/scala/akka/dispatch/Dispatcher.scala | 11 +++++++---- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 8ef6e1c930..62297ca495 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -217,12 +217,6 @@ abstract class ActorModelSpec extends JUnitSuite { protected def newInterceptedDispatcher: MessageDispatcherInterceptor - @After - def after { - //remove the interrupted status since we are messing with interrupted exceptions. - Thread.interrupted() - } - @Test def dispatcherShouldDynamicallyHandleItsOwnLifeCycle { implicit val dispatcher = newInterceptedDispatcher diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index f6d3b9de6f..9fbc5fd7ac 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -5,6 +5,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec import java.util.concurrent.CountDownLatch +import org.junit.{After, Test} class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ @@ -42,6 +43,13 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec { //Can't handle this... } + + @After + def after { + //remove the interrupted status since we are messing with interrupted exceptions. + Thread.interrupted() + } + } // vim: set ts=2 sw=2 et: diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3ceadf08d1..e97f86f515 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -700,7 +700,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, } catch { case e: InterruptedException ⇒ handleExceptionInDispatch(e, messageHandle.message) - Thread.currentThread().interrupt() //Restore interrupt + throw e case e ⇒ handleExceptionInDispatch(e, messageHandle.message) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index e6f2d3128f..5097f69aa0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -193,10 +193,13 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ def dispatcher: Dispatcher final def run = { - try { processMailbox()} finally {dispatcherLock.unlock()} - - if (!self.isEmpty) - dispatcher.reRegisterForExecution(this) + try { processMailbox() } catch { + case ie: InterruptedException => Thread.currentThread().interrupt() //Restore interrupt + } finally { + dispatcherLock.unlock() + if (!self.isEmpty) + dispatcher.reRegisterForExecution(this) + } } /** From 0bfe21a44ca82d9169776f8322e055245a397d2b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 11:34:09 +0200 Subject: [PATCH 10/13] Fixing a type and adding clarification to ticket #956 --- akka-docs/scala/actors.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 2996520ca1..20ceda4285 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -582,9 +582,11 @@ the same mailbox will be there. So all messages on that mailbox, will be there a What happens to the actor ^^^^^^^^^^^^^^^^^^^^^^^^^ -If an exception is thrownn and the actor is not supervised, the actor object itself is discarded and a new instance is +If an exception is thrown and the actor is supervised, the actor object itself is discarded and a new instance is created. This new instance will now be used in the actor references to this actor (so this is done invisible to the developer). +If the actor is _not_ supervised, but its lifeCycle is set to Permanent (default), it will just keep on processing messages as if nothing had happened. +If the actor is _not_ supervised, but its lifeCycle is set to Temporary, it will be stopped immediately. Extending Actors using PartialFunction chaining From 2871685b6b2ec4e434750cf5ed4735623d858843 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 12:38:05 +0200 Subject: [PATCH 11/13] Adding TODO declarations in Serialization --- .../main/scala/akka/serialization/Serialization.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 643a96dc5f..8581a9abba 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -18,12 +18,12 @@ case class NoSerializerFoundException(m: String) extends AkkaException(m) * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. */ object Serialization { - + //TODO document me def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match { case Left(ex) ⇒ Left(ex) case Right(serializer) ⇒ Right(serializer.toBinary(o)) } - + //TODO document me def deserialize( bytes: Array[Byte], clazz: Class[_], @@ -32,14 +32,15 @@ object Serialization { case Left(e) ⇒ Left(e) case Right(serializer) ⇒ Right(serializer.fromBinary(bytes, Some(clazz), classLoader)) } - - def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = + //TODO document me + //TODO memoize the lookups + def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = //TODO fall back on BestMatchClass THEN default getClassFor(serializerMap.get(clazz.getName).getOrElse(serializers("default"))) match { case Right(serializer) ⇒ Right(serializer.newInstance.asInstanceOf[Serializer]) case Left(e) => Left(e) } - private def getSerializerInstanceForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = { + private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = { if (bindings.isEmpty) Left(NoSerializerFoundException("No mapping serializer found for " + cl)) else { From c6297faa6f3d13545839d7e3d1830e48648ee731 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 15:58:11 +0200 Subject: [PATCH 12/13] Increasing timeouts for the RoundRobin2Replicas multijvm test --- .../RoundRobin2ReplicasMultiJvmSpec.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala index 08cd38a280..b16addfe3d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala @@ -13,6 +13,9 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.config.Config +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ConcurrentHashMap +import akka.util.Duration /** * When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible @@ -106,12 +109,14 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers { //todo: is there a reason to check for null again since it already has been done in the previous block. hello must not equal (null) - val replies = collection.mutable.Map.empty[String, Int] + val replies = new ConcurrentHashMap[String,AtomicInteger]() def count(reply: String) = { - if (replies.get(reply).isEmpty) replies.put(reply, 1) - else replies.put(reply, replies(reply) + 1) + val counter = new AtomicInteger(0) + Option(replies.putIfAbsent(reply, counter)).getOrElse(counter).incrementAndGet() } + implicit val timeout = Timeout(Duration(20, "seconds")) + count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) @@ -121,8 +126,8 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers { count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) - replies("World from node [node1]") must equal(4) - replies("World from node [node2]") must equal(4) + replies.get("World from node [node1]").get must equal(4) + replies.get("World from node [node2]").get must equal(4) } node.shutdown() From 2cf64bccae0afcfa2ed9062e1590cd9e4f187aeb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Jul 2011 16:21:45 +0200 Subject: [PATCH 13/13] Adding support for having method parameters individually serialized and deserialized using its own serializer, closing ticket #765 --- .../main/scala/akka/actor/TypedActor.scala | 27 ++++++++++++++++--- .../akka/serialization/Serialization.scala | 13 +++++---- .../scala/akka/util/ReflectiveAccess.scala | 2 ++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index ffd7184e6d..1511419184 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -10,6 +10,8 @@ import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutExce import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Duration } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } +import akka.serialization.Serialization +import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation //TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala /** @@ -87,16 +89,35 @@ object TypedActor { } } catch { case i: InvocationTargetException ⇒ throw i.getTargetException } - private def writeReplace(): AnyRef = new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters) + private def writeReplace(): AnyRef = { + val serializedParameters: Array[(Array[Byte],String)] = parameters match { + case null => null + case a if a.length == 0 => Array[(Array[Byte],String)]() + case a => a.map( { + case null => null + case value => Serialization.serializerFor(value.getClass).fold(throw _, s => (s.toBinary(value), s.getClass.getName)) + }) + } + new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters) + } } /** * Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call */ - case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) { + case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Array[Byte],String)]) { //TODO implement writeObject and readObject to serialize //TODO Possible optimization is to special encode the parameter-types to conserve space - private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues) + private def readResolve(): AnyRef = { + MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { + case null => null + case a if a.length == 0 => Array[AnyRef]() + case a => a.map( { + case null => null + case (bytes, serializerFQN) => Serialization.serializerOf(serializerFQN).fold(throw _, _.fromBinary(bytes)) + }) + }) + } } /** diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 8581a9abba..49dd527be6 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -9,6 +9,7 @@ import akka.config.Config import akka.config.Config._ import akka.actor.{ ActorRef, Actor } import akka.AkkaException +import akka.util.ReflectiveAccess case class NoSerializerFoundException(m: String) extends AkkaException(m) @@ -40,6 +41,12 @@ object Serialization { case Left(e) => Left(e) } + /** + * Tries to load the specified Serializer by the FQN + */ + def serializerOf(serializerFQN: String): Either[Exception, Serializer] = + createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments) + private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = { if (bindings.isEmpty) Left(NoSerializerFoundException("No mapping serializer found for " + cl)) @@ -51,11 +58,7 @@ object Serialization { case _ ⇒ false } } map { - case (_, ser) ⇒ - getClassFor(ser) match { - case Right(s) ⇒ Right(s.newInstance.asInstanceOf[Serializer]) - case _ ⇒ Left(new Exception("Error instantiating " + ser)) - } + case (_, ser) ⇒ serializerOf(ser) } getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl)) } } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 9d439a8876..3f0f33f01c 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -23,6 +23,8 @@ import java.net.InetSocketAddress object ReflectiveAccess { val loader = getClass.getClassLoader + val emptyParams: Array[Class[_]] = Array() + val emptyArguments: Array[AnyRef] = Array() /** * Reflective access to the Cluster module.