Switching to immutable.Seq instead of Seq

This commit is contained in:
Viktor Klang 2012-10-30 15:08:41 +01:00
parent 2866ecfa85
commit 8f131c680f
65 changed files with 375 additions and 350 deletions

View file

@ -6,24 +6,22 @@ package akka.actor
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import java.net.URLEncoder import java.net.URLEncoder
import scala.collection.immutable
class RelativeActorPathSpec extends WordSpec with MustMatchers { class RelativeActorPathSpec extends WordSpec with MustMatchers {
def elements(path: String): Seq[String] = path match { def elements(path: String): immutable.Seq[String] = RelativeActorPath.unapply(path).getOrElse(Nil)
case RelativeActorPath(elem) elem.toSeq
case _ Nil
}
"RelativeActorPath" must { "RelativeActorPath" must {
"match single name" in { "match single name" in {
elements("foo") must be(Seq("foo")) elements("foo") must be(List("foo"))
} }
"match path separated names" in { "match path separated names" in {
elements("foo/bar/baz") must be(Seq("foo", "bar", "baz")) elements("foo/bar/baz") must be(List("foo", "bar", "baz"))
} }
"match url encoded name" in { "match url encoded name" in {
val name = URLEncoder.encode("akka://ClusterSystem@127.0.0.1:2552", "UTF-8") val name = URLEncoder.encode("akka://ClusterSystem@127.0.0.1:2552", "UTF-8")
elements(name) must be(Seq(name)) elements(name) must be(List(name))
} }
} }
} }

View file

@ -5,18 +5,19 @@ package akka.actor
import language.postfixOps import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Timeout import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.util.Timeout
import akka.japi.{ Option JOption } import akka.japi.{ Option JOption }
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.{ Dispatchers } import akka.dispatch.Dispatchers
import akka.pattern.ask import akka.pattern.ask
import akka.serialization.JavaSerializer import akka.serialization.JavaSerializer
import akka.actor.TypedActor._ import akka.actor.TypedActor._
import java.util.concurrent.atomic.AtomicReference
import java.lang.IllegalStateException import java.lang.IllegalStateException
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
@ -35,9 +36,9 @@ object TypedActorSpec {
} }
""" """
class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] { class CyclicIterator[T](val items: immutable.Seq[T]) extends Iterator[T] {
private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items) private[this] val current = new AtomicReference(items)
def hasNext = items != Nil def hasNext = items != Nil

View file

@ -12,17 +12,18 @@ import java.io.PrintWriter
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import scala.collection.mutable.{ Map MutableMap } import scala.collection.mutable.{ Map MutableMap }
import scala.collection.immutable
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.Logging import akka.event.Logging
trait BenchResultRepository { trait BenchResultRepository {
def add(stats: Stats) def add(stats: Stats)
def get(name: String): Seq[Stats] def get(name: String): immutable.Seq[Stats]
def get(name: String, load: Int): Option[Stats] def get(name: String, load: Int): Option[Stats]
def getWithHistorical(name: String, load: Int): Seq[Stats] def getWithHistorical(name: String, load: Int): immutable.Seq[Stats]
def isBaseline(stats: Stats): Boolean def isBaseline(stats: Stats): Boolean
@ -38,9 +39,9 @@ object BenchResultRepository {
} }
class FileBenchResultRepository extends BenchResultRepository { class FileBenchResultRepository extends BenchResultRepository {
private val statsByName = MutableMap[String, Seq[Stats]]() private val statsByName = MutableMap[String, immutable.Seq[Stats]]()
private val baselineStats = MutableMap[Key, Stats]() private val baselineStats = MutableMap[Key, Stats]()
private val historicalStats = MutableMap[Key, Seq[Stats]]() private val historicalStats = MutableMap[Key, immutable.Seq[Stats]]()
private def resultDir = BenchmarkConfig.config.getString("benchmark.resultDir") private def resultDir = BenchmarkConfig.config.getString("benchmark.resultDir")
private val serDir = resultDir + "/ser" private val serDir = resultDir + "/ser"
private def serDirExists: Boolean = new File(serDir).exists private def serDirExists: Boolean = new File(serDir).exists
@ -51,13 +52,13 @@ class FileBenchResultRepository extends BenchResultRepository {
case class Key(name: String, load: Int) case class Key(name: String, load: Int)
def add(stats: Stats): Unit = synchronized { def add(stats: Stats): Unit = synchronized {
val values = statsByName.getOrElseUpdate(stats.name, IndexedSeq.empty) val values = statsByName.getOrElseUpdate(stats.name, Vector.empty)
statsByName(stats.name) = values :+ stats statsByName(stats.name) = values :+ stats
save(stats) save(stats)
} }
def get(name: String): Seq[Stats] = synchronized { def get(name: String): immutable.Seq[Stats] = synchronized {
statsByName.getOrElse(name, IndexedSeq.empty) statsByName.getOrElse(name, Vector.empty)
} }
def get(name: String, load: Int): Option[Stats] = synchronized { def get(name: String, load: Int): Option[Stats] = synchronized {
@ -68,13 +69,13 @@ class FileBenchResultRepository extends BenchResultRepository {
baselineStats.get(Key(stats.name, stats.load)) == Some(stats) baselineStats.get(Key(stats.name, stats.load)) == Some(stats)
} }
def getWithHistorical(name: String, load: Int): Seq[Stats] = synchronized { def getWithHistorical(name: String, load: Int): immutable.Seq[Stats] = synchronized {
val key = Key(name, load) val key = Key(name, load)
val historical = historicalStats.getOrElse(key, IndexedSeq.empty) val historical = historicalStats.getOrElse(key, Vector.empty)
val baseline = baselineStats.get(key) val baseline = baselineStats.get(key)
val current = get(name, load) val current = get(name, load)
val limited = (IndexedSeq.empty ++ historical ++ baseline ++ current).takeRight(maxHistorical) val limited = (Vector.empty ++ historical ++ baseline ++ current).takeRight(maxHistorical)
limited.sortBy(_.timestamp) limited.sortBy(_.timestamp)
} }
@ -94,7 +95,7 @@ class FileBenchResultRepository extends BenchResultRepository {
} }
val historical = load(historicalFiles) val historical = load(historicalFiles)
for (h historical) { for (h historical) {
val values = historicalStats.getOrElseUpdate(Key(h.name, h.load), IndexedSeq.empty) val values = historicalStats.getOrElseUpdate(Key(h.name, h.load), Vector.empty)
historicalStats(Key(h.name, h.load)) = values :+ h historicalStats(Key(h.name, h.load)) = values :+ h
} }
} }
@ -120,7 +121,7 @@ class FileBenchResultRepository extends BenchResultRepository {
} }
} }
private def load(files: Iterable[File]): Seq[Stats] = { private def load(files: Iterable[File]): immutable.Seq[Stats] = {
val result = val result =
for (f files) yield { for (f files) yield {
var in: ObjectInputStream = null var in: ObjectInputStream = null
@ -132,11 +133,11 @@ class FileBenchResultRepository extends BenchResultRepository {
case e: Throwable case e: Throwable
None None
} finally { } finally {
if (in ne null) try { in.close() } catch { case ignore: Exception } if (in ne null) try in.close() catch { case ignore: Exception }
} }
} }
result.flatten.toSeq.sortBy(_.timestamp) result.flatten.toVector.sortBy(_.timestamp)
} }
loadFiles() loadFiles()

View file

