Merge branch 'master' into wip-derekjw
Conflicts: akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala
This commit is contained in:
commit
50dcdd411c
253 changed files with 1465 additions and 660 deletions
2
LICENSE
2
LICENSE
|
|
@ -1,6 +1,6 @@
|
|||
This software is licensed under the Apache 2 license, quoted below.
|
||||
|
||||
Copyright 2009-2011 Scalable Solutions AB [http://scalablesolutions.se]
|
||||
Copyright 2009-2011 Typesafe Inc. [http://www.typesafe.com]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
use this file except in compliance with the License. You may obtain a copy of
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.testing
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package akka.actor
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -9,7 +9,6 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.scalatest.BeforeAndAfterEach
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.testkit._
|
||||
import akka.testkit.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision._
|
||||
|
|
@ -17,6 +16,7 @@ import akka.{ Die, Ping }
|
|||
import Actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.testkit.EventFilter
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
|
|
@ -217,7 +217,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
}
|
||||
|
||||
def kill(pingPongActor: ActorRef) = {
|
||||
intercept[RuntimeException] { pingPongActor !! (Die, TimeoutMillis) }
|
||||
intercept[RuntimeException] { (pingPongActor ? (Die, TimeoutMillis)).as[Any] }
|
||||
messageLogPoll must be === ExceptionMessage
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.supervisor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.dispatch
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.dispatch
|
||||
|
||||
|
|
|
|||
|
|
@ -6,12 +6,19 @@ import akka.event.EventHandler
|
|||
import akka.testkit.TestEvent._
|
||||
import akka.testkit.EventFilter
|
||||
import Actor._
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
import akka.config.Supervision._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import org.junit.{ Test, Before, After }
|
||||
import java.util.concurrent.{ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
|
||||
|
||||
class SchedulerSpec extends JUnitSuite {
|
||||
private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]()
|
||||
|
||||
def collectFuture(f: => ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = {
|
||||
val future = f
|
||||
futures.add(future)
|
||||
future
|
||||
}
|
||||
|
||||
@Before
|
||||
def beforeEach {
|
||||
|
|
@ -20,7 +27,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
|
||||
@After
|
||||
def afterEach {
|
||||
Scheduler.restart
|
||||
while(futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) }
|
||||
Actor.registry.local.shutdownAll
|
||||
EventHandler.start()
|
||||
}
|
||||
|
|
@ -34,14 +41,14 @@ class SchedulerSpec extends JUnitSuite {
|
|||
def receive = { case Tick ⇒ countDownLatch.countDown() }
|
||||
}).start()
|
||||
// run every 50 millisec
|
||||
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
|
||||
collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
|
||||
|
||||
// after max 1 second it should be executed at least the 3 times already
|
||||
assert(countDownLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
val countDownLatch2 = new CountDownLatch(3)
|
||||
|
||||
Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)
|
||||
collectFuture(Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
|
||||
|
||||
// after max 1 second it should be executed at least the 3 times already
|
||||
assert(countDownLatch2.await(1, TimeUnit.SECONDS))
|
||||
|
|
@ -55,8 +62,8 @@ class SchedulerSpec extends JUnitSuite {
|
|||
def receive = { case Tick ⇒ countDownLatch.countDown() }
|
||||
}).start()
|
||||
// run every 50 millisec
|
||||
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
|
||||
Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)
|
||||
collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
|
||||
collectFuture(Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
|
||||
|
||||
// after 1 second the wait should fail
|
||||
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
|
||||
|
|
@ -75,7 +82,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
def receive = { case Ping ⇒ ticks.countDown }
|
||||
}).start
|
||||
val numActors = Actor.registry.local.actors.length
|
||||
(1 to 1000).foreach(_ ⇒ Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))
|
||||
(1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)))
|
||||
assert(ticks.await(10, TimeUnit.SECONDS))
|
||||
assert(Actor.registry.local.actors.length === numActors)
|
||||
}
|
||||
|
|
@ -93,7 +100,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
}).start()
|
||||
|
||||
(1 to 10).foreach { i ⇒
|
||||
val future = Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)
|
||||
val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
|
||||
future.cancel(true)
|
||||
}
|
||||
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
|
||||
|
|
@ -130,9 +137,9 @@ class SchedulerSpec extends JUnitSuite {
|
|||
Permanent)
|
||||
:: Nil)).start
|
||||
|
||||
Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)
|
||||
collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
|
||||
// appx 2 pings before crash
|
||||
Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)
|
||||
collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
|
||||
|
||||
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
// should be enough time for the ping countdown to recover and reach 6 pings
|
||||
|
|
|
|||
|
|
@ -96,7 +96,8 @@ class FileBenchResultRepository extends BenchResultRepository {
|
|||
out.writeObject(stats)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(this, "Failed to save [%s] to [%s]".format(stats, f.getAbsolutePath))
|
||||
EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]".
|
||||
format(stats, f.getAbsolutePath, e.getMessage))
|
||||
}
|
||||
finally {
|
||||
if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ }
|
||||
|
|
@ -112,8 +113,9 @@ class FileBenchResultRepository extends BenchResultRepository {
|
|||
val stats = in.readObject.asInstanceOf[Stats]
|
||||
Some(stats)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(this, "Failed to load from [%s]".format(f.getAbsolutePath))
|
||||
case e: Throwable ⇒
|
||||
EventHandler.error(this, "Failed to load from [%s], due to [%s]".
|
||||
format(f.getAbsolutePath, e.getMessage))
|
||||
None
|
||||
}
|
||||
finally {
|
||||
|
|
|
|||
|
|
@ -2,8 +2,9 @@ package akka.performance.trading.common
|
|||
|
||||
import java.io.UnsupportedEncodingException
|
||||
import java.net.URLEncoder
|
||||
|
||||
import scala.collection.immutable.TreeMap
|
||||
import java.util.Locale
|
||||
import java.util.Formatter
|
||||
|
||||
/**
|
||||
* Generates URLs to Google Chart API http://code.google.com/apis/chart/
|
||||
|
|
@ -21,7 +22,7 @@ object GoogleChartBuilder {
|
|||
|
||||
val current = statistics.last
|
||||
|
||||
val sb = new StringBuilder()
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// bar chart
|
||||
sb.append("cht=bvg")
|
||||
|
|
@ -74,7 +75,7 @@ object GoogleChartBuilder {
|
|||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
return sb.toString()
|
||||
return sb.toString
|
||||
}
|
||||
|
||||
private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) {
|
||||
|
|
@ -119,4 +120,104 @@ object GoogleChartBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = {
|
||||
if (statistics.isEmpty) return ""
|
||||
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// line chart
|
||||
sb.append("cht=lxy")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=x,y,r,x,y,r")
|
||||
sb.append("&")
|
||||
// labels
|
||||
sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)")
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
|
||||
sb.append("&")
|
||||
sb.append("chco=")
|
||||
val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900")
|
||||
sb.append(seriesColors.mkString(","))
|
||||
sb.append("&")
|
||||
// legend
|
||||
sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chdlp=b")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chma=5,5,5,25")
|
||||
sb.append("&")
|
||||
|
||||
// data points
|
||||
sb.append("chm=")
|
||||
val chmStr = seriesColors.zipWithIndex.map(each ⇒ "o," + each._1 + "," + each._2 + ",-1,7").mkString("|")
|
||||
sb.append(chmStr)
|
||||
sb.append("&")
|
||||
|
||||
// data series
|
||||
val loadStr = statistics.map(_.load).mkString(",")
|
||||
sb.append("chd=t:")
|
||||
val maxP = 95
|
||||
val percentiles = List(5, 50, maxP)
|
||||
val maxValue = statistics.map(_.percentiles(maxP)).max
|
||||
val percentileSeries: List[String] =
|
||||
for (p ← percentiles) yield {
|
||||
loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",")
|
||||
}
|
||||
sb.append(percentileSeries.mkString("|"))
|
||||
|
||||
sb.append("|")
|
||||
val maxTps: Double = statistics.map(_.tps).max
|
||||
sb.append(loadStr).append("|")
|
||||
val tpsSeries = statistics.map(s ⇒ formatDouble(s.tps)).mkString(",")
|
||||
sb.append(tpsSeries)
|
||||
|
||||
val minLoad = statistics.head.load
|
||||
val maxLoad = statistics.last.load
|
||||
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,")
|
||||
.append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chds=")
|
||||
for (p ← percentiles) {
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(maxValue)
|
||||
sb.append(",")
|
||||
}
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
// label positions
|
||||
sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100")
|
||||
sb.append("&")
|
||||
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
return sb.toString
|
||||
}
|
||||
|
||||
def formatDouble(value: Double): String = {
|
||||
new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -130,27 +130,44 @@ trait PerformanceTest extends JUnitSuite {
|
|||
|
||||
EventHandler.info(this, formatResultsTable(resultRepository.get(name)))
|
||||
|
||||
val chartTitle = name + " Percentiles (microseconds)"
|
||||
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(name), chartTitle, _.load + " clients")
|
||||
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
|
||||
percentilesChart(stats)
|
||||
latencyAndThroughputChart(stats)
|
||||
comparePercentilesChart(stats)
|
||||
compareWithHistoricalPercentiliesChart(stats)
|
||||
|
||||
for {
|
||||
compareName ← compareResultWith
|
||||
compareStats ← resultRepository.get(compareName, numberOfClients)
|
||||
} {
|
||||
val chartTitle = name + " vs. " + compareName + ", " + numberOfClients + " clients" + ", Percentiles (microseconds)"
|
||||
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
|
||||
}
|
||||
|
||||
def percentilesChart(stats: Stats) {
|
||||
val chartTitle = stats.name + " Percentiles (microseconds)"
|
||||
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
|
||||
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
|
||||
}
|
||||
|
||||
val withHistorical = resultRepository.getWithHistorical(name, numberOfClients)
|
||||
def comparePercentilesChart(stats: Stats) {
|
||||
for {
|
||||
compareName ← compareResultWith
|
||||
compareStats ← resultRepository.get(compareName, stats.load)
|
||||
} {
|
||||
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
|
||||
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
|
||||
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
|
||||
}
|
||||
}
|
||||
|
||||
def compareWithHistoricalPercentiliesChart(stats: Stats) {
|
||||
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
|
||||
if (withHistorical.size > 1) {
|
||||
val chartTitle = name + " vs. historical, " + numberOfClients + " clients" + ", Percentiles (microseconds)"
|
||||
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
|
||||
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
|
||||
stats ⇒ legendTimeFormat.format(new Date(stats.timestamp)))
|
||||
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
|
||||
}
|
||||
}
|
||||
|
||||
def latencyAndThroughputChart(stats: Stats) {
|
||||
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
|
||||
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
|
||||
EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
|
||||
}
|
||||
|
||||
def formatResultsTable(statsSeq: Seq[Stats]): String = {
|
||||
|
|
|
|||
|
|
@ -95,38 +95,46 @@ class RoutingSpec extends WordSpec with MustMatchers {
|
|||
"dispatch to smallest mailbox" in {
|
||||
val t1Count = new AtomicInteger(0)
|
||||
val t2Count = new AtomicInteger(0)
|
||||
val latch = TestLatch(500)
|
||||
val latch1 = TestLatch(2501)
|
||||
val latch2 = TestLatch(2499)
|
||||
|
||||
val t1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x ⇒
|
||||
sleepFor(50 millis) // slow actor
|
||||
t1Count.incrementAndGet
|
||||
latch.countDown()
|
||||
latch1.countDown()
|
||||
}
|
||||
}).start()
|
||||
|
||||
t1.dispatcher.suspend(t1)
|
||||
|
||||
for (i <- 1 to 2501) t1 ! i
|
||||
|
||||
val t2 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x ⇒
|
||||
t2Count.incrementAndGet
|
||||
latch.countDown()
|
||||
latch2.countDown()
|
||||
}
|
||||
}).start()
|
||||
|
||||
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
|
||||
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
|
||||
|
||||
for (i ← 1 to 500) d ! i
|
||||
for (i ← 1 to 2499 ) d ! i
|
||||
|
||||
latch2.await(20 seconds)
|
||||
|
||||
t1.dispatcher.resume(t1)
|
||||
|
||||
try {
|
||||
latch.await(20 seconds)
|
||||
latch1.await(20 seconds)
|
||||
} finally {
|
||||
// because t1 is much slower and thus has a bigger mailbox all the time
|
||||
t1Count.get must be < (t2Count.get)
|
||||
}
|
||||
|
||||
t1Count.get must be === 2501
|
||||
t2Count.get must be === 2499
|
||||
for (a ← List(t1, t2, d)) a.stop()
|
||||
}
|
||||
}
|
||||
|
||||
"listen" in {
|
||||
val fooLatch = TestLatch(2)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.testkit
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -1152,28 +1152,6 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
|
|||
"Actor has not been started, you need to invoke 'actor.start()' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
|
||||
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
@deprecated("use `(actor ? msg).as[T]` instead", "1.2")
|
||||
def !!(message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Option[Any] = {
|
||||
if (isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
|
||||
|
||||
try { future.await.resultOrException } catch { case e: FutureTimeoutException ⇒ None }
|
||||
} else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start()' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously, returning a future which may eventually hold the reply.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package akka.actor
|
|||
import akka.event.EventHandler
|
||||
import akka.AkkaException
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.lang.ref.WeakReference
|
||||
import java.util.concurrent._
|
||||
import java.lang.RuntimeException
|
||||
|
||||
|
|
@ -27,20 +26,19 @@ object Scheduler {
|
|||
|
||||
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e)
|
||||
|
||||
@volatile
|
||||
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
|
||||
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
|
||||
receiver match {
|
||||
case local: LocalActorRef =>
|
||||
val ref = new WeakReference[ActorRef](local)
|
||||
case local: LocalActorRef ⇒
|
||||
val uuid = local.uuid
|
||||
new Runnable {
|
||||
def run = ref.get match {
|
||||
case null => if(throwWhenReceiverExpired) throw new RuntimeException("Receiver not found: GC:ed")
|
||||
case actor => actor ! message
|
||||
def run = Actor.registry.local.actorFor(uuid) match {
|
||||
case None ⇒ if (throwWhenReceiverExpired) throw new RuntimeException("Receiver not found, unregistered")
|
||||
case Some(actor) ⇒ actor ! message
|
||||
}
|
||||
}
|
||||
case other => new Runnable { def run = other ! message }
|
||||
case other ⇒ new Runnable { def run = other ! message }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -128,18 +126,7 @@ object Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
def shutdown() {
|
||||
synchronized {
|
||||
service.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
def restart() {
|
||||
synchronized {
|
||||
shutdown()
|
||||
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
}
|
||||
}
|
||||
private[akka] def shutdown() { service.shutdown() }
|
||||
}
|
||||
|
||||
private object SchedulerThreadFactory extends ThreadFactory {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package akka.actor
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import akka.japi.{ Creator, Option ⇒ JOption }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*
|
||||
* Based on Configgy by Robey Pointer.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*
|
||||
* Based on Configgy by Robey Pointer.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*
|
||||
* Based on Configgy by Robey Pointer.
|
||||
* Copyright 2009 Robey Pointer <robeypointer@gmail.com>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
@ -34,14 +34,15 @@ class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxTyp
|
|||
|
||||
private[akka] val owner = new AtomicReference[ActorRef](_actor)
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
//Relies on an external lock provided by MessageDispatcher.attach
|
||||
private[akka] override def register(actorRef: ActorRef) = {
|
||||
val actor = owner.get()
|
||||
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
|
||||
owner.compareAndSet(null, actorRef) //Register if unregistered
|
||||
super.register(actorRef)
|
||||
}
|
||||
|
||||
override def unregister(actorRef: ActorRef) = {
|
||||
//Relies on an external lock provided by MessageDispatcher.detach
|
||||
private[akka] override def unregister(actorRef: ActorRef) = {
|
||||
super.unregister(actorRef)
|
||||
owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.event
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import akka.dispatch.{ FutureTimeoutException, Future }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remoteinterface
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remoteinterface
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package akka.serialization
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import akka.actor.Actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
|
@ -10,12 +10,14 @@ import akka.config.Config._
|
|||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.AkkaException
|
||||
|
||||
|
||||
case class NoSerializerFoundException(m: String) extends AkkaException(m)
|
||||
|
||||
/**
|
||||
* Serialization module. Contains methods for serialization and deserialization as well as
|
||||
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
|
||||
*/
|
||||
object Serialization {
|
||||
case class NoSerializerFoundException(m: String) extends AkkaException(m)
|
||||
|
||||
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = serializerFor(o.getClass) match {
|
||||
case Left(ex) ⇒ Left(ex)
|
||||
|
|
@ -27,37 +29,22 @@ object Serialization {
|
|||
clazz: Class[_],
|
||||
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
|
||||
serializerFor(clazz) match {
|
||||
case Left(ex) ⇒ Left(ex)
|
||||
case Left(e) ⇒ Left(e)
|
||||
case Right(serializer) ⇒ Right(serializer.fromBinary(bytes, Some(clazz), classLoader))
|
||||
}
|
||||
|
||||
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = {
|
||||
serializerMap.get(clazz.getName) match {
|
||||
case Some(serializerName: String) ⇒
|
||||
getClassFor(serializerName) match {
|
||||
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] =
|
||||
getClassFor(serializerMap.get(clazz.getName).getOrElse(serializers("default"))) match {
|
||||
case Right(serializer) ⇒ Right(serializer.newInstance.asInstanceOf[Serializer])
|
||||
case Left(exception) ⇒ Left(exception)
|
||||
}
|
||||
case _ ⇒
|
||||
defaultSerializer match {
|
||||
case Some(s: Serializer) ⇒ Right(s)
|
||||
case None ⇒ Left(NoSerializerFoundException("No default serializer found for " + clazz))
|
||||
}
|
||||
}
|
||||
case Left(e) => Left(e)
|
||||
}
|
||||
|
||||
private def defaultSerializer = serializers.get("default") match {
|
||||
case Some(ser: String) ⇒
|
||||
getClassFor(ser) match {
|
||||
case Right(serializer) ⇒ Some(serializer.newInstance.asInstanceOf[Serializer])
|
||||
case Left(exception) ⇒ None
|
||||
}
|
||||
case None ⇒ None
|
||||
}
|
||||
|
||||
private def getSerializerInstanceForBestMatchClass(cl: Class[_]) = bindings match {
|
||||
case Some(mappings) ⇒ mappings find {
|
||||
case (clazzName, ser) ⇒
|
||||
private def getSerializerInstanceForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
|
||||
if (bindings.isEmpty)
|
||||
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
|
||||
else {
|
||||
bindings find {
|
||||
case (clazzName, _) ⇒
|
||||
getClassFor(clazzName) match {
|
||||
case Right(clazz) ⇒ clazz.isAssignableFrom(cl)
|
||||
case _ ⇒ false
|
||||
|
|
@ -69,17 +56,33 @@ object Serialization {
|
|||
case _ ⇒ Left(new Exception("Error instantiating " + ser))
|
||||
}
|
||||
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
|
||||
case None ⇒ Left(NoSerializerFoundException("No mapping serializer found for " + cl))
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: Add type and docs
|
||||
val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer"))
|
||||
|
||||
//TODO: Add type and docs
|
||||
val bindings = config.getSection("akka.actor.serialization-bindings")
|
||||
.map(_.map)
|
||||
.map(m ⇒ Map() ++ m.map { case (k, v: List[String]) ⇒ Map() ++ v.map((_, k)) }.flatten)
|
||||
|
||||
//TODO: Add type and docs
|
||||
val serializerMap = bindings.map(m ⇒ m.map { case (k, v: String) ⇒ (k, serializers(v)) }).getOrElse(Map())
|
||||
/**
|
||||
* A Map of serializer from alias to implementation (FQN of a class implementing akka.serialization.Serializer)
|
||||
* By default always contains the following mapping: "default" -> "akka.serialization.JavaSerializer"
|
||||
* But "default" can be overridden in config
|
||||
*/
|
||||
val serializers: Map[String, String] = config.getSection("akka.actor.serializers") map {
|
||||
_.map.foldLeft(Map("default" -> "akka.serialization.JavaSerializer")) {
|
||||
case (result, (k: String, v: String)) => result + (k -> v)
|
||||
case (result, _) => result
|
||||
}
|
||||
} getOrElse Map("default" -> "akka.serialization.JavaSerializer")
|
||||
|
||||
/**
|
||||
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
|
||||
*/
|
||||
val bindings: Map[String, String] = config.getSection("akka.actor.serialization-bindings") map {
|
||||
_.map.foldLeft(Map[String,String]()) {
|
||||
case (result, (k: String, vs: List[_])) => result ++ (vs collect { case v: String => (v, k) }) //All keys which are lists, take the Strings from them and Map them
|
||||
case (result, _) => result //For any other values, just skip them, TODO: print out warnings?
|
||||
}
|
||||
} getOrElse Map()
|
||||
|
||||
/**
|
||||
* serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class
|
||||
*/
|
||||
val serializerMap: Map[String, String] = bindings mapValues serializers
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package akka.serialization
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
@ -125,22 +125,49 @@ class Switch(startAsOn: Boolean = false) {
|
|||
} else false
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action if the lock is on under a lock, so be _very_ careful with longrunning/blocking operations in it
|
||||
* Only executes the action if the switch is on, and switches it off immediately after obtaining the lock
|
||||
* Will switch it back on if the provided action throws an exception
|
||||
*/
|
||||
def switchOff(action: ⇒ Unit): Boolean = transcend(from = true, action)
|
||||
|
||||
/**
|
||||
* Executes the provided action if the lock is off under a lock, so be _very_ careful with longrunning/blocking operations in it
|
||||
* Only executes the action if the switch is off, and switches it on immediately after obtaining the lock
|
||||
* Will switch it back off if the provided action throws an exception
|
||||
*/
|
||||
def switchOn(action: ⇒ Unit): Boolean = transcend(from = false, action)
|
||||
|
||||
/**
|
||||
* Switches the switch off (if on), uses locking
|
||||
*/
|
||||
def switchOff: Boolean = synchronized { switch.compareAndSet(true, false) }
|
||||
|
||||
/**
|
||||
* Switches the switch on (if off), uses locking
|
||||
*/
|
||||
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns its value if the switch is IMMEDIATELY on (i.e. no lock involved)
|
||||
*/
|
||||
def ifOnYield[T](action: ⇒ T): Option[T] = {
|
||||
if (switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns its value if the switch is IMMEDIATELY off (i.e. no lock involved)
|
||||
*/
|
||||
def ifOffYield[T](action: ⇒ T): Option[T] = {
|
||||
if (!switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY on (i.e. no lock involved)
|
||||
*/
|
||||
def ifOn(action: ⇒ Unit): Boolean = {
|
||||
if (switch.get) {
|
||||
action
|
||||
|
|
@ -148,6 +175,9 @@ class Switch(startAsOn: Boolean = false) {
|
|||
} else false
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY off (i.e. no lock involved)
|
||||
*/
|
||||
def ifOff(action: ⇒ Unit): Boolean = {
|
||||
if (!switch.get) {
|
||||
action
|
||||
|
|
@ -155,16 +185,28 @@ class Switch(startAsOn: Boolean = false) {
|
|||
} else false
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns its value if the switch is on, waiting for any pending changes to happen before (locking)
|
||||
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
|
||||
*/
|
||||
def whileOnYield[T](action: ⇒ T): Option[T] = synchronized {
|
||||
if (switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns its value if the switch is off, waiting for any pending changes to happen before (locking)
|
||||
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
|
||||
*/
|
||||
def whileOffYield[T](action: ⇒ T): Option[T] = synchronized {
|
||||
if (!switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns if the action was executed or not, if the switch is on, waiting for any pending changes to happen before (locking)
|
||||
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
|
||||
*/
|
||||
def whileOn(action: ⇒ Unit): Boolean = synchronized {
|
||||
if (switch.get) {
|
||||
action
|
||||
|
|
@ -172,6 +214,10 @@ class Switch(startAsOn: Boolean = false) {
|
|||
} else false
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the provided action and returns if the action was executed or not, if the switch is off, waiting for any pending changes to happen before (locking)
|
||||
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
|
||||
*/
|
||||
def whileOff(action: ⇒ Unit): Boolean = synchronized {
|
||||
if (switch.get) {
|
||||
action
|
||||
|
|
@ -179,10 +225,21 @@ class Switch(startAsOn: Boolean = false) {
|
|||
} else false
|
||||
}
|
||||
|
||||
def ifElseYield[T](on: ⇒ T)(off: ⇒ T) = synchronized {
|
||||
/**
|
||||
* Executes the provided callbacks depending on if the switch is either on or off waiting for any pending changes to happen before (locking)
|
||||
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
|
||||
*/
|
||||
def fold[T](on: ⇒ T)(off: ⇒ T) = synchronized {
|
||||
if (switch.get) on else off
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the switch is IMMEDIATELY on (no locking)
|
||||
*/
|
||||
def isOn = switch.get
|
||||
|
||||
/**
|
||||
* Returns whether the switch is IMMEDDIATELY off (no locking)
|
||||
*/
|
||||
def isOff = !isOn
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel.component
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.camel
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.camel
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue