Merge branch 'master' into wip-derekjw

Conflicts:
	akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
	akka-actor/src/main/scala/akka/actor/ActorRef.scala
This commit is contained in:
Derek Williams 2011-07-26 12:50:09 -06:00
commit 6d343b01f0
130 changed files with 1957 additions and 869 deletions

View file

@ -14,6 +14,8 @@ import sjson.json._
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def identifier = 2:Byte
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
@ -31,6 +33,8 @@ object ProtobufSerializer extends ProtobufSerializer
class JavaJSONSerializer extends Serializer {
private val mapper = new ObjectMapper
def identifier = 3:Byte
def toBinary(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
@ -54,6 +58,8 @@ object JavaJSONSerializer extends JavaJSONSerializer
class SJSONSerializer extends Serializer {
def identifier = 4:Byte
def toBinary(obj: AnyRef): Array[Byte] =
sjson.json.Serializer.SJSON.out(obj)

View file

@ -2,6 +2,6 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) {
getContext().replySafe("got it!");
getContext().tryReply("got it!");
}
}

View file

@ -0,0 +1,28 @@
package akka;
import akka.actor._
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec;
/**
* A spec that verified that the AkkaException has at least a single argument constructor of type String.
*
* This is required to make Akka Exceptions be friends with serialization/deserialization.
*/
class AkkaExceptionSpec extends WordSpec with MustMatchers {
"AkkaException" must {
"have a AkkaException(String msg) constructor to be serialization friendly" in {
//if the call to this method completes, we know what there is at least a single constructor which has
//the expected argument type.
verify(classOf[AkkaException])
//lets also try it for the exception that triggered this bug to be discovered.
verify(classOf[ActorKilledException])
}
}
def verify(clazz:java.lang.Class[_]):Unit = {
clazz.getConstructor(Array(classOf[String]): _*)
}
}

View file

@ -305,8 +305,8 @@ class ActorRefSpec extends WordSpec with MustMatchers {
val ref = Actor.actorOf(
new Actor {
def receive = {
case 5 self reply_? "five"
case null self reply_? "null"
case 5 self tryReply "five"
case null self tryReply "null"
}
}).start()
@ -334,7 +334,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable) = latch.countDown()
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import Actor.actorOf
import akka.testkit._
import akka.util.duration._
import akka.config.Supervision.OneForOneStrategy
import java.util.concurrent.atomic._
object ActorRestartSpec {
private var _gen = new AtomicInteger(0)
def generation = _gen.incrementAndGet
def generation_=(x: Int) { _gen.set(x) }
sealed trait RestartType
case object Normal extends RestartType
case object Nested extends RestartType
case object Handover extends RestartType
case object Fail extends RestartType
class Restarter(val testActor: ActorRef) extends Actor {
val gen = generation
var xx = 0
var restart: RestartType = Normal
def receive = {
case x: Int xx = x
case t: RestartType restart = t
case "get" self reply xx
}
override def preStart { testActor ! (("preStart", gen)) }
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) }
override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) }
override def freshInstance() = {
restart match {
case Normal None
case Nested
val ref = TestActorRef(new Actor {
def receive = { case _ }
override def preStart { testActor ! ((this, self)) }
}).start()
testActor ! ((ref.underlyingActor, ref))
None
case Handover
val fresh = new Restarter(testActor)
fresh.xx = xx
Some(fresh)
case Fail
throw new IllegalActorStateException("expected")
}
}
}
class Supervisor extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
def receive = {
case _
}
}
}
class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
import ActorRestartSpec._
override def beforeEach { generation = 0 }
override def afterEach {
val it = toStop.iterator
while (it.hasNext) {
it.next.stop()
it.remove
}
}
private var toStop = new java.util.concurrent.ConcurrentSkipListSet[ActorRef]
private def newActor(f: Actor): ActorRef = {
val ref = actorOf(f)
toStop add ref
ref.start()
}
"An Actor restart" must {
"invoke preRestart, preStart, postRestart" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
"support creation of nested actors in freshInstance()" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
"use freshInstance() if available" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 42)
}
"fall back to default factory if freshInstance() fails" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 0)
}
}
}

View file

@ -0,0 +1,59 @@
package akka.actor.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.config.Config
class ClusterSpec extends WordSpec with MustMatchers {
"A Deployer" must {
"be able to parse 'akka.actor.cluster._' config elements" in {
import Config.config._
//akka.cluster
getString("akka.cluster.name") must equal(Some("test-cluster"))
getString("akka.cluster.zookeeper-server-addresses") must equal(Some("localhost:2181"))
getInt("akka.cluster.server.port") must equal(Some(2552))
getInt("akka.cluster.max-time-to-wait-until-connected") must equal(Some(30))
getInt("akka.cluster.session-timeout") must equal(Some(60))
getInt("akka.cluster.connection-timeout") must equal(Some(60))
getBool("akka.cluster.use-compression") must equal(Some(false))
getInt("akka.cluster.connection-timeout") must equal(Some(60))
getInt("akka.cluster.remote-daemon-ack-timeout") must equal(Some(30))
getBool("akka.cluster.include-ref-node-in-replica-set") must equal(Some(true))
getString("akka.cluster.compression-scheme") must equal(Some("zlib"))
getInt("akka.cluster.zlib-compression-level") must equal(Some(6))
getString("akka.cluster.layer") must equal(Some("akka.remote.netty.NettyRemoteSupport"))
getString("akka.cluster.secure-cookie") must equal(Some(""))
getString("akka.cluster.log-directory") must equal(Some("_akka_cluster"))
//akka.cluster.replication
getString("akka.cluster.replication.digest-type") must equal(Some("MAC"))
getString("akka.cluster.replication.password") must equal(Some("secret"))
getInt("akka.cluster.replication.ensemble-size") must equal(Some(3))
getInt("akka.cluster.replication.quorum-size") must equal(Some(2))
getInt("akka.cluster.replication.snapshot-frequency") must equal(Some(1000))
getInt("akka.cluster.replication.timeout") must equal(Some(30))
//akka.cluster.server
getInt("akka.cluster.server.port") must equal(Some(2552))
getInt("akka.cluster.server.message-frame-size") must equal(Some(1048576))
getInt("akka.cluster.server.connection-timeout") must equal(Some(1))
getBool("akka.cluster.server.require-cookie") must equal(Some(false))
getBool("akka.cluster.server.untrusted-mode") must equal(Some(false))
getInt("akka.cluster.server.backlog") must equal(Some(4096))
getInt("akka.cluster.server.execution-pool-keepalive") must equal(Some(60))
getInt("akka.cluster.server.execution-pool-size") must equal(Some(16))
getInt("akka.cluster.server.max-channel-memory-size") must equal(Some(0))
getInt("akka.cluster.server.max-total-memory-size") must equal(Some(0))
//akka.cluster.client
getBool("akka.cluster.client.buffering.retry-message-send-on-failure") must equal(Some(true))
getInt("akka.cluster.client.buffering.capacity") must equal(Some(-1))
getInt("akka.cluster.client.reconnect-delay") must equal(Some(5))
getInt("akka.cluster.client.read-timeout") must equal(Some(10))
getInt("akka.cluster.client.reap-futures-delay") must equal(Some(5))
getInt("akka.cluster.client.reconnection-time-window") must equal(Some(600))
}
}
}

View file

@ -14,13 +14,14 @@ class DeployerSpec extends WordSpec with MustMatchers {
"be able to parse 'akka.actor.deployment._' config elements" in {
val deployment = Deployer.lookupInConfig("service-ping")
deployment must be('defined)
deployment must equal(Some(
Deploy(
"service-ping",
LeastCPU,
Clustered(
Vector(Node("node1")),
Replicate(3),
ReplicationFactor(3),
Replication(
TransactionLog,
WriteThrough)))))

View file

@ -32,7 +32,7 @@ object FSMTransitionSpec {
case Ev("reply") stay replying "reply"
}
initialize
override def preRestart(reason: Throwable) { target ! "restarted" }
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
}
class Forwarder(target: ActorRef) extends Actor {

View file

@ -106,9 +106,9 @@ object IOActorSpec {
case msg: NewClient self startLink createWorker forward msg
case ('set, key: String, value: ByteString)
kvs += (key -> value)
self reply_? (())
case ('get, key: String) self reply_? kvs.get(key)
case 'getall self reply_? kvs
self tryReply (())
case ('get, key: String) self tryReply kvs.get(key)
case 'getall self tryReply kvs
}
}
@ -124,15 +124,15 @@ object IOActorSpec {
def receiveIO = {
case ('set, key: String, value: ByteString)
socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
self reply_? readResult
self tryReply readResult
case ('get, key: String)
socket write ByteString("GET " + key + "\r\n")
self reply_? readResult
self tryReply readResult
case 'getall
socket write ByteString("GETALL\r\n")
self reply_? readResult
self tryReply readResult
}
def readResult = {

View file

@ -39,6 +39,8 @@ object TypedActorSpec {
def incr()
def read(): Int
def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected")
}
class Bar extends Foo with Serializable {
@ -307,7 +309,7 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
"be able to serialize and deserialize invocations' parameters" in {
import java.io._
val someFoo: Foo = new Bar
val m = MethodCall(classOf[Foo].getDeclaredMethod("futureComposePigdogFrom", Array[Class[_]](classOf[Foo]): _*), Array[AnyRef](someFoo))
val m = MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
@ -319,9 +321,12 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
val mNew = in.readObject().asInstanceOf[MethodCall]
mNew.method must be(m.method)
mNew.parameters must have size 1
mNew.parameters must have size 3
mNew.parameters(0) must not be null
mNew.parameters(0).getClass must be === classOf[Bar]
mNew.parameters(1) must be(null)
mNew.parameters(2) must not be null
mNew.parameters(2).asInstanceOf[Int] must be === 1
}
}
}

View file

@ -45,7 +45,7 @@ object SupervisorSpec {
def receive = {
case Ping
messageLog.put(PingMessage)
self.reply_?(PongMessage)
self.tryReply(PongMessage)
case Die
throw new RuntimeException(ExceptionMessage)
}
@ -374,7 +374,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
def receive = {
case Ping self.reply_?(PongMessage)
case Ping self.tryReply(PongMessage)
case Die throw new Exception("expected")
}
})

View file

@ -25,7 +25,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
case Die throw new Exception(self.address + " is dying...")
}
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
log += self.address
}
}

View file

@ -65,12 +65,12 @@ object Ticket669Spec {
case msg throw new Exception("test")
}
override def preRestart(reason: scala.Throwable) {
self.reply_?("failure1")
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("failure1")
}
override def postStop() {
self.reply_?("failure2")
self.tryReply("failure2")
}
}
}

View file

@ -20,7 +20,7 @@ object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class TryReply(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
@ -73,7 +73,7 @@ object ActorModelSpec {
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case TryReply(msg) ack; self.tryReply(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()

View file

@ -275,7 +275,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"fold" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
val timeout = 10000
@ -286,7 +286,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"fold by composing" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
@ -300,7 +300,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self reply_? add
self tryReply add
}
}).start()
}
@ -332,7 +332,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"shouldReduceResults" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self reply_? add }
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); self tryReply add }
}).start()
}
val timeout = 10000
@ -347,7 +347,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self reply_? add
self tryReply add
}
}).start()
}