@ -3,7 +3,7 @@ package akka.performance.workbench
import java.io.UnsupportedEncodingException import java.io.UnsupportedEncodingException
import java.net.URLEncoder import java.net.URLEncoder
import scala.collection.immutable.TreeMap import scala.collection.immutable
/** /**
* Generates URLs to Google Chart API http://code.google.com/apis/chart/ * Generates URLs to Google Chart API http://code.google.com/apis/chart/
@ -16,7 +16,7 @@ object GoogleChartBuilder {
/** /**
* Builds a bar chart for tps in the statistics. * Builds a bar chart for tps in the statistics.
*/ */
def tpsChartUrl(statsByTimestamp: TreeMap[Long, Seq[Stats]], title: String, legend: Stats String): String = { def tpsChartUrl(statsByTimestamp: immutable.TreeMap[Long, Seq[Stats]], title: String, legend: Stats String): String = {
if (statsByTimestamp.isEmpty) "" if (statsByTimestamp.isEmpty) ""
else { else {
val loads = statsByTimestamp.values.head.map(_.load) val loads = statsByTimestamp.values.head.map(_.load)
@ -46,7 +46,7 @@ object GoogleChartBuilder {
//sb.append("&") //sb.append("&")
// legend // legend
val legendStats = statsByTimestamp.values.map(_.head).toSeq val legendStats = statsByTimestamp.values.toVector.map(_.head)
appendLegend(legendStats, sb, legend) appendLegend(legendStats, sb, legend)
sb.append("&") sb.append("&")
// bar spacing // bar spacing
@ -60,10 +60,7 @@ object GoogleChartBuilder {
val loadStr = loads.mkString(",") val loadStr = loads.mkString(",")
sb.append("chd=t:") sb.append("chd=t:")
val maxValue = allStats.map(_.tps).max val maxValue = allStats.map(_.tps).max
val tpsSeries: Iterable[String] = val tpsSeries: Iterable[String] = for (statsSeq statsByTimestamp.values) yield statsSeq.map(_.tps).mkString(",")
for (statsSeq statsByTimestamp.values) yield {
statsSeq.map(_.tps).mkString(",")
}
sb.append(tpsSeries.mkString("|")) sb.append(tpsSeries.mkString("|"))
// y range // y range
@ -83,7 +80,7 @@ object GoogleChartBuilder {
/** /**
* Builds a bar chart for all percentiles and the mean in the statistics. * Builds a bar chart for all percentiles and the mean in the statistics.
*/ */
def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats String): String = { def percentilesAndMeanChartUrl(statistics: immutable.Seq[Stats], title: String, legend: Stats String): String = {
if (statistics.isEmpty) "" if (statistics.isEmpty) ""
else { else {
val current = statistics.last val current = statistics.last
@ -146,13 +143,13 @@ object GoogleChartBuilder {
} }
} }
private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) { private def percentileLabels(percentiles: immutable.TreeMap[Int, Long], sb: StringBuilder) {
sb.append("chxl=1:|") sb.append("chxl=1:|")
val s = percentiles.keys.toList.map(_ + "%").mkString("|") val s = percentiles.keys.toList.map(_ + "%").mkString("|")
sb.append(s) sb.append(s)
} }
private def appendLegend(statistics: Seq[Stats], sb: StringBuilder, legend: Stats String) { private def appendLegend(statistics: immutable.Seq[Stats], sb: StringBuilder, legend: Stats String) {
val legends = statistics.map(legend(_)) val legends = statistics.map(legend(_))
sb.append("chdl=") sb.append("chdl=")
val s = legends.map(urlEncode(_)).mkString("|") val s = legends.map(urlEncode(_)).mkString("|")
@ -166,7 +163,7 @@ object GoogleChartBuilder {
sb.append(s) sb.append(s)
} }
private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) { private def dataSeries(allPercentiles: immutable.Seq[immutable.TreeMap[Int, Long]], meanValues: immutable.Seq[Double], sb: StringBuilder) {
val percentileSeries = val percentileSeries =
for { for {
percentiles allPercentiles percentiles allPercentiles
@ -181,7 +178,7 @@ object GoogleChartBuilder {
sb.append(series.mkString("|")) sb.append(series.mkString("|"))
} }
private def dataSeries(values: Seq[Double], sb: StringBuilder) { private def dataSeries(values: immutable.Seq[Double], sb: StringBuilder) {
val series = values.map(formatDouble(_)) val series = values.map(formatDouble(_))
sb.append(series.mkString("|")) sb.append(series.mkString("|"))
} }
@ -198,7 +195,7 @@ object GoogleChartBuilder {
} }
} }
def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = { def latencyAndThroughputChartUrl(statistics: immutable.Seq[Stats], title: String): String = {
if (statistics.isEmpty) "" if (statistics.isEmpty) ""
else { else {
val sb = new StringBuilder val sb = new StringBuilder

View file

@ -5,7 +5,7 @@ import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.Logging import akka.event.Logging
import scala.collection.immutable.TreeMap import scala.collection.immutable
class Report( class Report(
system: ActorSystem, system: ActorSystem,
@ -19,7 +19,7 @@ class Report(
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]) { def html(statistics: immutable.Seq[Stats]) {
val current = statistics.last val current = statistics.last
val sb = new StringBuilder val sb = new StringBuilder
@ -80,13 +80,13 @@ class Report(
chartUrl chartUrl
} }
def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = { def comparePercentilesAndMeanChart(stats: Stats): immutable.Seq[String] = {
for { for {
compareName compareResultWith.toSeq compareName compareResultWith.to[immutable.Seq]
compareStats resultRepository.get(compareName, stats.load) compareStats resultRepository.get(compareName, stats.load)
} yield { } yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name) val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(List(compareStats, stats), chartTitle, _.name)
chartUrl chartUrl
} }
} }
@ -102,17 +102,17 @@ class Report(
} }
} }
def compareWithHistoricalTpsChart(statistics: Seq[Stats]): Option[String] = { def compareWithHistoricalTpsChart(statistics: immutable.Seq[Stats]): Option[String] = {
if (statistics.isEmpty) { if (statistics.isEmpty) {
None None
} else { } else {
val histTimestamps = resultRepository.getWithHistorical(statistics.head.name, statistics.head.load).map(_.timestamp) val histTimestamps = resultRepository.getWithHistorical(statistics.head.name, statistics.head.load).map(_.timestamp)
val statsByTimestamp = TreeMap[Long, Seq[Stats]]() ++ val statsByTimestamp = immutable.TreeMap[Long, Seq[Stats]]() ++
(for (ts histTimestamps) yield { (for (ts histTimestamps) yield {
val seq = val seq =
for (stats statistics) yield { for (stats statistics) yield {
val withHistorical: Seq[Stats] = resultRepository.getWithHistorical(stats.name, stats.load) val withHistorical: immutable.Seq[Stats] = resultRepository.getWithHistorical(stats.name, stats.load)
val cell = withHistorical.find(_.timestamp == ts) val cell = withHistorical.find(_.timestamp == ts)
cell.getOrElse(Stats(stats.name, stats.load, ts)) cell.getOrElse(Stats(stats.name, stats.load, ts))
} }
@ -131,7 +131,7 @@ class Report(
chartUrl chartUrl
} }
def formatResultsTable(statsSeq: Seq[Stats]): String = { def formatResultsTable(statsSeq: immutable.Seq[Stats]): String = {
val name = statsSeq.head.name val name = statsSeq.head.name

View file

@ -121,7 +121,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
else addr + name else addr + name
override def compareTo(other: ActorPath): Int = other match { override def compareTo(other: ActorPath): Int = other match {
case r: RootActorPath toString compareTo r.toString case r: RootActorPath toString compareTo r.toString // FIXME make this cheaper by comparing address and name in isolation
case c: ChildActorPath 1 case c: ChildActorPath 1
} }
} }

View file

@ -480,7 +480,7 @@ class LocalActorRefProvider(
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
private def guardianSupervisorStrategyConfigurator = private def guardianSupervisorStrategyConfigurator =
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Nil).get
/** /**
* Overridable supervision strategy to be used by the /user guardian. * Overridable supervision strategy to be used by the /user guardian.

View file

@ -9,17 +9,17 @@ import akka.dispatch._
import akka.pattern.ask import akka.pattern.ask
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration.Duration import scala.collection.immutable
import java.io.Closeable import scala.concurrent.duration.{ FiniteDuration, Duration }
import scala.concurrent.{ Await, Awaitable, CanAwait, Future } import scala.concurrent.{ Await, Awaitable, CanAwait, Future }
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.util._ import akka.util._
import java.io.Closeable
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.dungeon.ChildrenContainer import akka.actor.dungeon.ChildrenContainer
import scala.concurrent.duration.FiniteDuration
import util.{ Failure, Success }
object ActorSystem { object ActorSystem {
@ -144,7 +144,7 @@ object ActorSystem {
final val LogLevel: String = getString("akka.loglevel") final val LogLevel: String = getString("akka.loglevel")
final val StdoutLogLevel: String = getString("akka.stdout-loglevel") final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala final val EventHandlers: immutable.Seq[String] = getStringList("akka.event-handlers").asScala.to[Vector]
final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS)) final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start") final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
@ -273,10 +273,8 @@ abstract class ActorSystem extends ActorRefFactory {
/** /**
* ''Java API'': Recursively create a descendants path by appending all child names. * ''Java API'': Recursively create a descendants path by appending all child names.
*/ */
def descendant(names: java.lang.Iterable[String]): ActorPath = { def descendant(names: java.lang.Iterable[String]): ActorPath =
import scala.collection.JavaConverters._ /(scala.collection.JavaConverters.iterableAsScalaIterableConverter(names).asScala)
/(names.asScala)
}
/** /**
* Start-up time in milliseconds since the epoch. * Start-up time in milliseconds since the epoch.
@ -536,7 +534,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
val scheduler: Scheduler = createScheduler() val scheduler: Scheduler = createScheduler()
val provider: ActorRefProvider = { val provider: ActorRefProvider = {
val arguments = Seq( val arguments = Vector(
classOf[String] -> name, classOf[String] -> name,
classOf[Settings] -> settings, classOf[Settings] -> settings,
classOf[EventStream] -> eventStream, classOf[EventStream] -> eventStream,
@ -676,15 +674,15 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null
private def loadExtensions() { private def loadExtensions() {
import scala.collection.JavaConverters.collectionAsScalaIterableConverter scala.collection.JavaConverters.collectionAsScalaIterableConverter(
settings.config.getStringList("akka.extensions").asScala foreach { fqcn settings.config.getStringList("akka.extensions")).asScala foreach { fqcn
dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match { dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
case Success(p: ExtensionIdProvider) registerExtension(p.lookup()) case Success(p: ExtensionIdProvider) registerExtension(p.lookup())
case Success(p: ExtensionId[_]) registerExtension(p) case Success(p: ExtensionId[_]) registerExtension(p)
case Success(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) case Success(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
case Failure(problem) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) case Failure(problem) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
}
} }
}
} }
override def toString: String = lookupRoot.path.root.address.toString override def toString: String = lookupRoot.path.root.address.toString

View file

@ -5,7 +5,8 @@ package akka.actor
import java.net.URI import java.net.URI
import java.net.URISyntaxException import java.net.URISyntaxException
import java.net.MalformedURLException import java.net.MalformedURLException
import annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable
/** /**
* The address specifies the physical location under which an Actor can be * The address specifies the physical location under which an Actor can be
@ -71,7 +72,7 @@ private[akka] trait PathUtils {
} }
object RelativeActorPath extends PathUtils { object RelativeActorPath extends PathUtils {
def unapply(addr: String): Option[Iterable[String]] = { def unapply(addr: String): Option[immutable.Seq[String]] = {
try { try {
val uri = new URI(addr) val uri = new URI(addr)
if (uri.isAbsolute) None if (uri.isAbsolute) None

View file

@ -160,7 +160,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
val vnodes = deployment.getInt("virtual-nodes-factor") val vnodes = deployment.getInt("virtual-nodes-factor")
ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes) ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes)
case fqn case fqn
val args = Seq(classOf[Config] -> deployment) val args = List(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({
case exception throw new IllegalArgumentException( case exception throw new IllegalArgumentException(
("Cannot instantiate router [%s], defined in [%s], " + ("Cannot instantiate router [%s], defined in [%s], " +

View file

@ -3,7 +3,7 @@
*/ */
package akka.actor package akka.actor
import scala.util.control.NonFatal import scala.collection.immutable
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.Try import scala.util.Try
@ -25,7 +25,7 @@ abstract class DynamicAccess {
* val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name)) * val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name))
* }}} * }}}
*/ */
def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T]
/** /**
* Obtain a `Class[_]` object loaded with the right class loader (i.e. the one * Obtain a `Class[_]` object loaded with the right class loader (i.e. the one
@ -40,7 +40,7 @@ abstract class DynamicAccess {
* `args` argument. The exact usage of args depends on which type is requested, * `args` argument. The exact usage of args depends on which type is requested,
* see the relevant requesting code for details. * see the relevant requesting code for details.
*/ */
def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T]
/** /**
* Obtain the Scala object instance for the given fully-qualified class name, if there is one. * Obtain the Scala object instance for the given fully-qualified class name, if there is one.
@ -70,7 +70,7 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c) if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c)
}) })
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] = override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
Try { Try {
val types = args.map(_._1).toArray val types = args.map(_._1).toArray
val values = args.map(_._2).toArray val values = args.map(_._2).toArray
@ -81,7 +81,7 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t) if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
} recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException } } recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException }
override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] = override def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
getClassFor(fqcn) flatMap { c createInstanceFor(c, args) } getClassFor(fqcn) flatMap { c createInstanceFor(c, args) }
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = { override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {

View file

@ -98,5 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext
def this(clazz: Class[T]) = this()(ClassTag(clazz)) def this(clazz: Class[T]) = this()(ClassTag(clazz))
override def lookup(): ExtensionId[T] = this override def lookup(): ExtensionId[T] = this
def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, List(classOf[ExtendedActorSystem] -> system)).get
} }

View file

@ -5,9 +5,10 @@ package akka.actor
import language.implicitConversions import language.implicitConversions
import java.lang.{ Iterable JIterable }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import java.lang.{ Iterable JIterable } import scala.collection.immutable
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
/** /**
* INTERNAL API * INTERNAL API
@ -170,7 +171,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* Implicit conversion from `Seq` of Throwables to a `Decider`. * Implicit conversion from `Seq` of Throwables to a `Decider`.
* This maps the given Throwables to restarts, otherwise escalates. * This maps the given Throwables to restarts, otherwise escalates.
*/ */
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit) implicit def seqThrowable2Decider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit)
type Decider = PartialFunction[Throwable, Directive] type Decider = PartialFunction[Throwable, Directive]
type JDecider = akka.japi.Function[Throwable, Directive] type JDecider = akka.japi.Function[Throwable, Directive]
@ -180,23 +181,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: Array[Class[_]]): Decider = def makeDecider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit)
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/** /**
* Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider = def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate } makeImmutableDecider(scala.collection.JavaConverters.iterableAsScalaIterableConverter(trapExit).asScala)
/** private[this] def makeImmutableDecider(trapExit: Iterable[Class[_]]): Decider = {
* Decider builder which just checks whether one of val traps = trapExit match { // This is the sad, awkward, truth
* the given Throwables matches the cause and restarts, otherwise escalates. case s: immutable.Seq[_] s.asInstanceOf[immutable.Seq[Class[_]]]
*/ case other other.to[immutable.Seq]
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = { }
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
makeDecider(trapExit.asScala.toSeq) { case x if (traps exists (_ isInstance x)) Restart else Escalate }
} }
/** /**
@ -222,14 +221,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* *
* INTERNAL API * INTERNAL API
*/ */
private[akka] def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] = private[akka] def sort(in: Iterable[CauseDirective]): immutable.Seq[CauseDirective] =
(new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) (new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca)
buf.indexWhere(_._1 isAssignableFrom ca._1) match { buf.indexWhere(_._1 isAssignableFrom ca._1) match {
case -1 buf append ca case -1 buf append ca
case x buf insert (x, ca) case x buf insert (x, ca)
} }
buf buf
} }.to[immutable.Seq]
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] = private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
@ -338,10 +337,6 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared
@ -380,9 +375,6 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared

View file

