Merge branch 'master' of github.com:jboner/akka

Conflicts:
	akka-actor/src/main/scala/akka/actor/ActorRef.scala
This commit is contained in:
Garrick Evans 2011-07-15 14:39:18 -07:00
commit 7f62717a8c
294 changed files with 2222 additions and 1042 deletions

View file

@ -1,6 +1,6 @@
This software is licensed under the Apache 2 license, quoted below.
Copyright 2009-2011 Scalable Solutions AB [http://scalablesolutions.se]
Copyright 2009-2011 Typesafe Inc. [http://www.typesafe.com]
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testing

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util;

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,7 +1,7 @@
package akka.actor
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import org.scalatest.matchers.MustMatchers

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
@ -8,7 +8,6 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import akka.testkit.Testing.sleepFor
import akka.util.duration._
import akka.config.Supervision._
@ -205,7 +204,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
def kill(pingPongActor: ActorRef) = {
intercept[RuntimeException] { pingPongActor !! (Die, TimeoutMillis) }
intercept[RuntimeException] { (pingPongActor ? (Die, TimeoutMillis)).as[Any] }
messageLogPoll must be === ExceptionMessage
}

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.supervisor
@ -14,10 +14,13 @@ import org.scalatest.matchers.MustMatchers
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
import Ticket669Spec._
override def beforeAll = Thread.interrupted() //remove interrupted status.
override def afterAll = Actor.registry.local.shutdownAll
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.config

View file

@ -1,36 +1,52 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
import akka.testkit.Testing
import akka.dispatch._
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
import akka.util.Switch
import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor}
import java.rmi.RemoteException
import org.junit.{After, Test}
object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage
case class CountDown(latch: CountDownLatch) extends ActorModelMessage
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Interrupt extends ActorModelMessage
case object Restart extends ActorModelMessage
case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
@ -52,17 +68,19 @@ object ActorModelSpec {
}
def receive = {
case Await(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case Await(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ack; l.countDown(); self.stop(); busy.switchOff()
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e
}
}
@ -183,7 +201,9 @@ object ActorModelSpec {
if (condition) return true
Thread.sleep(intervalMs)
} catch { case e: InterruptedException }
} catch {
case e: InterruptedException
}
}
false
}
@ -192,6 +212,7 @@ object ActorModelSpec {
}
abstract class ActorModelSpec extends JUnitSuite {
import ActorModelSpec._
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
@ -215,13 +236,17 @@ abstract class ActorModelSpec extends JUnitSuite {
msgsProcessed = 0,
restarts = 0)
val futures = for (i 1 to 10) yield Future { i }
val futures = for (i 1 to 10) yield Future {
i
}
await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 2, stops = 2)
val a2 = newTestActor
a2.start
val futures2 = for (i 1 to 10) yield Future { i }
val futures2 = for (i 1 to 10) yield Future {
i
}
await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 3, stops = 2)
@ -259,7 +284,13 @@ abstract class ActorModelSpec extends JUnitSuite {
val counter = new CountDownLatch(200)
a.start()
for (i 1 to 10) { spawn { for (i 1 to 20) { a ! WaitAck(1, counter) } } }
for (i 1 to 10) {
spawn {
for (i 1 to 20) {
a ! WaitAck(1, counter)
}
}
}
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
@ -267,7 +298,15 @@ abstract class ActorModelSpec extends JUnitSuite {
}
def spawn(f: Unit) {
val thread = new Thread { override def run { try { f } catch { case e e.printStackTrace } } }
val thread = new Thread {
override def run {
try {
f
} catch {
case e e.printStackTrace
}
}
}
thread.start()
}
@ -329,8 +368,9 @@ abstract class ActorModelSpec extends JUnitSuite {
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
(1 to num) foreach { _
newTestActor.start() ! cachedMessage
(1 to num) foreach {
_
newTestActor.start() ! cachedMessage
}
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
}
@ -356,6 +396,52 @@ abstract class ActorModelSpec extends JUnitSuite {
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
}
}
class DispatcherModelTest extends ActorModelSpec {

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.dispatch

View file

@ -2,16 +2,23 @@ package akka.actor
import org.scalatest.junit.JUnitSuite
import Actor._
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.config.Supervision._
import org.multiverse.api.latches.StandardLatch
import org.junit.Test
import java.util.concurrent.{ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
class SchedulerSpec extends JUnitSuite {
private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]()
def collectFuture(f: => ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = {
val future = f
futures.add(future)
future
}
def withCleanEndState(action: Unit) {
action
Scheduler.restart
while(futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) }
Actor.registry.local.shutdownAll
}
@ -24,14 +31,14 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Tick countDownLatch.countDown() }
}).start()
// run every 50 millisec
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
Scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(1, TimeUnit.SECONDS))
@ -45,8 +52,8 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Tick countDownLatch.countDown() }
}).start()
// run every 50 millisec
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
Scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
collectFuture(Scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
// after 1 second the wait should fail
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
@ -65,7 +72,7 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Ping ticks.countDown }
}).start
val numActors = Actor.registry.local.actors.length
(1 to 1000).foreach(_ Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))
(1 to 1000).foreach(_ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)))
assert(ticks.await(10, TimeUnit.SECONDS))
assert(Actor.registry.local.actors.length === numActors)
}
@ -83,7 +90,7 @@ class SchedulerSpec extends JUnitSuite {
}).start()
(1 to 10).foreach { i
val future = Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)
val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
future.cancel(true)
}
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
@ -120,9 +127,9 @@ class SchedulerSpec extends JUnitSuite {
Permanent)
:: Nil)).start
Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
// appx 2 pings before crash
Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
// should be enough time for the ping countdown to recover and reach 6 pings