View file

@ -33,7 +33,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
def receive = {
case i: Int acc = i :: acc
case 'Result self reply_? acc
case 'Result self tryReply acc
}
}).start()

View file

@ -29,12 +29,14 @@ trait BenchmarkScenarios extends PerformanceTest {
def complexScenario80 = complexScenario(80)
@Test
def complexScenario100 = complexScenario(100)
/*
@Test
def complexScenario200 = complexScenario(200)
@Test
def complexScenario300 = complexScenario(300)
@Test
def complexScenario400 = complexScenario(400)
*/
def complexScenario(numberOfClients: Int) {
Assume.assumeTrue(numberOfClients >= minClients)

View file

@ -7,11 +7,13 @@ import akka.event.EventHandler
trait OrderReceiver {
type ME
val matchingEngines: List[ME]
var matchingEnginePartitionsIsStale = true
var matchingEngineForOrderbook: Map[String, ME] = Map()
def refreshMatchingEnginePartitions() {
def refreshMatchingEnginePartitions(routing: MatchingEngineRouting[ME]) {
val matchingEngines: List[ME] = routing.mapping.keys.toList
def supportedOrderbooks(me: ME): List[String] = routing.mapping(me)
val m = Map() ++
(for {
me matchingEngines
@ -19,14 +21,11 @@ trait OrderReceiver {
} yield (orderbookSymbol, me))
matchingEngineForOrderbook = m
matchingEnginePartitionsIsStale = false
}
def supportedOrderbooks(me: ME): List[String]
}
class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher])
class AkkaOrderReceiver(disp: Option[MessageDispatcher])
extends Actor with OrderReceiver {
type ME = ActorRef
@ -34,21 +33,13 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp
self.dispatcher = d
}
override val matchingEngines: List[ActorRef] = matchingEngineRouting.keys.toList
override def preStart() {
refreshMatchingEnginePartitions()
}
def receive = {
case routing@MatchingEngineRouting(mapping)
refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]])
case order: Order placeOrder(order)
case unknown EventHandler.warning(this, "Received unknown message: " + unknown)
}
override def supportedOrderbooks(me: ActorRef): List[String] = {
matchingEngineRouting(me)
}
def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
@ -60,3 +51,5 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp
}
}
}
case class MatchingEngineRouting[ME](mapping: Map[ME, List[String]])

View file