@ -6,6 +6,7 @@ package akka.actor
import language.higherKinds import language.higherKinds
import language.postfixOps import language.postfixOps
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -122,7 +123,7 @@ object IO {
* @return a new SocketHandle that can be used to perform actions on the * @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel. * new connection's SocketChannel.
*/ */
def accept(options: Seq[SocketOption] = Seq.empty)(implicit socketOwner: ActorRef): SocketHandle = { def accept(options: immutable.Seq[SocketOption] = Nil)(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager) val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this, options) ioManager ! Accept(socket, this, options)
socket socket
@ -250,7 +251,7 @@ object IO {
* *
* Normally sent using IOManager.listen() * Normally sent using IOManager.listen()
*/ */
case class Listen(server: ServerHandle, address: SocketAddress, options: Seq[ServerSocketOption] = Seq.empty) extends IOMessage case class Listen(server: ServerHandle, address: SocketAddress, options: immutable.Seq[ServerSocketOption] = Nil) extends IOMessage
/** /**
* Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is * Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is
@ -272,7 +273,7 @@ object IO {
* *
* Normally sent using [[akka.actor.IO.ServerHandle]].accept() * Normally sent using [[akka.actor.IO.ServerHandle]].accept()
*/ */
case class Accept(socket: SocketHandle, server: ServerHandle, options: Seq[SocketOption] = Seq.empty) extends IOMessage case class Accept(socket: SocketHandle, server: ServerHandle, options: immutable.Seq[SocketOption] = Nil) extends IOMessage
/** /**
* Message to an [[akka.actor.IOManager]] to create a SocketChannel connected * Message to an [[akka.actor.IOManager]] to create a SocketChannel connected
@ -280,7 +281,7 @@ object IO {
* *
* Normally sent using IOManager.connect() * Normally sent using IOManager.connect()
*/ */
case class Connect(socket: SocketHandle, address: SocketAddress, options: Seq[SocketOption] = Seq.empty) extends IOMessage case class Connect(socket: SocketHandle, address: SocketAddress, options: immutable.Seq[SocketOption] = Nil) extends IOMessage
/** /**
* Message from an [[akka.actor.IOManager]] that the SocketChannel has * Message from an [[akka.actor.IOManager]] that the SocketChannel has
@ -832,7 +833,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension {
* @param option Seq of [[akka.actor.IO.ServerSocketOptions]] to setup on socket * @param option Seq of [[akka.actor.IO.ServerSocketOptions]] to setup on socket
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/ */
def listen(address: SocketAddress, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = { def listen(address: SocketAddress, options: immutable.Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = {
val server = IO.ServerHandle(owner, actor) val server = IO.ServerHandle(owner, actor)
actor ! IO.Listen(server, address, options) actor ! IO.Listen(server, address, options)
server server
@ -847,7 +848,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor * @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/ */
def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Seq.empty) def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Nil)
/** /**
* Create a ServerSocketChannel listening on a host and port. Messages will * Create a ServerSocketChannel listening on a host and port. Messages will
@ -860,7 +861,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor * @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/ */
def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption] = Seq.empty)(implicit owner: ActorRef): IO.ServerHandle = def listen(host: String, port: Int, options: immutable.Seq[IO.ServerSocketOption] = Nil)(implicit owner: ActorRef): IO.ServerHandle =
listen(new InetSocketAddress(host, port), options)(owner) listen(new InetSocketAddress(host, port), options)(owner)
/** /**
@ -873,7 +874,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor * @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/ */
def connect(address: SocketAddress, options: Seq[IO.SocketOption] = Seq.empty)(implicit owner: ActorRef): IO.SocketHandle = { def connect(address: SocketAddress, options: immutable.Seq[IO.SocketOption] = Nil)(implicit owner: ActorRef): IO.SocketHandle = {
val socket = IO.SocketHandle(owner, actor) val socket = IO.SocketHandle(owner, actor)
actor ! IO.Connect(socket, address, options) actor ! IO.Connect(socket, address, options)
socket socket
@ -991,7 +992,7 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi
private def forwardFailure(f: Unit): Unit = try f catch { case NonFatal(e) sender ! Status.Failure(e) } private def forwardFailure(f: Unit): Unit = try f catch { case NonFatal(e) sender ! Status.Failure(e) }
private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) { private def setSocketOptions(socket: java.net.Socket, options: immutable.Seq[IO.SocketOption]) {
options foreach { options foreach {
case IO.KeepAlive(on) forwardFailure(socket.setKeepAlive(on)) case IO.KeepAlive(on) forwardFailure(socket.setKeepAlive(on))
case IO.OOBInline(on) forwardFailure(socket.setOOBInline(on)) case IO.OOBInline(on) forwardFailure(socket.setOOBInline(on))

View file

@ -4,22 +4,24 @@
package akka.actor package akka.actor
import language.existentials import language.existentials
import akka.japi.{ Creator, Option JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.Timeout
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
import akka.japi.{ Creator, Option JOption }
import akka.util.Timeout
import akka.util.Reflect.instantiator import akka.util.Reflect.instantiator
import akka.serialization.{ JavaSerializer, SerializationExtension }
import akka.dispatch._ import akka.dispatch._
import java.util.concurrent.atomic.{ AtomicReference AtomVar } import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.reflect.ClassTag
import akka.serialization.{ JavaSerializer, SerializationExtension }
import java.io.ObjectStreamException import java.io.ObjectStreamException
import scala.util.{ Try, Success, Failure } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import scala.concurrent.duration.FiniteDuration
/** /**
* A TypedActorFactory is something that can created TypedActor instances. * A TypedActorFactory is something that can created TypedActor instances.
@ -439,8 +441,8 @@ object TypedProps {
* @return a sequence of interfaces that the specified class implements, * @return a sequence of interfaces that the specified class implements,
* or a sequence containing only itself, if itself is an interface. * or a sequence containing only itself, if itself is an interface.
*/ */
def extractInterfaces(clazz: Class[_]): Seq[Class[_]] = def extractInterfaces(clazz: Class[_]): immutable.Seq[Class[_]] =
if (clazz.isInterface) Seq[Class[_]](clazz) else clazz.getInterfaces.toList if (clazz.isInterface) List[Class[_]](clazz) else clazz.getInterfaces.to[List]
/** /**
* Uses the supplied class as the factory for the TypedActor implementation, * Uses the supplied class as the factory for the TypedActor implementation,
@ -489,7 +491,7 @@ object TypedProps {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class TypedProps[T <: AnyRef] protected[TypedProps] ( case class TypedProps[T <: AnyRef] protected[TypedProps] (
interfaces: Seq[Class[_]], interfaces: immutable.Seq[Class[_]],
creator: () T, creator: () T,
dispatcher: String = TypedProps.defaultDispatcherId, dispatcher: String = TypedProps.defaultDispatcherId,
deploy: Deploy = Props.defaultDeploy, deploy: Deploy = Props.defaultDeploy,

View file

@ -420,7 +420,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case "unbounded" UnboundedMailbox() case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(prerequisites.settings, config) case "bounded" new BoundedMailbox(prerequisites.settings, config)
case fqcn case fqcn
val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) val args = List(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception case exception
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -436,7 +436,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn case fqcn
val args = Seq( val args = List(
classOf[Config] -> config, classOf[Config] -> config,
classOf[DispatcherPrerequisites] -> prerequisites) classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({ prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({

View file

@ -147,7 +147,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
case "BalancingDispatcher" new BalancingDispatcherConfigurator(cfg, prerequisites) case "BalancingDispatcher" new BalancingDispatcherConfigurator(cfg, prerequisites)
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites) case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn case fqn
val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
case exception case exception
throw new IllegalArgumentException( throw new IllegalArgumentException(

View file

@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentSkipListSet
import java.util.Comparator import java.util.Comparator
import akka.util.{ Subclassification, SubclassifiedIndex } import akka.util.{ Subclassification, SubclassifiedIndex }
import scala.collection.immutable.TreeSet import scala.collection.immutable.TreeSet
import scala.collection.immutable
/** /**
* Represents the base type for EventBuses * Represents the base type for EventBuses
@ -167,12 +168,12 @@ trait SubchannelClassification { this: EventBus ⇒
recv foreach (publish(event, _)) recv foreach (publish(event, _))
} }
private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs)
} }
private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs)
} }

View file

@ -9,12 +9,13 @@ import akka.actor._
import akka.{ ConfigurationException, AkkaException } import akka.{ ConfigurationException, AkkaException }
import akka.actor.ActorSystem.Settings import akka.actor.ActorSystem.Settings
import akka.util.{ Timeout, ReentrantGuard } import akka.util.{ Timeout, ReentrantGuard }
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import scala.annotation.implicitNotFound
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import annotation.implicitNotFound import scala.util.control.NoStackTrace
/** /**
* This trait brings log level handling to the EventStream: it reads the log * This trait brings log level handling to the EventStream: it reads the log
@ -448,7 +449,7 @@ object Logging {
} }
// these type ascriptions/casts are necessary to avoid CCEs during construction while retaining correct type // these type ascriptions/casts are necessary to avoid CCEs during construction while retaining correct type
val AllLogLevels: Seq[LogLevel] = Seq(ErrorLevel, WarningLevel, InfoLevel, DebugLevel) val AllLogLevels: immutable.Seq[LogLevel] = Vector(ErrorLevel, WarningLevel, InfoLevel, DebugLevel)
/** /**
* Obtain LoggingAdapter for the given actor system and source object. This * Obtain LoggingAdapter for the given actor system and source object. This

View file

@ -5,7 +5,8 @@
package akka.japi package akka.japi
import language.implicitConversions import language.implicitConversions
import scala.Some
import scala.collection.immutable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.runtime.AbstractPartialFunction import scala.runtime.AbstractPartialFunction
@ -176,7 +177,7 @@ object Option {
object Util { object Util {
def classTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz) def classTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
def arrayToSeq[T](arr: Array[T]): Seq[T] = arr.toSeq def arrayToSeq[T](arr: Array[T]): immutable.Seq[T] = arr.to[immutable.Seq]
def arrayToSeq(classes: Array[Class[_]]): Seq[Class[_]] = classes.toSeq def arrayToSeq(classes: Array[Class[_]]): immutable.Seq[Class[_]] = classes.to[immutable.Seq]
} }

View file

@ -4,7 +4,7 @@
package akka.routing package akka.routing
import scala.collection.immutable.SortedMap import scala.collection.immutable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import java.util.Arrays import java.util.Arrays
@ -18,7 +18,7 @@ import java.util.Arrays
* hash, i.e. make sure it is different for different nodes. * hash, i.e. make sure it is different for different nodes.
* *
*/ */
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) { class ConsistentHash[T: ClassTag] private (nodes: immutable.SortedMap[Int, T], val virtualNodesFactor: Int) {
import ConsistentHash._ import ConsistentHash._
@ -106,7 +106,7 @@ class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtual
object ConsistentHash { object ConsistentHash {
def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = {
new ConsistentHash(SortedMap.empty[Int, T] ++ new ConsistentHash(immutable.SortedMap.empty[Int, T] ++
(for (node nodes; vnode 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), (for (node nodes; vnode 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)),
virtualNodesFactor) virtualNodesFactor)
} }

View file

@ -4,14 +4,15 @@
package akka.serialization package akka.serialization
import akka.AkkaException
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.AkkaException
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess } import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
import akka.event.Logging import akka.event.Logging
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException import java.io.NotSerializableException
import util.{ Try, DynamicVariable } import scala.util.{ Try, DynamicVariable }
import scala.collection.immutable
object Serialization { object Serialization {
@ -97,7 +98,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
serializerMap.get(clazz) match { serializerMap.get(clazz) match {
case null case null
// bindings are ordered from most specific to least specific // bindings are ordered from most specific to least specific
def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean = def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean =
possibilities.size == 1 || possibilities.size == 1 ||
(possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) || (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
(possibilities forall (_._2 == possibilities(0)._2)) (possibilities forall (_._2 == possibilities(0)._2))
@ -122,8 +123,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* loading is performed by the systems [[akka.actor.DynamicAccess]]. * loading is performed by the systems [[akka.actor.DynamicAccess]].
*/ */
def serializerOf(serializerFQN: String): Try[Serializer] = def serializerOf(serializerFQN: String): Try[Serializer] =
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith { system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, List(classOf[ExtendedActorSystem] -> system)) recoverWith {
case _ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()) case _ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Nil)
} }
/** /**
@ -137,21 +138,21 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* bindings is a Seq of tuple representing the mapping from Class to Serializer. * bindings is a Seq of tuple representing the mapping from Class to Serializer.
* It is primarily ordered by the most specific classes first, and secondly in the configured order. * It is primarily ordered by the most specific classes first, and secondly in the configured order.
*/ */
private[akka] val bindings: Seq[ClassSerializer] = private[akka] val bindings: immutable.Seq[ClassSerializer] =
sort(for ((k: String, v: String) settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))) sort(for ((k: String, v: String) settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))).to[immutable.Seq]
/** /**
* Sort so that subtypes always precede their supertypes, but without * Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort). * obeying any order between unrelated subtypes (insert sort).
*/ */
private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] = private def sort(in: Iterable[ClassSerializer]): immutable.Seq[ClassSerializer] =
(new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca) ((new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca)
buf.indexWhere(_._1 isAssignableFrom ca._1) match { buf.indexWhere(_._1 isAssignableFrom ca._1) match {
case -1 buf append ca case -1 buf append ca
case x buf insert (x, ca) case x buf insert (x, ca)
} }
buf buf
} }).to[immutable.Seq]
/** /**
* serializerMap is a Map whose keys is the class that is serializable and values is the serializer * serializerMap is a Map whose keys is the class that is serializable and values is the serializer

View file

@ -3,6 +3,8 @@
*/ */
package akka.util package akka.util
import scala.collection.immutable
/** /**
* Typeclass which describes a classification hierarchy. Observe the contract between `isEqual` and `isSubclass`! * Typeclass which describes a classification hierarchy. Observe the contract between `isEqual` and `isSubclass`!
*/ */
@ -74,7 +76,7 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V]
import SubclassifiedIndex._ import SubclassifiedIndex._
type Changes = Seq[(K, Set[V])] type Changes = immutable.Seq[(K, Set[V])]
protected var subkeys = Vector.empty[Nonroot[K, V]] protected var subkeys = Vector.empty[Nonroot[K, V]]
@ -208,5 +210,5 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V]
private def mergeChangesByKey(changes: Changes): Changes = private def mergeChangesByKey(changes: Changes): Changes =
(emptyMergeMap[K, V] /: changes) { (emptyMergeMap[K, V] /: changes) {
case (m, (k, s)) m.updated(k, m(k) ++ s) case (m, (k, s)) m.updated(k, m(k) ++ s)
}.toSeq }.to[immutable.Seq]
} }

View file

@ -1,13 +1,18 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel package akka.camel
import language.postfixOps
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import scala.concurrent.{ Promise, Await, Future }
import scala.collection.immutable
import akka.camel.TestSupport.NonSharedCamelSystem import akka.camel.TestSupport.NonSharedCamelSystem
import akka.actor.{ ActorRef, Props, Actor } import akka.actor.{ ActorRef, Props, Actor }
import akka.routing.BroadcastRouter import akka.routing.BroadcastRouter
import concurrent.{ Promise, Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
import language.postfixOps
import akka.testkit._ import akka.testkit._
import akka.util.Timeout import akka.util.Timeout
import org.apache.camel.model.RouteDefinition import org.apache.camel.model.RouteDefinition
@ -58,7 +63,7 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared
activations.size must be(2 * number * number) activations.size must be(2 * number * number)
// must be the size of the activated activated producers and consumers // must be the size of the activated activated producers and consumers
deactivations.size must be(2 * number * number) deactivations.size must be(2 * number * number)
def partitionNames(refs: Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer")) def partitionNames(refs: immutable.Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer"))
def assertContainsSameElements(lists: (Seq[_], Seq[_])) { def assertContainsSameElements(lists: (Seq[_], Seq[_])) {
val (a, b) = lists val (a, b) = lists
a.intersect(b).size must be(a.size) a.intersect(b).size must be(a.size)

View file

@ -75,7 +75,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val failureDetector: FailureDetector = { val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn } import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector]( system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ fqcn, List(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
case e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) case e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}).get }).get
} }