View file

@ -86,6 +86,7 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def save(stats: Stats) {
new File(dir).mkdirs
if (!dirExists) return
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
@ -96,9 +97,9 @@ class FileBenchResultRepository extends BenchResultRepository {
out.writeObject(stats)
} catch {
case e: Exception
EventHandler.error(this, "Failed to save [%s] to [%s]".format(stats, f.getAbsolutePath))
}
finally {
EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]".
format(stats, f.getAbsolutePath, e.getMessage))
} finally {
if (out ne null) try { out.close() } catch { case ignore: Exception }
}
}
@ -112,11 +113,11 @@ class FileBenchResultRepository extends BenchResultRepository {
val stats = in.readObject.asInstanceOf[Stats]
Some(stats)
} catch {
case e: Exception
EventHandler.error(this, "Failed to load from [%s]".format(f.getAbsolutePath))
case e: Throwable
EventHandler.error(this, "Failed to load from [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
None
}
finally {
} finally {
if (in ne null) try { in.close() } catch { case ignore: Exception }
}
}

View file

@ -2,8 +2,9 @@ package akka.performance.trading.common
import java.io.UnsupportedEncodingException
import java.net.URLEncoder
import scala.collection.immutable.TreeMap
import java.util.Locale
import java.util.Formatter
/**
* Generates URLs to Google Chart API http://code.google.com/apis/chart/
@ -21,7 +22,7 @@ object GoogleChartBuilder {
val current = statistics.last
val sb = new StringBuilder()
val sb = new StringBuilder
sb.append(BaseUrl)
// bar chart
sb.append("cht=bvg")
@ -74,7 +75,7 @@ object GoogleChartBuilder {
// grid lines
appendGridSpacing(maxValue, sb)
return sb.toString()
return sb.toString
}
private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) {
@ -119,4 +120,104 @@ object GoogleChartBuilder {
}
}
def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = {
if (statistics.isEmpty) return ""
val sb = new StringBuilder
sb.append(BaseUrl)
// line chart
sb.append("cht=lxy")
sb.append("&")
// size
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
sb.append("&")
// title
sb.append("chtt=").append(urlEncode(title))
sb.append("&")
// axis locations
sb.append("chxt=x,y,r,x,y,r")
sb.append("&")
// labels
sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)")
sb.append("&")
// label color and font
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
sb.append("&")
sb.append("chco=")
val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900")
sb.append(seriesColors.mkString(","))
sb.append("&")
// legend
sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput")
sb.append("&")
sb.append("chdlp=b")
sb.append("&")
sb.append("chls=1|1|1")
sb.append("&")
sb.append("chls=1|1|1")
sb.append("&")
sb.append("chma=5,5,5,25")
sb.append("&")
// data points
sb.append("chm=")
val chmStr = seriesColors.zipWithIndex.map(each "o," + each._1 + "," + each._2 + ",-1,7").mkString("|")
sb.append(chmStr)
sb.append("&")
// data series
val loadStr = statistics.map(_.load).mkString(",")
sb.append("chd=t:")
val maxP = 95
val percentiles = List(5, 50, maxP)
val maxValue = statistics.map(_.percentiles(maxP)).max
val percentileSeries: List[String] =
for (p percentiles) yield {
loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",")
}
sb.append(percentileSeries.mkString("|"))
sb.append("|")
val maxTps: Double = statistics.map(_.tps).max
sb.append(loadStr).append("|")
val tpsSeries = statistics.map(s formatDouble(s.tps)).mkString(",")
sb.append(tpsSeries)
val minLoad = statistics.head.load
val maxLoad = statistics.last.load
// y range
sb.append("&")
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,")
.append(formatDouble(maxTps))
sb.append("&")
sb.append("chds=")
for (p percentiles) {
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(maxValue)
sb.append(",")
}
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(formatDouble(maxTps))
sb.append("&")
// label positions
sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100")
sb.append("&")
// grid lines
appendGridSpacing(maxValue, sb)
return sb.toString
}
def formatDouble(value: Double): String = {
new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString
}
}

View file

@ -52,8 +52,7 @@ trait PerformanceTest extends JUnitSuite {
var stat: DescriptiveStatistics = _
val resultRepository = BenchResultRepository()
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
lazy val report = new Report(resultRepository, compareResultWith)
type TS <: TradingSystem
@ -128,78 +127,7 @@ trait PerformanceTest extends JUnitSuite {
resultRepository.add(stats)
EventHandler.info(this, formatResultsTable(resultRepository.get(name)))
val chartTitle = name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(name), chartTitle, _.load + " clients")
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
for {
compareName compareResultWith
compareStats resultRepository.get(compareName, numberOfClients)
} {
val chartTitle = name + " vs. " + compareName + ", " + numberOfClients + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
val withHistorical = resultRepository.getWithHistorical(name, numberOfClients)
if (withHistorical.size > 1) {
val chartTitle = name + " vs. historical, " + numberOfClients + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
}
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
report.html(resultRepository.get(name))
}
def delay(delayMs: Int) {

View file

@ -0,0 +1,179 @@
package akka.performance.trading.common
import java.io.File
import java.text.SimpleDateFormat
import java.io.PrintWriter
import java.io.FileWriter
import akka.event.EventHandler
import java.util.Date
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private def log = System.getProperty("benchmark.logResult", "false").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = if (dirExists) {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
sb.append(formatResultsTable(statistics))
sb.append("\n</pre>\n")
sb.append(img(percentilesChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesChart(stats).foreach(url sb.append(img(url)))
}
if (dirExists) {
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val name = current.name + "--" + timestamp + ".html"
write(sb.toString, name)
}
}
private def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def comparePercentilesChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
}
def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
if (log) EventHandler.info(this, formattedStats)
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def write(content: String, fileName: String) {
val f = new File(dir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -95,37 +95,45 @@ class RoutingSpec extends WordSpec with MustMatchers {
"dispatch to smallest mailbox" in {
val t1Count = new AtomicInteger(0)
val t2Count = new AtomicInteger(0)
val latch = TestLatch(500)
val latch1 = TestLatch(2501)
val latch2 = TestLatch(2499)
val t1 = actorOf(new Actor {
def receive = {
case x
sleepFor(50 millis) // slow actor
t1Count.incrementAndGet
latch.countDown()
latch1.countDown()
}
}).start()
t1.dispatcher.suspend(t1)
for (i <- 1 to 2501) t1 ! i
val t2 = actorOf(new Actor {
def receive = {
case x
t2Count.incrementAndGet
latch.countDown()
latch2.countDown()
}
}).start()
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
for (i 1 to 500) d ! i
for (i 1 to 2499 ) d ! i
latch2.await(20 seconds)
t1.dispatcher.resume(t1)
try {
latch.await(20 seconds)
latch1.await(20 seconds)
} finally {
// because t1 is much slower and thus has a bigger mailbox all the time
t1Count.get must be < (t2Count.get)
t1Count.get must be === 2501
t2Count.get must be === 2499
for (a List(t1, t2, d)) a.stop()
}
for (a List(t1, t2, d)) a.stop()
}
"listen" in {

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization

View file

@ -1,10 +1,11 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import akka.actor.dispatch.ActorModelSpec
import java.util.concurrent.CountDownLatch
import org.junit.{After, Test}
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
@ -42,6 +43,13 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec {
//Can't handle this...
}
@After
def after {
//remove the interrupted status since we are messing with interrupted exceptions.
Thread.interrupted()
}
}
// vim: set ts=2 sw=2 et:

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,12 +1,12 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.actor.newUuid
import java.net.{ InetAddress, UnknownHostException }
/**
* Akka base Exception. Each Exception gets:
* <ul>

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
@ -9,15 +9,15 @@ import akka.dispatch._
import akka.config._
import akka.config.Supervision._
import akka.util._
import akka.serialization.{ Format, Serializer, Serialization }
import akka.serialization.{Serializer, Serialization}
import ReflectiveAccess._
import ClusterModule._
import DeploymentConfig.{ TransactionLog TransactionLogConfig, _ }
import DeploymentConfig.{TransactionLog TransactionLogConfig, _}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map JMap }
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
import java.util.{Map JMap}
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
@ -30,10 +30,15 @@ private[akka] object ActorRefInternals {
* LifeCycles for ActorRefs.
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
@ -68,7 +73,8 @@ private[akka] object ActorRefInternals {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
protected[akka] var _uuid = newUuid
@ -105,6 +111,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
def setReceiveTimeout(timeout: Long) {
this.receiveTimeout = Some(timeout)
}
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
@ -121,6 +128,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler: FaultHandlingStrategy
/**
@ -139,6 +147,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setLifeCycle(lifeCycle: LifeCycle)
def getLifeCycle: LifeCycle
/**
@ -153,7 +162,10 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
def setDispatcher(dispatcher: MessageDispatcher) { this.dispatcher = dispatcher }
def setDispatcher(dispatcher: MessageDispatcher) {
this.dispatcher = dispatcher
}
def getDispatcher: MessageDispatcher = dispatcher
/**
@ -177,6 +189,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* Returns the uuid for the actor.
*/
def getUuid = _uuid
def uuid = _uuid
/**
@ -366,9 +379,13 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*/
def sendException(ex: Throwable) {}
def isUsableOnlyOnce = false
def isUsable = true
def isReplyable = true
def canSendException = false
/**
@ -382,9 +399,9 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any]
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any]
protected[akka] def actorInstance: AtomicReference[Actor]
@ -393,6 +410,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
protected[akka] def supervisor_=(sup: Option[ActorRef])
protected[akka] def mailbox: AnyRef
protected[akka] def mailbox_=(value: AnyRef): AnyRef
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable)
@ -416,7 +434,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (private[this] val actorFactory: () Actor, val address: String)
class LocalActorRef private[akka](private[this] val actorFactory: () Actor, val address: String)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@ -442,7 +460,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
protected[akka] val actorInstance = guard.withGuard {
new AtomicReference[Actor](newActor)
}
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
@ -480,16 +500,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
// used only for deserialization
private[akka] def this(
__uuid: Uuid,
__address: String,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () Actor) = {
__uuid: Uuid,
__address: String,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () Actor) = {
this(__factory, __address)
this (__factory, __address)
_uuid = __uuid
timeout = __timeout
@ -627,7 +647,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
*/
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value }
protected[akka] def mailbox_=(value: AnyRef): AnyRef = {
_mailbox = value; value
}
/**
* Returns the supervisor, if there is one.
@ -651,12 +673,12 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
dispatcher dispatchMessage new MessageInvocation(this, message, channel)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)
case _ new ActorPromise(timeout)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
@ -677,7 +699,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException
currentMessage = null // received message while actor is shutting down, ignore
handleExceptionInDispatch(e, messageHandle.message)
throw e
case e
handleExceptionInDispatch(e, messageHandle.message)
}
@ -716,13 +739,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) {
//Immortal
false
} else if (withinTimeRange.isEmpty) { // restrict number of restarts
} else if (withinTimeRange.isEmpty) {
// restrict number of restarts
val retries = maxNrOfRetriesCount + 1
maxNrOfRetriesCount = retries //Increment number of retries
retries > maxNrOfRetries.get
} else { // cannot restart more than N within M timerange
} else {
// cannot restart more than N within M timerange
val retries = maxNrOfRetriesCount + 1
val windowStart = restartTimeWindowStartNanos
@ -826,7 +852,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
// ========= PRIVATE FUNCTIONS =========
private[this] def newActor: Actor = {
import Actor.{ actorRefInCreation refStack }
import Actor.{actorRefInCreation refStack}
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
@ -837,7 +863,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self
}
} match {
case null throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case null throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case valid valid
}
@ -862,27 +888,34 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, reason))
else {
lifeCycle match {
<<<<<<< HEAD
case Temporary shutDownTemporaryActor(this, reason)
case _ dispatcher.resume(this) //Resume processing for this actor
=======
case Temporary shutDownTemporaryActor(this)
case _ dispatcher.resume(this) //Resume processing for this actor
>>>>>>> 2cf64bccae0afcfa2ed9062e1590cd9e4f187aeb
}
}
}
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove
_supervisor.foreach {
sup
if (sup.isShutdown) {
// if supervisor is shut down, game over for all linked actors
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove
}
}
}
//Stop the actor itself
stop
} else sup ! notification // else notify supervisor
//Stop the actor itself
stop
} else sup ! notification // else notify supervisor
}
}
@ -923,7 +956,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout()
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
@ -951,11 +985,11 @@ object RemoteActorSystemMessage {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
private[akka] case class RemoteActorRef private[akka](
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
ClusterModule.ensureEnabled()
@ -967,22 +1001,22 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader)
}
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
val chFuture = channel match {
case f: Promise[Any] Some(f)
case _ None
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
@ -1013,34 +1047,49 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
@ -1072,7 +1121,8 @@ trait ActorRefShared {
* There are implicit conversions in ../actor/Implicits.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef
trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
ref: ActorRef
/**
* Address for actor, must be a unique one.
@ -1116,7 +1166,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
}
@ -1130,7 +1180,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
case _ None
}
}
@ -1154,28 +1204,6 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
"Actor has not been started, you need to invoke 'actor.start()' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
* <p/>
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
@deprecated("use `(actor ? msg).as[T]` instead", "1.2")
def !!(message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Option[Any] = {
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
try { future.await.resultOrException } catch { case e: FutureTimeoutException None }
} else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start()' before using it")
}
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
*/

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -18,7 +18,6 @@ package akka.actor
import akka.event.EventHandler
import akka.AkkaException
import java.util.concurrent.atomic.AtomicLong
import java.lang.ref.WeakReference
import java.util.concurrent._
import java.lang.RuntimeException
@ -27,20 +26,19 @@ object Scheduler {
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e)
@volatile
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
receiver match {
case local: LocalActorRef =>
val ref = new WeakReference[ActorRef](local)
case local: LocalActorRef
val uuid = local.uuid
new Runnable {
def run = ref.get match {
case null => if(throwWhenReceiverExpired) throw new RuntimeException("Receiver not found: GC:ed")
case actor => actor ! message
def run = Actor.registry.local.actorFor(uuid) match {
case None if (throwWhenReceiverExpired) throw new RuntimeException("Receiver not found, unregistered")
case Some(actor) actor ! message
}
}
case other => new Runnable { def run = other ! message }
case other new Runnable { def run = other ! message }
}
}
@ -128,18 +126,7 @@ object Scheduler {
}
}
def shutdown() {
synchronized {
service.shutdown()
}
}
def restart() {
synchronized {
shutdown()
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
}
}
private[akka] def shutdown() { service.shutdown() }
}
private object SchedulerThreadFactory extends ThreadFactory {

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,7 +1,7 @@
package akka.actor
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.japi.{ Creator, Option JOption }
@ -10,6 +10,8 @@ import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutExce
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.Serialization
import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@ -87,16 +89,35 @@ object TypedActor {
}
} catch { case i: InvocationTargetException throw i.getTargetException }
private def writeReplace(): AnyRef = new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters)
private def writeReplace(): AnyRef = {
val serializedParameters: Array[(Array[Byte],String)] = parameters match {
case null => null
case a if a.length == 0 => Array[(Array[Byte],String)]()
case a => a.map( {
case null => null
case value => Serialization.serializerFor(value.getClass).fold(throw _, s => (s.toBinary(value), s.getClass.getName))
})
}
new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters)
}
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) {
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Array[Byte],String)]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
private def readResolve(): AnyRef = {
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null => null
case a if a.length == 0 => Array[AnyRef]()
case a => a.map( {
case null => null
case (bytes, serializerFQN) => Serialization.serializerOf(serializerFQN).fold(throw _, _.fromBinary(bytes))
})
})
}
}
/**

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
@ -122,8 +122,6 @@ object NodeAddress {
trait ClusterNode {
import ChangeListener._
val isConnected = new AtomicBoolean(false)
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
def membershipNodes: Array[String]
@ -136,7 +134,7 @@ trait ClusterNode {
def remoteServerAddress: InetSocketAddress
def isRunning: Boolean = isConnected.get
def isRunning: Boolean
def start(): ClusterNode

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.config

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*
* Based on Configgy by Robey Pointer.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*
* Based on Configgy by Robey Pointer.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.config

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*
* Based on Configgy by Robey Pointer.
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.config

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
@ -160,8 +160,6 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@ -195,19 +193,13 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
def dispatcher: Dispatcher
final def run = {
try {
processMailbox()
} catch {
case ie: InterruptedException
}
finally {
try { processMailbox() } catch {
case ie: InterruptedException => Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
}
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
dispatcher.doneProcessingMailbox(this)
}
/**

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
@ -127,6 +127,10 @@ trait MessageDispatcher {
}
}
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "attach" for the only invocation
*/
private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef)
@ -139,6 +143,10 @@ trait MessageDispatcher {
}
}
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "detach" for the only invocation
*/
private[akka] def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
@ -34,14 +34,15 @@ class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxTyp
private[akka] val owner = new AtomicReference[ActorRef](_actor)
override def register(actorRef: ActorRef) = {
//Relies on an external lock provided by MessageDispatcher.attach
private[akka] override def register(actorRef: ActorRef) = {
val actor = owner.get()
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null, actorRef) //Register if unregistered
super.register(actorRef)
}
override def unregister(actorRef: ActorRef) = {
//Relies on an external lock provided by MessageDispatcher.detach
private[akka] override def unregister(actorRef: ActorRef) = {
super.unregister(actorRef)
owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
}

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.dispatch.{ FutureTimeoutException, Future }

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remoteinterface

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remoteinterface

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing

View file

@ -1,7 +1,7 @@
package akka.serialization
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.actor.Actor

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
@ -9,77 +9,84 @@ import akka.config.Config
import akka.config.Config._
import akka.actor.{ ActorRef, Actor }
import akka.AkkaException
import akka.util.ReflectiveAccess
case class NoSerializerFoundException(m: String) extends AkkaException(m)
/**
* Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/
object Serialization {
case class NoSerializerFoundException(m: String) extends AkkaException(m)
//TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match {
case Left(ex) Left(ex)
case Right(serializer) Right(serializer.toBinary(o))
}
//TODO document me
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
serializerFor(clazz) match {
case Left(ex) Left(ex)
case Left(e) Left(e)
case Right(serializer) Right(serializer.fromBinary(bytes, Some(clazz), classLoader))
}
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = {
serializerMap.get(clazz.getName) match {
case Some(serializerName: String)
getClassFor(serializerName) match {
case Right(serializer) Right(serializer.newInstance.asInstanceOf[Serializer])
case Left(exception) Left(exception)
}
case _
defaultSerializer match {
case Some(s: Serializer) Right(s)
case None Left(NoSerializerFoundException("No default serializer found for " + clazz))
}
//TODO document me
//TODO memoize the lookups
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = //TODO fall back on BestMatchClass THEN default
getClassFor(serializerMap.get(clazz.getName).getOrElse(serializers("default"))) match {
case Right(serializer) Right(serializer.newInstance.asInstanceOf[Serializer])
case Left(e) => Left(e)
}
}
private def defaultSerializer = serializers.get("default") match {
case Some(ser: String)
getClassFor(ser) match {
case Right(serializer) Some(serializer.newInstance.asInstanceOf[Serializer])
case Left(exception) None
}
case None None
}
/**
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments)
private def getSerializerInstanceForBestMatchClass(cl: Class[_]) = bindings match {
case Some(mappings) mappings find {
case (clazzName, ser)
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
else {
bindings find {
case (clazzName, _)
getClassFor(clazzName) match {
case Right(clazz) clazz.isAssignableFrom(cl)
case _ false
}
} map {
case (_, ser)
getClassFor(ser) match {
case Right(s) Right(s.newInstance.asInstanceOf[Serializer])
case _ Left(new Exception("Error instantiating " + ser))
}
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
case None Left(NoSerializerFoundException("No mapping serializer found for " + cl))
} map {
case (_, ser) serializerOf(ser)
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
}
}
//TODO: Add type and docs
val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer"))
/**
* A Map of serializer from alias to implementation (FQN of a class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> "akka.serialization.JavaSerializer"
* But "default" can be overridden in config
*/
val serializers: Map[String, String] = config.getSection("akka.actor.serializers") map {
_.map.foldLeft(Map("default" -> "akka.serialization.JavaSerializer")) {
case (result, (k: String, v: String)) => result + (k -> v)
case (result, _) => result
}
} getOrElse Map("default" -> "akka.serialization.JavaSerializer")
//TODO: Add type and docs
val bindings = config.getSection("akka.actor.serialization-bindings")
.map(_.map)
.map(m Map() ++ m.map { case (k, v: List[String]) Map() ++ v.map((_, k)) }.flatten)
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
val bindings: Map[String, String] = config.getSection("akka.actor.serialization-bindings") map {
_.map.foldLeft(Map[String,String]()) {
case (result, (k: String, vs: List[_])) => result ++ (vs collect { case v: String => (v, k) }) //All keys which are lists, take the Strings from them and Map them
case (result, _) => result //For any other values, just skip them, TODO: print out warnings?
}
} getOrElse Map()
//TODO: Add type and docs
val serializerMap = bindings.map(m m.map { case (k, v: String) (k, serializers(v)) }).getOrElse(Map())
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class
*/
val serializerMap: Map[String, String] = bindings mapValues serializers
}