@ -1,7 +1,5 @@
package akka.performance.trading.common
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Random
import scala.collection.immutable.TreeMap
@ -12,11 +10,13 @@ import org.junit.After
import org.junit.Before
import org.scalatest.junit.JUnitSuite
import akka.event.EventHandler
import akka.performance.trading.domain.Ask
import akka.performance.trading.domain.Bid
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.BenchResultRepository
import akka.performance.workbench.Report
import akka.performance.workbench.Stats
trait PerformanceTest extends JUnitSuite {

View file

@ -1,179 +0,0 @@
package akka.performance.trading.common
import java.io.File
import java.text.SimpleDateFormat
import java.io.PrintWriter
import java.io.FileWriter
import akka.event.EventHandler
import java.util.Date
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private def log = System.getProperty("benchmark.logResult", "false").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = if (dirExists) {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
sb.append(formatResultsTable(statistics))
sb.append("\n</pre>\n")
sb.append(img(percentilesChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesChart(stats).foreach(url sb.append(img(url)))
}
if (dirExists) {
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val name = current.name + "--" + timestamp + ".html"
write(sb.toString, name)
}
}
private def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def comparePercentilesChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
}
def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
if (log) EventHandler.info(this, formattedStats)
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def write(content: String, fileName: String) {
val f = new File(dir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -75,7 +75,7 @@ class AkkaTradingSystem extends TradingSystem {
(1 to 10).toList map (i createOrderReceiver())
}
def matchingEngineRouting: Map[ActorRef, List[String]] = {
def matchingEngineRouting: MatchingEngineRouting[ActorRef] = {
val rules =
for {
info matchingEngines
@ -84,11 +84,11 @@ class AkkaTradingSystem extends TradingSystem {
(info.primary, orderbookSymbols)
}
Map() ++ rules
MatchingEngineRouting(Map() ++ rules)
}
def createOrderReceiver() =
actorOf(new AkkaOrderReceiver(matchingEngineRouting, orDispatcher))
actorOf(new AkkaOrderReceiver(orDispatcher))
override def start() {
for (MatchingEngineInfo(p, s, o) matchingEngines) {
@ -97,7 +97,11 @@ class AkkaTradingSystem extends TradingSystem {
s.foreach(_.start())
s.foreach(p ! _)
}
orderReceivers.foreach(_.start())
val routing = matchingEngineRouting
for (or orderReceivers) {
or.start()
or ! routing
}
}
override def shutdown() {

View file

@ -6,8 +6,8 @@ import akka.event.EventHandler
import akka.performance.trading.domain._
import akka.performance.trading.common.AkkaOrderReceiver
class OneWayOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher])
extends AkkaOrderReceiver(matchingEngineRouting, disp) {
class OneWayOrderReceiver(disp: Option[MessageDispatcher])
extends AkkaOrderReceiver(disp) {
override def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)

View file

@ -11,6 +11,6 @@ class OneWayTradingSystem extends AkkaTradingSystem {
actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceiver() =
actorOf(new OneWayOrderReceiver(matchingEngineRouting, orDispatcher))
actorOf(new OneWayOrderReceiver(orDispatcher))
}

View file

@ -1,16 +1,18 @@
package akka.performance.trading.common
package akka.performance.workbench
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.FileWriter
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.io.PrintWriter
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{ Map MutableMap }
import scala.collection.mutable.{Map => MutableMap}
import akka.event.EventHandler
@ -23,6 +25,10 @@ trait BenchResultRepository {
def getWithHistorical(name: String, load: Int): Seq[Stats]
def saveHtmlReport(content: String, name: String): Unit
def htmlReportUrl(name: String): String
}
object BenchResultRepository {
@ -34,8 +40,10 @@ class FileBenchResultRepository extends BenchResultRepository {
private val statsByName = MutableMap[String, Seq[Stats]]()
private val baselineStats = MutableMap[Key, Stats]()
private val historicalStats = MutableMap[Key, Seq[Stats]]()
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private val serDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/ser"
private def serDirExists: Boolean = new File(serDir).exists
private val htmlDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/html"
private def htmlDirExists: Boolean = new File(htmlDir).exists
protected val maxHistorical = 7
case class Key(name: String, load: Int)
@ -64,10 +72,10 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def loadFiles() {
if (dirExists) {
if (serDirExists) {
val files =
for {
f new File(dir).listFiles
f new File(serDir).listFiles
if f.isFile
if f.getName.endsWith(".ser")
} yield f
@ -86,11 +94,11 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def save(stats: Stats) {
new File(dir).mkdirs
if (!dirExists) return
new File(serDir).mkdirs
if (!serDirExists) return
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
val f = new File(dir, name)
val f = new File(serDir, name)
var out: ObjectOutputStream = null
try {
out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f)))
@ -127,5 +135,24 @@ class FileBenchResultRepository extends BenchResultRepository {
loadFiles()
def saveHtmlReport(content: String, fileName: String) {
new File(htmlDir).mkdirs
if (!htmlDirExists) return
val f = new File(htmlDir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def htmlReportUrl(fileName: String): String = new File(htmlDir, fileName).getAbsolutePath
}

View file

@ -1,10 +1,9 @@
package akka.performance.trading.common
package akka.performance.workbench
import java.io.UnsupportedEncodingException
import java.net.URLEncoder
import scala.collection.immutable.TreeMap
import java.util.Locale
import java.util.Formatter
/**
* Generates URLs to Google Chart API http://code.google.com/apis/chart/
@ -15,9 +14,9 @@ object GoogleChartBuilder {
val ChartHeight = 400
/**
* Builds a bar chart for all percentiles in the statistics.
* Builds a bar chart for all percentiles and the mean in the statistics.
*/
def percentilChartUrl(statistics: Seq[Stats], title: String, legend: Stats String): String = {
def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats String): String = {
if (statistics.isEmpty) return ""
val current = statistics.last
@ -38,6 +37,7 @@ object GoogleChartBuilder {
sb.append("&")
// labels
percentileLabels(current.percentiles, sb)
sb.append("|mean")
sb.append("|2:|min|mean|median")
sb.append("&")
// label positions
@ -63,7 +63,7 @@ object GoogleChartBuilder {
// data series
val maxValue = statistics.map(_.percentiles.last._2).max
sb.append("chd=t:")
dataSeries(statistics.map(_.percentiles), sb)
dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb)
// y range
sb.append("&")
@ -98,13 +98,18 @@ object GoogleChartBuilder {
sb.append(s)
}
private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], sb: StringBuilder) {
val series =
private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) {
val percentileSeries =
for {
percentiles allPercentiles
} yield {
percentiles.values.mkString(",")
}
val series =
for ((s, m) percentileSeries.zip(meanValues))
yield s + "," + formatDouble(m)
sb.append(series.mkString("|"))
}
@ -144,11 +149,11 @@ object GoogleChartBuilder {
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
sb.append("&")
sb.append("chco=")
val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900")
val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900")
sb.append(seriesColors.mkString(","))
sb.append("&")
// legend
sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput")
sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput")
sb.append("&")
sb.append("chdlp=b")
@ -160,6 +165,7 @@ object GoogleChartBuilder {
sb.append("chls=1|1|1")
sb.append("&")
// margins
sb.append("chma=5,5,5,25")
sb.append("&")
@ -181,6 +187,11 @@ object GoogleChartBuilder {
}
sb.append(percentileSeries.mkString("|"))
sb.append("|")
sb.append(loadStr).append("|")
val meanSeries = statistics.map(s formatDouble(s.mean)).mkString(",")
sb.append(meanSeries)
sb.append("|")
val maxTps: Double = statistics.map(_.tps).max
sb.append(loadStr).append("|")
@ -192,7 +203,7 @@ object GoogleChartBuilder {
// y range
sb.append("&")
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,")
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,")
.append(formatDouble(maxTps))
sb.append("&")
@ -203,6 +214,9 @@ object GoogleChartBuilder {
sb.append(",")
}
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(formatDouble(maxValue))
sb.append(",")
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(formatDouble(maxTps))
sb.append("&")

View file

@ -0,0 +1,220 @@
package akka.performance.workbench
import java.lang.management.ManagementFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.enumerationAsScalaIterator
import akka.event.EventHandler
import akka.config.Config
import akka.config.Config.config
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private def log = System.getProperty("benchmark.logResult", "true").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
val resultTable = formatResultsTable(statistics)
sb.append(resultTable)
sb.append("\n</pre>\n")
sb.append(img(percentilesAndMeanChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
sb.append("<hr/>\n")
sb.append("<pre>\n")
sb.append(systemInformation)
sb.append("\n</pre>\n")
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val reportName = current.name + "--" + timestamp + ".html"
resultRepository.saveHtmlReport(sb.toString, reportName)
if (log) {
EventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName))
}
}
def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesAndMeanChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
chartUrl
}
def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name)
chartUrl
}
}
def compareWithHistoricalPercentiliesAndMeanChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def systemInformation: String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean
val threads = ManagementFactory.getThreadMXBean
val mem = ManagementFactory.getMemoryMXBean
val heap = mem.getHeapMemoryUsage
val sb = new StringBuilder
sb.append("Benchmark properties:")
import scala.collection.JavaConversions._
val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString)
for (name propNames if name.startsWith("benchmark")) {
sb.append("\n ").append(name).append("=").append(System.getProperty(name))
}
sb.append("\n")
sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion)
sb.append("\n")
sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor).
append(" ").append(runtime.getVmVersion)
sb.append("\n")
sb.append("Processors: ").append(os.getAvailableProcessors)
sb.append("\n")
sb.append("Load average: ").append(os.getSystemLoadAverage)
sb.append("\n")
sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")")
sb.append("\n")
sb.append("Heap: ").append(formatDouble(heap.getUsed.toDouble / 1024 / 1024)).
append(" (").append(formatDouble(heap.getInit.toDouble / 1024 / 1024)).
append(" - ").
append(formatDouble(heap.getMax.toDouble / 1024 / 1024)).
append(")").append(" MB")
sb.append("\n")
val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString("\n ")
sb.append("Args:\n ").append(args)
sb.append("\n")
sb.append("Akka version: ").append(Config.CONFIG_VERSION)
sb.append("\n")
sb.append("Akka config:")
for (key config.keys) {
sb.append("\n ").append(key).append("=").append(config(key))
}
sb.toString
}
def formatDouble(value: Double): String = {
new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -1,7 +1,8 @@
package akka.performance.trading.common
package akka.performance.workbench
import scala.collection.immutable.TreeMap
@SerialVersionUID(1L)
case class Stats(
name: String,
load: Int,

View file

@ -10,6 +10,7 @@ import akka.util.duration._
import akka.actor._
import akka.actor.Actor._
import akka.routing._
import akka.event.EventHandler
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.{ KeptPromise, Future }
@ -186,13 +187,13 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with SmallestMailboxSelector {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
def factory = actorOf(new Actor {
def receive = {
case _
count.incrementAndGet
latch.countDown()
self reply_? "success"
self tryReply "success"
}
}).start()
@ -226,7 +227,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"pass ticket #705" in {
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
@ -241,7 +242,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
def receive = {
case req: String {
sleepFor(10 millis)
self.reply_?("Response")
self.tryReply("Response")
}
}
})
@ -264,7 +265,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -329,7 +330,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -383,7 +384,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
@ -412,7 +413,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
delegates.clear()
val pool2 = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
@ -442,7 +443,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val latch = TestLatch(10)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -488,7 +489,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"support typed actors" in {
import RoutingSpec._
import TypedActor._
def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
@ -507,6 +508,214 @@ class RoutingSpec extends WordSpec with MustMatchers {
for ((i, r) results) r.get must equal(i * i)
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (4)
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die =>
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}

View file

@ -11,7 +11,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
"A ? call to an actor pool" should {
"reuse the proper timeout" in {
val actorPool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
@ -24,7 +24,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
def receive = {
case req: String
Thread.sleep(6000L)
self.reply_?("Response")
self.tryReply("Response")
}
})
}).start()

View file

@ -26,6 +26,8 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti
lazy val toLongString =
"%s: %s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString)
def this(msg:String) = this(msg, null);
def stackTraceToString = {
val trace = getStackTrace
val sb = new StringBuffer

View file

@ -5,6 +5,7 @@
package akka.actor
import DeploymentConfig._
import akka.experimental
import akka.dispatch._
import akka.config._
import Config._
@ -79,12 +80,29 @@ case class MaximumNumberOfRestartsWithinTimeRangeReached(
@BeanProperty lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause){
def this(msg:String) = this(msg, null);
}
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
class ActorKilledException private[akka] (message: String, cause: Throwable) extends AkkaException(message, cause){
def this(msg: String) = this(msg, null);
}
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
/**
* This message is thrown by default when an Actors behavior doesn't match a message
@ -477,13 +495,10 @@ object Actor extends ListenerManagement {
"Remote server is not running")
val isHomeNode = DeploymentConfig.isHomeNode(preferredHomeNodes)
val nrOfReplicas = DeploymentConfig.replicaValueFor(replicas)
def serializerErrorDueTo(reason: String) = throw new akka.config.ConfigurationException(
"Could not create Serializer for actor [" + address + "] due to: " + reason)
val nrOfReplicas = replicas.factor
val serializer: Serializer =
Serialization.serializerFor(this.getClass).fold(x serializerErrorDueTo(x.toString), s s)
Serialization.serializerFor(this.getClass)
def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = {
// add actor to cluster registry (if not already added)
@ -680,9 +695,23 @@ trait Actor {
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
def preRestart(reason: Throwable, message: Option[Any]) {}
/**
* User overridable callback.
* <p/>
* Is called on the crashed Actor to give it the option of producing the
* Actor's reincarnation. If it returns None, which is the default, the
* initially provided actor factory is used.
* <p/>
* <b>Warning:</b> <i>Propagating state from a crashed actor carries the risk
* of proliferating the cause of the error. Consider let-it-crash first.</i>
*/
@experimental("1.2")
def freshInstance(): Option[Actor] = None
/**
* User overridable callback.

View file

@ -12,8 +12,6 @@ import akka.util._
import akka.serialization.{Serializer, Serialization}
import ReflectiveAccess._
import ClusterModule._
import DeploymentConfig.{TransactionLog TransactionLogConfig, _}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
@ -243,6 +241,13 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*/
def ask(message: AnyRef): Future[AnyRef] = ask(message, timeout, null)
/**
* Akka Java API. <p/>
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
* Uses the specified timeout (milliseconds)
*/
def ask(message: AnyRef, timeout: Long): Future[Any] = ask(message, timeout, null)
/**
* Akka Java API. <p/>
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
@ -274,24 +279,29 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
}
/**
* Akka Java API. <p/>
* Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* Akka Scala & Java API
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed. This method fails if the original sender of the message could not be determined with an
* IllegalStateException.
*
* If you don't want deal with this IllegalStateException, but just a boolean, just use the <code>tryReply(...)</code>
* version.
*
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def replyUnsafe(message: AnyRef) {
reply(message)
}
def reply(message: Any) = channel.!(message)(this)
/**
* Akka Java API. <p/>
* Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* Akka Scala & Java API
* Use <code>tryReply(..)</code> to try reply with a message to the original sender of the message currently
* being processed. This method
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*
* If you would rather have an exception, check the <code>reply(..)</code> version.
*/
def replySafe(message: AnyRef): Boolean = reply_?(message)
def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
@ -470,29 +480,22 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
"]")
private val serializer: Serializer =
Serialization.serializerFor(this.getClass).fold(x serializerErrorDueTo(x.toString), s s)
try { Serialization.serializerFor(this.getClass) } catch { case e: Exception => serializerErrorDueTo(e.toString)}
private lazy val replicationScheme: ReplicationScheme =
DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
private lazy val replicationStorage: Option[TransactionLog] = {
import DeploymentConfig._
val replicationScheme = replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
if(isReplicated(replicationScheme)) {
if (isReplicatedWithTransactionLog(replicationScheme)) {
EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]".format(address, replicationScheme))
private lazy val isReplicated: Boolean = DeploymentConfig.isReplicated(replicationScheme)
private lazy val isWriteBehindReplication: Boolean = DeploymentConfig.isWriteBehindReplication(replicationScheme)
private lazy val replicationStorage: Either[TransactionLog, AnyRef] = {
if (DeploymentConfig.isReplicatedWithTransactionLog(replicationScheme)) {
EventHandler.debug(this,
"Creating a transaction log for Actor [%s] with replication strategy [%s]"
.format(address, replicationScheme))
Left(transactionLog.newLogFor(_uuid.toString, isWriteBehindReplication, replicationScheme))
} else if (DeploymentConfig.isReplicatedWithDataGrid(replicationScheme)) {
Some(transactionLog.newLogFor(_uuid.toString, isWriteBehindReplication(replicationScheme), replicationScheme)) //TODO FIXME @jboner shouldn't this be address?
} else if (isReplicatedWithDataGrid(replicationScheme)) {
throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported")
} else {
throw new ConfigurationException("Unknown replication storage type [" + replicationScheme + "]")
}
} else None
}
// If it was started inside "newActor", initialize it
@ -586,9 +589,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
}
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
if (isReplicated) {
if (replicationStorage.isLeft) replicationStorage.left.get.delete()
}
if (replicationStorage.isDefined) replicationStorage.get.delete()
}
}
@ -717,9 +718,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
}
} finally {
guard.lock.unlock()
if (isReplicated) {
if (replicationStorage.isLeft) replicationStorage.left.get.recordEntry(messageHandle, this)
}
if (replicationStorage.isDefined) replicationStorage.get.recordEntry(messageHandle, this)
}
}
@ -775,7 +774,8 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
def performRestart() {
val failedActor = actorInstance.get
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
failedActor.preRestart(reason)
val message = if (currentMessage ne null) Some(currentMessage.message) else None
failedActor.preRestart(reason, message)
val freshActor = newActor
setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
@ -798,7 +798,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
lifeCycle match {
case Temporary
shutDownTemporaryActor(this)
shutDownTemporaryActor(this, reason)
true
case _ // either permanent or none where default is permanent
@ -841,7 +841,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
val actorRef = i.next
actorRef.lifeCycle match {
// either permanent or none where default is permanent
case Temporary shutDownTemporaryActor(actorRef)
case Temporary shutDownTemporaryActor(actorRef, reason)
case _ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
@ -856,7 +856,20 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
if (_status == ActorRefInternals.BEING_RESTARTED) {
val a = actor
val fresh = try a.freshInstance catch {
case e
EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory")
None
}
fresh match {
case Some(ref) ref
case None actorFactory()
}
} else {
actorFactory()
}
} finally {
val stackAfter = refStack.get
if (stackAfter.nonEmpty)
@ -867,9 +880,11 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
case valid valid
}
private def shutDownTemporaryActor(temporaryActor: ActorRef) {
private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) {
temporaryActor.stop()
_linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
// when this comes down through the handleTrapExit path, we get here when the temp actor is restarted
notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason))
// if last temporary actor is gone, then unlink me from supervisor
if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
true
@ -886,7 +901,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, reason))
else {
lifeCycle match {
case Temporary shutDownTemporaryActor(this)
case Temporary shutDownTemporaryActor(this, reason)
case _ dispatcher.resume(this) //Resume processing for this actor
}
}
@ -1008,7 +1023,7 @@ private[akka] case class RemoteActorRef private[akka](
case _ None
}
val chFuture = channel match {
case f: Promise[Any] Some(f)
case f: Promise[_] Some(f.asInstanceOf[Promise[Any]])
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout.duration.toMillis, false, this, loader)
@ -1226,22 +1241,6 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
} else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start()' before using it")
}
/**
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = channel.!(message)(this)
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def reply_?(message: Any): Boolean = channel.safe_!(message)(this)
}
case class SerializedActorRef(uuid: Uuid,

View file

@ -11,9 +11,7 @@ import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
import akka.config.{ ConfigurationException, Config }
import akka.routing.RouterType
import akka.util.ReflectiveAccess._
import akka.serialization._
import akka.AkkaException
/**
@ -204,15 +202,15 @@ object Deployer {
// --------------------------------
// akka.actor.deployment.<address>.clustered.replicas
// --------------------------------
val replicas = {
if (router == Direct) Replicate(1)
val replicationFactor = {
if (router == Direct) ReplicationFactor(1)
else {
clusteredConfig.getAny("replicas", "0") match {
case "auto" AutoReplicate
case "0" NoReplicas
clusteredConfig.getAny("replication-factor", "0") match {
case "auto" AutoReplicationFactor
case "0" ZeroReplicationFactor
case nrOfReplicas: String
try {
Replicate(nrOfReplicas.toInt)
ReplicationFactor(nrOfReplicas.toInt)
} catch {
case e: NumberFormatException
throw new ConfigurationException(
@ -229,7 +227,7 @@ object Deployer {
// --------------------------------
clusteredConfig.getSection("replication") match {
case None
Some(Deploy(address, router, Clustered(preferredNodes, replicas, Transient)))
Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Transient)))
case Some(replicationConfig)
val storage = replicationConfig.getString("storage", "transaction-log") match {
@ -248,7 +246,7 @@ object Deployer {
".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]")
}
Some(Deploy(address, router, Clustered(preferredNodes, replicas, Replication(storage, strategy))))
Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy))))
}
}
}

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.config.Config
import akka.routing.RouterType
import akka.serialization.Serializer
/**
* Module holding the programmatic deployment configuration classes.
@ -53,7 +52,7 @@ object DeploymentConfig {
sealed trait Scope
case class Clustered(
preferredNodes: Iterable[Home] = Vector(Host("localhost")),
replicas: Replicas = NoReplicas,
replicas: ReplicationFactor = ZeroReplicationFactor,
replication: ReplicationScheme = Transient) extends Scope
// For Java API
@ -73,18 +72,17 @@ object DeploymentConfig {
// --------------------------------
// --- Replicas
// --------------------------------
sealed trait Replicas
case class Replicate(factor: Int) extends Replicas {
if (factor < 1) throw new IllegalArgumentException("Replicas factor can not be negative or zero")
sealed case class ReplicationFactor(val factor: Int) {
if (factor < 0) throw new IllegalArgumentException("replication-factor can not be negative")
}
// For Java API
case class AutoReplicate() extends Replicas
case class NoReplicas() extends Replicas
case class AutoReplicationFactor() extends ReplicationFactor(-1)
case class ZeroReplicationFactor() extends ReplicationFactor(0)
// For Scala API
case object AutoReplicate extends Replicas
case object NoReplicas extends Replicas
case object AutoReplicationFactor extends ReplicationFactor(-1)
case object ZeroReplicationFactor extends ReplicationFactor(0)
// --------------------------------
// --- Replication
@ -141,14 +139,6 @@ object DeploymentConfig {
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == Config.nodename)
def replicaValueFor(replicas: Replicas): Int = replicas match {
case Replicate(replicas) replicas
case AutoReplicate -1
case AutoReplicate() -1
case NoReplicas 0
case NoReplicas() 0
}
def routerTypeFor(routing: Routing): RouterType = routing match {
case Direct RouterType.Direct
case Direct() RouterType.Direct

View file

@ -22,7 +22,6 @@ import java.util.concurrent._
import java.lang.RuntimeException
object Scheduler {
import Actor._
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e)

View file

@ -10,10 +10,11 @@ import ReflectiveAccess._
import Actor._
import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap }
import java.net.InetSocketAddress
import akka.config.Supervision._
class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
/**
* Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class.

View file

@ -10,8 +10,8 @@ import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutExce
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.Serialization
import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation
import akka.serialization.{Serializer, Serialization}
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@ -89,33 +89,35 @@ object TypedActor {
}
} catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = {
val serializedParameters: Array[(Array[Byte],String)] = parameters match {
case null => null
case a if a.length == 0 => Array[(Array[Byte],String)]()
case a => a.map( {
case null => null
case value => Serialization.serializerFor(value.getClass).fold(throw _, s => (s.toBinary(value), s.getClass.getName))
})
}
new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters)
private def writeReplace(): AnyRef = parameters match {
case null => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
case ps if ps.length == 0 => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
case ps =>
val serializers: Array[Serializer] = ps map Serialization.findSerializerFor
val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length)
for(i <- 0 until serializers.length)
serializedParameters(i) = serializers(i) toBinary parameters(i) //Mutable for the sake of sanity
SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializers.map(_.identifier), serializedParameters)
}
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Array[Byte],String)]) {
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializerIdentifiers: Array[Serializer.Identifier], serializedParameters: Array[Array[Byte]]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = {
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null => null
case a if a.length == 0 => Array[AnyRef]()
case a => a.map( {
case null => null
case (bytes, serializerFQN) => Serialization.serializerOf(serializerFQN).fold(throw _, _.fromBinary(bytes))
})
case a =>
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for(i <- 0 until a.length)
deserializedParameters(i) = Serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
deserializedParameters
})
}
}

View file

@ -19,8 +19,8 @@ import akka.japi.{ Creator, Procedure }
* String msg = (String)message;
*
* if (msg.equals("UseReply")) {
* // Reply to original sender of message using the 'replyUnsafe' method
* getContext().replyUnsafe(msg + ":" + getContext().getUuid());
* // Reply to original sender of message using the 'reply' method
* getContext().reply(msg + ":" + getContext().getUuid());
*
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
@ -102,7 +102,7 @@ abstract class UntypedActor extends Actor {
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
override def preRestart(reason: Throwable) {}
override def preRestart(reason: Throwable, lastMessage: Option[Any]) {}
/**
* User overridable callback.

View file

@ -10,18 +10,14 @@ import akka.actor._
import DeploymentConfig._
import akka.dispatch.Future
import akka.config.Config
import akka.util._
import akka.routing.RouterType
import akka.AkkaException
import com.eaio.uuid.UUID
import java.net.InetSocketAddress
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
import java.util.concurrent.{ ConcurrentSkipListSet}
import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._
class ClusterException(message: String) extends AkkaException(message)

View file

@ -10,8 +10,13 @@ import java.net.InetAddress
import com.eaio.uuid.UUID
class ConfigurationException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ModuleNotAvailableException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ConfigurationException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
class ModuleNotAvailableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg:String) = this(msg, null);
}
/**
* Loads up the configuration (from the akka.conf file).

View file

@ -64,7 +64,6 @@ class Configuration(val map: Map[String, Any]) {
private def outputIfDesiredAndReturnInput[T](key: String, t: T): T = {
if (Configuration.outputConfigSources)
println("Akka config is using default value for: " + key)
t
}
@ -149,7 +148,8 @@ class Configuration(val map: Map[String, Any]) {
getDouble(key).getOrElse(outputIfDesiredAndReturnInput(key, defaultValue))
def getBoolean(key: String): Option[Boolean] = {
getString(key) flatMap { s
getString(key) flatMap {
s
val isTrue = trueValues.contains(s)
if (!isTrue && !falseValues.contains(s)) None
else Some(isTrue)
@ -170,13 +170,19 @@ class Configuration(val map: Map[String, Any]) {
}
def apply(key: String, defaultValue: String) = getString(key, defaultValue)
def apply(key: String, defaultValue: Int) = getInt(key, defaultValue)
def apply(key: String, defaultValue: Long) = getLong(key, defaultValue)
def apply(key: String, defaultValue: Boolean) = getBool(key, defaultValue)
def getSection(name: String): Option[Configuration] = {
val l = name.length + 1
val m = map.collect { case (k, v) if k.startsWith(name) (k.substring(l), v) }
val pattern = name+"."
val m = map.collect {
case (k, v) if k.startsWith(pattern) (k.substring(l), v)
}
if (m.isEmpty) None
else Some(new Configuration(m))
}

View file

@ -197,14 +197,15 @@ object Dispatchers {
case "GlobalDispatcher" GlobalDispatcherConfigurator
case fqn
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
case r: Right[_, Class[MessageDispatcherConfigurator]]
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match {
case r: Right[Exception, MessageDispatcherConfigurator] r.b
case l: Left[Exception, MessageDispatcherConfigurator]
throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a)
case Right(clazz)
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
case Right(configurator) configurator
case Left(exception)
throw new IllegalArgumentException(
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
}
case l: Left[Exception, _]
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a)
case Left(exception)
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, exception)
}
} map {
_ configure cfg

View file

@ -103,8 +103,8 @@ object Futures {
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
f.value.get match {
case r: Right[Throwable, T]
val added = results add r.b
case Right(value)
val added = results add value
if (added && results.size == allDone) { //Only one thread can get here
if (done.switchOn) {
try {
@ -122,9 +122,9 @@ object Futures {
}
}
}
case l: Left[Throwable, T]
case Left(exception)
if (done.switchOn) {
result completeWithException l.a
result completeWithException exception
results.clear
}
}
@ -165,10 +165,8 @@ object Futures {
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case r: Right[Throwable, T]
result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
case l: Left[Throwable, T]
result.completeWithException(l.a)
case Right(value) result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
case Left(exception) result.completeWithException(exception)
}
}
}

View file

@ -68,6 +68,11 @@ trait MessageDispatcher {
*/
private[akka] def createMailbox(actorRef: ActorRef): AnyRef
/**
* Name of this dispatcher.
*/
def name: String
/**
* Attaches the specified actorRef to this dispatcher
*/