View file

@ -557,7 +557,7 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
*/ */
private[cluster] object MetricsCollector { private[cluster] object MetricsCollector {
def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector = def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector =
dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match { dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil) match {
case Success(identity) new MetricsCollector(Some(identity), address) case Success(identity) new MetricsCollector(Some(identity), address)
case Failure(e) case Failure(e)
log.debug(e.toString) log.debug(e.toString)

View file

@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor.Address import akka.actor.Address
import scala.collection.immutable
case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -51,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
cluster.down(thirdAddress) cluster.down(thirdAddress)
enterBarrier("down-third-node") enterBarrier("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false) clusterView.members.exists(_.address == thirdAddress) must be(false)
} }
@ -62,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
runOn(second, fourth) { runOn(second, fourth) {
enterBarrier("down-third-node") enterBarrier("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
} }
enterBarrier("await-completion") enterBarrier("await-completion")

View file

@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor.Address import akka.actor.Address
import scala.collection.immutable
case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -49,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
markNodeAsUnavailable(thirdAddress) markNodeAsUnavailable(thirdAddress)
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false) clusterView.members.exists(_.address == thirdAddress) must be(false)
} }
@ -60,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
runOn(second, fourth) { runOn(second, fourth) {
enterBarrier("down-third-node") enterBarrier("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
} }
enterBarrier("await-completion") enterBarrier("await-completion")

View file

@ -11,6 +11,7 @@ import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor._ import akka.actor._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -59,7 +60,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds)
} }
runOn(fourth) { runOn(fourth) {
@ -69,7 +70,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
runOn(second, third) { runOn(second, third) {
enterBarrier("down-fourth-node") enterBarrier("down-fourth-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds)
} }
enterBarrier("await-completion-1") enterBarrier("await-completion-1")
@ -89,7 +90,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30.seconds)
} }
runOn(second) { runOn(second) {
@ -99,7 +100,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
runOn(third) { runOn(third) {
enterBarrier("down-second-node") enterBarrier("down-second-node")
awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds) awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30 seconds)
} }
enterBarrier("await-completion-2") enterBarrier("await-completion-2")

View file

@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val controller = role("controller") val controller = role("controller")
@ -42,7 +43,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
import multiNodeConfig._ import multiNodeConfig._
// sorted in the order used by the cluster // sorted in the order used by the cluster
lazy val sortedRoles = Seq(first, second, third, fourth).sorted lazy val sortedRoles = List(first, second, third, fourth).sorted
"A cluster of four nodes" must { "A cluster of four nodes" must {

View file

@ -4,21 +4,21 @@
package akka.cluster package akka.cluster
import language.implicitConversions import language.implicitConversions
import org.scalatest.Suite
import org.scalatest.exceptions.TestFailedException
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec } import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
import akka.testkit._ import akka.testkit._
import akka.testkit.TestEvent._ import akka.testkit.TestEvent._
import scala.concurrent.duration._ import akka.actor.{ ActorSystem, Address }
import org.scalatest.Suite
import org.scalatest.exceptions.TestFailedException
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorPath
import akka.actor.RootActorPath
import akka.event.Logging.ErrorLevel import akka.event.Logging.ErrorLevel
import akka.actor.ActorSystem import scala.concurrent.duration._
import scala.collection.immutable
import java.util.concurrent.ConcurrentHashMap
object MultiNodeClusterSpec { object MultiNodeClusterSpec {
@ -158,7 +158,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
* nodes (roles). First node will be started first * nodes (roles). First node will be started first
* and others will join the first. * and others will join the first.
*/ */
def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq) def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.to[immutable.Seq])
/** /**
* Initialize the cluster of the specified member * Initialize the cluster of the specified member
@ -166,11 +166,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
* First node will be started first and others will join * First node will be started first and others will join
* the first. * the first.
*/ */
def awaitClusterUp(roles: RoleName*): Unit = { def awaitClusterUp(roles: RoleName*): Unit = awaitStartCluster(true, roles.to[immutable.Seq])
awaitStartCluster(true, roles.toSeq)
}
private def awaitStartCluster(upConvergence: Boolean = true, roles: Seq[RoleName]): Unit = { private def awaitStartCluster(upConvergence: Boolean = true, roles: immutable.Seq[RoleName]): Unit = {
runOn(roles.head) { runOn(roles.head) {
// make sure that the node-to-join is started before other join // make sure that the node-to-join is started before other join
startClusterNode() startClusterNode()
@ -196,16 +194,15 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) members(i).address must be(a) } expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) members(i).address must be(a) }
} }
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) { def assertLeader(nodesInCluster: RoleName*): Unit =
assertLeaderIn(nodesInCluster) if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])
}
/** /**
* Assert that the cluster has elected the correct leader * Assert that the cluster has elected the correct leader
* out of all nodes in the cluster. First * out of all nodes in the cluster. First
* member in the cluster ring is expected leader. * member in the cluster ring is expected leader.
*/ */
def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) {
nodesInCluster.length must not be (0) nodesInCluster.length must not be (0)
val expectedLeader = roleOfLeader(nodesInCluster) val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader val leader = clusterView.leader
@ -221,7 +218,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
*/ */
def awaitUpConvergence( def awaitUpConvergence(
numberOfMembers: Int, numberOfMembers: Int,
canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], canNotBePartOfMemberRing: immutable.Seq[Address] = Nil,
timeout: FiniteDuration = 20.seconds): Unit = { timeout: FiniteDuration = 20.seconds): Unit = {
within(timeout) { within(timeout) {
awaitCond(clusterView.members.size == numberOfMembers) awaitCond(clusterView.members.size == numberOfMembers)
@ -239,7 +236,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
def awaitSeenSameState(addresses: Address*): Unit = def awaitSeenSameState(addresses: Address*): Unit =
awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty)
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0) nodesInCluster.length must not be (0)
nodesInCluster.sorted.head nodesInCluster.sorted.head
} }

View file

@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -65,7 +66,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
markNodeAsUnavailable(secondAddress) markNodeAsUnavailable(secondAddress)
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = List(secondAddress), 30.seconds)
clusterView.isSingletonCluster must be(true) clusterView.isSingletonCluster must be(true)
awaitCond(clusterView.isLeader) awaitCond(clusterView.isLeader)
} }

View file

@ -9,9 +9,10 @@ import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.remote.testconductor.Direction import akka.remote.testconductor.Direction
import scala.concurrent.duration._
import scala.collection.immutable
case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -53,8 +54,8 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
muteMarkingAsUnreachable() muteMarkingAsUnreachable()
val side1 = IndexedSeq(first, second) val side1 = Vector(first, second)
val side2 = IndexedSeq(third, fourth, fifth) val side2 = Vector(third, fourth, fifth)
"A cluster of 5 members" must { "A cluster of 5 members" must {

View file

@ -6,13 +6,14 @@ package akka.cluster
import language.postfixOps import language.postfixOps
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.actor.Address import akka.actor.Address
import akka.remote.testconductor.{ RoleName, Direction } import akka.remote.testconductor.{ RoleName, Direction }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -45,7 +46,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
muteMarkingAsUnreachable() muteMarkingAsUnreachable()
def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = {
roles.filterNot(_ == role) roles.filterNot(_ == role)
} }
@ -125,7 +126,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
} }
runOn(allBut(victim): _*) { runOn(allBut(victim): _*) {
awaitUpConvergence(roles.size - 1, Seq(victim)) awaitUpConvergence(roles.size - 1, List(victim))
} }
endBarrier endBarrier

View file