View file

@ -1,7 +1,7 @@
package akka.serialization
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
@ -125,22 +125,49 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
/**
* Executes the provided action if the lock is on under a lock, so be _very_ careful with longrunning/blocking operations in it
* Only executes the action if the switch is on, and switches it off immediately after obtaining the lock
* Will switch it back on if the provided action throws an exception
*/
def switchOff(action: Unit): Boolean = transcend(from = true, action)
/**
* Executes the provided action if the lock is off under a lock, so be _very_ careful with longrunning/blocking operations in it
* Only executes the action if the switch is off, and switches it on immediately after obtaining the lock
* Will switch it back off if the provided action throws an exception
*/
def switchOn(action: Unit): Boolean = transcend(from = false, action)
/**
* Switches the switch off (if on), uses locking
*/
def switchOff: Boolean = synchronized { switch.compareAndSet(true, false) }
/**
* Switches the switch on (if off), uses locking
*/
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
/**
* Executes the provided action and returns its value if the switch is IMMEDIATELY on (i.e. no lock involved)
*/
def ifOnYield[T](action: T): Option[T] = {
if (switch.get) Some(action)
else None
}
/**
* Executes the provided action and returns its value if the switch is IMMEDIATELY off (i.e. no lock involved)
*/
def ifOffYield[T](action: T): Option[T] = {
if (!switch.get) Some(action)
else None
}
/**
* Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY on (i.e. no lock involved)
*/
def ifOn(action: Unit): Boolean = {
if (switch.get) {
action
@ -148,6 +175,9 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
/**
* Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY off (i.e. no lock involved)
*/
def ifOff(action: Unit): Boolean = {
if (!switch.get) {
action
@ -155,16 +185,28 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
/**
* Executes the provided action and returns its value if the switch is on, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOnYield[T](action: T): Option[T] = synchronized {
if (switch.get) Some(action)
else None
}
/**
* Executes the provided action and returns its value if the switch is off, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOffYield[T](action: T): Option[T] = synchronized {
if (!switch.get) Some(action)
else None
}
/**
* Executes the provided action and returns if the action was executed or not, if the switch is on, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOn(action: Unit): Boolean = synchronized {
if (switch.get) {
action
@ -172,6 +214,10 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
/**
* Executes the provided action and returns if the action was executed or not, if the switch is off, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOff(action: Unit): Boolean = synchronized {
if (switch.get) {
action
@ -179,10 +225,21 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
def ifElseYield[T](on: T)(off: T) = synchronized {
/**
* Executes the provided callbacks depending on if the switch is either on or off waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def fold[T](on: T)(off: T) = synchronized {
if (switch.get) on else off
}
/**
* Returns whether the switch is IMMEDIATELY on (no locking)
*/
def isOn = switch.get
/**
* Returns whether the switch is IMMEDDIATELY off (no locking)
*/
def isOff = !isOn
}

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
@ -23,6 +23,8 @@ import java.net.InetSocketAddress
object ReflectiveAccess {
val loader = getClass.getClassLoader
val emptyParams: Array[Class[_]] = Array()
val emptyArguments: Array[AnyRef] = Array()
/**
* Reflective access to the Cluster module.

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel;

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel.component

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel

Some files were not shown because too many files have changed in this diff Show more