Merge branch 'master' into wip-derekjw

Conflicts:
	akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala
	akka-actor/src/main/scala/akka/actor/ActorRef.scala
This commit is contained in:
Derek Williams 2011-07-15 11:44:48 -06:00
commit fb321854a8
63 changed files with 832 additions and 502 deletions

View file

@ -14,6 +14,8 @@ import org.scalatest.matchers.MustMatchers
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
import Ticket669Spec._
override def beforeAll = Thread.interrupted() //remove interrupted status.
override def afterAll = {
Actor.registry.local.shutdownAll
akka.event.EventHandler.start
@ -21,6 +23,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()

View file

@ -4,33 +4,49 @@
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
import akka.testkit.Testing
import akka.dispatch._
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
import akka.util.Switch
import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor}
import java.rmi.RemoteException
import org.junit.{After, Test}
object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage
case class CountDown(latch: CountDownLatch) extends ActorModelMessage
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Interrupt extends ActorModelMessage
case object Restart extends ActorModelMessage
case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
@ -63,6 +79,8 @@ object ActorModelSpec {
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ack; l.countDown(); self.stop(); busy.switchOff()
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e
}
}
@ -183,7 +201,9 @@ object ActorModelSpec {
if (condition) return true
Thread.sleep(intervalMs)
} catch { case e: InterruptedException }
} catch {
case e: InterruptedException
}
}
false
}
@ -192,6 +212,7 @@ object ActorModelSpec {
}
abstract class ActorModelSpec extends JUnitSuite {
import ActorModelSpec._
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
@ -215,13 +236,17 @@ abstract class ActorModelSpec extends JUnitSuite {
msgsProcessed = 0,
restarts = 0)
val futures = for (i 1 to 10) yield Future { i }
val futures = for (i 1 to 10) yield Future {
i
}
await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 2, stops = 2)
val a2 = newTestActor
a2.start
val futures2 = for (i 1 to 10) yield Future { i }
val futures2 = for (i 1 to 10) yield Future {
i
}
await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 3, stops = 2)
@ -259,7 +284,13 @@ abstract class ActorModelSpec extends JUnitSuite {
val counter = new CountDownLatch(200)
a.start()
for (i 1 to 10) { spawn { for (i 1 to 20) { a ! WaitAck(1, counter) } } }
for (i 1 to 10) {
spawn {
for (i 1 to 20) {
a ! WaitAck(1, counter)
}
}
}
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
@ -267,7 +298,15 @@ abstract class ActorModelSpec extends JUnitSuite {
}
def spawn(f: Unit) {
val thread = new Thread { override def run { try { f } catch { case e e.printStackTrace } } }
val thread = new Thread {
override def run {
try {
f
} catch {
case e e.printStackTrace
}
}
}
thread.start()
}
@ -329,7 +368,8 @@ abstract class ActorModelSpec extends JUnitSuite {
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
(1 to num) foreach { _
(1 to num) foreach {
_
newTestActor.start() ! cachedMessage
}
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
@ -356,6 +396,52 @@ abstract class ActorModelSpec extends JUnitSuite {
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
}
}
class DispatcherModelTest extends ActorModelSpec {

View file

@ -86,6 +86,7 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def save(stats: Stats) {
new File(dir).mkdirs
if (!dirExists) return
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
@ -98,8 +99,7 @@ class FileBenchResultRepository extends BenchResultRepository {
case e: Exception
EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]".
format(stats, f.getAbsolutePath, e.getMessage))
}
finally {
} finally {
if (out ne null) try { out.close() } catch { case ignore: Exception }
}
}
@ -117,8 +117,7 @@ class FileBenchResultRepository extends BenchResultRepository {
EventHandler.error(this, "Failed to load from [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
None
}
finally {
} finally {
if (in ne null) try { in.close() } catch { case ignore: Exception }
}
}

View file

@ -52,8 +52,7 @@ trait PerformanceTest extends JUnitSuite {
var stat: DescriptiveStatistics = _
val resultRepository = BenchResultRepository()
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
lazy val report = new Report(resultRepository, compareResultWith)
type TS <: TradingSystem
@ -128,95 +127,7 @@ trait PerformanceTest extends JUnitSuite {
resultRepository.add(stats)
EventHandler.info(this, formatResultsTable(resultRepository.get(name)))
percentilesChart(stats)
latencyAndThroughputChart(stats)
comparePercentilesChart(stats)
compareWithHistoricalPercentiliesChart(stats)
}
def percentilesChart(stats: Stats) {
val chartTitle = stats.name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
def comparePercentilesChart(stats: Stats) {
for {
compareName compareResultWith
compareStats resultRepository.get(compareName, stats.load)
} {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
}
def compareWithHistoricalPercentiliesChart(stats: Stats) {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
}
def latencyAndThroughputChart(stats: Stats) {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
report.html(resultRepository.get(name))
}
def delay(delayMs: Int) {

View file

@ -0,0 +1,179 @@
package akka.performance.trading.common
import java.io.File
import java.text.SimpleDateFormat
import java.io.PrintWriter
import java.io.FileWriter
import akka.event.EventHandler
import java.util.Date
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private def log = System.getProperty("benchmark.logResult", "false").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = if (dirExists) {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
sb.append(formatResultsTable(statistics))
sb.append("\n</pre>\n")
sb.append(img(percentilesChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesChart(stats).foreach(url sb.append(img(url)))
}
if (dirExists) {
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val name = current.name + "--" + timestamp + ".html"
write(sb.toString, name)
}
}
private def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def comparePercentilesChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
}
def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
if (log) EventHandler.info(this, formattedStats)
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def write(content: String, fileName: String) {
val f = new File(dir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -5,6 +5,7 @@ package akka.testkit
import akka.actor.dispatch.ActorModelSpec
import java.util.concurrent.CountDownLatch
import org.junit.{After, Test}
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
@ -42,6 +43,13 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec {
//Can't handle this...
}
@After
def after {
//remove the interrupted status since we are messing with interrupted exceptions.
Thread.interrupted()
}
}
// vim: set ts=2 sw=2 et:

View file

@ -9,15 +9,15 @@ import akka.dispatch._
import akka.config._
import akka.config.Supervision._
import akka.util._
import akka.serialization.{ Format, Serializer, Serialization }
import akka.serialization.{Serializer, Serialization}
import ReflectiveAccess._
import ClusterModule._
import DeploymentConfig.{ TransactionLog TransactionLogConfig, _ }
import DeploymentConfig.{TransactionLog TransactionLogConfig, _}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map JMap }
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
import java.util.{Map JMap}
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
@ -30,10 +30,15 @@ private[akka] object ActorRefInternals {
* LifeCycles for ActorRefs.
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
@ -68,7 +73,8 @@ private[akka] object ActorRefInternals {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
protected[akka] var _uuid = newUuid
@ -105,6 +111,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
def setReceiveTimeout(timeout: Long) {
this.receiveTimeout = Some(timeout)
}
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
@ -121,6 +128,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler: FaultHandlingStrategy
/**
@ -139,6 +147,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setLifeCycle(lifeCycle: LifeCycle)
def getLifeCycle: LifeCycle
/**
@ -153,7 +162,10 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
def setDispatcher(dispatcher: MessageDispatcher) { this.dispatcher = dispatcher }
def setDispatcher(dispatcher: MessageDispatcher) {
this.dispatcher = dispatcher
}
def getDispatcher: MessageDispatcher = dispatcher
/**
@ -177,6 +189,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* Returns the uuid for the actor.
*/
def getUuid = _uuid
def uuid = _uuid
/**
@ -366,9 +379,13 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*/
def sendException(ex: Throwable) {}
def isUsableOnlyOnce = false
def isUsable = true
def isReplyable = true
def canSendException = false
/**
@ -393,6 +410,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
protected[akka] def supervisor_=(sup: Option[ActorRef])
protected[akka] def mailbox: AnyRef
protected[akka] def mailbox_=(value: AnyRef): AnyRef
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable)
@ -416,7 +434,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (private[this] val actorFactory: () Actor, val address: String)
class LocalActorRef private[akka](private[this] val actorFactory: () Actor, val address: String)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@ -442,7 +460,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
protected[akka] val actorInstance = guard.withGuard {
new AtomicReference[Actor](newActor)
}
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
@ -489,7 +509,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () Actor) = {
this(__factory, __address)
this (__factory, __address)
_uuid = __uuid
timeout = __timeout
@ -627,7 +647,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
*/
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value }
protected[akka] def mailbox_=(value: AnyRef): AnyRef = {
_mailbox = value; value
}
/**
* Returns the supervisor, if there is one.
@ -677,7 +699,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException
currentMessage = null // received message while actor is shutting down, ignore
handleExceptionInDispatch(e, messageHandle.message)
throw e
case e
handleExceptionInDispatch(e, messageHandle.message)
}
@ -716,13 +739,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) {
//Immortal
false
} else if (withinTimeRange.isEmpty) { // restrict number of restarts
} else if (withinTimeRange.isEmpty) {
// restrict number of restarts
val retries = maxNrOfRetriesCount + 1
maxNrOfRetriesCount = retries //Increment number of retries
retries > maxNrOfRetries.get
} else { // cannot restart more than N within M timerange
} else {
// cannot restart more than N within M timerange
val retries = maxNrOfRetriesCount + 1
val windowStart = restartTimeWindowStartNanos
@ -826,7 +852,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
// ========= PRIVATE FUNCTIONS =========
private[this] def newActor: Actor = {
import Actor.{ actorRefInCreation refStack }
import Actor.{actorRefInCreation refStack}
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
@ -868,8 +894,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
_supervisor.foreach {
sup
if (sup.isShutdown) {
// if supervisor is shut down, game over for all linked actors
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
@ -921,7 +949,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout()
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
@ -949,7 +978,7 @@ object RemoteActorSystemMessage {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
private[akka] case class RemoteActorRef private[akka](
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
@ -1011,34 +1040,49 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
@ -1070,7 +1114,8 @@ trait ActorRefShared {
* There are implicit conversions in ../actor/Implicits.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef
trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
ref: ActorRef
/**
* Address for actor, must be a unique one.

View file

@ -10,6 +10,8 @@ import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutExce
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.Serialization
import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@ -87,16 +89,35 @@ object TypedActor {
}
} catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters)
private def writeReplace(): AnyRef = {
val serializedParameters: Array[(Array[Byte],String)] = parameters match {
case null => null
case a if a.length == 0 => Array[(Array[Byte],String)]()
case a => a.map( {
case null => null
case value => Serialization.serializerFor(value.getClass).fold(throw _, s => (s.toBinary(value), s.getClass.getName))
})
}
new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters)
}
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) {
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Array[Byte],String)]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
private def readResolve(): AnyRef = {
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null => null
case a if a.length == 0 => Array[AnyRef]()
case a => a.map( {
case null => null
case (bytes, serializerFQN) => Serialization.serializerOf(serializerFQN).fold(throw _, _.fromBinary(bytes))
})
})
}
}
/**

View file

@ -122,8 +122,6 @@ object NodeAddress {
trait ClusterNode {
import ChangeListener._
val isConnected = new AtomicBoolean(false)
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
def membershipNodes: Array[String]
@ -136,7 +134,7 @@ trait ClusterNode {
def remoteServerAddress: InetSocketAddress
def isRunning: Boolean = isConnected.get
def isRunning: Boolean
def start(): ClusterNode

View file

@ -160,8 +160,6 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@ -195,19 +193,13 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
def dispatcher: Dispatcher
final def run = {
try {
processMailbox()
} catch {
case ie: InterruptedException
}
finally {
try { processMailbox() } catch {
case ie: InterruptedException => Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
}
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
dispatcher.doneProcessingMailbox(this)
}
}
/**

View file

@ -127,6 +127,10 @@ trait MessageDispatcher {
}
}
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "attach" for the only invocation
*/
private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef)
@ -139,6 +143,10 @@ trait MessageDispatcher {
}
}
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "detach" for the only invocation
*/
private[akka] def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)

View file

@ -9,6 +9,7 @@ import akka.config.Config
import akka.config.Config._
import akka.actor.{ ActorRef, Actor }
import akka.AkkaException
import akka.util.ReflectiveAccess
case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -18,12 +19,12 @@ case class NoSerializerFoundException(m: String) extends AkkaException(m)
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/
object Serialization {
//TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match {
case Left(ex) Left(ex)
case Right(serializer) Right(serializer.toBinary(o))
}
//TODO document me
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
@ -32,14 +33,21 @@ object Serialization {
case Left(e) Left(e)
case Right(serializer) Right(serializer.fromBinary(bytes, Some(clazz), classLoader))
}
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] =
//TODO document me
//TODO memoize the lookups
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = //TODO fall back on BestMatchClass THEN default
getClassFor(serializerMap.get(clazz.getName).getOrElse(serializers("default"))) match {
case Right(serializer) Right(serializer.newInstance.asInstanceOf[Serializer])
case Left(e) => Left(e)
}
private def getSerializerInstanceForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
/**
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments)
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
else {
@ -50,11 +58,7 @@ object Serialization {
case _ false
}
} map {
case (_, ser)
getClassFor(ser) match {
case Right(s) Right(s.newInstance.asInstanceOf[Serializer])
case _ Left(new Exception("Error instantiating " + ser))
}
case (_, ser) serializerOf(ser)
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
}
}

View file

@ -23,6 +23,8 @@ import java.net.InetSocketAddress
object ReflectiveAccess {
val loader = getClass.getClassLoader
val emptyParams: Array[Class[_]] = Array()
val emptyArguments: Array[AnyRef] = Array()
/**
* Reflective access to the Cluster module.

View file

@ -6,15 +6,14 @@ package akka.cluster
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event._
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener }
import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener}
import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
import java.util.{ List JList }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap }
import java.util.{List JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.net.InetSocketAddress
import javax.management.StandardMBean
@ -30,17 +29,17 @@ import Status._
import DeploymentConfig._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.dispatch.{Dispatchers, Future}
import akka.remoteinterface._
import akka.routing.RouterType
import akka.config.{ Config, Supervision }
import akka.config.{Config, Supervision}
import Supervision._
import Config._
import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization }
import akka.serialization.{Serialization, Serializer, ActorSerialization}
import ActorSerialization._
import Compression.LZF
import akka.serialization.Compression.LZF
import akka.cluster.zookeeper._
import ChangeListener._
@ -50,6 +49,7 @@ import RemoteDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import java.util.concurrent.{CopyOnWriteArrayList, Callable, ConcurrentHashMap}
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
@ -84,7 +84,7 @@ trait ClusterNodeMBean {
def getMemberNodes: Array[String]
def getNodeAddres():NodeAddress
def getNodeAddres(): NodeAddress
def getLeaderLockName: String
@ -112,31 +112,31 @@ trait ClusterNodeMBean {
def getConfigElementKeys: Array[String]
def getMemberShipPathFor(node:String):String
def getMemberShipPathFor(node: String): String
def getConfigurationPathFor(key:String):String
def getConfigurationPathFor(key: String): String
def getActorAddresstoNodesPathFor(actorAddress:String):String
def getActorAddresstoNodesPathFor(actorAddress: String): String
def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String):String
def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String): String
def getNodeToUuidsPathFor(node:String):String
def getNodeToUuidsPathFor(node: String): String
def getNodeToUuidsPathFor(node:String, uuid:UUID):String
def getNodeToUuidsPathFor(node: String, uuid: UUID): String
def getActorAddressRegistryPathFor(actorAddress:String):String
def getActorAddressRegistryPathFor(actorAddress: String): String
def getActorAddressRegistrySerializerPathFor(actorAddress:String):String
def getActorAddressRegistrySerializerPathFor(actorAddress: String): String
def getActorAddressRegistryUuidPathFor(actorAddress:String):String
def getActorAddressRegistryUuidPathFor(actorAddress: String): String
def getActorUuidRegistryNodePathFor(uuid: UUID):String
def getActorUuidRegistryNodePathFor(uuid: UUID): String
def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID):String
def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID): String
def getActorAddressToUuidsPathFor(actorAddress: String):String
def getActorAddressToUuidsPathFor(actorAddress: String): String
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID):String
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String
}
/**
@ -329,7 +329,7 @@ object Cluster {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DefaultClusterNode private[akka] (
class DefaultClusterNode private[akka](
val nodeAddress: NodeAddress,
val hostname: String = Config.hostname,
val port: Int = Config.remoteServerPort,
@ -373,6 +373,8 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
val isConnected = new Switch(false)
// static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@ -445,15 +447,37 @@ class DefaultClusterNode private[akka] (
// Node
// =======================================
def isRunning: Boolean = isConnected.isOn
def start(): ClusterNode = {
if (isConnected.compareAndSet(false, true)) {
isConnected.switchOn {
initializeNode()
}
this
}
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createZooKeeperPathStructureIfNeeded()
registerListeners()
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
def shutdown() {
if (isConnected.compareAndSet(true, false)) {
def shutdownNode() {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
locallyCachedMembershipNodes.clear()
@ -476,6 +500,10 @@ class DefaultClusterNode private[akka] (
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
}
isConnected.switchOff {
shutdownNode()
}
}
def disconnect(): ClusterNode = {
@ -673,7 +701,7 @@ class DefaultClusterNode private[akka] (
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.get) {
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
val serializerClassName = serializer.getClass.getName
@ -749,7 +777,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid clustered or not?
*/
def isClustered(actorAddress: String): Boolean = if (isConnected.get) {
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressRegistryPathFor(actorAddress))
} else false
@ -761,7 +789,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) {
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
} else false
@ -775,7 +803,7 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) {
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
@ -860,7 +888,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
if (isConnected.get) {
if (isConnected.isOn) {
val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
@ -871,7 +899,8 @@ class DefaultClusterNode private[akka] (
val command = builder.build
nodes foreach { node
nodes foreach {
node
nodeConnections.get(node) foreach {
case (_, connection)
sendCommandToNode(connection, command, async = false)
@ -908,10 +937,11 @@ class DefaultClusterNode private[akka] (
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
if (isConnected.get) {
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
uuidsForActorAddress(actorAddress) foreach { uuid
uuidsForActorAddress(actorAddress) foreach {
uuid
EventHandler.debug(this,
"Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
@ -925,7 +955,7 @@ class DefaultClusterNode private[akka] (
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
*/
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
if (isConnected.get) {
if (isConnected.isOn) {
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
@ -934,7 +964,8 @@ class DefaultClusterNode private[akka] (
.setActorAddress(actorAddress)
.build
nodesForActorsInUseWithAddress(actorAddress) foreach { node
nodesForActorsInUseWithAddress(actorAddress) foreach {
node
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
@ -945,14 +976,16 @@ class DefaultClusterNode private[akka] (
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) {
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach { case (_, address) clusterActorRefs.put(address, actorRef) }
addresses foreach {
case (_, address) clusterActorRefs.put(address, actorRef)
}
actorRef.start()
} else throw new ClusterException("Not connected to cluster")
@ -970,7 +1003,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
} else Array.empty[UUID]
@ -982,7 +1015,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor id for the actor with a specific UUID.
*/
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) {
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
try {
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
} catch {
@ -999,7 +1032,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor UUIDs for actor ID.
*/
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
try {
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1012,7 +1045,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the node names of all actors in use with UUID.
*/
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) {
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
try {
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
} catch {
@ -1023,7 +1056,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1036,7 +1069,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) {
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
val uuids =
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
@ -1059,7 +1092,8 @@ class DefaultClusterNode private[akka] (
case e: ZkNoNodeException throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
}
ReflectiveAccess.getClassFor(serializerClassName) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
ReflectiveAccess.getClassFor(serializerClassName) match {
// FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
case Right(clazz) clazz.newInstance.asInstanceOf[Serializer]
case Left(error)
EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString))
@ -1242,44 +1276,35 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def membershipPathFor(node: String): String = "%s/%s".format(MEMBERSHIP_PATH, node)
private[cluster] def configurationPathFor(key: String): String = "%s/%s".format(CONFIGURATION_PATH, key)
private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_NODES_TO_PATH, actorAddress)
private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String = "%s/%s".format(actorAddressToNodesPathFor(actorAddress), nodeName)
private[cluster] def nodeToUuidsPathFor(node: String): String = "%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node)
private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String = "%s/%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node, uuid)
private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_REGISTRY_PATH, actorAddress)
private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "serializer")
private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "uuid")
private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String = "%s/%s".format(ACTOR_UUID_REGISTRY_PATH, uuid)
private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "node")
private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "address")
private[cluster] def actorUuidRegistryRemoteAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "remote-address")
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_PATH, actorAddress.replace('.', '_'))
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid)
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createZooKeeperPathStructureIfNeeded()
registerListeners()
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
/**
* Returns a random set with node names of size 'replicationFactor'.
@ -1295,7 +1320,8 @@ class DefaultClusterNode private[akka] (
"] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]")
val preferredNodes =
if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
if (actorAddress.isDefined) {
// use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
case Deploy(_, _, Clustered(nodes, _, _))
nodes map (node DeploymentConfig.nodeNameFor(node)) take replicationFactor
@ -1355,8 +1381,11 @@ class DefaultClusterNode private[akka] (
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
newlyDisconnectedMembershipNodes foreach { node
disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) address }))
newlyDisconnectedMembershipNodes foreach {
node
disconnectedConnections += (node -> (nodeConnections(node) match {
case (address, _) address
}))
}
if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
@ -1365,10 +1394,13 @@ class DefaultClusterNode private[akka] (
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
// add connections newly arrived nodes
newlyConnectedMembershipNodes foreach { node
if (!nodeConnections.contains(node)) { // only connect to each replica once
newlyConnectedMembershipNodes foreach {
node
if (!nodeConnections.contains(node)) {
// only connect to each replica once
remoteSocketAddressForNode(node) foreach { address
remoteSocketAddressForNode(node) foreach {
address
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
@ -1427,7 +1459,8 @@ class DefaultClusterNode private[akka] (
oldClusterNodes: List[String],
disconnectedConnections: Map[String, InetSocketAddress]) {
failedNodes.foreach { failedNodeName
failedNodes.foreach {
failedNodeName
val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName)
@ -1436,7 +1469,8 @@ class DefaultClusterNode private[akka] (
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail
(failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor?
(failedNodeIndex == myIndex + 1)) {
// Am I the leftmost successor?
// Takes the lead of migrating the actors. Not all to this node.
// All to this node except if the actor already resides here, then pick another node it is not already on.
@ -1444,7 +1478,8 @@ class DefaultClusterNode private[akka] (
// Yes I am the node to migrate the actor to (can only be one in the cluster)
val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList
actorUuidsForFailedNode.foreach { uuidAsString
actorUuidsForFailedNode.foreach {
uuidAsString
EventHandler.debug(this,
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
.format(failedNodeName, uuidAsString, nodeAddress.nodeName))
@ -1533,7 +1568,8 @@ class DefaultClusterNode private[akka] (
EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH))
}
basePaths.foreach { path
basePaths.foreach {
path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
@ -1578,7 +1614,7 @@ class DefaultClusterNode private[akka] (
override def resign() = self.resign()
override def isConnected = self.isConnected.get
override def isConnected = self.isConnected.isOn
override def getNodeAddres = self.nodeAddress
@ -1620,27 +1656,27 @@ class DefaultClusterNode private[akka] (
override def getConfigElementKeys = self.getConfigElementKeys.toArray
override def getMemberShipPathFor(node:String) = self.membershipPathFor(node)
override def getMemberShipPathFor(node: String) = self.membershipPathFor(node)
override def getConfigurationPathFor(key:String) = self.configurationPathFor(key)
override def getConfigurationPathFor(key: String) = self.configurationPathFor(key)
override def getActorAddresstoNodesPathFor(actorAddress:String) = self.actorAddressToNodesPathFor(actorAddress)
override def getActorAddresstoNodesPathFor(actorAddress: String) = self.actorAddressToNodesPathFor(actorAddress)
override def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String) = self.actorAddressToNodesPathFor(actorAddress, nodeName)
override def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String) = self.actorAddressToNodesPathFor(actorAddress, nodeName)
override def getNodeToUuidsPathFor(node:String) = self.nodeToUuidsPathFor(node)
override def getNodeToUuidsPathFor(node: String) = self.nodeToUuidsPathFor(node)
override def getNodeToUuidsPathFor(node:String, uuid:UUID) = self.nodeToUuidsPathFor(node, uuid)
override def getNodeToUuidsPathFor(node: String, uuid: UUID) = self.nodeToUuidsPathFor(node, uuid)
override def getActorAddressRegistryPathFor(actorAddress:String) = self.actorAddressRegistryPathFor(actorAddress)
override def getActorAddressRegistryPathFor(actorAddress: String) = self.actorAddressRegistryPathFor(actorAddress)
override def getActorAddressRegistrySerializerPathFor(actorAddress:String) = self.actorAddressRegistrySerializerPathFor(actorAddress)
override def getActorAddressRegistrySerializerPathFor(actorAddress: String) = self.actorAddressRegistrySerializerPathFor(actorAddress)
override def getActorAddressRegistryUuidPathFor(actorAddress:String) = self.actorAddressRegistryUuidPathFor(actorAddress)
override def getActorAddressRegistryUuidPathFor(actorAddress: String) = self.actorAddressRegistryUuidPathFor(actorAddress)
override def getActorUuidRegistryNodePathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid)
override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID)= self.actorUuidRegistryNodePathFor(uuid)
override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid)
override def getActorAddressToUuidsPathFor(actorAddress: String) = self.actorAddressToUuidsPathFor(actorAddress)
@ -1770,8 +1806,10 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
try {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { newActorRef
cluster.serializerForActor(actorAddress) foreach {
serializer
cluster.use(actorAddress, serializer) foreach {
newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
if (message.hasReplicateActorFromUuid) {
@ -1821,7 +1859,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
// deserialize the messages
val messages: Vector[AnyRef] = entriesAsBytes map { bytes
val messages: Vector[AnyRef] = entriesAsBytes map {
bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
@ -1831,7 +1870,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach { message
messages foreach {
message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
// FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
@ -1859,7 +1899,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case RELEASE
if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach {
address
cluster.release(address)
}
} else if (message.hasActorAddress) {

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -13,6 +13,9 @@ import Cluster._
import akka.actor._
import akka.actor.Actor._
import akka.config.Config
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ConcurrentHashMap
import akka.util.Duration
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
@ -106,12 +109,14 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
//todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null)
val replies = collection.mutable.Map.empty[String, Int]
val replies = new ConcurrentHashMap[String,AtomicInteger]()
def count(reply: String) = {
if (replies.get(reply).isEmpty) replies.put(reply, 1)
else replies.put(reply, replies(reply) + 1)
val counter = new AtomicInteger(0)
Option(replies.putIfAbsent(reply, counter)).getOrElse(counter).incrementAndGet()
}
implicit val timeout = Timeout(Duration(20, "seconds"))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
@ -121,8 +126,8 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
replies("World from node [node1]") must equal(4)
replies("World from node [node2]") must equal(4)
replies.get("World from node [node1]").get must equal(4)
replies.get("World from node [node2]").get must equal(4)
}
node.shutdown()

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -561,6 +561,34 @@ The actor has a well-defined non-circular life-cycle.
=> STARTED (when 'start' is invoked) - can receive messages
=> SHUT DOWN (when 'exit' or 'stop' is invoked) - can't do anything
Actors and exceptions
---------------------
It can happen that while a message is being processed by an actor, that some kind of exception is thrown, e.g. a
database exception.
What happens to the Message
^^^^^^^^^^^^^^^^^^^^^^^^^^^
If an exception is thrown while a message is being processed (so taken of his mailbox and handed over the the receive),
then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to
retry processing of a message, you need to deal with it yourself by catching the exception and retry your flow. Make
sure that you put a bound on the number of retries since you don't want a system to livelock (so consuming a lot of
cpu cycles without making progress).
What happens to the mailbox
^^^^^^^^^^^^^^^^^^^^^^^^^^^
If an exception is thrown while a message is being processed, nothing happens to the mailbox. If the actor is restarted,
the same mailbox will be there. So all messages on that mailbox, will be there as well.
What happens to the actor
^^^^^^^^^^^^^^^^^^^^^^^^^
If an exception is thrown and the actor is supervised, the actor object itself is discarded and a new instance is
created. This new instance will now be used in the actor references to this actor (so this is done invisible
to the developer).
If the actor is _not_ supervised, but its lifeCycle is set to Permanent (default), it will just keep on processing messages as if nothing had happened.
If the actor is _not_ supervised, but its lifeCycle is set to Temporary, it will be stopped immediately.
Extending Actors using PartialFunction chaining
-----------------------------------------------