View file

@ -109,11 +109,12 @@ object EventHandler extends ListenerManagement {
case Nil "akka.event.EventHandler$DefaultListener" :: Nil
case listeners listeners
}
defaultListeners foreach { listenerName
defaultListeners foreach {
listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case r: Right[_, Class[Actor]] addListener(Actor.localActorOf(r.b).start())
case l: Left[Exception, _] throw l.a
case Right(actorClass) addListener(Actor.localActorOf(actorClass).start())
case Left(exception) throw exception
}
} catch {
case e: Exception
@ -181,6 +182,7 @@ object EventHandler extends ListenerManagement {
if (level >= InfoLevel) notifyListeners(Info(instance, message))
}
def debug(instance: AnyRef, message: String) {
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
}
@ -210,6 +212,7 @@ object EventHandler extends ListenerManagement {
}
class DefaultListener extends Actor {
import java.text.SimpleDateFormat
import java.util.Date

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka
import annotation.target._
/**
* This annotation marks a feature which is not yet considered stable and may
* change or be removed in a future release.
*
* @author Roland Kuhn
* @since 1.2
*/
@getter
@setter
@beanGetter
@beanSetter
final class experimental(since: String) extends annotation.StaticAnnotation

View file

@ -4,8 +4,9 @@
package akka.routing
import akka.actor.{ Actor, ActorRef, PoisonPill }
import akka.actor.{ Actor, ActorRef, PoisonPill, Death, MaximumNumberOfRestartsWithinTimeRangeReached }
import akka.dispatch.{ Promise }
import akka.config.Supervision._
/**
* Actor pooling
@ -35,44 +36,102 @@ object ActorPool {
* Defines the nature of an actor pool.
*/
trait ActorPool {
def instance(): ActorRef //Question, Instance of what?
def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value?
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set?
/**
* Adds a new actor to the pool. The DefaultActorPool implementation will start and link (supervise) this actor.
* This method is invoked whenever the pool determines it must boost capacity.
* @return A new actor for the pool
*/
def instance(): ActorRef
/**
* Returns the overall desired change in pool capacity. This method is used by non-static pools as the means
* for the capacity strategy to influence the pool.
* @param _delegates The current sequence of pooled actors
* @return the number of delegates by which the pool should be adjusted (positive, negative or zero)
*/
def capacity(delegates: Seq[ActorRef]): Int
/**
* Provides the results of the selector, one or more actors, to which an incoming message is forwarded.
* This method returns an iterator since a selector might return more than one actor to handle the message.
* You might want to do this to perform redundant processing of particularly error-prone messages.
* @param _delegates The current sequence of pooled actors
* @return a list of actors to which the message will be delivered
*/
def select(delegates: Seq[ActorRef]): Seq[ActorRef]
}
/**
* A default implementation of a pool, on each message to route,
* - checks the current capacity and adjusts accordingly if needed
* - routes the incoming message to a selection set of delegate actors
* Defines the configuration options for how the pool supervises the actors.
*/
trait DefaultActorPool extends ActorPool { this: Actor
trait ActorPoolSupervisionConfig {
/**
* Defines the default fault handling strategy to be employed by the pool.
*/
def poolFaultHandler: FaultHandlingStrategy
}
/**
* Provides a default implementation of the supervision configuration by
* defining a One-for-One fault handling strategy, trapping exceptions,
* limited to 5 retries within 1 second.
*
* This is just a basic strategy and implementors are encouraged to define
* something more appropriate for their needs.
*/
trait DefaultActorPoolSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
}
/**
* A default implementation of a pool that:
* First, invokes the pool's capacitor that tells it, based on the current delegate count
* and it's own heuristic by how many delegates the pool should be resized. Resizing can
* can be incremental, decremental or flat. If there is a change to capacity, new delegates
* are added or existing ones are removed. Removed actors are sent the PoisonPill message.
* New actors are automatically started and linked. The pool supervises the actors and will
* use the fault handling strategy specified by the mixed-in ActorPoolSupervisionConfig.
* Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
* look at the unit tests... Any delegate with a <b>Permanent</b> lifecycle will be
* restarted and the pool size will be level with what it was prior to the fault. In just
* about every other case, e.g. the delegates are <b>Temporary</b> or the delegate cannot be
* restarted within the time interval specified in the fault handling strategy, the pool will
* be temporarily shy by that actor (it will have been removed by not back-filled). The
* back-fill if any is required, will occur on the next message [as usual].
*
* Second, invokes the pool's selector that returns a list of delegates that are to receive
* the incoming message. Selectors may return more than one actor. If <i>partialFill</i>
* is true then it might also the case that fewer than number of desired actors will be
* returned.
*
* Lastly, routes by forwarding, the incoming message to each delegate in the selected set.
*/
trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervisionConfig
import ActorPool._
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
protected var _delegates = Vector[ActorRef]()
private var _lastCapacityChange = 0
private var _lastSelectorCount = 0
protected[akka] var _delegates = Vector[ActorRef]()
override def postStop() = _delegates foreach { delegate
override def preStart() {
self.faultHandler = poolFaultHandler
}
override def postStop() {
_delegates foreach { delegate
try {
delegate ! PoisonPill
} catch { case e: Exception } //Ignore any exceptions here
}
}
protected def _route(): Receive = {
// for testing...
case Stat
self reply_? Stats(_delegates length)
case max: MaximumNumberOfRestartsWithinTimeRangeReached
_delegates = _delegates filterNot { _.uuid == max.victim.uuid }
self tryReply Stats(_delegates length)
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case Death(victim, _) =>
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case msg
resizeIfAppropriate()
select(_delegates) match {
case (selectedDelegates, count)
_lastSelectorCount = count
selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors?
}
select(_delegates) foreach { _ forward msg }
}
private def resizeIfAppropriate() {
@ -95,14 +154,15 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
case _ _delegates //No change
}
_lastCapacityChange = requestedCapacity
_delegates = newDelegates
}
}
/**
* Selectors
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
*
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool.
* Note that it's acceptable to return more than one actor to handle a given message.
*/
/**
@ -112,7 +172,7 @@ trait SmallestMailboxSelector {
def selectionCount: Int
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
var set: Seq[ActorRef] = Nil
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
@ -121,7 +181,7 @@ trait SmallestMailboxSelector {
take -= set.size
}
(set.iterator, set.size)
set
}
}
@ -134,7 +194,7 @@ trait RoundRobinSelector {
def selectionCount: Int
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
val length = delegates.length
val take = if (partialFill) math.min(selectionCount, length)
else selectionCount
@ -145,13 +205,16 @@ trait RoundRobinSelector {
delegates(_last)
}
(set.iterator, set.size)
set
}
}
/**
* Capacitors
* These traits define how to alter the size of the pool
*
* These traits define how to alter the size of the pool according to some desired behavior.
* Capacitors are required (minimally) by the pool to establish bounds on the number of delegates
* that may exist in the pool.
*/
/**
@ -163,7 +226,13 @@ trait FixedSizeCapacitor {
}
/**
* Constrains the pool capacity to a bounded range
* Constrains the pool capacity to a bounded range.
* This capacitor employs 'pressure capacitors' (sorry for the unforunate confusing naming)
* to feed a 'pressure' delta into the capacity function. This measure is
* basically the difference between the current pressure level and a pre-established threshhold.
* When using this capacitor you must provide a method called 'pressure' or mix-in
* one of the PressureCapacitor traits below.
*
*/
trait BoundedCapacitor {
def lowerBound: Int
@ -196,21 +265,43 @@ trait MailboxPressureCapacitor {
*/
trait ActiveFuturesPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.channel.isInstanceOf[Promise[Any]] }
delegates count { _.channel.isInstanceOf[Promise[_]] }
}
/**
*
*/
trait CapacityStrategy {
import ActorPool._
/**
* This method returns a 'pressure level' that will be fed into the capacitor and
* evaluated against the established threshhold. For instance, in general, if
* the current pressure level exceeds the capacity of the pool, new delegates will
* be added.
*/
def pressure(delegates: Seq[ActorRef]): Int
/**
* This method can be used to smooth the response of the capacitor by considering
* the current pressure and current capacity.
*/
def filter(pressure: Int, capacity: Int): Int
protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}
/**
* Use this trait to setup a pool that uses a fixed delegate count.
*/
trait FixedCapacityStrategy extends FixedSizeCapacitor
/**
* Use this trait to setup a pool that may have a variable number of
* delegates but always within an established upper and lower limit.
*
* If mix this into your pool implementation, you must also provide a
* PressureCapacitor and a Filter.
*/
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
/**

View file

@ -43,6 +43,9 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B
* in.close()
* obj
* }
*
* def identifier: Byte = 111 //Pick a number and hope no one has chosen the same :-) 0 - 16 is reserved for Akka internals
*
* }
*
* val defaultSerializerName = Default.getClass.getName

View file

@ -19,27 +19,26 @@ case class NoSerializerFoundException(m: String) extends AkkaException(m)
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/
object Serialization {
//TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match {
case Left(ex) Left(ex)
case Right(serializer) Right(serializer.toBinary(o))
}
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception => Left(e) }
//TODO document me
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
serializerFor(clazz) match {
case Left(e) Left(e)
case Right(serializer) Right(serializer.fromBinary(bytes, Some(clazz), classLoader))
try { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } catch { case e: Exception => Left(e) }
def findSerializerFor(o: AnyRef): Serializer = o match {
case null => NullSerializer
case other => serializerFor(other.getClass)
}
//TODO document me
//TODO memoize the lookups
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = //TODO fall back on BestMatchClass THEN default
getClassFor(serializerMap.get(clazz.getName).getOrElse(serializers("default"))) match {
case Right(serializer) Right(serializer.newInstance.asInstanceOf[Serializer])
case Left(e) => Left(e)
}
def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups
serializerMap.get(clazz.getName).getOrElse(serializers("default"))
/**
* Tries to load the specified Serializer by the FQN
@ -64,16 +63,18 @@ object Serialization {
}
/**
* A Map of serializer from alias to implementation (FQN of a class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> "akka.serialization.JavaSerializer"
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config
*/
val serializers: Map[String, String] = config.getSection("akka.actor.serializers") map {
_.map.foldLeft(Map("default" -> "akka.serialization.JavaSerializer")) {
case (result, (k: String, v: String)) => result + (k -> v)
val serializers: Map[String, Serializer] =
config.getSection("akka.actor.serializers")
.map(_.map)
.getOrElse(Map())
.foldLeft(Map[String, Serializer]("default" -> akka.serialization.JavaSerializer)) {
case (result, (k: String, v: String)) => result + (k -> serializerOf(v).fold(throw _, identity))
case (result, _) => result
}
} getOrElse Map("default" -> "akka.serialization.JavaSerializer")
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
@ -88,5 +89,11 @@ object Serialization {
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class
*/
val serializerMap: Map[String, String] = bindings mapValues serializers
val serializerMap: Map[String, Serializer] = bindings mapValues serializers
/**
* Maps from a Serializer.Identifier (Byte) to a Serializer instance (optimization)
*/
val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) => (v.identifier,v) }
}

