From 8f131c680fb089f4e1a8be78cfe4ad72b0a1123a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 30 Oct 2012 15:08:41 +0100 Subject: [PATCH] Switching to immutable.Seq instead of Seq --- .../akka/actor/RelativeActorPathSpec.scala | 12 +++-- .../scala/akka/actor/TypedActorSpec.scala | 13 +++--- .../workbench/BenchResultRepository.scala | 29 ++++++------ .../workbench/GoogleChartBuilder.scala | 23 +++++----- .../akka/performance/workbench/Report.scala | 18 ++++---- .../src/main/scala/akka/actor/ActorPath.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 34 +++++++------- .../src/main/scala/akka/actor/Address.scala | 5 ++- .../src/main/scala/akka/actor/Deployer.scala | 2 +- .../main/scala/akka/actor/DynamicAccess.scala | 10 ++--- .../src/main/scala/akka/actor/Extension.scala | 2 +- .../main/scala/akka/actor/FaultHandling.scala | 38 +++++++--------- akka-actor/src/main/scala/akka/actor/IO.scala | 19 ++++---- .../main/scala/akka/actor/TypedActor.scala | 22 +++++----- .../akka/dispatch/AbstractDispatcher.scala | 4 +- .../scala/akka/dispatch/Dispatchers.scala | 2 +- .../src/main/scala/akka/event/EventBus.scala | 5 ++- .../src/main/scala/akka/event/Logging.scala | 9 ++-- .../src/main/scala/akka/japi/JavaAPI.scala | 7 +-- .../scala/akka/routing/ConsistentHash.scala | 6 +-- .../akka/serialization/Serialization.scala | 21 ++++----- .../scala/akka/util/SubclassifiedIndex.scala | 6 ++- .../akka/camel/ConcurrentActivationTest.scala | 11 +++-- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../cluster/ClusterMetricsCollector.scala | 2 +- ...ientDowningNodeThatIsUnreachableSpec.scala | 5 ++- .../ClientDowningNodeThatIsUpSpec.scala | 5 ++- ...aderDowningNodeThatIsUnreachableSpec.scala | 9 ++-- .../akka/cluster/LeaderElectionSpec.scala | 3 +- .../akka/cluster/MultiNodeClusterSpec.scala | 35 +++++++-------- .../akka/cluster/SingletonClusterSpec.scala | 3 +- .../scala/akka/cluster/SplitBrainSpec.scala | 7 +-- .../UnreachableNodeRejoinsClusterSpec.scala | 7 +-- .../cluster/AccrualFailureDetectorSpec.scala | 6 +-- .../akka/cluster/MetricsCollectorSpec.scala | 13 +++--- .../docs/actor/FaultHandlingTestBase.java | 4 +- .../jrouting/CustomRouterDocTestBase.java | 3 +- .../scala/code/docs/actor/FSMDocSpec.scala | 11 ++--- .../code/docs/testkit/TestKitUsageSpec.scala | 7 +-- .../code/docs/zeromq/ZeromqDocSpec.scala | 12 ++--- .../src/main/scala/akka/kernel/Main.scala | 7 +-- .../blueprint/NamespaceHandlerTest.scala | 6 +-- .../akka/osgi/ActorSystemActivatorTest.scala | 6 ++- .../scala/akka/osgi/PojoSRTestSupport.scala | 16 +++---- .../NetworkFailureInjector.scala | 18 ++++---- .../akka/remote/testconductor/Player.scala | 20 ++++----- .../akka/remote/testkit/MultiNodeSpec.scala | 24 +++++----- .../akka/remote/RemoteActorRefProvider.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 11 ++--- .../provider/InternetSeedGenerator.scala | 5 ++- .../scala/akka/testkit/TestActorRef.scala | 2 +- .../akka/testkit/TestEventListener.scala | 19 ++++---- .../src/main/scala/akka/testkit/TestKit.scala | 44 +++++++++---------- .../src/main/scala/akka/testkit/package.scala | 8 ++-- .../UntypedCoordinatedIncrementTest.java | 4 +- .../transactor/UntypedTransactorTest.java | 4 +- .../transactor/CoordinatedIncrementSpec.scala | 7 +-- .../akka/transactor/FickleFriendsSpec.scala | 15 ++++--- .../akka/transactor/TransactorSpec.scala | 5 ++- .../akka/zeromq/ConcurrentSocketActor.scala | 13 +++--- .../main/scala/akka/zeromq/SocketOption.scala | 32 +++++++------- .../akka/zeromq/ZMQMessageDeserializer.scala | 9 ++-- .../scala/akka/zeromq/ZeroMQExtension.scala | 4 +- .../zeromq/ConcurrentSocketActorSpec.scala | 8 ++-- 65 files changed, 375 insertions(+), 350 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala index 179f4aa92c..6870a36125 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala @@ -6,24 +6,22 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import java.net.URLEncoder +import scala.collection.immutable class RelativeActorPathSpec extends WordSpec with MustMatchers { - def elements(path: String): Seq[String] = path match { - case RelativeActorPath(elem) ⇒ elem.toSeq - case _ ⇒ Nil - } + def elements(path: String): immutable.Seq[String] = RelativeActorPath.unapply(path).getOrElse(Nil) "RelativeActorPath" must { "match single name" in { - elements("foo") must be(Seq("foo")) + elements("foo") must be(List("foo")) } "match path separated names" in { - elements("foo/bar/baz") must be(Seq("foo", "bar", "baz")) + elements("foo/bar/baz") must be(List("foo", "bar", "baz")) } "match url encoded name" in { val name = URLEncoder.encode("akka://ClusterSystem@127.0.0.1:2552", "UTF-8") - elements(name) must be(Seq(name)) + elements(name) must be(List(name)) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 040c8e6211..201b6c6949 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -5,18 +5,19 @@ package akka.actor import language.postfixOps import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import akka.util.Timeout +import scala.annotation.tailrec +import scala.collection.immutable import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ -import java.util.concurrent.atomic.AtomicReference -import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } +import akka.util.Timeout import akka.japi.{ Option ⇒ JOption } import akka.testkit.DefaultTimeout -import akka.dispatch.{ Dispatchers } +import akka.dispatch.Dispatchers import akka.pattern.ask import akka.serialization.JavaSerializer import akka.actor.TypedActor._ +import java.util.concurrent.atomic.AtomicReference import java.lang.IllegalStateException import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } @@ -35,9 +36,9 @@ object TypedActorSpec { } """ - class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] { + class CyclicIterator[T](val items: immutable.Seq[T]) extends Iterator[T] { - private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items) + private[this] val current = new AtomicReference(items) def hasNext = items != Nil diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index 1cccd19417..7bc3fec9d1 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -12,17 +12,18 @@ import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date import scala.collection.mutable.{ Map ⇒ MutableMap } +import scala.collection.immutable import akka.actor.ActorSystem import akka.event.Logging trait BenchResultRepository { def add(stats: Stats) - def get(name: String): Seq[Stats] + def get(name: String): immutable.Seq[Stats] def get(name: String, load: Int): Option[Stats] - def getWithHistorical(name: String, load: Int): Seq[Stats] + def getWithHistorical(name: String, load: Int): immutable.Seq[Stats] def isBaseline(stats: Stats): Boolean @@ -38,9 +39,9 @@ object BenchResultRepository { } class FileBenchResultRepository extends BenchResultRepository { - private val statsByName = MutableMap[String, Seq[Stats]]() + private val statsByName = MutableMap[String, immutable.Seq[Stats]]() private val baselineStats = MutableMap[Key, Stats]() - private val historicalStats = MutableMap[Key, Seq[Stats]]() + private val historicalStats = MutableMap[Key, immutable.Seq[Stats]]() private def resultDir = BenchmarkConfig.config.getString("benchmark.resultDir") private val serDir = resultDir + "/ser" private def serDirExists: Boolean = new File(serDir).exists @@ -51,13 +52,13 @@ class FileBenchResultRepository extends BenchResultRepository { case class Key(name: String, load: Int) def add(stats: Stats): Unit = synchronized { - val values = statsByName.getOrElseUpdate(stats.name, IndexedSeq.empty) + val values = statsByName.getOrElseUpdate(stats.name, Vector.empty) statsByName(stats.name) = values :+ stats save(stats) } - def get(name: String): Seq[Stats] = synchronized { - statsByName.getOrElse(name, IndexedSeq.empty) + def get(name: String): immutable.Seq[Stats] = synchronized { + statsByName.getOrElse(name, Vector.empty) } def get(name: String, load: Int): Option[Stats] = synchronized { @@ -68,13 +69,13 @@ class FileBenchResultRepository extends BenchResultRepository { baselineStats.get(Key(stats.name, stats.load)) == Some(stats) } - def getWithHistorical(name: String, load: Int): Seq[Stats] = synchronized { + def getWithHistorical(name: String, load: Int): immutable.Seq[Stats] = synchronized { val key = Key(name, load) - val historical = historicalStats.getOrElse(key, IndexedSeq.empty) + val historical = historicalStats.getOrElse(key, Vector.empty) val baseline = baselineStats.get(key) val current = get(name, load) - val limited = (IndexedSeq.empty ++ historical ++ baseline ++ current).takeRight(maxHistorical) + val limited = (Vector.empty ++ historical ++ baseline ++ current).takeRight(maxHistorical) limited.sortBy(_.timestamp) } @@ -94,7 +95,7 @@ class FileBenchResultRepository extends BenchResultRepository { } val historical = load(historicalFiles) for (h ← historical) { - val values = historicalStats.getOrElseUpdate(Key(h.name, h.load), IndexedSeq.empty) + val values = historicalStats.getOrElseUpdate(Key(h.name, h.load), Vector.empty) historicalStats(Key(h.name, h.load)) = values :+ h } } @@ -120,7 +121,7 @@ class FileBenchResultRepository extends BenchResultRepository { } } - private def load(files: Iterable[File]): Seq[Stats] = { + private def load(files: Iterable[File]): immutable.Seq[Stats] = { val result = for (f ← files) yield { var in: ObjectInputStream = null @@ -132,11 +133,11 @@ class FileBenchResultRepository extends BenchResultRepository { case e: Throwable ⇒ None } finally { - if (in ne null) try { in.close() } catch { case ignore: Exception ⇒ } + if (in ne null) try in.close() catch { case ignore: Exception ⇒ } } } - result.flatten.toSeq.sortBy(_.timestamp) + result.flatten.toVector.sortBy(_.timestamp) } loadFiles() diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala index 52b30ceee7..66b634d47f 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala @@ -3,7 +3,7 @@ package akka.performance.workbench import java.io.UnsupportedEncodingException import java.net.URLEncoder -import scala.collection.immutable.TreeMap +import scala.collection.immutable /** * Generates URLs to Google Chart API http://code.google.com/apis/chart/ @@ -16,7 +16,7 @@ object GoogleChartBuilder { /** * Builds a bar chart for tps in the statistics. */ - def tpsChartUrl(statsByTimestamp: TreeMap[Long, Seq[Stats]], title: String, legend: Stats ⇒ String): String = { + def tpsChartUrl(statsByTimestamp: immutable.TreeMap[Long, Seq[Stats]], title: String, legend: Stats ⇒ String): String = { if (statsByTimestamp.isEmpty) "" else { val loads = statsByTimestamp.values.head.map(_.load) @@ -46,7 +46,7 @@ object GoogleChartBuilder { //sb.append("&") // legend - val legendStats = statsByTimestamp.values.map(_.head).toSeq + val legendStats = statsByTimestamp.values.toVector.map(_.head) appendLegend(legendStats, sb, legend) sb.append("&") // bar spacing @@ -60,10 +60,7 @@ object GoogleChartBuilder { val loadStr = loads.mkString(",") sb.append("chd=t:") val maxValue = allStats.map(_.tps).max - val tpsSeries: Iterable[String] = - for (statsSeq ← statsByTimestamp.values) yield { - statsSeq.map(_.tps).mkString(",") - } + val tpsSeries: Iterable[String] = for (statsSeq ← statsByTimestamp.values) yield statsSeq.map(_.tps).mkString(",") sb.append(tpsSeries.mkString("|")) // y range @@ -83,7 +80,7 @@ object GoogleChartBuilder { /** * Builds a bar chart for all percentiles and the mean in the statistics. */ - def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { + def percentilesAndMeanChartUrl(statistics: immutable.Seq[Stats], title: String, legend: Stats ⇒ String): String = { if (statistics.isEmpty) "" else { val current = statistics.last @@ -146,13 +143,13 @@ object GoogleChartBuilder { } } - private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) { + private def percentileLabels(percentiles: immutable.TreeMap[Int, Long], sb: StringBuilder) { sb.append("chxl=1:|") val s = percentiles.keys.toList.map(_ + "%").mkString("|") sb.append(s) } - private def appendLegend(statistics: Seq[Stats], sb: StringBuilder, legend: Stats ⇒ String) { + private def appendLegend(statistics: immutable.Seq[Stats], sb: StringBuilder, legend: Stats ⇒ String) { val legends = statistics.map(legend(_)) sb.append("chdl=") val s = legends.map(urlEncode(_)).mkString("|") @@ -166,7 +163,7 @@ object GoogleChartBuilder { sb.append(s) } - private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) { + private def dataSeries(allPercentiles: immutable.Seq[immutable.TreeMap[Int, Long]], meanValues: immutable.Seq[Double], sb: StringBuilder) { val percentileSeries = for { percentiles ← allPercentiles @@ -181,7 +178,7 @@ object GoogleChartBuilder { sb.append(series.mkString("|")) } - private def dataSeries(values: Seq[Double], sb: StringBuilder) { + private def dataSeries(values: immutable.Seq[Double], sb: StringBuilder) { val series = values.map(formatDouble(_)) sb.append(series.mkString("|")) } @@ -198,7 +195,7 @@ object GoogleChartBuilder { } } - def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = { + def latencyAndThroughputChartUrl(statistics: immutable.Seq[Stats], title: String): String = { if (statistics.isEmpty) "" else { val sb = new StringBuilder diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index 18f87702f3..f7974e6784 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -5,7 +5,7 @@ import java.text.SimpleDateFormat import java.util.Date import akka.actor.ActorSystem import akka.event.Logging -import scala.collection.immutable.TreeMap +import scala.collection.immutable class Report( system: ActorSystem, @@ -19,7 +19,7 @@ class Report( val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") - def html(statistics: Seq[Stats]) { + def html(statistics: immutable.Seq[Stats]) { val current = statistics.last val sb = new StringBuilder @@ -80,13 +80,13 @@ class Report( chartUrl } - def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = { + def comparePercentilesAndMeanChart(stats: Stats): immutable.Seq[String] = { for { - compareName ← compareResultWith.toSeq + compareName ← compareResultWith.to[immutable.Seq] compareStats ← resultRepository.get(compareName, stats.load) } yield { val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" - val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name) + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(List(compareStats, stats), chartTitle, _.name) chartUrl } } @@ -102,17 +102,17 @@ class Report( } } - def compareWithHistoricalTpsChart(statistics: Seq[Stats]): Option[String] = { + def compareWithHistoricalTpsChart(statistics: immutable.Seq[Stats]): Option[String] = { if (statistics.isEmpty) { None } else { val histTimestamps = resultRepository.getWithHistorical(statistics.head.name, statistics.head.load).map(_.timestamp) - val statsByTimestamp = TreeMap[Long, Seq[Stats]]() ++ + val statsByTimestamp = immutable.TreeMap[Long, Seq[Stats]]() ++ (for (ts ← histTimestamps) yield { val seq = for (stats ← statistics) yield { - val withHistorical: Seq[Stats] = resultRepository.getWithHistorical(stats.name, stats.load) + val withHistorical: immutable.Seq[Stats] = resultRepository.getWithHistorical(stats.name, stats.load) val cell = withHistorical.find(_.timestamp == ts) cell.getOrElse(Stats(stats.name, stats.load, ts)) } @@ -131,7 +131,7 @@ class Report( chartUrl } - def formatResultsTable(statsSeq: Seq[Stats]): String = { + def formatResultsTable(statsSeq: immutable.Seq[Stats]): String = { val name = statsSeq.head.name diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index cc21e0de16..a20e8220b1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -121,7 +121,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act else addr + name override def compareTo(other: ActorPath): Int = other match { - case r: RootActorPath ⇒ toString compareTo r.toString + case r: RootActorPath ⇒ toString compareTo r.toString // FIXME make this cheaper by comparing address and name in isolation case c: ChildActorPath ⇒ 1 } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8fa84b2978..a11da0c150 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -480,7 +480,7 @@ class LocalActorRefProvider( def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras private def guardianSupervisorStrategyConfigurator = - dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get + dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Nil).get /** * Overridable supervision strategy to be used by the “/user” guardian. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 212ee9372d..b4309bcb5c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -9,17 +9,17 @@ import akka.dispatch._ import akka.pattern.ask import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec -import scala.concurrent.duration.Duration -import java.io.Closeable +import scala.collection.immutable +import scala.concurrent.duration.{ FiniteDuration, Duration } import scala.concurrent.{ Await, Awaitable, CanAwait, Future } +import scala.util.{ Failure, Success } import scala.util.control.NonFatal import akka.util._ +import java.io.Closeable import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.dungeon.ChildrenContainer -import scala.concurrent.duration.FiniteDuration -import util.{ Failure, Success } object ActorSystem { @@ -144,7 +144,7 @@ object ActorSystem { final val LogLevel: String = getString("akka.loglevel") final val StdoutLogLevel: String = getString("akka.stdout-loglevel") - final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val EventHandlers: immutable.Seq[String] = getStringList("akka.event-handlers").asScala.to[Vector] final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS)) final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start") @@ -273,10 +273,8 @@ abstract class ActorSystem extends ActorRefFactory { /** * ''Java API'': Recursively create a descendant’s path by appending all child names. */ - def descendant(names: java.lang.Iterable[String]): ActorPath = { - import scala.collection.JavaConverters._ - /(names.asScala) - } + def descendant(names: java.lang.Iterable[String]): ActorPath = + /(scala.collection.JavaConverters.iterableAsScalaIterableConverter(names).asScala) /** * Start-up time in milliseconds since the epoch. @@ -536,7 +534,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, val scheduler: Scheduler = createScheduler() val provider: ActorRefProvider = { - val arguments = Seq( + val arguments = Vector( classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, @@ -676,15 +674,15 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null private def loadExtensions() { - import scala.collection.JavaConverters.collectionAsScalaIterableConverter - settings.config.getStringList("akka.extensions").asScala foreach { fqcn ⇒ - dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match { - case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) - case Success(p: ExtensionId[_]) ⇒ registerExtension(p) - case Success(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) - case Failure(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) + scala.collection.JavaConverters.collectionAsScalaIterableConverter( + settings.config.getStringList("akka.extensions")).asScala foreach { fqcn ⇒ + dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match { + case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) + case Success(p: ExtensionId[_]) ⇒ registerExtension(p) + case Success(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) + case Failure(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) + } } - } } override def toString: String = lookupRoot.path.root.address.toString diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 7c87d696a4..b8f8a52e45 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -5,7 +5,8 @@ package akka.actor import java.net.URI import java.net.URISyntaxException import java.net.MalformedURLException -import annotation.tailrec +import scala.annotation.tailrec +import scala.collection.immutable /** * The address specifies the physical location under which an Actor can be @@ -71,7 +72,7 @@ private[akka] trait PathUtils { } object RelativeActorPath extends PathUtils { - def unapply(addr: String): Option[Iterable[String]] = { + def unapply(addr: String): Option[immutable.Seq[String]] = { try { val uri = new URI(addr) if (uri.isAbsolute) None diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 06d3b01a1b..e74b54c320 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -160,7 +160,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val vnodes = deployment.getInt("virtual-nodes-factor") ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes) case fqn ⇒ - val args = Seq(classOf[Config] -> deployment) + val args = List(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ case exception ⇒ throw new IllegalArgumentException( ("Cannot instantiate router [%s], defined in [%s], " + diff --git a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala index 7a73eb3b15..af891bc483 100644 --- a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala @@ -3,7 +3,7 @@ */ package akka.actor -import scala.util.control.NonFatal +import scala.collection.immutable import java.lang.reflect.InvocationTargetException import scala.reflect.ClassTag import scala.util.Try @@ -25,7 +25,7 @@ abstract class DynamicAccess { * val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name)) * }}} */ - def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] + def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] /** * Obtain a `Class[_]` object loaded with the right class loader (i.e. the one @@ -40,7 +40,7 @@ abstract class DynamicAccess { * `args` argument. The exact usage of args depends on which type is requested, * see the relevant requesting code for details. */ - def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] + def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] /** * Obtain the Scala “object” instance for the given fully-qualified class name, if there is one. @@ -70,7 +70,7 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c) }) - override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] = + override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = Try { val types = args.map(_._1).toArray val values = args.map(_._2).toArray @@ -81,7 +81,7 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t) } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } - override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] = + override def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = getClassFor(fqcn) flatMap { c ⇒ createInstanceFor(c, args) } override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = { diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index 6fab4ceb07..707c07982a 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -98,5 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext def this(clazz: Class[T]) = this()(ClassTag(clazz)) override def lookup(): ExtensionId[T] = this - def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get + def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, List(classOf[ExtendedActorSystem] -> system)).get } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index d72389ae5e..444618df00 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -5,9 +5,10 @@ package akka.actor import language.implicitConversions +import java.lang.{ Iterable ⇒ JIterable } import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer -import java.lang.{ Iterable ⇒ JIterable } +import scala.collection.immutable import scala.concurrent.duration.Duration /** * INTERNAL API @@ -170,7 +171,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { * Implicit conversion from `Seq` of Throwables to a `Decider`. * This maps the given Throwables to restarts, otherwise escalates. */ - implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit) + implicit def seqThrowable2Decider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit) type Decider = PartialFunction[Throwable, Directive] type JDecider = akka.japi.Function[Throwable, Directive] @@ -180,23 +181,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { * Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ - def makeDecider(trapExit: Array[Class[_]]): Decider = - { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } - + def makeDecider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit) /** * Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ - def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider = - { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } + def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = + makeImmutableDecider(scala.collection.JavaConverters.iterableAsScalaIterableConverter(trapExit).asScala) - /** - * Decider builder which just checks whether one of - * the given Throwables matches the cause and restarts, otherwise escalates. - */ - def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = { - import scala.collection.JavaConverters.iterableAsScalaIterableConverter - makeDecider(trapExit.asScala.toSeq) + private[this] def makeImmutableDecider(trapExit: Iterable[Class[_]]): Decider = { + val traps = trapExit match { // This is the sad, awkward, truth + case s: immutable.Seq[_] ⇒ s.asInstanceOf[immutable.Seq[Class[_]]] + case other ⇒ other.to[immutable.Seq] + } + + { case x ⇒ if (traps exists (_ isInstance x)) Restart else Escalate } } /** @@ -222,14 +221,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { * * INTERNAL API */ - private[akka] def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] = + private[akka] def sort(in: Iterable[CauseDirective]): immutable.Seq[CauseDirective] = (new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) ⇒ buf.indexWhere(_._1 isAssignableFrom ca._1) match { case -1 ⇒ buf append ca case x ⇒ buf insert (x, ca) } buf - } + }.to[immutable.Seq] private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] = if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None @@ -338,10 +337,6 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - - def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) = - this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared @@ -380,9 +375,6 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_]]) = - this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 635afe2a58..e1dedb3ba2 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -6,6 +6,7 @@ package akka.actor import language.higherKinds import language.postfixOps +import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -122,7 +123,7 @@ object IO { * @return a new SocketHandle that can be used to perform actions on the * new connection's SocketChannel. */ - def accept(options: Seq[SocketOption] = Seq.empty)(implicit socketOwner: ActorRef): SocketHandle = { + def accept(options: immutable.Seq[SocketOption] = Nil)(implicit socketOwner: ActorRef): SocketHandle = { val socket = SocketHandle(socketOwner, ioManager) ioManager ! Accept(socket, this, options) socket @@ -250,7 +251,7 @@ object IO { * * Normally sent using IOManager.listen() */ - case class Listen(server: ServerHandle, address: SocketAddress, options: Seq[ServerSocketOption] = Seq.empty) extends IOMessage + case class Listen(server: ServerHandle, address: SocketAddress, options: immutable.Seq[ServerSocketOption] = Nil) extends IOMessage /** * Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is @@ -272,7 +273,7 @@ object IO { * * Normally sent using [[akka.actor.IO.ServerHandle]].accept() */ - case class Accept(socket: SocketHandle, server: ServerHandle, options: Seq[SocketOption] = Seq.empty) extends IOMessage + case class Accept(socket: SocketHandle, server: ServerHandle, options: immutable.Seq[SocketOption] = Nil) extends IOMessage /** * Message to an [[akka.actor.IOManager]] to create a SocketChannel connected @@ -280,7 +281,7 @@ object IO { * * Normally sent using IOManager.connect() */ - case class Connect(socket: SocketHandle, address: SocketAddress, options: Seq[SocketOption] = Seq.empty) extends IOMessage + case class Connect(socket: SocketHandle, address: SocketAddress, options: immutable.Seq[SocketOption] = Nil) extends IOMessage /** * Message from an [[akka.actor.IOManager]] that the SocketChannel has @@ -832,7 +833,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension { * @param option Seq of [[akka.actor.IO.ServerSocketOptions]] to setup on socket * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket */ - def listen(address: SocketAddress, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = { + def listen(address: SocketAddress, options: immutable.Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = { val server = IO.ServerHandle(owner, actor) actor ! IO.Listen(server, address, options) server @@ -847,7 +848,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension { * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket */ - def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Seq.empty) + def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Nil) /** * Create a ServerSocketChannel listening on a host and port. Messages will @@ -860,7 +861,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension { * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket */ - def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption] = Seq.empty)(implicit owner: ActorRef): IO.ServerHandle = + def listen(host: String, port: Int, options: immutable.Seq[IO.ServerSocketOption] = Nil)(implicit owner: ActorRef): IO.ServerHandle = listen(new InetSocketAddress(host, port), options)(owner) /** @@ -873,7 +874,7 @@ final class IOManager private (system: ExtendedActorSystem) extends Extension { * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket */ - def connect(address: SocketAddress, options: Seq[IO.SocketOption] = Seq.empty)(implicit owner: ActorRef): IO.SocketHandle = { + def connect(address: SocketAddress, options: immutable.Seq[IO.SocketOption] = Nil)(implicit owner: ActorRef): IO.SocketHandle = { val socket = IO.SocketHandle(owner, actor) actor ! IO.Connect(socket, address, options) socket @@ -991,7 +992,7 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi private def forwardFailure(f: ⇒ Unit): Unit = try f catch { case NonFatal(e) ⇒ sender ! Status.Failure(e) } - private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) { + private def setSocketOptions(socket: java.net.Socket, options: immutable.Seq[IO.SocketOption]) { options foreach { case IO.KeepAlive(on) ⇒ forwardFailure(socket.setKeepAlive(on)) case IO.OOBInline(on) ⇒ forwardFailure(socket.setOOBInline(on)) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 6dbe48ba40..f7b0e853ef 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -4,22 +4,24 @@ package akka.actor import language.existentials -import akka.japi.{ Creator, Option ⇒ JOption } -import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } -import akka.util.Timeout + import scala.util.control.NonFatal +import scala.util.{ Try, Success, Failure } +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.Duration +import scala.reflect.ClassTag import scala.concurrent.{ Await, Future } +import akka.japi.{ Creator, Option ⇒ JOption } +import akka.util.Timeout import akka.util.Reflect.instantiator +import akka.serialization.{ JavaSerializer, SerializationExtension } import akka.dispatch._ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit.MILLISECONDS -import scala.reflect.ClassTag -import akka.serialization.{ JavaSerializer, SerializationExtension } import java.io.ObjectStreamException -import scala.util.{ Try, Success, Failure } -import scala.concurrent.duration.FiniteDuration +import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } /** * A TypedActorFactory is something that can created TypedActor instances. @@ -439,8 +441,8 @@ object TypedProps { * @return a sequence of interfaces that the specified class implements, * or a sequence containing only itself, if itself is an interface. */ - def extractInterfaces(clazz: Class[_]): Seq[Class[_]] = - if (clazz.isInterface) Seq[Class[_]](clazz) else clazz.getInterfaces.toList + def extractInterfaces(clazz: Class[_]): immutable.Seq[Class[_]] = + if (clazz.isInterface) List[Class[_]](clazz) else clazz.getInterfaces.to[List] /** * Uses the supplied class as the factory for the TypedActor implementation, @@ -489,7 +491,7 @@ object TypedProps { */ @SerialVersionUID(1L) case class TypedProps[T <: AnyRef] protected[TypedProps] ( - interfaces: Seq[Class[_]], + interfaces: immutable.Seq[Class[_]], creator: () ⇒ T, dispatcher: String = TypedProps.defaultDispatcherId, deploy: Deploy = Props.defaultDeploy, diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 36afc8a24c..1b9de36e77 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -420,7 +420,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case "unbounded" ⇒ UnboundedMailbox() case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) case fqcn ⇒ - val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) + val args = List(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ case exception ⇒ throw new IllegalArgumentException( @@ -436,7 +436,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case fqcn ⇒ - val args = Seq( + val args = List( classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites) prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index e1ae0ae50b..910a5ceed5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -147,7 +147,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator(cfg, prerequisites) case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ - val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) + val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ case exception ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index cb83fbe806..403f7a0dfd 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentSkipListSet import java.util.Comparator import akka.util.{ Subclassification, SubclassifiedIndex } import scala.collection.immutable.TreeSet +import scala.collection.immutable /** * Represents the base type for EventBuses @@ -167,12 +168,12 @@ trait SubchannelClassification { this: EventBus ⇒ recv foreach (publish(event, _)) } - private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) } - private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index c9286cf2c9..dbd561514d 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -9,12 +9,13 @@ import akka.actor._ import akka.{ ConfigurationException, AkkaException } import akka.actor.ActorSystem.Settings import akka.util.{ Timeout, ReentrantGuard } -import scala.concurrent.duration._ import java.util.concurrent.atomic.AtomicInteger -import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException +import scala.annotation.implicitNotFound +import scala.collection.immutable +import scala.concurrent.duration._ import scala.concurrent.Await -import annotation.implicitNotFound +import scala.util.control.NoStackTrace /** * This trait brings log level handling to the EventStream: it reads the log @@ -448,7 +449,7 @@ object Logging { } // these type ascriptions/casts are necessary to avoid CCEs during construction while retaining correct type - val AllLogLevels: Seq[LogLevel] = Seq(ErrorLevel, WarningLevel, InfoLevel, DebugLevel) + val AllLogLevels: immutable.Seq[LogLevel] = Vector(ErrorLevel, WarningLevel, InfoLevel, DebugLevel) /** * Obtain LoggingAdapter for the given actor system and source object. This diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index ee2a688345..fda674a02c 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -5,7 +5,8 @@ package akka.japi import language.implicitConversions -import scala.Some + +import scala.collection.immutable import scala.reflect.ClassTag import scala.util.control.NoStackTrace import scala.runtime.AbstractPartialFunction @@ -176,7 +177,7 @@ object Option { object Util { def classTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz) - def arrayToSeq[T](arr: Array[T]): Seq[T] = arr.toSeq + def arrayToSeq[T](arr: Array[T]): immutable.Seq[T] = arr.to[immutable.Seq] - def arrayToSeq(classes: Array[Class[_]]): Seq[Class[_]] = classes.toSeq + def arrayToSeq(classes: Array[Class[_]]): immutable.Seq[Class[_]] = classes.to[immutable.Seq] } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 819fea2586..84100f0f21 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -4,7 +4,7 @@ package akka.routing -import scala.collection.immutable.SortedMap +import scala.collection.immutable import scala.reflect.ClassTag import java.util.Arrays @@ -18,7 +18,7 @@ import java.util.Arrays * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) { +class ConsistentHash[T: ClassTag] private (nodes: immutable.SortedMap[Int, T], val virtualNodesFactor: Int) { import ConsistentHash._ @@ -106,7 +106,7 @@ class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtual object ConsistentHash { def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { - new ConsistentHash(SortedMap.empty[Int, T] ++ + new ConsistentHash(immutable.SortedMap.empty[Int, T] ++ (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), virtualNodesFactor) } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 003c9de2b1..1f78fbd3a7 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -4,14 +4,15 @@ package akka.serialization -import akka.AkkaException import com.typesafe.config.Config +import akka.AkkaException import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess } import akka.event.Logging import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer import java.io.NotSerializableException -import util.{ Try, DynamicVariable } +import scala.util.{ Try, DynamicVariable } +import scala.collection.immutable object Serialization { @@ -97,7 +98,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { serializerMap.get(clazz) match { case null ⇒ // bindings are ordered from most specific to least specific - def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean = + def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean = possibilities.size == 1 || (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) || (possibilities forall (_._2 == possibilities(0)._2)) @@ -122,8 +123,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * loading is performed by the system’s [[akka.actor.DynamicAccess]]. */ def serializerOf(serializerFQN: String): Try[Serializer] = - system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith { - case _ ⇒ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()) + system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, List(classOf[ExtendedActorSystem] -> system)) recoverWith { + case _ ⇒ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Nil) } /** @@ -137,21 +138,21 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * bindings is a Seq of tuple representing the mapping from Class to Serializer. * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ - private[akka] val bindings: Seq[ClassSerializer] = - sort(for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))) + private[akka] val bindings: immutable.Seq[ClassSerializer] = + sort(for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))).to[immutable.Seq] /** * Sort so that subtypes always precede their supertypes, but without * obeying any order between unrelated subtypes (insert sort). */ - private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] = - (new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca) ⇒ + private def sort(in: Iterable[ClassSerializer]): immutable.Seq[ClassSerializer] = + ((new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca) ⇒ buf.indexWhere(_._1 isAssignableFrom ca._1) match { case -1 ⇒ buf append ca case x ⇒ buf insert (x, ca) } buf - } + }).to[immutable.Seq] /** * serializerMap is a Map whose keys is the class that is serializable and values is the serializer diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index 565d50e636..d0ee67c1fb 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -3,6 +3,8 @@ */ package akka.util +import scala.collection.immutable + /** * Typeclass which describes a classification hierarchy. Observe the contract between `isEqual` and `isSubclass`! */ @@ -74,7 +76,7 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] import SubclassifiedIndex._ - type Changes = Seq[(K, Set[V])] + type Changes = immutable.Seq[(K, Set[V])] protected var subkeys = Vector.empty[Nonroot[K, V]] @@ -208,5 +210,5 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] private def mergeChangesByKey(changes: Changes): Changes = (emptyMergeMap[K, V] /: changes) { case (m, (k, s)) ⇒ m.updated(k, m(k) ++ s) - }.toSeq + }.to[immutable.Seq] } diff --git a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala index a4ad1564c2..ff5524ad6c 100644 --- a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala @@ -1,13 +1,18 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.camel +import language.postfixOps + import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers +import scala.concurrent.{ Promise, Await, Future } +import scala.collection.immutable import akka.camel.TestSupport.NonSharedCamelSystem import akka.actor.{ ActorRef, Props, Actor } import akka.routing.BroadcastRouter -import concurrent.{ Promise, Await, Future } import scala.concurrent.duration._ -import language.postfixOps import akka.testkit._ import akka.util.Timeout import org.apache.camel.model.RouteDefinition @@ -58,7 +63,7 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared activations.size must be(2 * number * number) // must be the size of the activated activated producers and consumers deactivations.size must be(2 * number * number) - def partitionNames(refs: Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer")) + def partitionNames(refs: immutable.Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer")) def assertContainsSameElements(lists: (Seq[_], Seq[_])) { val (a, b) = lists a.intersect(b).size must be(a.size) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 13f93d0482..80fdc69fed 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -75,7 +75,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val failureDetector: FailureDetector = { import settings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ + fqcn, List(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) }).get } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 76c01a6381..7ed3699035 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -557,7 +557,7 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe */ private[cluster] object MetricsCollector { def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector = - dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match { + dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil) match { case Success(identity) ⇒ new MetricsCollector(Some(identity), address) case Failure(e) ⇒ log.debug(e.toString) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 3efb891a3b..bf1009b472 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address +import scala.collection.immutable case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -51,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow cluster.down(thirdAddress) enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -62,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 5a7308ec92..2a0af15997 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address +import scala.collection.immutable case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -49,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT markNodeAsUnavailable(thirdAddress) - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -60,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 134ed4d0d6..279e32ab66 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -11,6 +11,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor._ import scala.concurrent.duration._ +import scala.collection.immutable case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -59,7 +60,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds) } runOn(fourth) { @@ -69,7 +70,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(second, third) { enterBarrier("down-fourth-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds) } enterBarrier("await-completion-1") @@ -89,7 +90,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30.seconds) } runOn(second) { @@ -99,7 +100,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(third) { enterBarrier("down-second-node") - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds) + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30 seconds) } enterBarrier("await-completion-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index fc8c4d2619..dfe1553369 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -10,6 +10,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.duration._ +import scala.collection.immutable case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") @@ -42,7 +43,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig import multiNodeConfig._ // sorted in the order used by the cluster - lazy val sortedRoles = Seq(first, second, third, fourth).sorted + lazy val sortedRoles = List(first, second, third, fourth).sorted "A cluster of four nodes" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 12fc8ebbc6..d696f9b62b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -4,21 +4,21 @@ package akka.cluster import language.implicitConversions + +import org.scalatest.Suite +import org.scalatest.exceptions.TestFailedException + import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import akka.actor.{ Address, ExtendedActorSystem } import akka.remote.testconductor.RoleName import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec } import akka.testkit._ import akka.testkit.TestEvent._ -import scala.concurrent.duration._ -import org.scalatest.Suite -import org.scalatest.exceptions.TestFailedException -import java.util.concurrent.ConcurrentHashMap -import akka.actor.ActorPath -import akka.actor.RootActorPath +import akka.actor.{ ActorSystem, Address } import akka.event.Logging.ErrorLevel -import akka.actor.ActorSystem +import scala.concurrent.duration._ +import scala.collection.immutable +import java.util.concurrent.ConcurrentHashMap object MultiNodeClusterSpec { @@ -158,7 +158,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS * nodes (roles). First node will be started first * and others will join the first. */ - def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq) + def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.to[immutable.Seq]) /** * Initialize the cluster of the specified member @@ -166,11 +166,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS * First node will be started first and others will join * the first. */ - def awaitClusterUp(roles: RoleName*): Unit = { - awaitStartCluster(true, roles.toSeq) - } + def awaitClusterUp(roles: RoleName*): Unit = awaitStartCluster(true, roles.to[immutable.Seq]) - private def awaitStartCluster(upConvergence: Boolean = true, roles: Seq[RoleName]): Unit = { + private def awaitStartCluster(upConvergence: Boolean = true, roles: immutable.Seq[RoleName]): Unit = { runOn(roles.head) { // make sure that the node-to-join is started before other join startClusterNode() @@ -196,16 +194,15 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) } } - def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) { - assertLeaderIn(nodesInCluster) - } + def assertLeader(nodesInCluster: RoleName*): Unit = + if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq]) /** * Assert that the cluster has elected the correct leader * out of all nodes in the cluster. First * member in the cluster ring is expected leader. */ - def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { + def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { nodesInCluster.length must not be (0) val expectedLeader = roleOfLeader(nodesInCluster) val leader = clusterView.leader @@ -221,7 +218,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS */ def awaitUpConvergence( numberOfMembers: Int, - canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], + canNotBePartOfMemberRing: immutable.Seq[Address] = Nil, timeout: FiniteDuration = 20.seconds): Unit = { within(timeout) { awaitCond(clusterView.members.size == numberOfMembers) @@ -239,7 +236,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS def awaitSeenSameState(addresses: Address*): Unit = awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) - def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { + def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 291a59a44f..33ce67ecb5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -8,6 +8,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.duration._ +import scala.collection.immutable case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -65,7 +66,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo markNodeAsUnavailable(secondAddress) - awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = List(secondAddress), 30.seconds) clusterView.isSingletonCluster must be(true) awaitCond(clusterView.isLeader) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index e1b1a4af96..a7a7c6f4ba 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -9,9 +9,10 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import scala.concurrent.duration._ import akka.actor.Address import akka.remote.testconductor.Direction +import scala.concurrent.duration._ +import scala.collection.immutable case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -53,8 +54,8 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) muteMarkingAsUnreachable() - val side1 = IndexedSeq(first, second) - val side2 = IndexedSeq(third, fourth, fifth) + val side1 = Vector(first, second) + val side2 = Vector(third, fourth, fifth) "A cluster of 5 members" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index db2f9fc930..395d803865 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -6,13 +6,14 @@ package akka.cluster import language.postfixOps import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import com.typesafe.config.ConfigFactory import akka.actor.Address import akka.remote.testconductor.{ RoleName, Direction } import scala.concurrent.duration._ +import scala.collection.immutable case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -45,7 +46,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod muteMarkingAsUnreachable() - def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { + def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = { roles.filterNot(_ == role) } @@ -125,7 +126,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod } runOn(allBut(victim): _*) { - awaitUpConvergence(roles.size - 1, Seq(victim)) + awaitUpConvergence(roles.size - 1, List(victim)) } endBarrier diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 45476864db..8a9d6eb6fc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -7,7 +7,7 @@ package akka.cluster import akka.actor.Address import akka.testkit._ import akka.testkit.TestEvent._ -import scala.collection.immutable.TreeMap +import scala.collection.immutable import scala.concurrent.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -27,7 +27,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" val conn = Address("akka", "", "localhost", 2552) val conn2 = Address("akka", "", "localhost", 2553) - def fakeTimeGenerator(timeIntervals: Seq[Long]): () ⇒ Long = { + def fakeTimeGenerator(timeIntervals: immutable.Seq[Long]): () ⇒ Long = { var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c)) def timeGenerator(): Long = { val currentTime = times.head @@ -73,7 +73,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return realistic phi values" in { val fd = createFailureDetector() - val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) + val test = immutable.TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) for ((timeDiff, expectedPhi) ← test) { fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) must be(expectedPhi plusOrMinus (0.1)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala index 6589ba0efc..609975db6a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -5,14 +5,16 @@ package akka.cluster import scala.language.postfixOps + +import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.Await +import scala.util.{ Success, Try, Failure } import akka.actor._ import akka.testkit._ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import util.{ Success, Try, Failure } object MetricsEnabledSpec { val config = """ @@ -207,11 +209,10 @@ trait MetricSpec extends WordSpec with MustMatchers { if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) ⇒ m }.foreach(_.average.isDefined must be(true)) } - def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = { - var r: Seq[Metric] = Seq.empty - nodes.foreach(n ⇒ r ++= n.metrics.filter(_.isDefined)) - r - } + def collectNodeMetrics(nodes: Set[NodeMetrics]): immutable.Seq[Metric] = + nodes.foldLeft(Vector[Metric]()) { + case (r, n) ⇒ r ++ n.metrics.filter(_.isDefined) + } } trait AbstractClusterMetricsSpec extends DefaultTimeout { diff --git a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java index c78da61fb1..9ea2a332b1 100644 --- a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java @@ -30,7 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import akka.japi.Function; import scala.Option; import scala.collection.JavaConverters; -import scala.collection.Seq; +import scala.collection.immutable.Seq; import org.junit.Test; import org.junit.BeforeClass; @@ -220,7 +220,7 @@ public class FaultHandlingTestBase { //#testkit public Seq seq(A... args) { return JavaConverters.collectionAsScalaIterableConverter( - java.util.Arrays.asList(args)).asScala().toSeq(); + java.util.Arrays.asList(args)).asScala().toList(); } //#testkit } diff --git a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java index c4e7414ce1..73b8c5c639 100644 --- a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java @@ -11,6 +11,7 @@ import static docs.jrouting.CustomRouterDocTestBase.Message.RepublicanVote; import static org.junit.Assert.assertEquals; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.junit.After; @@ -69,7 +70,7 @@ public class CustomRouterDocTestBase { //#supervision final SupervisorStrategy strategy = new OneForOneStrategy(5, Duration.create("1 minute"), - new Class[] { Exception.class }); + Collections.>singletonList(Exception.class)); final ActorRef router = system.actorOf(new Props(MyActor.class) .withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy))); //#supervision diff --git a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala index bcc908990f..cc88416b0e 100644 --- a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala @@ -8,6 +8,7 @@ import language.postfixOps import akka.testkit.{ AkkaSpec ⇒ MyFavoriteTestFrameWorkPlusAkkaTestKit } //#test-code import akka.actor.Props +import scala.collection.immutable class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { @@ -24,7 +25,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { case object Flush // sent events - case class Batch(obj: Seq[Any]) + case class Batch(obj: immutable.Seq[Any]) //#simple-events //#simple-state // states @@ -34,7 +35,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { sealed trait Data case object Uninitialized extends Data - case class Todo(target: ActorRef, queue: Seq[Any]) extends Data + case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data //#simple-state //#simple-fsm class Buncher extends Actor with FSM[State, Data] { @@ -193,12 +194,12 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { buncher ! SetTarget(testActor) buncher ! Queue(42) buncher ! Queue(43) - expectMsg(Batch(Seq(42, 43))) + expectMsg(Batch(immutable.Seq(42, 43))) buncher ! Queue(44) buncher ! Flush buncher ! Queue(45) - expectMsg(Batch(Seq(44))) - expectMsg(Batch(Seq(45))) + expectMsg(Batch(immutable.Seq(44))) + expectMsg(Batch(immutable.Seq(45))) } "batch not if uninitialized" in { diff --git a/akka-docs/rst/scala/code/docs/testkit/TestKitUsageSpec.scala b/akka-docs/rst/scala/code/docs/testkit/TestKitUsageSpec.scala index 48e6b6664b..8b153c5944 100644 --- a/akka-docs/rst/scala/code/docs/testkit/TestKitUsageSpec.scala +++ b/akka-docs/rst/scala/code/docs/testkit/TestKitUsageSpec.scala @@ -22,6 +22,7 @@ import akka.testkit.DefaultTimeout import akka.testkit.ImplicitSender import akka.testkit.TestKit import scala.concurrent.duration._ +import scala.collection.immutable /** * a Test to show some TestKit examples @@ -38,8 +39,8 @@ class TestKitUsageSpec val filterRef = system.actorOf(Props(new FilteringActor(testActor))) val randomHead = Random.nextInt(6) val randomTail = Random.nextInt(10) - val headList = Seq().padTo(randomHead, "0") - val tailList = Seq().padTo(randomTail, "1") + val headList = immutable.Seq().padTo(randomHead, "0") + val tailList = immutable.Seq().padTo(randomTail, "1") val seqRef = system.actorOf(Props(new SequencingActor(testActor, headList, tailList))) @@ -145,7 +146,7 @@ object TestKitUsageSpec { * like to test that the interesting value is received and that you cant * be bothered with the rest */ - class SequencingActor(next: ActorRef, head: Seq[String], tail: Seq[String]) + class SequencingActor(next: ActorRef, head: immutable.Seq[String], tail: immutable.Seq[String]) extends Actor { def receive = { case msg ⇒ { diff --git a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala index 0e3b4df262..28ff2e3d34 100644 --- a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -5,13 +5,13 @@ package docs.zeromq import language.postfixOps -import akka.actor.{ Actor, Props } import scala.concurrent.duration._ +import scala.collection.immutable +import akka.actor.{ Actor, Props } import akka.testkit._ -import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension } +import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind } import java.text.SimpleDateFormat import java.util.Date -import akka.zeromq.{ SocketType, Bind } object ZeromqDocSpec { @@ -52,12 +52,12 @@ object ZeromqDocSpec { val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload))) + pubSocket ! ZMQMessage(immutable.Seq(Frame("health.heap"), Frame(heapPayload))) // use akka SerializationExtension to convert to bytes val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload))) + pubSocket ! ZMQMessage(immutable.Seq(Frame("health.load"), Frame(loadPayload))) } } //#health @@ -146,7 +146,7 @@ class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") { val payload = Array.empty[Byte] //#pub-topic - pubSocket ! ZMQMessage(Seq(Frame("foo.bar"), Frame(payload))) + pubSocket ! ZMQMessage(Frame("foo.bar"), Frame(payload)) //#pub-topic system.stop(subSocket) diff --git a/akka-kernel/src/main/scala/akka/kernel/Main.scala b/akka-kernel/src/main/scala/akka/kernel/Main.scala index 97ff625ab8..3fe3cac403 100644 --- a/akka-kernel/src/main/scala/akka/kernel/Main.scala +++ b/akka-kernel/src/main/scala/akka/kernel/Main.scala @@ -9,6 +9,7 @@ import java.io.File import java.lang.Boolean.getBoolean import java.net.URLClassLoader import java.util.jar.JarFile +import scala.collection.immutable import scala.collection.JavaConverters._ /** @@ -77,8 +78,8 @@ object Main { Thread.currentThread.setContextClassLoader(classLoader) - val bootClasses: Seq[String] = args.toSeq - val bootables: Seq[Bootable] = bootClasses map { c ⇒ classLoader.loadClass(c).newInstance.asInstanceOf[Bootable] } + val bootClasses: immutable.Seq[String] = args.to[immutable.Seq] + val bootables: immutable.Seq[Bootable] = bootClasses map { c ⇒ classLoader.loadClass(c).newInstance.asInstanceOf[Bootable] } for (bootable ← bootables) { log("Starting up " + bootable.getClass.getName) @@ -122,7 +123,7 @@ object Main { new URLClassLoader(urls, Thread.currentThread.getContextClassLoader) } - private def addShutdownHook(bootables: Seq[Bootable]): Unit = { + private def addShutdownHook(bootables: immutable.Seq[Bootable]): Unit = { Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { def run = { log("") diff --git a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala index 2728a80894..79d07c65a3 100644 --- a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala +++ b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala @@ -32,7 +32,7 @@ class SimpleNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT import NamespaceHandlerTest._ - val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + val testBundles = buildTestBundles(List( AKKA_OSGI_BLUEPRINT, bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("simple.xml")))) @@ -62,7 +62,7 @@ class ConfigNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT import NamespaceHandlerTest._ - val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + val testBundles = buildTestBundles(List( AKKA_OSGI_BLUEPRINT, bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("config.xml")))) @@ -94,7 +94,7 @@ class DependencyInjectionNamespaceHandlerTest extends WordSpec with MustMatchers import NamespaceHandlerTest._ - val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + val testBundles = buildTestBundles(List( AKKA_OSGI_BLUEPRINT, bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("injection.xml")))) diff --git a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala index d582209d76..27455be75e 100644 --- a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala +++ b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala @@ -10,6 +10,7 @@ import akka.actor.ActorSystem import akka.pattern.ask import scala.concurrent.Await import scala.concurrent.duration._ +import scala.collection.immutable import akka.util.Timeout import de.kalpatec.pojosr.framework.launch.BundleDescriptor import test.{ RuntimeNameActorSystemActivator, TestActivators, PingPongActorSystemActivator } @@ -32,7 +33,7 @@ class PingPongActorSystemActivatorTest extends WordSpec with MustMatchers with P import ActorSystemActivatorTest._ - val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + val testBundles: immutable.Seq[BundleDescriptor] = buildTestBundles(List( bundle(TEST_BUNDLE_NAME).withActivator(classOf[PingPongActorSystemActivator]))) "PingPongActorSystemActivator" must { @@ -65,7 +66,8 @@ class RuntimeNameActorSystemActivatorTest extends WordSpec with MustMatchers wit import ActorSystemActivatorTest._ - val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq(bundle(TEST_BUNDLE_NAME).withActivator(classOf[RuntimeNameActorSystemActivator]))) + val testBundles: immutable.Seq[BundleDescriptor] = + buildTestBundles(List(bundle(TEST_BUNDLE_NAME).withActivator(classOf[RuntimeNameActorSystemActivator]))) "RuntimeNameActorSystemActivator" must { diff --git a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala index e993d04f01..b8c7ea24e5 100644 --- a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala +++ b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala @@ -17,7 +17,7 @@ import java.io._ import org.scalatest.{ BeforeAndAfterAll, Suite } import java.util.{ UUID, Date, ServiceLoader, HashMap } import scala.reflect.ClassTag -import scala.Some +import scala.collection.immutable /** * Trait that provides support for building akka-osgi tests using PojoSR @@ -31,7 +31,7 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { * All bundles being found on the test classpath are automatically installed and started in the PojoSR runtime. * Implement this to define the extra bundles that should be available for testing. */ - def testBundles: Seq[BundleDescriptor] + def testBundles: immutable.Seq[BundleDescriptor] val bufferedLoadingErrors = new ByteArrayOutputStream() @@ -82,15 +82,11 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { } } - protected def buildTestBundles(builders: Seq[BundleDescriptorBuilder]): Seq[BundleDescriptor] = builders map (_.build) + protected def buildTestBundles(builders: immutable.Seq[BundleDescriptorBuilder]): immutable.Seq[BundleDescriptor] = + builders map (_.build) - def filterErrors()(block: ⇒ Unit): Unit = { - try { - block - } catch { - case e: Throwable ⇒ System.err.write(bufferedLoadingErrors.toByteArray); throw e - } - } + def filterErrors()(block: ⇒ Unit): Unit = + try block catch { case e: Throwable ⇒ System.err.write(bufferedLoadingErrors.toByteArray); throw e } } object PojoSRTestSupport { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala index 3f2e739308..26d6f6f243 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala @@ -6,13 +6,13 @@ package akka.remote.testconductor import language.postfixOps import java.net.InetSocketAddress import scala.annotation.tailrec -import scala.collection.immutable.Queue +import scala.collection.immutable +import scala.concurrent.duration._ import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{ SimpleChannelHandler, MessageEvent, Channels, ChannelStateEvent, ChannelHandlerContext, ChannelFutureListener, ChannelFuture } import akka.actor.{ Props, LoggingFSM, Address, ActorSystem, ActorRef, ActorLogging, Actor, FSM } import akka.event.Logging import akka.remote.netty.ChannelAddress -import scala.concurrent.duration._ /** * INTERNAL API. @@ -230,7 +230,7 @@ private[akka] object ThrottleActor { case object Throttle extends State case object Blackhole extends State - case class Data(lastSent: Long, rateMBit: Float, queue: Queue[Send]) + case class Data(lastSent: Long, rateMBit: Float, queue: immutable.Queue[Send]) case class Send(ctx: ChannelHandlerContext, direction: Direction, future: Option[ChannelFuture], msg: AnyRef) case class SetRate(rateMBit: Float) @@ -248,7 +248,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) private val packetSplitThreshold = TestConductor(context.system).Settings.PacketSplitThreshold - startWith(PassThrough, Data(0, -1, Queue())) + startWith(PassThrough, Data(0, -1, immutable.Queue())) when(PassThrough) { case Event(s @ Send(_, _, _, msg), _) ⇒ @@ -258,8 +258,8 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) } when(Throttle) { - case Event(s: Send, data @ Data(_, _, Queue())) ⇒ - stay using sendThrottled(data.copy(lastSent = System.nanoTime, queue = Queue(s))) + case Event(s: Send, data @ Data(_, _, immutable.Queue())) ⇒ + stay using sendThrottled(data.copy(lastSent = System.nanoTime, queue = immutable.Queue(s))) case Event(s: Send, data) ⇒ stay using sendThrottled(data.copy(queue = data.queue.enqueue(s))) case Event(Tick, data) ⇒ @@ -286,7 +286,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) whenUnhandled { case Event(SetRate(rate), d) ⇒ if (rate > 0) { - goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = Queue()) + goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = immutable.Queue()) } else if (rate == 0) { goto(Blackhole) } else { @@ -328,7 +328,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) */ private def schedule(d: Data): (Data, Seq[Send], Option[FiniteDuration]) = { val now = System.nanoTime - @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[FiniteDuration]) = { + @tailrec def rec(d: Data, toSend: immutable.Seq[Send]): (Data, immutable.Seq[Send], Option[FiniteDuration]) = { if (d.queue.isEmpty) (d, toSend, None) else { val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong @@ -344,7 +344,7 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) } } } - rec(d, Seq()) + rec(d, Nil) } /** diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 2a3957d146..95bfab1ee5 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -4,20 +4,20 @@ package akka.remote.testconductor import language.postfixOps + +import java.util.concurrent.TimeoutException import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } -import RemoteConnection.getAddrString +import akka.remote.testconductor.RemoteConnection.getAddrString +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Await, Future } import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import scala.reflect.classTag import akka.util.Timeout import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent, WriteCompletionEvent, ExceptionEvent } -import com.typesafe.config.ConfigFactory -import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.concurrent.TimeoutException import akka.pattern.{ ask, pipe, AskTimeoutException } -import scala.util.control.NoStackTrace import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } -import scala.reflect.classTag -import concurrent.{ ExecutionContext, Await, Future } /** * The Player is the client component of the @@ -67,15 +67,13 @@ trait Player { this: TestConductorExt ⇒ * Enter the named barriers, one after the other, in the order given. Will * throw an exception in case of timeouts or other errors. */ - def enter(name: String*) { - enter(Settings.BarrierTimeout, name) - } + def enter(name: String*): Unit = enter(Settings.BarrierTimeout, name.to[immutable.Seq]) /** * Enter the named barriers, one after the other, in the order given. Will * throw an exception in case of timeouts or other errors. */ - def enter(timeout: Timeout, name: Seq[String]) { + def enter(timeout: Timeout, name: immutable.Seq[String]) { system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) val stop = Deadline.now + timeout.duration name foreach { b ⇒ diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index c40a4e5a7b..e79ab4a1ee 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -7,16 +7,17 @@ import language.implicitConversions import language.postfixOps import java.net.InetSocketAddress +import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } +import scala.concurrent.{ Await, Awaitable } +import scala.util.control.NonFatal +import scala.collection.immutable import akka.actor._ import akka.util.Timeout import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.RemoteActorRefProvider import akka.testkit._ -import scala.concurrent.{ Await, Awaitable } -import scala.util.control.NonFatal import scala.concurrent.duration._ -import java.util.concurrent.TimeoutException import akka.remote.testconductor.RoleName import akka.remote.testconductor.TestConductorTransport import akka.actor.RootActorPath @@ -30,7 +31,7 @@ abstract class MultiNodeConfig { private var _commonConf: Option[Config] = None private var _nodeConf = Map[RoleName, Config]() private var _roles = Vector[RoleName]() - private var _deployments = Map[RoleName, Seq[String]]() + private var _deployments = Map[RoleName, immutable.Seq[String]]() private var _allDeploy = Vector[String]() private var _testTransport = false @@ -106,9 +107,9 @@ abstract class MultiNodeConfig { configs reduce (_ withFallback _) } - private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy + private[testkit] def deployments(node: RoleName): immutable.Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy - private[testkit] def roles: Seq[RoleName] = _roles + private[testkit] def roles: immutable.Seq[RoleName] = _roles } @@ -234,7 +235,7 @@ object MultiNodeSpec { * `AskTimeoutException: sending to terminated ref breaks promises`. Using lazy * val is fine. */ -abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: Seq[RoleName], deployments: RoleName ⇒ Seq[String]) +abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: immutable.Seq[RoleName], deployments: RoleName ⇒ Seq[String]) extends TestKit(_system) with MultiNodeSpecCallbacks { import MultiNodeSpec._ @@ -294,7 +295,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: /** * All registered roles */ - def roles: Seq[RoleName] = _roles + def roles: immutable.Seq[RoleName] = _roles /** * TO BE DEFINED BY USER: Defines the number of participants required for starting the test. This @@ -335,9 +336,10 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: * Enter the named barriers in the order given. Use the remaining duration from * the innermost enclosing `within` block or the default `BarrierTimeout` */ - def enterBarrier(name: String*) { - testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name) - } + def enterBarrier(name: String*): Unit = + testConductor.enter( + Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), + name.to[immutable.Seq]) /** * Query the controller for the transport address of the given node (by role name) and diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4967978582..48fca0af08 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -70,7 +70,7 @@ class RemoteActorRefProvider( _transport = { val fqn = remoteSettings.RemoteTransport - val args = Seq( + val args = List( classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a65bac1a66..2a067237a7 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -8,7 +8,9 @@ import java.net.InetSocketAddress import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.Executors -import scala.collection.mutable.HashMap +import scala.collection.mutable +import scala.collection.immutable +import scala.util.control.NonFatal import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal } @@ -20,7 +22,6 @@ import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer } import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } -import scala.util.control.NonFatal import akka.actor.{ ExtendedActorSystem, Address, ActorRef } import com.google.protobuf.MessageLite @@ -53,7 +54,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * Construct a DefaultChannelPipeline from a sequence of handlers; to be used * in implementations of ChannelPipelineFactory. */ - def apply(handlers: Seq[ChannelHandler]): DefaultChannelPipeline = + def apply(handlers: immutable.Seq[ChannelHandler]): DefaultChannelPipeline = (new DefaultChannelPipeline /: handlers) { (p, h) ⇒ p.addLast(Logging.simpleName(h.getClass), h); p } /** @@ -69,7 +70,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * Construct a default protocol stack, excluding the “head” handler (i.e. the one which * actually dispatches the received messages to the local target actors). */ - def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = + def defaultStack(withTimeout: Boolean, isClient: Boolean): immutable.Seq[ChannelHandler] = (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: @@ -138,7 +139,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = PipelineFactory(Seq(endpoint), withTimeout, isClient) - private val remoteClients = new HashMap[Address, RemoteClient] + private val remoteClients = new mutable.HashMap[Address, RemoteClient] private val clientsLock = new ReentrantReadWriteLock override protected def useUntrustedMode = remoteSettings.UntrustedMode diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala b/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala index f049a4e678..b274c4c0b6 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala @@ -16,6 +16,7 @@ package akka.remote.security.provider import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSeedGenerator, RandomDotOrgSeedGenerator, DevRandomSeedGenerator } +import scala.collection.immutable /** * Internal API @@ -33,8 +34,8 @@ object InternetSeedGenerator { /**Singleton instance. */ private final val Instance: InternetSeedGenerator = new InternetSeedGenerator /**Delegate generators. */ - private final val Generators: Seq[SeedGenerator] = - Seq(new RandomDotOrgSeedGenerator, // first try the Internet seed generator + private final val Generators: immutable.Seq[SeedGenerator] = + List(new RandomDotOrgSeedGenerator, // first try the Internet seed generator new SecureRandomSeedGenerator) // this is last because it always works } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 520f2557c7..132e3f5e78 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -138,7 +138,7 @@ object TestActorRef { def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({ - system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({ + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Nil).recover({ case exception ⇒ throw ActorInitializationException(null, "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 1fdd339ca5..c46f15a26a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -6,14 +6,15 @@ package akka.testkit import language.existentials import scala.util.matching.Regex +import scala.collection.JavaConverters +import scala.collection.immutable +import scala.concurrent.duration.Duration +import scala.reflect.ClassTag import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } import akka.dispatch.{ SystemMessage, Terminate } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging import java.lang.{ Iterable ⇒ JIterable } -import scala.collection.JavaConverters -import scala.concurrent.duration.Duration -import scala.reflect.ClassTag import akka.actor.NoSerializationVerificationNeeded /** @@ -38,22 +39,22 @@ sealed trait TestEvent */ object TestEvent { object Mute { - def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) + def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to[immutable.Seq]) } - case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { + case class Mute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ - def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.toSeq) + def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq]) } object UnMute { - def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) + def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq]) } - case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { + case class UnMute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ - def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.toSeq) + def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq]) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 4f6744b74b..e81acb23a3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -5,15 +5,15 @@ package akka.testkit import language.postfixOps +import scala.annotation.{ varargs, tailrec } +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.reflect.ClassTag +import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic } +import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.actor.Actor._ -import scala.concurrent.duration._ -import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic } -import atomic.AtomicInteger -import scala.annotation.tailrec import akka.util.{ Timeout, BoxedType } -import scala.annotation.varargs -import scala.reflect.ClassTag object TestActor { type Ignore = Option[PartialFunction[Any, Boolean]] @@ -415,7 +415,7 @@ trait TestKitBase { /** * Same as `expectMsgAllOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllOf[T](obj: T*): Seq[T] = expectMsgAllOf_internal(remaining, obj: _*) + def expectMsgAllOf[T](obj: T*): immutable.Seq[T] = expectMsgAllOf_internal(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -430,19 +430,19 @@ trait TestKitBase { * expectMsgAllOf(1 second, Result1(), Result2()) * */ - def expectMsgAllOf[T](max: FiniteDuration, obj: T*): Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) + def expectMsgAllOf[T](max: FiniteDuration, obj: T*): immutable.Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) - private def expectMsgAllOf_internal[T](max: FiniteDuration, obj: T*): Seq[T] = { + private def expectMsgAllOf_internal[T](max: FiniteDuration, obj: T*): immutable.Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (x == _), "not found " + x)) recv foreach (x ⇒ assert(obj exists (x == _), "found unexpected " + x)) - recv.asInstanceOf[Seq[T]] + recv.asInstanceOf[immutable.Seq[T]] } /** * Same as `expectMsgAllClassOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllClassOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(remaining, obj: _*) + def expectMsgAllClassOf[T](obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllClassOf(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -452,19 +452,19 @@ trait TestKitBase { * Wait time is bounded by the given duration, with an AssertionFailure * being thrown in case of timeout. */ - def expectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) + def expectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) - private def internalExpectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) eq x.getClass), "found non-matching object " + x)) - recv.asInstanceOf[Seq[T]] + recv.asInstanceOf[immutable.Seq[T]] } /** * Same as `expectMsgAllConformingOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(remaining, obj: _*) + def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllConformingOf(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -477,13 +477,13 @@ trait TestKitBase { * Beware that one object may satisfy all given class constraints, which * may be counter-intuitive. */ - def expectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) + def expectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) - private def internalExpectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): immutable.Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (BoxedType(x) isInstance _), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) isInstance x), "found non-matching object " + x)) - recv.asInstanceOf[Seq[T]] + recv.asInstanceOf[immutable.Seq[T]] } /** @@ -520,7 +520,7 @@ trait TestKitBase { * assert(series == (1 to 7).toList) * */ - def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = { + def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): immutable.Seq[T] = { val stop = now + remainingOrDilated(max) var msg: Message = NullMessage @@ -553,14 +553,14 @@ trait TestKitBase { * Same as `receiveN(n, remaining)` but correctly taking into account * Duration.timeFactor. */ - def receiveN(n: Int): Seq[AnyRef] = receiveN_internal(n, remaining) + def receiveN(n: Int): immutable.Seq[AnyRef] = receiveN_internal(n, remaining) /** * Receive N messages in a row before the given deadline. */ - def receiveN(n: Int, max: FiniteDuration): Seq[AnyRef] = receiveN_internal(n, max.dilated) + def receiveN(n: Int, max: FiniteDuration): immutable.Seq[AnyRef] = receiveN_internal(n, max.dilated) - private def receiveN_internal(n: Int, max: Duration): Seq[AnyRef] = { + private def receiveN_internal(n: Int, max: Duration): immutable.Seq[AnyRef] = { val stop = max + now for { x ← 1 to n } yield { val timeout = stop - now diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 38d6d853af..ff8926154e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -4,14 +4,16 @@ import language.implicitConversions import akka.actor.ActorSystem import scala.concurrent.duration.{ Duration, FiniteDuration } -import java.util.concurrent.TimeUnit.MILLISECONDS import scala.reflect.ClassTag +import scala.collection.immutable +import java.util.concurrent.TimeUnit.MILLISECONDS package object testkit { def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit system: ActorSystem): T = { def now = System.currentTimeMillis - system.eventStream.publish(TestEvent.Mute(eventFilters.toSeq)) + system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq])) + try { val result = block @@ -23,7 +25,7 @@ package object testkit { result } finally { - system.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq)) + system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq])) } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index 5aecd341e0..27323787aa 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import scala.collection.JavaConverters; -import scala.collection.Seq; +import scala.collection.immutable.Seq; public class UntypedCoordinatedIncrementTest { private static ActorSystem system; @@ -110,6 +110,6 @@ public class UntypedCoordinatedIncrementTest { } public Seq seq(A... args) { - return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq(); + return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toList(); } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index 5aae61d9c1..3b841e300b 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import scala.collection.JavaConverters; -import scala.collection.Seq; +import scala.collection.immutable.Seq; public class UntypedTransactorTest { @@ -120,6 +120,6 @@ public class UntypedTransactorTest { public Seq seq(A... args) { return JavaConverters .collectionAsScalaIterableConverter(Arrays.asList(args)).asScala() - .toSeq(); + .toList(); } } diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index 1aa9fb555c..6dad1079a3 100644 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -6,12 +6,13 @@ package akka.transactor import org.scalatest.BeforeAndAfterAll -import akka.actor._ import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.stm._ +import scala.collection.immutable +import akka.actor._ import akka.util.Timeout import akka.testkit._ -import scala.concurrent.stm._ import akka.pattern.{ AskTimeoutException, ask } object CoordinatedIncrement { @@ -30,7 +31,7 @@ object CoordinatedIncrement { } """ - case class Increment(friends: Seq[ActorRef]) + case class Increment(friends: immutable.Seq[ActorRef]) case object GetCount class Counter(name: String) extends Actor { diff --git a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala index a087ab5b6d..eb75247164 100644 --- a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala @@ -8,21 +8,22 @@ import language.postfixOps import org.scalatest.BeforeAndAfterAll -import akka.actor._ import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.stm._ +import scala.collection.immutable +import scala.util.Random.{ nextInt ⇒ random } +import scala.util.control.NonFatal +import akka.actor._ import akka.testkit._ import akka.testkit.TestEvent.Mute -import scala.concurrent.stm._ -import scala.util.Random.{ nextInt ⇒ random } import java.util.concurrent.CountDownLatch import akka.pattern.{ AskTimeoutException, ask } import akka.util.Timeout -import scala.util.control.NonFatal object FickleFriends { - case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) - case class Increment(friends: Seq[ActorRef]) + case class FriendlyIncrement(friends: immutable.Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) + case class Increment(friends: immutable.Seq[ActorRef]) case object GetCount /** @@ -120,7 +121,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { "Coordinated fickle friends" should { "eventually succeed to increment all counters by one" in { - val ignoreExceptions = Seq( + val ignoreExceptions = immutable.Seq( EventFilter[ExpectedFailureException](), EventFilter[CoordinatedTransactionException](), EventFilter[AskTimeoutException]()) diff --git a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala index 4fd05a0e1e..0de8a13d97 100644 --- a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala @@ -7,15 +7,16 @@ package akka.transactor import language.postfixOps import akka.actor._ +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.stm._ import akka.util.Timeout import akka.testkit._ -import scala.concurrent.stm._ import akka.pattern.{ AskTimeoutException, ask } object TransactorIncrement { - case class Increment(friends: Seq[ActorRef], latch: TestLatch) + case class Increment(friends: immutable.Seq[ActorRef], latch: TestLatch) case object GetCount class Counter(name: String) extends Transactor { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 20c8b2d723..a9efa56c1e 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -6,13 +6,14 @@ package akka.zeromq import org.zeromq.ZMQ.{ Socket, Poller } import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ +import scala.collection.immutable +import scala.annotation.tailrec import scala.concurrent.{ Promise, Future } import scala.concurrent.duration.Duration -import scala.annotation.tailrec import scala.collection.mutable.ListBuffer +import scala.util.control.NonFatal import akka.event.Logging import java.util.concurrent.TimeUnit -import scala.util.control.NonFatal private[zeromq] object ConcurrentSocketActor { private sealed trait PollMsg @@ -25,7 +26,7 @@ private[zeromq] object ConcurrentSocketActor { private val DefaultContext = Context() } -private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends Actor { +private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) extends Actor { import ConcurrentSocketActor._ private val noBytes = Array[Byte]() @@ -40,7 +41,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A private val socket: Socket = zmqContext.socket(socketType) private val poller: Poller = zmqContext.poller - private val pendingSends = new ListBuffer[Seq[Frame]] + private val pendingSends = new ListBuffer[immutable.Seq[Frame]] def receive = { case m: PollMsg ⇒ doPoll(m) @@ -151,7 +152,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A } } finally notifyListener(Closed) - @tailrec private def flushMessage(i: Seq[Frame]): Boolean = + @tailrec private def flushMessage(i: immutable.Seq[Frame]): Boolean = if (i.isEmpty) true else { @@ -198,7 +199,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A case frames ⇒ notifyListener(deserializer(frames)); doPoll(mode, togo - 1) } - @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = + @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): immutable.Seq[Frame] = if (mode == PollCareful && (poller.poll(0) <= 0)) { if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!") } else { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala index f935a9c31d..b70c245327 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala @@ -4,9 +4,10 @@ package akka.zeromq import com.google.protobuf.Message -import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor.ActorRef import scala.concurrent.duration._ +import scala.collection.immutable +import org.zeromq.{ ZMQ ⇒ JZMQ } import org.zeromq.ZMQ.{ Poller, Socket } /** @@ -36,7 +37,7 @@ sealed trait SocketConnectOption extends SocketOption { * A base trait for pubsub options for the ZeroMQ socket */ sealed trait PubSubOption extends SocketOption { - def payload: Seq[Byte] + def payload: immutable.Seq[Byte] } /** @@ -79,7 +80,7 @@ class Context(numIoThreads: Int) extends SocketMeta { * A base trait for message deserializers */ trait Deserializer extends SocketOption { - def apply(frames: Seq[Frame]): Any + def apply(frames: immutable.Seq[Frame]): Any } /** @@ -172,12 +173,12 @@ case class Bind(endpoint: String) extends SocketConnectOption * * @param payload the topic to subscribe to */ -case class Subscribe(payload: Seq[Byte]) extends PubSubOption { - def this(topic: String) = this(topic.getBytes("UTF-8")) +case class Subscribe(payload: immutable.Seq[Byte]) extends PubSubOption { + def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq]) } object Subscribe { def apply(topic: String): Subscribe = new Subscribe(topic) - val all = Subscribe(Seq.empty) + val all = Subscribe("") } /** @@ -189,8 +190,8 @@ object Subscribe { * * @param payload */ -case class Unsubscribe(payload: Seq[Byte]) extends PubSubOption { - def this(topic: String) = this(topic.getBytes("UTF-8")) +case class Unsubscribe(payload: immutable.Seq[Byte]) extends PubSubOption { + def this(topic: String) = this(topic.getBytes("UTF-8").to[immutable.Seq]) } object Unsubscribe { def apply(topic: String): Unsubscribe = new Unsubscribe(topic) @@ -200,17 +201,17 @@ object Unsubscribe { * Send a message over the zeromq socket * @param frames */ -case class Send(frames: Seq[Frame]) extends Request +case class Send(frames: immutable.Seq[Frame]) extends Request /** * A message received over the zeromq socket * @param frames */ -case class ZMQMessage(frames: Seq[Frame]) { +case class ZMQMessage(frames: immutable.Seq[Frame]) { - def this(frame: Frame) = this(Seq(frame)) - def this(frame1: Frame, frame2: Frame) = this(Seq(frame1, frame2)) - def this(frameArray: Array[Frame]) = this(frameArray.toSeq) + def this(frame: Frame) = this(List(frame)) + def this(frame1: Frame, frame2: Frame) = this(List(frame1, frame2)) + def this(frameArray: Array[Frame]) = this(frameArray.to[immutable.Seq]) /** * Convert the bytes in the first frame to a String, using specified charset. @@ -224,8 +225,9 @@ case class ZMQMessage(frames: Seq[Frame]) { def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray } object ZMQMessage { - def apply(bytes: Array[Byte]): ZMQMessage = ZMQMessage(Seq(Frame(bytes))) - def apply(message: Message): ZMQMessage = ZMQMessage(message.toByteArray) + def apply(bytes: Array[Byte]): ZMQMessage = new ZMQMessage(List(Frame(bytes))) + def apply(frames: Frame*): ZMQMessage = new ZMQMessage(frames.to[immutable.Seq]) + def apply(message: Message): ZMQMessage = apply(message.toByteArray) } /** diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala index 2d41424e88..d0141bf515 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala @@ -3,7 +3,10 @@ */ package akka.zeromq +import scala.collection.immutable + object Frame { + def apply(bytes: Array[Byte]): Frame = new Frame(bytes) def apply(text: String): Frame = new Frame(text) } @@ -11,8 +14,8 @@ object Frame { * A single message frame of a zeromq message * @param payload */ -case class Frame(payload: Seq[Byte]) { - def this(bytes: Array[Byte]) = this(bytes.toSeq) +case class Frame(payload: immutable.Seq[Byte]) { + def this(bytes: Array[Byte]) = this(bytes.to[immutable.Seq]) def this(text: String) = this(text.getBytes("UTF-8")) } @@ -20,5 +23,5 @@ case class Frame(payload: Seq[Byte]) { * Deserializes ZeroMQ messages into an immutable sequence of frames */ class ZMQMessageDeserializer extends Deserializer { - def apply(frames: Seq[Frame]): ZMQMessage = ZMQMessage(frames) + def apply(frames: immutable.Seq[Frame]): ZMQMessage = ZMQMessage(frames) } diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 85e05e54f3..bc40ea580b 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -7,6 +7,7 @@ import org.zeromq.{ ZMQ ⇒ JZMQ } import org.zeromq.ZMQ.Poller import akka.actor._ import akka.pattern.ask +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit @@ -66,7 +67,8 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { case s: SocketType.ZMQSocketType ⇒ true case _ ⇒ false }, "A socket type is required") - Props(new ConcurrentSocketActor(socketParameters)).withDispatcher("akka.zeromq.socket-dispatcher") + val params = socketParameters.to[immutable.Seq] + Props(new ConcurrentSocketActor(params)).withDispatcher("akka.zeromq.socket-dispatcher") } /** diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 68123761c5..6feaffd6d6 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -51,7 +51,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable { var number = 0 def run() { - publisher ! ZMQMessage(Seq(Frame(number.toString.getBytes), Frame(Seq()))) + publisher ! ZMQMessage(Frame(number.toString), Frame(Nil)) number += 1 } }) @@ -88,8 +88,8 @@ class ConcurrentSocketActorSpec extends AkkaSpec { try { replierProbe.expectMsg(Connecting) - val request = ZMQMessage(Seq(Frame("Request"))) - val reply = ZMQMessage(Seq(Frame("Reply"))) + val request = ZMQMessage(Frame("Request")) + val reply = ZMQMessage(Frame("Reply")) requester ! request replierProbe.expectMsg(request) @@ -112,7 +112,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { try { pullerProbe.expectMsg(Connecting) - val message = ZMQMessage(Seq(Frame("Pushed message"))) + val message = ZMQMessage(Frame("Pushed message")) pusher ! message pullerProbe.expectMsg(message)