@ -7,7 +7,7 @@ package akka.cluster
import akka.actor.Address import akka.actor.Address
import akka.testkit._ import akka.testkit._
import akka.testkit.TestEvent._ import akka.testkit.TestEvent._
import scala.collection.immutable.TreeMap import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -27,7 +27,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
val conn = Address("akka", "", "localhost", 2552) val conn = Address("akka", "", "localhost", 2552)
val conn2 = Address("akka", "", "localhost", 2553) val conn2 = Address("akka", "", "localhost", 2553)
def fakeTimeGenerator(timeIntervals: Seq[Long]): () Long = { def fakeTimeGenerator(timeIntervals: immutable.Seq[Long]): () Long = {
var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) acc ::: List[Long](acc.last + c)) var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) acc ::: List[Long](acc.last + c))
def timeGenerator(): Long = { def timeGenerator(): Long = {
val currentTime = times.head val currentTime = times.head
@ -73,7 +73,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"return realistic phi values" in { "return realistic phi values" in {
val fd = createFailureDetector() val fd = createFailureDetector()
val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) val test = immutable.TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3)
for ((timeDiff, expectedPhi) test) { for ((timeDiff, expectedPhi) test) {
fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1)) fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1))
} }

View file

@ -5,14 +5,16 @@
package akka.cluster package akka.cluster
import scala.language.postfixOps import scala.language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.{ Success, Try, Failure }
import akka.actor._ import akka.actor._
import akka.testkit._ import akka.testkit._
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import util.{ Success, Try, Failure }
object MetricsEnabledSpec { object MetricsEnabledSpec {
val config = """ val config = """
@ -207,11 +209,10 @@ trait MetricSpec extends WordSpec with MustMatchers {
if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) m }.foreach(_.average.isDefined must be(true)) if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) m }.foreach(_.average.isDefined must be(true))
} }
def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = { def collectNodeMetrics(nodes: Set[NodeMetrics]): immutable.Seq[Metric] =
var r: Seq[Metric] = Seq.empty nodes.foldLeft(Vector[Metric]()) {
nodes.foreach(n r ++= n.metrics.filter(_.isDefined)) case (r, n) r ++ n.metrics.filter(_.isDefined)
r }
}
} }
trait AbstractClusterMetricsSpec extends DefaultTimeout { trait AbstractClusterMetricsSpec extends DefaultTimeout {

View file

@ -30,7 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import akka.japi.Function; import akka.japi.Function;
import scala.Option; import scala.Option;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.immutable.Seq;
import org.junit.Test; import org.junit.Test;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -220,7 +220,7 @@ public class FaultHandlingTestBase {
//#testkit //#testkit
public <A> Seq<A> seq(A... args) { public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter( return JavaConverters.collectionAsScalaIterableConverter(
java.util.Arrays.asList(args)).asScala().toSeq(); java.util.Arrays.asList(args)).asScala().toList();
} }
//#testkit //#testkit
} }

View file

@ -11,6 +11,7 @@ import static docs.jrouting.CustomRouterDocTestBase.Message.RepublicanVote;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.junit.After; import org.junit.After;
@ -69,7 +70,7 @@ public class CustomRouterDocTestBase {
//#supervision //#supervision
final SupervisorStrategy strategy = final SupervisorStrategy strategy =
new OneForOneStrategy(5, Duration.create("1 minute"), new OneForOneStrategy(5, Duration.create("1 minute"),
new Class<?>[] { Exception.class }); Collections.<Class<? extends Throwable>>singletonList(Exception.class));
final ActorRef router = system.actorOf(new Props(MyActor.class) final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy))); .withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));
//#supervision //#supervision

View file

@ -8,6 +8,7 @@ import language.postfixOps
import akka.testkit.{ AkkaSpec MyFavoriteTestFrameWorkPlusAkkaTestKit } import akka.testkit.{ AkkaSpec MyFavoriteTestFrameWorkPlusAkkaTestKit }
//#test-code //#test-code
import akka.actor.Props import akka.actor.Props
import scala.collection.immutable
class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
@ -24,7 +25,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
case object Flush case object Flush
// sent events // sent events
case class Batch(obj: Seq[Any]) case class Batch(obj: immutable.Seq[Any])
//#simple-events //#simple-events
//#simple-state //#simple-state
// states // states
@ -34,7 +35,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
sealed trait Data sealed trait Data
case object Uninitialized extends Data case object Uninitialized extends Data
case class Todo(target: ActorRef, queue: Seq[Any]) extends Data case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data
//#simple-state //#simple-state
//#simple-fsm //#simple-fsm
class Buncher extends Actor with FSM[State, Data] { class Buncher extends Actor with FSM[State, Data] {
@ -193,12 +194,12 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
buncher ! SetTarget(testActor) buncher ! SetTarget(testActor)
buncher ! Queue(42) buncher ! Queue(42)
buncher ! Queue(43) buncher ! Queue(43)
expectMsg(Batch(Seq(42, 43))) expectMsg(Batch(immutable.Seq(42, 43)))
buncher ! Queue(44) buncher ! Queue(44)
buncher ! Flush buncher ! Flush
buncher ! Queue(45) buncher ! Queue(45)
expectMsg(Batch(Seq(44))) expectMsg(Batch(immutable.Seq(44)))
expectMsg(Batch(Seq(45))) expectMsg(Batch(immutable.Seq(45)))
} }
"batch not if uninitialized" in { "batch not if uninitialized" in {

View file

@ -22,6 +22,7 @@ import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.TestKit import akka.testkit.TestKit
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
/** /**
* a Test to show some TestKit examples * a Test to show some TestKit examples
@ -38,8 +39,8 @@ class TestKitUsageSpec
val filterRef = system.actorOf(Props(new FilteringActor(testActor))) val filterRef = system.actorOf(Props(new FilteringActor(testActor)))
val randomHead = Random.nextInt(6) val randomHead = Random.nextInt(6)
val randomTail = Random.nextInt(10) val randomTail = Random.nextInt(10)
val headList = Seq().padTo(randomHead, "0") val headList = immutable.Seq().padTo(randomHead, "0")
val tailList = Seq().padTo(randomTail, "1") val tailList = immutable.Seq().padTo(randomTail, "1")
val seqRef = val seqRef =
system.actorOf(Props(new SequencingActor(testActor, headList, tailList))) system.actorOf(Props(new SequencingActor(testActor, headList, tailList)))
@ -145,7 +146,7 @@ object TestKitUsageSpec {
* like to test that the interesting value is received and that you cant * like to test that the interesting value is received and that you cant
* be bothered with the rest * be bothered with the rest
*/ */
class SequencingActor(next: ActorRef, head: Seq[String], tail: Seq[String]) class SequencingActor(next: ActorRef, head: immutable.Seq[String], tail: immutable.Seq[String])
extends Actor { extends Actor {
def receive = { def receive = {
case msg { case msg {

View file

@ -5,13 +5,13 @@ package docs.zeromq
import language.postfixOps import language.postfixOps
import akka.actor.{ Actor, Props }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
import akka.actor.{ Actor, Props }
import akka.testkit._ import akka.testkit._
import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension } import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind }
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import akka.zeromq.{ SocketType, Bind }
object ZeromqDocSpec { object ZeromqDocSpec {
@ -52,12 +52,12 @@ object ZeromqDocSpec {
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed,
currentHeap.getMax)).get currentHeap.getMax)).get
// the first frame is the topic, second is the message // the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload))) pubSocket ! ZMQMessage(immutable.Seq(Frame("health.heap"), Frame(heapPayload)))
// use akka SerializationExtension to convert to bytes // use akka SerializationExtension to convert to bytes
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
// the first frame is the topic, second is the message // the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload))) pubSocket ! ZMQMessage(immutable.Seq(Frame("health.load"), Frame(loadPayload)))
} }
} }
//#health //#health
@ -146,7 +146,7 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
val payload = Array.empty[Byte] val payload = Array.empty[Byte]
//#pub-topic //#pub-topic
pubSocket ! ZMQMessage(Seq(Frame("foo.bar"), Frame(payload))) pubSocket ! ZMQMessage(Frame("foo.bar"), Frame(payload))
//#pub-topic //#pub-topic
system.stop(subSocket) system.stop(subSocket)

View file

@ -9,6 +9,7 @@ import java.io.File
import java.lang.Boolean.getBoolean import java.lang.Boolean.getBoolean
import java.net.URLClassLoader import java.net.URLClassLoader
import java.util.jar.JarFile import java.util.jar.JarFile
import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
@ -77,8 +78,8 @@ object Main {
Thread.currentThread.setContextClassLoader(classLoader) Thread.currentThread.setContextClassLoader(classLoader)
val bootClasses: Seq[String] = args.toSeq val bootClasses: immutable.Seq[String] = args.to[immutable.Seq]
val bootables: Seq[Bootable] = bootClasses map { c classLoader.loadClass(c).newInstance.asInstanceOf[Bootable] } val bootables: immutable.Seq[Bootable] = bootClasses map { c classLoader.loadClass(c).newInstance.asInstanceOf[Bootable] }
for (bootable bootables) { for (bootable bootables) {
log("Starting up " + bootable.getClass.getName) log("Starting up " + bootable.getClass.getName)
@ -122,7 +123,7 @@ object Main {
new URLClassLoader(urls, Thread.currentThread.getContextClassLoader) new URLClassLoader(urls, Thread.currentThread.getContextClassLoader)
} }
private def addShutdownHook(bootables: Seq[Bootable]): Unit = { private def addShutdownHook(bootables: immutable.Seq[Bootable]): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
def run = { def run = {
log("") log("")

View file

@ -32,7 +32,7 @@ class SimpleNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT
import NamespaceHandlerTest._ import NamespaceHandlerTest._
val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( val testBundles = buildTestBundles(List(
AKKA_OSGI_BLUEPRINT, AKKA_OSGI_BLUEPRINT,
bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("simple.xml")))) bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("simple.xml"))))
@ -62,7 +62,7 @@ class ConfigNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT
import NamespaceHandlerTest._ import NamespaceHandlerTest._
val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( val testBundles = buildTestBundles(List(
AKKA_OSGI_BLUEPRINT, AKKA_OSGI_BLUEPRINT,
bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("config.xml")))) bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("config.xml"))))
@ -94,7 +94,7 @@ class DependencyInjectionNamespaceHandlerTest extends WordSpec with MustMatchers
import NamespaceHandlerTest._ import NamespaceHandlerTest._
val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( val testBundles = buildTestBundles(List(
AKKA_OSGI_BLUEPRINT, AKKA_OSGI_BLUEPRINT,
bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("injection.xml")))) bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("injection.xml"))))

View file