View file

@ -6,14 +6,40 @@ package akka.serialization
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
import akka.util.ClassLoaderObjectInputStream
import akka.actor.ActorRef
object Serializer {
val defaultSerializerName = classOf[JavaSerializer].getName
type Identifier = Byte
}
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object
*/
trait Serializer extends scala.Serializable {
/**
* Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic
* Values from 0 to 16 is reserved for Akka internal usage
*/
def identifier: Serializer.Identifier
/**
* Serializes the given object into an Array of Byte
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef
}
object JavaSerializer extends JavaSerializer
object NullSerializer extends NullSerializer
class JavaSerializer extends Serializer {
def identifier = 1:Byte
def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
@ -33,7 +59,11 @@ class JavaSerializer extends Serializer {
}
}
object JavaSerializer extends JavaSerializer
object Serializer {
val defaultSerializerName = JavaSerializer.getClass.getName
class NullSerializer extends Serializer {
val nullAsBytes = Array[Byte]()
def identifier = 0:Byte
def toBinary(o: AnyRef) = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
}

View file

@ -58,7 +58,7 @@ trait ProducerSupport { this: Actor ⇒
* Default implementation of <code>Actor.preRestart</code> for freeing resources needed
* to actually send messages to <code>endpointUri</code>.
*/
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
try { preRestartProducer(reason) } finally { processor.stop }
}