@ -10,6 +10,7 @@ import akka.actor.ActorSystem
import akka.pattern.ask import akka.pattern.ask
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
import akka.util.Timeout import akka.util.Timeout
import de.kalpatec.pojosr.framework.launch.BundleDescriptor import de.kalpatec.pojosr.framework.launch.BundleDescriptor
import test.{ RuntimeNameActorSystemActivator, TestActivators, PingPongActorSystemActivator } import test.{ RuntimeNameActorSystemActivator, TestActivators, PingPongActorSystemActivator }
@ -32,7 +33,7 @@ class PingPongActorSystemActivatorTest extends WordSpec with MustMatchers with P
import ActorSystemActivatorTest._ import ActorSystemActivatorTest._
val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( val testBundles: immutable.Seq[BundleDescriptor] = buildTestBundles(List(
bundle(TEST_BUNDLE_NAME).withActivator(classOf[PingPongActorSystemActivator]))) bundle(TEST_BUNDLE_NAME).withActivator(classOf[PingPongActorSystemActivator])))
"PingPongActorSystemActivator" must { "PingPongActorSystemActivator" must {
@ -65,7 +66,8 @@ class RuntimeNameActorSystemActivatorTest extends WordSpec with MustMatchers wit
import ActorSystemActivatorTest._ import ActorSystemActivatorTest._
val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq(bundle(TEST_BUNDLE_NAME).withActivator(classOf[RuntimeNameActorSystemActivator]))) val testBundles: immutable.Seq[BundleDescriptor] =
buildTestBundles(List(bundle(TEST_BUNDLE_NAME).withActivator(classOf[RuntimeNameActorSystemActivator])))
"RuntimeNameActorSystemActivator" must { "RuntimeNameActorSystemActivator" must {

View file

@ -17,7 +17,7 @@ import java.io._
import org.scalatest.{ BeforeAndAfterAll, Suite } import org.scalatest.{ BeforeAndAfterAll, Suite }
import java.util.{ UUID, Date, ServiceLoader, HashMap } import java.util.{ UUID, Date, ServiceLoader, HashMap }
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.Some import scala.collection.immutable
/** /**
* Trait that provides support for building akka-osgi tests using PojoSR * Trait that provides support for building akka-osgi tests using PojoSR
@ -31,7 +31,7 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll {
* All bundles being found on the test classpath are automatically installed and started in the PojoSR runtime. * All bundles being found on the test classpath are automatically installed and started in the PojoSR runtime.
* Implement this to define the extra bundles that should be available for testing. * Implement this to define the extra bundles that should be available for testing.
*/ */
def testBundles: Seq[BundleDescriptor] def testBundles: immutable.Seq[BundleDescriptor]
val bufferedLoadingErrors = new ByteArrayOutputStream() val bufferedLoadingErrors = new ByteArrayOutputStream()
@ -82,15 +82,11 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll {
} }
} }
protected def buildTestBundles(builders: Seq[BundleDescriptorBuilder]): Seq[BundleDescriptor] = builders map (_.build) protected def buildTestBundles(builders: immutable.Seq[BundleDescriptorBuilder]): immutable.Seq[BundleDescriptor] =
builders map (_.build)
def filterErrors()(block: Unit): Unit = { def filterErrors()(block: Unit): Unit =
try { try block catch { case e: Throwable System.err.write(bufferedLoadingErrors.toByteArray); throw e }
block
} catch {
case e: Throwable System.err.write(bufferedLoadingErrors.toByteArray); throw e
}
}
} }
object PojoSRTestSupport { object PojoSRTestSupport {

View file

@ -6,13 +6,13 @@ package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import java.net.InetSocketAddress import java.net.InetSocketAddress
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable.Queue import scala.collection.immutable
import scala.concurrent.duration._
import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{ SimpleChannelHandler, MessageEvent, Channels, ChannelStateEvent, ChannelHandlerContext, ChannelFutureListener, ChannelFuture } import org.jboss.netty.channel.{ SimpleChannelHandler, MessageEvent, Channels, ChannelStateEvent, ChannelHandlerContext, ChannelFutureListener, ChannelFuture }
import akka.actor.{ Props, LoggingFSM, Address, ActorSystem, ActorRef, ActorLogging, Actor, FSM } import akka.actor.{ Props, LoggingFSM, Address, ActorSystem, ActorRef, ActorLogging, Actor, FSM }
import akka.event.Logging import akka.event.Logging
import akka.remote.netty.ChannelAddress import akka.remote.netty.ChannelAddress
import scala.concurrent.duration._
/** /**
* INTERNAL API. * INTERNAL API.
@ -230,7 +230,7 @@ private[akka] object ThrottleActor {
case object Throttle extends State case object Throttle extends State
case object Blackhole extends State case object Blackhole extends State
case class Data(lastSent: Long, rateMBit: Float, queue: Queue[Send]) case class Data(lastSent: Long, rateMBit: Float, queue: immutable.Queue[Send])
case class Send(ctx: ChannelHandlerContext, direction: Direction, future: Option[ChannelFuture], msg: AnyRef) case class Send(ctx: ChannelHandlerContext, direction: Direction, future: Option[ChannelFuture], msg: AnyRef)
case class SetRate(rateMBit: Float) case class SetRate(rateMBit: Float)
@ -248,7 +248,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
private val packetSplitThreshold = TestConductor(context.system).Settings.PacketSplitThreshold private val packetSplitThreshold = TestConductor(context.system).Settings.PacketSplitThreshold
startWith(PassThrough, Data(0, -1, Queue())) startWith(PassThrough, Data(0, -1, immutable.Queue()))
when(PassThrough) { when(PassThrough) {
case Event(s @ Send(_, _, _, msg), _) case Event(s @ Send(_, _, _, msg), _)
@ -258,8 +258,8 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
} }
when(Throttle) { when(Throttle) {
case Event(s: Send, data @ Data(_, _, Queue())) case Event(s: Send, data @ Data(_, _, immutable.Queue()))
stay using sendThrottled(data.copy(lastSent = System.nanoTime, queue = Queue(s))) stay using sendThrottled(data.copy(lastSent = System.nanoTime, queue = immutable.Queue(s)))
case Event(s: Send, data) case Event(s: Send, data)
stay using sendThrottled(data.copy(queue = data.queue.enqueue(s))) stay using sendThrottled(data.copy(queue = data.queue.enqueue(s)))
case Event(Tick, data) case Event(Tick, data)
@ -286,7 +286,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
whenUnhandled { whenUnhandled {
case Event(SetRate(rate), d) case Event(SetRate(rate), d)
if (rate > 0) { if (rate > 0) {
goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = Queue()) goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = immutable.Queue())
} else if (rate == 0) { } else if (rate == 0) {
goto(Blackhole) goto(Blackhole)
} else { } else {
@ -328,7 +328,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
*/ */
private def schedule(d: Data): (Data, Seq[Send], Option[FiniteDuration]) = { private def schedule(d: Data): (Data, Seq[Send], Option[FiniteDuration]) = {
val now = System.nanoTime val now = System.nanoTime
@tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[FiniteDuration]) = { @tailrec def rec(d: Data, toSend: immutable.Seq[Send]): (Data, immutable.Seq[Send], Option[FiniteDuration]) = {
if (d.queue.isEmpty) (d, toSend, None) if (d.queue.isEmpty) (d, toSend, None)
else { else {
val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong
@ -344,7 +344,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
} }
} }
} }
rec(d, Seq()) rec(d, Nil)
} }
/** /**

View file

@ -4,20 +4,20 @@
package akka.remote.testconductor package akka.remote.testconductor
import language.postfixOps import language.postfixOps
import java.util.concurrent.TimeoutException
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler }
import RemoteConnection.getAddrString import akka.remote.testconductor.RemoteConnection.getAddrString
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import scala.reflect.classTag
import akka.util.Timeout import akka.util.Timeout
import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent, WriteCompletionEvent, ExceptionEvent } import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent, WriteCompletionEvent, ExceptionEvent }
import com.typesafe.config.ConfigFactory
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeoutException
import akka.pattern.{ ask, pipe, AskTimeoutException } import akka.pattern.{ ask, pipe, AskTimeoutException }
import scala.util.control.NoStackTrace
import akka.event.{ LoggingAdapter, Logging } import akka.event.{ LoggingAdapter, Logging }
import java.net.{ InetSocketAddress, ConnectException } import java.net.{ InetSocketAddress, ConnectException }
import scala.reflect.classTag
import concurrent.{ ExecutionContext, Await, Future }
/** /**
* The Player is the client component of the * The Player is the client component of the
@ -67,15 +67,13 @@ trait Player { this: TestConductorExt ⇒
* Enter the named barriers, one after the other, in the order given. Will * Enter the named barriers, one after the other, in the order given. Will
* throw an exception in case of timeouts or other errors. * throw an exception in case of timeouts or other errors.
*/ */
def enter(name: String*) { def enter(name: String*): Unit = enter(Settings.BarrierTimeout, name.to[immutable.Seq])
enter(Settings.BarrierTimeout, name)
}
/** /**
* Enter the named barriers, one after the other, in the order given. Will * Enter the named barriers, one after the other, in the order given. Will
* throw an exception in case of timeouts or other errors. * throw an exception in case of timeouts or other errors.
*/ */
def enter(timeout: Timeout, name: Seq[String]) { def enter(timeout: Timeout, name: immutable.Seq[String]) {
system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) system.log.debug("entering barriers " + name.mkString("(", ", ", ")"))
val stop = Deadline.now + timeout.duration val stop = Deadline.now + timeout.duration
name foreach { b name foreach { b

View file

@ -7,16 +7,17 @@ import language.implicitConversions
import language.postfixOps import language.postfixOps
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
import scala.concurrent.{ Await, Awaitable }
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._ import akka.actor._
import akka.util.Timeout import akka.util.Timeout
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import akka.testkit._ import akka.testkit._
import scala.concurrent.{ Await, Awaitable }
import scala.util.control.NonFatal
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testconductor.TestConductorTransport import akka.remote.testconductor.TestConductorTransport
import akka.actor.RootActorPath import akka.actor.RootActorPath
@ -30,7 +31,7 @@ abstract class MultiNodeConfig {
private var _commonConf: Option[Config] = None private var _commonConf: Option[Config] = None
private var _nodeConf = Map[RoleName, Config]() private var _nodeConf = Map[RoleName, Config]()
private var _roles = Vector[RoleName]() private var _roles = Vector[RoleName]()
private var _deployments = Map[RoleName, Seq[String]]() private var _deployments = Map[RoleName, immutable.Seq[String]]()
private var _allDeploy = Vector[String]() private var _allDeploy = Vector[String]()
private var _testTransport = false private var _testTransport = false
@ -106,9 +107,9 @@ abstract class MultiNodeConfig {
configs reduce (_ withFallback _) configs reduce (_ withFallback _)
} }
private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy private[testkit] def deployments(node: RoleName): immutable.Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy
private[testkit] def roles: Seq[RoleName] = _roles private[testkit] def roles: immutable.Seq[RoleName] = _roles
} }
@ -234,7 +235,7 @@ object MultiNodeSpec {
* `AskTimeoutException: sending to terminated ref breaks promises`. Using lazy * `AskTimeoutException: sending to terminated ref breaks promises`. Using lazy
* val is fine. * val is fine.
*/ */
abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: Seq[RoleName], deployments: RoleName Seq[String]) abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: immutable.Seq[RoleName], deployments: RoleName Seq[String])
extends TestKit(_system) with MultiNodeSpecCallbacks { extends TestKit(_system) with MultiNodeSpecCallbacks {
import MultiNodeSpec._ import MultiNodeSpec._
@ -294,7 +295,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
/** /**
* All registered roles * All registered roles
*/ */
def roles: Seq[RoleName] = _roles def roles: immutable.Seq[RoleName] = _roles
/** /**
* TO BE DEFINED BY USER: Defines the number of participants required for starting the test. This * TO BE DEFINED BY USER: Defines the number of participants required for starting the test. This
@ -335,9 +336,10 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* Enter the named barriers in the order given. Use the remaining duration from * Enter the named barriers in the order given. Use the remaining duration from
* the innermost enclosing `within` block or the default `BarrierTimeout` * the innermost enclosing `within` block or the default `BarrierTimeout`
*/ */
def enterBarrier(name: String*) { def enterBarrier(name: String*): Unit =
testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name) testConductor.enter(
} Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)),
name.to[immutable.Seq])
/** /**
* Query the controller for the transport address of the given node (by role name) and * Query the controller for the transport address of the given node (by role name) and

View file

@ -70,7 +70,7 @@ class RemoteActorRefProvider(
_transport = { _transport = {
val fqn = remoteSettings.RemoteTransport val fqn = remoteSettings.RemoteTransport
val args = Seq( val args = List(
classOf[ExtendedActorSystem] -> system, classOf[ExtendedActorSystem] -> system,
classOf[RemoteActorRefProvider] -> this) classOf[RemoteActorRefProvider] -> this)

View file

@ -8,7 +8,9 @@ import java.net.InetSocketAddress
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.concurrent.Executors import java.util.concurrent.Executors
import scala.collection.mutable.HashMap import scala.collection.mutable
import scala.collection.immutable
import scala.util.control.NonFatal
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal } import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal }
@ -20,7 +22,6 @@ import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer }
import akka.event.Logging import akka.event.Logging
import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.RemoteProtocol.AkkaRemoteProtocol
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted }
import scala.util.control.NonFatal
import akka.actor.{ ExtendedActorSystem, Address, ActorRef } import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
import com.google.protobuf.MessageLite import com.google.protobuf.MessageLite
@ -53,7 +54,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* Construct a DefaultChannelPipeline from a sequence of handlers; to be used * Construct a DefaultChannelPipeline from a sequence of handlers; to be used
* in implementations of ChannelPipelineFactory. * in implementations of ChannelPipelineFactory.
*/ */
def apply(handlers: Seq[ChannelHandler]): DefaultChannelPipeline = def apply(handlers: immutable.Seq[ChannelHandler]): DefaultChannelPipeline =
(new DefaultChannelPipeline /: handlers) { (p, h) p.addLast(Logging.simpleName(h.getClass), h); p } (new DefaultChannelPipeline /: handlers) { (p, h) p.addLast(Logging.simpleName(h.getClass), h); p }
/** /**
@ -69,7 +70,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* Construct a default protocol stack, excluding the head handler (i.e. the one which * Construct a default protocol stack, excluding the head handler (i.e. the one which
* actually dispatches the received messages to the local target actors). * actually dispatches the received messages to the local target actors).
*/ */
def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = def defaultStack(withTimeout: Boolean, isClient: Boolean): immutable.Seq[ChannelHandler] =
(if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) :::
(if (withTimeout) List(timeout) else Nil) ::: (if (withTimeout) List(timeout) else Nil) :::
msgFormat ::: msgFormat :::
@ -138,7 +139,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
PipelineFactory(Seq(endpoint), withTimeout, isClient) PipelineFactory(Seq(endpoint), withTimeout, isClient)
private val remoteClients = new HashMap[Address, RemoteClient] private val remoteClients = new mutable.HashMap[Address, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock private val clientsLock = new ReentrantReadWriteLock
override protected def useUntrustedMode = remoteSettings.UntrustedMode override protected def useUntrustedMode = remoteSettings.UntrustedMode

View file

@ -16,6 +16,7 @@
package akka.remote.security.provider package akka.remote.security.provider
import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSeedGenerator, RandomDotOrgSeedGenerator, DevRandomSeedGenerator } import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSeedGenerator, RandomDotOrgSeedGenerator, DevRandomSeedGenerator }
import scala.collection.immutable
/** /**
* Internal API * Internal API
@ -33,8 +34,8 @@ object InternetSeedGenerator {
/**Singleton instance. */ /**Singleton instance. */
private final val Instance: InternetSeedGenerator = new InternetSeedGenerator private final val Instance: InternetSeedGenerator = new InternetSeedGenerator
/**Delegate generators. */ /**Delegate generators. */
private final val Generators: Seq[SeedGenerator] = private final val Generators: immutable.Seq[SeedGenerator] =
Seq(new RandomDotOrgSeedGenerator, // first try the Internet seed generator List(new RandomDotOrgSeedGenerator, // first try the Internet seed generator
new SecureRandomSeedGenerator) // this is last because it always works new SecureRandomSeedGenerator) // this is last because it always works
} }

View file

@ -138,7 +138,7 @@ object TestActorRef {
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({ def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({ system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Nil).recover({
case exception throw ActorInitializationException(null, case exception throw ActorInitializationException(null,
"Could not instantiate Actor" + "Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," + "\nMake sure Actor is NOT defined inside a class/trait," +

View file

@ -6,14 +6,15 @@ package akka.testkit
import language.existentials import language.existentials
import scala.util.matching.Regex import scala.util.matching.Regex
import scala.collection.JavaConverters
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage }
import akka.dispatch.{ SystemMessage, Terminate } import akka.dispatch.{ SystemMessage, Terminate }
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
import akka.event.Logging import akka.event.Logging
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import scala.collection.JavaConverters
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import akka.actor.NoSerializationVerificationNeeded import akka.actor.NoSerializationVerificationNeeded
/** /**
@ -38,22 +39,22 @@ sealed trait TestEvent
*/ */
object TestEvent { object TestEvent {
object Mute { object Mute {
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to[immutable.Seq])
} }
case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { case class Mute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/** /**
* Java API * Java API
*/ */
def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.toSeq) def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq])
} }
object UnMute { object UnMute {
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq])
} }
case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { case class UnMute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/** /**
* Java API * Java API
*/ */
def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.toSeq) def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq])
} }
} }

View file

@ -5,15 +5,15 @@ package akka.testkit
import language.postfixOps import language.postfixOps
import scala.annotation.{ varargs, tailrec }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.reflect.ClassTag
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic }
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import scala.concurrent.duration._
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic }
import atomic.AtomicInteger
import scala.annotation.tailrec
import akka.util.{ Timeout, BoxedType } import akka.util.{ Timeout, BoxedType }
import scala.annotation.varargs
import scala.reflect.ClassTag
object TestActor { object TestActor {
type Ignore = Option[PartialFunction[Any, Boolean]] type Ignore = Option[PartialFunction[Any, Boolean]]
@ -415,7 +415,7 @@ trait TestKitBase {
/** /**
* Same as `expectMsgAllOf(remaining, obj...)`, but correctly treating the timeFactor. * Same as `expectMsgAllOf(remaining, obj...)`, but correctly treating the timeFactor.
*/ */
def expectMsgAllOf[T](obj: T*): Seq[T] = expectMsgAllOf_internal(remaining, obj: _*) def expectMsgAllOf[T](obj: T*): immutable.Seq[T] = expectMsgAllOf_internal(remaining, obj: _*)
/** /**
* Receive a number of messages from the test actor matching the given * Receive a number of messages from the test actor matching the given
@ -430,19 +430,19 @@ trait TestKitBase {
* expectMsgAllOf(1 second, Result1(), Result2()) * expectMsgAllOf(1 second, Result1(), Result2())
* </pre> * </pre>
*/ */
def expectMsgAllOf[T](max: FiniteDuration, obj: T*): Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) def expectMsgAllOf[T](max: FiniteDuration, obj: T*): immutable.Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*)
private def expectMsgAllOf_internal[T](max: FiniteDuration, obj: T*): Seq[T] = { private def expectMsgAllOf_internal[T](max: FiniteDuration, obj: T*): immutable.Seq[T] = {
val recv = receiveN_internal(obj.size, max) val recv = receiveN_internal(obj.size, max)
obj foreach (x assert(recv exists (x == _), "not found " + x)) obj foreach (x assert(recv exists (x == _), "not found " + x))
recv foreach (x assert(obj exists (x == _), "found unexpected " + x)) recv foreach (x assert(obj exists (x == _), "found unexpected " + x))
recv.asInstanceOf[Seq[T]] recv.asInstanceOf[immutable.Seq[T]]
} }
/** /**
* Same as `expectMsgAllClassOf(remaining, obj...)`, but correctly treating the timeFactor. * Same as `expectMsgAllClassOf(remaining, obj...)`, but correctly treating the timeFactor.
*/ */
def expectMsgAllClassOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(remaining, obj: _*) def expectMsgAllClassOf[T](obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllClassOf(remaining, obj: _*)
/** /**
* Receive a number of messages from the test actor matching the given * Receive a number of messages from the test actor matching the given
@ -452,19 +452,19 @@ trait TestKitBase {
* Wait time is bounded by the given duration, with an AssertionFailure * Wait time is bounded by the given duration, with an AssertionFailure
* being thrown in case of timeout. * being thrown in case of timeout.
*/ */
def expectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) def expectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*)
private def internalExpectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { private def internalExpectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = {
val recv = receiveN_internal(obj.size, max) val recv = receiveN_internal(obj.size, max)
obj foreach (x assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x)) obj foreach (x assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x))
recv foreach (x assert(obj exists (c BoxedType(c) eq x.getClass), "found non-matching object " + x)) recv foreach (x assert(obj exists (c BoxedType(c) eq x.getClass), "found non-matching object " + x))
recv.asInstanceOf[Seq[T]] recv.asInstanceOf[immutable.Seq[T]]
} }
/** /**
* Same as `expectMsgAllConformingOf(remaining, obj...)`, but correctly treating the timeFactor. * Same as `expectMsgAllConformingOf(remaining, obj...)`, but correctly treating the timeFactor.
*/ */
def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(remaining, obj: _*) def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllConformingOf(remaining, obj: _*)
/** /**
* Receive a number of messages from the test actor matching the given * Receive a number of messages from the test actor matching the given
@ -477,13 +477,13 @@ trait TestKitBase {
* Beware that one object may satisfy all given class constraints, which * Beware that one object may satisfy all given class constraints, which
* may be counter-intuitive. * may be counter-intuitive.
*/ */
def expectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) def expectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*)
private def internalExpectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { private def internalExpectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = {
val recv = receiveN_internal(obj.size, max) val recv = receiveN_internal(obj.size, max)
obj foreach (x assert(recv exists (BoxedType(x) isInstance _), "not found " + x)) obj foreach (x assert(recv exists (BoxedType(x) isInstance _), "not found " + x))
recv foreach (x assert(obj exists (c BoxedType(c) isInstance x), "found non-matching object " + x)) recv foreach (x assert(obj exists (c BoxedType(c) isInstance x), "found non-matching object " + x))
recv.asInstanceOf[Seq[T]] recv.asInstanceOf[immutable.Seq[T]]
} }
/** /**
@ -520,7 +520,7 @@ trait TestKitBase {
* assert(series == (1 to 7).toList) * assert(series == (1 to 7).toList)
* </pre> * </pre>
*/ */
def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = { def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): immutable.Seq[T] = {
val stop = now + remainingOrDilated(max) val stop = now + remainingOrDilated(max)
var msg: Message = NullMessage var msg: Message = NullMessage
@ -553,14 +553,14 @@ trait TestKitBase {
* Same as `receiveN(n, remaining)` but correctly taking into account * Same as `receiveN(n, remaining)` but correctly taking into account
* Duration.timeFactor. * Duration.timeFactor.
*/ */
def receiveN(n: Int): Seq[AnyRef] = receiveN_internal(n, remaining) def receiveN(n: Int): immutable.Seq[AnyRef] = receiveN_internal(n, remaining)
/** /**
* Receive N messages in a row before the given deadline. * Receive N messages in a row before the given deadline.
*/ */
def receiveN(n: Int, max: FiniteDuration): Seq[AnyRef] = receiveN_internal(n, max.dilated) def receiveN(n: Int, max: FiniteDuration): immutable.Seq[AnyRef] = receiveN_internal(n, max.dilated)
private def receiveN_internal(n: Int, max: Duration): Seq[AnyRef] = { private def receiveN_internal(n: Int, max: Duration): immutable.Seq[AnyRef] = {
val stop = max + now val stop = max + now
for { x 1 to n } yield { for { x 1 to n } yield {
val timeout = stop - now val timeout = stop - now

View file

@ -4,14 +4,16 @@ import language.implicitConversions
import akka.actor.ActorSystem import akka.actor.ActorSystem
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.collection.immutable
import java.util.concurrent.TimeUnit.MILLISECONDS
package object testkit { package object testkit {
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: T)(implicit system: ActorSystem): T = { def filterEvents[T](eventFilters: Iterable[EventFilter])(block: T)(implicit system: ActorSystem): T = {
def now = System.currentTimeMillis def now = System.currentTimeMillis
system.eventStream.publish(TestEvent.Mute(eventFilters.toSeq)) system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq]))
try { try {
val result = block val result = block
@ -23,7 +25,7 @@ package object testkit {
result result
} finally { } finally {
system.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq)) system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq]))
} }
} }

View file

@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.immutable.Seq;
public class UntypedCoordinatedIncrementTest { public class UntypedCoordinatedIncrementTest {
private static ActorSystem system; private static ActorSystem system;
@ -110,6 +110,6 @@ public class UntypedCoordinatedIncrementTest {
} }
public <A> Seq<A> seq(A... args) { public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq(); return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toList();
} }
} }

View file