View file

@ -15,7 +15,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
getContext().tryReply(String.format("%s %s", body, header));
}
}

View file

@ -17,7 +17,7 @@ public class SampleUntypedConsumerBlocking extends UntypedConsumerActor {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
String header = msg.getHeaderAs("test", String.class);
getContext().replySafe(String.format("%s %s", body, header));
getContext().tryReply(String.format("%s %s", body, header));
}
}

View file

@ -251,12 +251,12 @@ object ConsumerScalaTest {
case "succeed" self.reply("ok")
}
override def preRestart(reason: scala.Throwable) {
self.reply_?("pr")
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("pr")
}
override def postStop {
self.reply_?("ps")
self.tryReply("ps")
}
}

View file

@ -703,8 +703,6 @@ class DefaultClusterNode private[akka](
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
val serializerClassName = serializer.getClass.getName
EventHandler.debug(this,
"Storing actor with address [%s] in cluster".format(actorAddress))
@ -739,9 +737,9 @@ class DefaultClusterNode private[akka](
// create ADDRESS -> SERIALIZER CLASS NAME mapping
try {
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString)
} catch {
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString)
}
// create ADDRESS -> NODE mapping
@ -1084,23 +1082,12 @@ class DefaultClusterNode private[akka](
/**
* Returns Serializer for actor with specific address.
*/
def serializerForActor(actorAddress: String): Serializer = {
val serializerClassName =
try {
zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String]
def serializerForActor(actorAddress: String): Serializer = try {
Serialization.serializerByIdentity(zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String].toByte)
} catch {
case e: ZkNoNodeException throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
}
ReflectiveAccess.getClassFor(serializerClassName) match {
// FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
case Right(clazz) clazz.newInstance.asInstanceOf[Serializer]
case Left(error)
EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString))
throw error
}
}
/**
* Returns addresses for nodes that the clustered actor is in use on.
*/
@ -1790,7 +1777,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason))
}
@ -1930,7 +1917,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Unit] try {
case f: Function0[_] try {
f()
} finally {
self.stop()
@ -1943,7 +1930,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Any] try {
case f: Function0[_] try {
self.reply(f())
} finally {
self.stop()
@ -1956,8 +1943,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {
fun(param)
case (fun: Function[_, _], param: Any) try {
fun.asInstanceOf[Any => Unit].apply(param)
} finally {
self.stop()
}
@ -1969,8 +1956,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {
self.reply(fun(param))
case (fun: Function[_, _], param: Any) try {
self.reply(fun.asInstanceOf[Any => Any](param))
} finally {
self.stop()
}

View file

@ -14,17 +14,16 @@ import akka.config._
import Config._
import akka.util._
import akka.actor._
import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind }
import DeploymentConfig.{ ReplicationScheme}
import akka.event.EventHandler
import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation }
import akka.remote.MessageSerializer
import akka.cluster.zookeeper._
import akka.serialization.{ Serializer, Serialization, Compression }
import akka.serialization.Compression
import Compression.LZF
import akka.serialization.ActorSerialization._
import java.util.Enumeration
import java.util.concurrent.atomic.AtomicLong
// FIXME allow user to choose dynamically between 'async' and 'sync' tx logging (asyncAddEntry(byte[] data, AddCallback cb, Object ctx))
// FIXME clean up old entries in log after doing a snapshot

View file

@ -118,23 +118,31 @@ class VersionedData(val data: Array[Byte], val version: Long) {}
/**
* An AkkaException thrown by the Storage module.
*/
class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause)
class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause){
def this(msg:String) = this(msg, null);
}
/**
* *
* A StorageException thrown when an operation is done on a non existing node.
*/
class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) {
def this(msg:String) = this(msg, null);
}
/**
* A StorageException thrown when an operation is done on an existing node, but no node was expected.
*/
class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause){
def this(msg:String) = this(msg, null);
}
/**
* A StorageException thrown when an operation causes an optimistic locking failure.
*/
class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) {
def this(msg:String) = this(msg, null);
}
/**
* A Storage implementation based on ZooKeeper.

View file

@ -6,6 +6,7 @@ package akka.remote
import akka.actor.{ Actor, BootableActorLoaderService }
import akka.util.{ ReflectiveAccess, Bootable }
import akka.event.EventHandler
/**
* This bundle/service is responsible for booting up and shutting down the remote actors facility.
@ -23,14 +24,19 @@ trait BootableRemoteActorService extends Bootable {
abstract override def onLoad() {
if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) {
EventHandler.info(this, "Initializing Remote Actors Service...")
startRemoteService()
EventHandler.info(this, "Remote Actors Service initialized")
}
super.onLoad()
}
abstract override def onUnload() {
EventHandler.info(this, "Shutting down Remote Actors Service")
Actor.remote.shutdown()
if (remoteServerThread.isAlive) remoteServerThread.join(1000)
EventHandler.info(this, "Remote Actors Service has been shut down")
super.onUnload()
}
}

View file

@ -4,7 +4,7 @@
package akka.remote.netty
import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future }
import akka.dispatch.{ActorPromise, DefaultPromise, Promise}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.serialization.RemoteActorSerialization
@ -12,7 +12,6 @@ import akka.serialization.RemoteActorSerialization._
import akka.remoteinterface._
import akka.actor.{
PoisonPill,
LocalActorRef,
Actor,
RemoteActorRef,
ActorRef,
@ -20,7 +19,6 @@ import akka.actor.{
RemoteActorSystemMessage,
uuidFrom,
Uuid,
Death,
LifeCycleMessage
}
import akka.actor.Actor._
@ -45,12 +43,13 @@ import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
import akka.AkkaException
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause){
def this(msg:String) = this(msg, null);
}
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -66,7 +65,8 @@ object RemoteEncoder {
}
}
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement
trait NettyRemoteClientModule extends RemoteClientModule {
self: ListenerManagement
private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
@ -88,37 +88,44 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
lock.readLock.lock
try {
val c = remoteClients.get(key) match {
case s: Some[RemoteClient] s.get
case Some(client) client
case None
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case s: Some[RemoteClient] s.get //If already populated by other writer
remoteClients.get(key) match {
//Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
client
}
} finally { lock.readLock.lock } //downgrade
} finally { lock.writeLock.unlock }
} finally {
lock.readLock.lock
} //downgrade
} finally {
lock.writeLock.unlock
}
}
fun(c)
} finally { lock.readLock.unlock }
} finally {
lock.readLock.unlock
}
}
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case s: Some[RemoteClient] s.get.shutdown()
case Some(client) client.shutdown()
case None false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case s: Some[RemoteClient] s.get.connect(reconnectIfAlreadyConnected = true)
case Some(client) client.connect(reconnectIfAlreadyConnected = true)
case None false
}
}
@ -133,7 +140,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
def shutdownRemoteClients() = lock withWriteGuard {
remoteClients.foreach({ case (addr, client) client.shutdown() })
remoteClients.foreach({
case (addr, client) client.shutdown()
})
remoteClients.clear()
}
}
@ -266,20 +275,23 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
private[remote] def sendPendingRequests() = pendingRequests synchronized {
// ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) { // sendOneWay
if (isOneWay) {
// sendOneWay
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
throw future.getCause
}
} else { // sendRequestReply
} else {
// sendRequestReply
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
@ -303,6 +315,7 @@ abstract class RemoteClient private[akka] (
class ActiveRemoteClient private[akka](
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: ( Any) Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@ -318,6 +331,7 @@ class ActiveRemoteClient private[akka] (
private var reconnectionTimeWindowStart = 0L
def notifyListeners(msg: Any): Unit = notifyListenersFun(msg)
def currentChannel = connection.getChannel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
@ -330,15 +344,18 @@ class ActiveRemoteClient private[akka] (
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress))
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this)
false
} else {
//Send cookie
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty)
@ -365,12 +382,16 @@ class ActiveRemoteClient private[akka] (
} match {
case true true
case false if reconnectIfAlreadyConnected
EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress))
openChannels.remove(connection.getChannel)
connection.getChannel.close
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress),this)
false
} else {
//Send cookie
@ -387,6 +408,8 @@ class ActiveRemoteClient private[akka] (
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown() = runSwitch switchOff {
EventHandler.info(this, "Shutting down [%s]".format(name))
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop()
timer = null
@ -396,6 +419,8 @@ class ActiveRemoteClient private[akka] (
bootstrap = null
connection = null
pendingRequests.clear()
EventHandler.info(this, "[%s] has been shut down".format(name))
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -403,7 +428,11 @@ class ActiveRemoteClient private[akka] (
reconnectionTimeWindowStart = System.currentTimeMillis
true
} else {
/*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
if (timeLeft) {
EventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft))
}
timeLeft
}
}
@ -457,20 +486,28 @@ class ActiveRemoteClientHandler(
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN spawn { client.module.shutdownClientConnection(remoteAddress) }
case CommandType.SHUTDOWN spawn {
client.module.shutdownClientConnection(remoteAddress)
}
}
case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]]
EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply))
EventHandler.debug(this, "Trying to map back to future: %s".format(replyUuid))
futures.remove(replyUuid).asInstanceOf[Promise[Any]] match {
case null =>
client.notifyListeners(RemoteClientError(new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module, client.remoteAddress))
case future =>
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
future.completeWithException(parseException(reply, client.loader))
}
}
case other
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
}
@ -492,13 +529,16 @@ class ActiveRemoteClientHandler(
}
}
}, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.module.shutdownClientConnection(remoteAddress) }
} else spawn {
client.module.shutdownClientConnection(remoteAddress)
}
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
try {
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable
@ -510,12 +550,20 @@ class ActiveRemoteClientHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
EventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress))
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
if (event.getCause ne null)
EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this)
else
EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event))
event.getCause match {
case e: ReadTimeoutException
spawn { client.module.shutdownClientConnection(remoteAddress) }
spawn {
client.module.shutdownClientConnection(remoteAddress)
}
case e
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
@ -560,7 +608,8 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
if (optimizeLocalScoped_?) {
if ((host == homeInetSocketAddress.getAddress.getHostAddress ||
host == homeInetSocketAddress.getHostName) &&
port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals?
port == homeInetSocketAddress.getPort) {
//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
@ -575,7 +624,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
import RemoteServerSettings._
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
@ -626,18 +677,18 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
}
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
import RemoteServerSettings._
trait NettyRemoteServerModule extends RemoteServerModule {
self: RemoteModule
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case s: Some[NettyRemoteServer] s.get.address
case Some(server) server.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
case s: Some[NettyRemoteServer] s.get.name
case Some(server) server.name
case None
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
@ -650,6 +701,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port))
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
@ -662,7 +715,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
def shutdownServerModule() = guard withGuard {
_isRunning switchOff {
currentServer.getAndSet(None) foreach { instance
currentServer.getAndSet(None) foreach {
instance
EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
instance.shutdown()
}
}
@ -707,7 +763,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid))
actors.remove(actorRef.address, actorRef)
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
}
@ -719,7 +778,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id))
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
val actorRef = actors get id
@ -735,7 +797,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String): Unit = {
if (_isRunning.isOn) {
EventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id))
actorsFactories.remove(id)
}
}
@ -750,6 +815,7 @@ class RemoteServerPipelineFactory(
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
def getPipeline: ChannelPipeline = {
@ -803,6 +869,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
@ -835,6 +902,8 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug(this,"Remote client [%s] connected to [%s]".format(clientAddress, server.name))
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
@ -842,12 +911,18 @@ class RemoteServerHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name))
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try { actor ! PoisonPill } catch { case e: Exception }
try {
actor ! PoisonPill
} catch {
case e: Exception EventHandler.error(e, "Couldn't stop %s".format(actor),this)
}
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
@ -855,6 +930,8 @@ class RemoteServerHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name),this)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
@ -870,6 +947,8 @@ class RemoteServerHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this)
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))
}
@ -891,8 +970,13 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
EventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid))
val actorRef =
try { createActor(actorInfo, channel) } catch {
try {
createActor(actorInfo, channel)
} catch {
case e: SecurityException
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
@ -905,7 +989,8 @@ class RemoteServerHandler(
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
message match { // first match on system messages
message match {
// first match on system messages
case RemoteActorSystemMessage.Stop
if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor")
else actorRef.stop()
@ -920,15 +1005,15 @@ class RemoteServerHandler(
request.getActorInfo.getTimeout,
new ActorPromise(request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request))
case r: Right[Throwable, Any]
case Left(exception) write(channel, createErrorReplyMessage(exception, request))
case r: Right[_, _]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
actorInfo.getTimeout,
r,
true,
r.asInstanceOf[Either[Throwable, Any]],
isOneWay = true,
Some(actorRef))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method

View file

@ -21,6 +21,7 @@ import java.net.InetSocketAddress
import com.google.protobuf.ByteString
import com.eaio.uuid.UUID
import akka.event.EventHandler
/**
* Module for local actor serialization.
@ -95,10 +96,10 @@ object ActorSerialization {
if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
val messages =
actorRef.mailbox match {
case q: java.util.Queue[MessageInvocation]
case q: java.util.Queue[_]
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val it = q.iterator
while (it.hasNext == true) l += it.next
while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation]
l
}
@ -142,6 +143,8 @@ object ActorSerialization {
overriddenUuid: Option[UUID],
loader: Option[ClassLoader]): ActorRef = {
EventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol))
val lifeCycle =
if (protocol.hasLifeCycle) {
protocol.getLifeCycle.getLifeCycle match {
@ -243,11 +246,17 @@ object RemoteActorSerialization {
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
RemoteActorRef(
EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol))
val ref = RemoteActorRef(
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress,
protocol.getTimeout,
loader)
EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid))
ref
}
/**
@ -263,6 +272,9 @@ object RemoteActorSerialization {
case _
ReflectiveAccess.RemoteModule.configDefaultAddress
}
EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress))
RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address)

View file

@ -57,16 +57,14 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode {
}
barrier("store-1-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s)
node.store("hello-world-1", classOf[HelloWorld1], serializer)
node.store("hello-world-1", classOf[HelloWorld1], Serialization.serializerFor(classOf[HelloWorld1]))
}
barrier("use-1-in-node-2", NrOfNodes) {
}
barrier("store-2-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s)
node.store("hello-world-2", classOf[HelloWorld1], false, serializer)
node.store("hello-world-2", classOf[HelloWorld1], false, Serialization.serializerFor(classOf[HelloWorld1]))
}
barrier("use-2-in-node-2", NrOfNodes) {

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
akka.cluster.replication.snapshot-frequency = 7

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
akka.cluster.replication.snapshot-frequency = 7

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 7

View file

@ -1,7 +1,7 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 7

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.repliction-factor = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -2,4 +2,4 @@ akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -291,13 +291,13 @@ Now we can create the worker actor. This is done by extending in the ``UntypedA
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
// reply with the result
getContext().replyUnsafe(new Result(result));
getContext().reply(new Result(result));
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
}
As you can see we have now created an ``UntypedActor`` with a ``onReceive`` method as a handler for the ``Work`` message. In this handler we invoke the ``calculatePiFor(..)`` method, wrap the result in a ``Result`` message and send it back to the original sender using ``getContext().replyUnsafe(..)``. In Akka the sender reference is implicitly passed along with the message so that the receiver can always reply or store away the sender reference for future use.
As you can see we have now created an ``UntypedActor`` with a ``onReceive`` method as a handler for the ``Work`` message. In this handler we invoke the ``calculatePiFor(..)`` method, wrap the result in a ``Result`` message and send it back to the original sender using ``getContext().reply(..)``. In Akka the sender reference is implicitly passed along with the message so that the receiver can always reply or store away the sender reference for future use.
The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method::
@ -587,7 +587,7 @@ Before we package it up and run it, let's take a look at the full code now, with
double result = calculatePiFor(work.getStart(), work.getNrOfElements())
// reply with the result
getContext().replyUnsafe(new Result(result));
getContext().reply(new Result(result));
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}

View file

@ -342,22 +342,22 @@ Supervised actors have the option to reply to the initial sender within preResta
// do something that may throw an exception
// ...
getContext().replySafe("ok");
getContext().tryReply("ok");
}
@Override
public void preRestart(Throwable reason) {
getContext().replySafe(reason.getMessage());
getContext().tryReply(reason.getMessage());
}
@Override
public void postStop() {
getContext().replySafe("stopped by supervisor");
getContext().tryReply("stopped by supervisor");
}
}
- A reply within preRestart or postRestart must be a safe reply via getContext().replySafe() because a getContext().replyUnsafe() will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via getContext().replySafe() because a getContext().replyUnsafe() will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
- A reply within preRestart or postRestart must be a safe reply via getContext().tryReply() because a getContext().reply() will throw an exception when the actor is restarted without having failed. This can be the case in context of AllForOne restart strategies.
- A reply within postStop must be a safe reply via getContext().tryReply() because a getContext().reply() will throw an exception when the actor has been stopped by the application (and not by a supervisor) after successful execution of receive (or no execution at all).
Handling too many actor restarts within a specific time limit
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -20,7 +20,7 @@ Step 1: Define the Actor
public class SerializationTestActor extends UntypedActor {
public void onReceive(Object msg) {
getContext().replySafe("got it!");
getContext().tryReply("got it!");
}
}
@ -101,10 +101,10 @@ Step 1: Define the Actor
public void onReceive(Object msg) {
if (msg.equals("hello")) {
count = count + 1;
getContext().replyUnsafe("world " + count);
getContext().reply("world " + count);
} else if (msg instanceof String) {
count = count + 1;
getContext().replyUnsafe("hello " + msg + " " + count);
getContext().reply("hello " + msg + " " + count);
} else {
throw new IllegalArgumentException("invalid message type");
}

View file

@ -95,7 +95,7 @@ Here is an example of coordinating two simple counter UntypedActors so that they
});
}
} else if (incoming.equals("GetCount")) {
getContext().replyUnsafe(count.get());
getContext().reply(count.get());
}
}
}

View file

@ -247,10 +247,10 @@ which you do by Channel.sendOneWay(msg)
We recommend that you as first choice use the channel abstraction instead of the other ways described in the following sections.
Reply using the 'replySafe' and 'replyUnsafe' methods
Reply using the 'tryReply' and 'reply' methods
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you want to send a message back to the original sender of the message you just received then you can use the 'getContext().replyUnsafe(..)' method.
If you want to send a message back to the original sender of the message you just received then you can use the 'getContext().reply(..)' method.
.. code-block:: java
@ -258,15 +258,15 @@ If you want to send a message back to the original sender of the message you jus
if (message instanceof String) {
String msg = (String)message;
if (msg.equals("Hello")) {
// Reply to original sender of message using the 'replyUnsafe' method
getContext().replyUnsafe(msg + " from " + getContext().getUuid());
// Reply to original sender of message using the 'reply' method
getContext().reply(msg + " from " + getContext().getUuid());
}
}
}
In this case we will a reply back to the Actor that sent the message.
The 'replyUnsafe' method throws an 'IllegalStateException' if unable to determine what to reply to, e.g. the sender has not been passed along with the message when invoking one of 'send*' methods. You can also use the more forgiving 'replySafe' method which returns 'true' if reply was sent, and 'false' if unable to determine what to reply to.
The 'reply' method throws an 'IllegalStateException' if unable to determine what to reply to, e.g. the sender has not been passed along with the message when invoking one of 'send*' methods. You can also use the more forgiving 'tryReply' method which returns 'true' if reply was sent, and 'false' if unable to determine what to reply to.
.. code-block:: java
@ -274,8 +274,8 @@ The 'replyUnsafe' method throws an 'IllegalStateException' if unable to determin
if (message instanceof String) {
String msg = (String)message;
if (msg.equals("Hello")) {
// Reply to original sender of message using the 'replyUnsafe' method
if (getContext().replySafe(msg + " from " + getContext().getUuid())) ... // success
// Reply to original sender of message using the 'reply' method
if (getContext().tryReply(msg + " from " + getContext().getUuid())) ... // success
else ... // handle failure
}
}

View file

@ -246,7 +246,7 @@ from localhost on port 8877.
public void onReceive(Object message) {
Message msg = (Message)message;
String body = msg.getBodyAs(String.class);
getContext().replySafe(String.format("Hello %s", body));
getContext().tryReply(String.format("Hello %s", body));
}
}
@ -659,10 +659,10 @@ acknowledgement).
public void onReceive(Object message) {
// ...
getContext().replyUnsafe(ack()) // on success
getContext().reply(ack()) // on success
// ...
val e: Exception = ...
getContext().replyUnsafe(new Failure(e)) // on failure
getContext().reply(new Failure(e)) // on failure
}
}
@ -855,7 +855,7 @@ following consumer actor class.
}
public void onReceive(Object message) {
getContext().replySafe("response from remote actor 1");
getContext().tryReply("response from remote actor 1");
}
}
@ -1423,7 +1423,7 @@ For initiating a a two-way message exchange, one of the
public class SampleUntypedActor extends UntypedActor {
public void onReceive(Object msg) {
getContext().replySafe(CamelContextManager.getMandatoryTemplate().requestBody("direct:news", msg));
getContext().tryReply(CamelContextManager.getMandatoryTemplate().requestBody("direct:news", msg));
}
}
@ -1995,7 +1995,7 @@ ends at the target actor.
public void onReceive(Object message) {
Message msg = (Message) message;
String body = msg.getBodyAs(String.class);
getContext().replySafe(String.format("Hello %s", body));
getContext().tryReply(String.format("Hello %s", body));
}
}

View file

@ -0,0 +1,6 @@
.. _migration-1.2:
################################
Migration Guide 1.1.x to 1.2.x
################################

View file

@ -0,0 +1,20 @@
.. _migration-2.0:
################################
Migration Guide 1.2.x to 2.0.x
################################
Actors
======
The 2.0 release contains several new features which require source-level
changes in client code. This API cleanup is planned to be the last one for a
significant amount of time.
Lifecycle Callbacks
-------------------
The :meth:`preRestart(cause: Throwable)` method has been replaced by
:meth:`preRestart(cause: Throwable, lastMessage: Any)`, hence you must insert
the second argument in all overriding methods. The good news is that any missed
actor will not compile without error.

View file

@ -6,6 +6,8 @@ Migration Guides
.. toctree::
:maxdepth: 1
migration-guide-1.2.x-2.0.x
migration-guide-1.1.x-1.2.x
migration-guide-1.0.x-1.1.x
migration-guide-0.10.x-1.0.x
migration-guide-0.9.x-0.10.x

View file

@ -92,6 +92,90 @@ Here we create a light-weight actor-based thread, that can be used to spawn off
... // do stuff
}
Actor Internal API
------------------
The :class:`Actor` trait defines only one abstract method, the abovementioned
:meth:`receive`. In addition, it offers two convenience methods
:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as
described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actors
:class:`ActorRef` object. If the current actor behavior does not match a
received message, :meth:`unhandled` is called, which by default throws an
:class:`UnhandledMessageException`.
The remaining visible methods are user-overridable life-cycle hooks which are
described in the following::
def preStart() {}
def preRestart(cause: Throwable, message: Option[Any]) {}
def freshInstance(): Option[Actor] = None
def postRestart(cause: Throwable) {}
def postStop() {}
The implementations shown above are the defaults provided by the :class:`Actor`
trait.
Start Hook
^^^^^^^^^^
Right after starting the actor, its :meth:`preStart` method is invoked. This is
guaranteed to happen before the first message from external sources is queued
to the actors mailbox.
::
override def preStart {
// e.g. send initial message to self
self ! GetMeStarted
// or do any other stuff, e.g. registering with other actors
someService ! Register(self)
}
Restart Hooks
^^^^^^^^^^^^^
A supervised actor, i.e. one which is linked to another actor with a fault
handling strategy, will be restarted in case an exception is thrown while
processing a message. This restart involves four of the hooks mentioned above:
1. The old actor is informed by calling :meth:`preRestart` with the exception
which caused the restart and the message which triggered that exception; the
latter may be ``None`` if the restart was not caused by processing a
message, e.g. when a supervisor does not trap the exception and is restarted
in turn by its supervisor. This method is the best place for cleaning up,
preparing hand-over to the fresh actor instance, etc.
2. The old actors :meth:`freshInstance` factory method is invoked, which may
optionally produce the new actor instance which will replace this actor. If
this method returns :obj:`None` or throws an exception, the initial factory
from the ``Actor.actorOf`` call is used to produce the fresh instance.
3. The new actors :meth:`preStart` method is invoked, just as in the normal
start-up case.
4. The new actors :meth:`postRestart` method is called with the exception
which caused the restart.
.. warning::
The :meth:`freshInstance` hook may be used to propagate (part of) the failed
actors state to the fresh instance. This carries the risk of proliferating
the cause for the crash which triggered the restart. If you are tempted to
take this route, it is strongly advised to step back and consider other
possible approaches, e.g. distributing the state in question using other
means or spawning short-lived worker actors for carrying out “risky” tasks.
An actor restart replaces only the actual actor object; the contents of the
mailbox and the hotswap stack are unaffected by the restart, so processing of
messages will resume after the :meth:`postRestart` hook returns. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
Stop Hook
^^^^^^^^^
After stopping an actor, its :meth:`postStop` hook is called, which may be used
e.g. for deregistering this actor from other services. This hook is guaranteed
to run after message queuing has been disabled for this actor, i.e. sending
messages would fail with an :class:`IllegalActorStateException`.
Identifying Actors
------------------
@ -252,43 +336,6 @@ This method should return a ``PartialFunction``, e.g. a match/case clause
}
}
Actor internal API
------------------
The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the:
#. ``receive`` message handler
#. life-cycle callbacks:
#. preStart
#. postStop
#. preRestart
#. postRestart
The ``Actor`` trait has one single member field:
.. code-block:: scala
val self: ActorRef
This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc.
However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix:
.. code-block:: scala
class MyActor extends Actor {
import self._
id = ...
dispatcher = ...
start
...
}
But in this documentation we will always prefix the calls with ``self`` for clarity.
Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API.
Reply to messages
-----------------
@ -335,13 +382,13 @@ If you want to send a message back to the original sender of the message you jus
In this case the ``result`` will be send back to the Actor that sent the ``request``.
The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``reply_?`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to.
The ``reply`` method throws an ``IllegalStateException`` if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving ``tryReply`` method which returns ``true`` if reply was sent, and ``false`` if unable to determine what to reply to.
.. code-block:: scala
case request =>
val result = process(request)
if (self.reply_?(result)) ...// success
if (self.tryReply(result)) ...// success
else ... // handle failure
Summary of reply semantics and options
@ -441,6 +488,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
.. _Actor.HotSwap:
HotSwap
-------

Some files were not shown because too many files have changed in this diff Show more