@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.immutable.Seq;
public class UntypedTransactorTest { public class UntypedTransactorTest {
@ -120,6 +120,6 @@ public class UntypedTransactorTest {
public <A> Seq<A> seq(A... args) { public <A> Seq<A> seq(A... args) {
return JavaConverters return JavaConverters
.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala() .collectionAsScalaIterableConverter(Arrays.asList(args)).asScala()
.toSeq(); .toList();
} }
} }

View file

@ -6,12 +6,13 @@ package akka.transactor
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import akka.actor._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.collection.immutable
import akka.actor._
import akka.util.Timeout import akka.util.Timeout
import akka.testkit._ import akka.testkit._
import scala.concurrent.stm._
import akka.pattern.{ AskTimeoutException, ask } import akka.pattern.{ AskTimeoutException, ask }
object CoordinatedIncrement { object CoordinatedIncrement {
@ -30,7 +31,7 @@ object CoordinatedIncrement {
} }
""" """
case class Increment(friends: Seq[ActorRef]) case class Increment(friends: immutable.Seq[ActorRef])
case object GetCount case object GetCount
class Counter(name: String) extends Actor { class Counter(name: String) extends Actor {

View file

@ -8,21 +8,22 @@ import language.postfixOps
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import akka.actor._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.collection.immutable
import scala.util.Random.{ nextInt random }
import scala.util.control.NonFatal
import akka.actor._
import akka.testkit._ import akka.testkit._
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import scala.concurrent.stm._
import scala.util.Random.{ nextInt random }
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import akka.pattern.{ AskTimeoutException, ask } import akka.pattern.{ AskTimeoutException, ask }
import akka.util.Timeout import akka.util.Timeout
import scala.util.control.NonFatal
object FickleFriends { object FickleFriends {
case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) case class FriendlyIncrement(friends: immutable.Seq[ActorRef], timeout: Timeout, latch: CountDownLatch)
case class Increment(friends: Seq[ActorRef]) case class Increment(friends: immutable.Seq[ActorRef])
case object GetCount case object GetCount
/** /**
@ -120,7 +121,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
"Coordinated fickle friends" should { "Coordinated fickle friends" should {
"eventually succeed to increment all counters by one" in { "eventually succeed to increment all counters by one" in {
val ignoreExceptions = Seq( val ignoreExceptions = immutable.Seq(
EventFilter[ExpectedFailureException](), EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](), EventFilter[CoordinatedTransactionException](),
EventFilter[AskTimeoutException]()) EventFilter[AskTimeoutException]())

View file

@ -7,15 +7,16 @@ package akka.transactor
import language.postfixOps import language.postfixOps
import akka.actor._ import akka.actor._
import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.stm._
import akka.util.Timeout import akka.util.Timeout
import akka.testkit._ import akka.testkit._
import scala.concurrent.stm._
import akka.pattern.{ AskTimeoutException, ask } import akka.pattern.{ AskTimeoutException, ask }
object TransactorIncrement { object TransactorIncrement {
case class Increment(friends: Seq[ActorRef], latch: TestLatch) case class Increment(friends: immutable.Seq[ActorRef], latch: TestLatch)
case object GetCount case object GetCount
class Counter(name: String) extends Transactor { class Counter(name: String) extends Transactor {

View file

@ -6,13 +6,14 @@ package akka.zeromq
import org.zeromq.ZMQ.{ Socket, Poller } import org.zeromq.ZMQ.{ Socket, Poller }
import org.zeromq.{ ZMQ JZMQ } import org.zeromq.{ ZMQ JZMQ }
import akka.actor._ import akka.actor._
import scala.collection.immutable
import scala.annotation.tailrec
import scala.concurrent.{ Promise, Future } import scala.concurrent.{ Promise, Future }
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import akka.event.Logging import akka.event.Logging
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal
private[zeromq] object ConcurrentSocketActor { private[zeromq] object ConcurrentSocketActor {
private sealed trait PollMsg private sealed trait PollMsg
@ -25,7 +26,7 @@ private[zeromq] object ConcurrentSocketActor {
private val DefaultContext = Context() private val DefaultContext = Context()
} }
private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends Actor { private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) extends Actor {
import ConcurrentSocketActor._ import ConcurrentSocketActor._
private val noBytes = Array[Byte]() private val noBytes = Array[Byte]()
@ -40,7 +41,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
private val socket: Socket = zmqContext.socket(socketType) private val socket: Socket = zmqContext.socket(socketType)
private val poller: Poller = zmqContext.poller private val poller: Poller = zmqContext.poller
private val pendingSends = new ListBuffer[Seq[Frame]] private val pendingSends = new ListBuffer[immutable.Seq[Frame]]
def receive = { def receive = {
case m: PollMsg doPoll(m) case m: PollMsg doPoll(m)
@ -151,7 +152,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
} }
} finally notifyListener(Closed) } finally notifyListener(Closed)
@tailrec private def flushMessage(i: Seq[Frame]): Boolean = @tailrec private def flushMessage(i: immutable.Seq[Frame]): Boolean =
if (i.isEmpty) if (i.isEmpty)
true true
else { else {
@ -198,7 +199,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
case frames notifyListener(deserializer(frames)); doPoll(mode, togo - 1) case frames notifyListener(deserializer(frames)); doPoll(mode, togo - 1)
} }
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): immutable.Seq[Frame] =
if (mode == PollCareful && (poller.poll(0) <= 0)) { if (mode == PollCareful && (poller.poll(0) <= 0)) {
if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!") if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!")
} else { } else {

View file

@ -4,9 +4,10 @@
package akka.zeromq package akka.zeromq
import com.google.protobuf.Message import com.google.protobuf.Message
import org.zeromq.{ ZMQ JZMQ }
import akka.actor.ActorRef import akka.actor.ActorRef
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable
import org.zeromq.{ ZMQ JZMQ }
import org.zeromq.ZMQ.{ Poller, Socket } import org.zeromq.ZMQ.{ Poller, Socket }
/** /**
@ -36,7 +37,7 @@ sealed trait SocketConnectOption extends SocketOption {
* A base trait for pubsub options for the ZeroMQ socket * A base trait for pubsub options for the ZeroMQ socket
*/ */
sealed trait PubSubOption extends SocketOption { sealed trait PubSubOption extends SocketOption {
def payload: Seq[Byte] def payload: immutable.Seq[Byte]
} }
/** /**
@ -79,7 +80,7 @@ class Context(numIoThreads: Int) extends SocketMeta {
* A base trait for message deserializers * A base trait for message deserializers
*/ */
trait Deserializer extends SocketOption { trait Deserializer extends SocketOption {
def apply(frames: Seq[Frame]): Any def apply(frames: immutable.Seq[Frame]): Any
} }
/** /**
@ -172,12 +173,12 @@ case class Bind(endpoint: String) extends SocketConnectOption
* *
* @param payload the topic to subscribe to * @param payload the topic to subscribe to
*/ */
case class Subscribe(payload: Seq[Byte]) extends PubSubOption { case class Subscribe(payload: immutable.Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8")) def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq])
} }
object Subscribe { object Subscribe {
def apply(topic: String): Subscribe = new Subscribe(topic) def apply(topic: String): Subscribe = new Subscribe(topic)
val all = Subscribe(Seq.empty) val all = Subscribe("")
} }
/** /**
@ -189,8 +190,8 @@ object Subscribe {
* *
* @param payload * @param payload
*/ */
case class Unsubscribe(payload: Seq[Byte]) extends PubSubOption { case class Unsubscribe(payload: immutable.Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8")) def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq])
} }
object Unsubscribe { object Unsubscribe {
def apply(topic: String): Unsubscribe = new Unsubscribe(topic) def apply(topic: String): Unsubscribe = new Unsubscribe(topic)
@ -200,17 +201,17 @@ object Unsubscribe {
* Send a message over the zeromq socket * Send a message over the zeromq socket
* @param frames * @param frames
*/ */
case class Send(frames: Seq[Frame]) extends Request case class Send(frames: immutable.Seq[Frame]) extends Request
/** /**
* A message received over the zeromq socket * A message received over the zeromq socket
* @param frames * @param frames
*/ */
case class ZMQMessage(frames: Seq[Frame]) { case class ZMQMessage(frames: immutable.Seq[Frame]) {
def this(frame: Frame) = this(Seq(frame)) def this(frame: Frame) = this(List(frame))
def this(frame1: Frame, frame2: Frame) = this(Seq(frame1, frame2)) def this(frame1: Frame, frame2: Frame) = this(List(frame1, frame2))
def this(frameArray: Array[Frame]) = this(frameArray.toSeq) def this(frameArray: Array[Frame]) = this(frameArray.to[immutable.Seq])
/** /**
* Convert the bytes in the first frame to a String, using specified charset. * Convert the bytes in the first frame to a String, using specified charset.
@ -224,8 +225,9 @@ case class ZMQMessage(frames: Seq[Frame]) {
def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray
} }
object ZMQMessage { object ZMQMessage {
def apply(bytes: Array[Byte]): ZMQMessage = ZMQMessage(Seq(Frame(bytes))) def apply(bytes: Array[Byte]): ZMQMessage = new ZMQMessage(List(Frame(bytes)))
def apply(message: Message): ZMQMessage = ZMQMessage(message.toByteArray) def apply(frames: Frame*): ZMQMessage = new ZMQMessage(frames.to[immutable.Seq])
def apply(message: Message): ZMQMessage = apply(message.toByteArray)
} }
/** /**

View file

@ -3,7 +3,10 @@
*/ */
package akka.zeromq package akka.zeromq
import scala.collection.immutable
object Frame { object Frame {
def apply(bytes: Array[Byte]): Frame = new Frame(bytes)
def apply(text: String): Frame = new Frame(text) def apply(text: String): Frame = new Frame(text)
} }
@ -11,8 +14,8 @@ object Frame {
* A single message frame of a zeromq message * A single message frame of a zeromq message
* @param payload * @param payload
*/ */
case class Frame(payload: Seq[Byte]) { case class Frame(payload: immutable.Seq[Byte]) {
def this(bytes: Array[Byte]) = this(bytes.toSeq) def this(bytes: Array[Byte]) = this(bytes.to[immutable.Seq])
def this(text: String) = this(text.getBytes("UTF-8")) def this(text: String) = this(text.getBytes("UTF-8"))
} }
@ -20,5 +23,5 @@ case class Frame(payload: Seq[Byte]) {
* Deserializes ZeroMQ messages into an immutable sequence of frames * Deserializes ZeroMQ messages into an immutable sequence of frames
*/ */
class ZMQMessageDeserializer extends Deserializer { class ZMQMessageDeserializer extends Deserializer {
def apply(frames: Seq[Frame]): ZMQMessage = ZMQMessage(frames) def apply(frames: immutable.Seq[Frame]): ZMQMessage = ZMQMessage(frames)
} }

View file

@ -7,6 +7,7 @@ import org.zeromq.{ ZMQ ⇒ JZMQ }
import org.zeromq.ZMQ.Poller import org.zeromq.ZMQ.Poller
import akka.actor._ import akka.actor._
import akka.pattern.ask import akka.pattern.ask
import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -66,7 +67,8 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
case s: SocketType.ZMQSocketType true case s: SocketType.ZMQSocketType true
case _ false case _ false
}, "A socket type is required") }, "A socket type is required")
Props(new ConcurrentSocketActor(socketParameters)).withDispatcher("akka.zeromq.socket-dispatcher") val params = socketParameters.to[immutable.Seq]
Props(new ConcurrentSocketActor(params)).withDispatcher("akka.zeromq.socket-dispatcher")
} }
/** /**

View file

@ -51,7 +51,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable { val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable {
var number = 0 var number = 0
def run() { def run() {
publisher ! ZMQMessage(Seq(Frame(number.toString.getBytes), Frame(Seq()))) publisher ! ZMQMessage(Frame(number.toString), Frame(Nil))
number += 1 number += 1
} }
}) })
@ -88,8 +88,8 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
try { try {
replierProbe.expectMsg(Connecting) replierProbe.expectMsg(Connecting)
val request = ZMQMessage(Seq(Frame("Request"))) val request = ZMQMessage(Frame("Request"))
val reply = ZMQMessage(Seq(Frame("Reply"))) val reply = ZMQMessage(Frame("Reply"))
requester ! request requester ! request
replierProbe.expectMsg(request) replierProbe.expectMsg(request)
@ -112,7 +112,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
try { try {
pullerProbe.expectMsg(Connecting) pullerProbe.expectMsg(Connecting)
val message = ZMQMessage(Seq(Frame("Pushed message"))) val message = ZMQMessage(Frame("Pushed message"))
pusher ! message pusher ! message
pullerProbe.expectMsg(message) pullerProbe.expectMsg(message)