Switching over to immutable.Iterable where possible
This commit is contained in:
parent
8f131c680f
commit
b00c47854b
44 changed files with 385 additions and 369 deletions
|
|
@ -15,7 +15,8 @@ public class CustomRouteTest {
|
||||||
// only to test compilability
|
// only to test compilability
|
||||||
public void testRoute() {
|
public void testRoute() {
|
||||||
final ActorRef ref = system.actorOf(new Props().withRouter(new RoundRobinRouter(1)));
|
final ActorRef ref = system.actorOf(new Props().withRouter(new RoundRobinRouter(1)));
|
||||||
final scala.Function1<scala.Tuple2<ActorRef, Object>, scala.collection.Iterable<Destination>> route = ExtractRoute.apply(ref);
|
final scala.Function1<scala.Tuple2<ActorRef, Object>,
|
||||||
|
scala.collection.immutable.Iterable<Destination>> route = ExtractRoute.apply(ref);
|
||||||
route.apply(null);
|
route.apply(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,15 @@
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
|
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.collection.JavaConverters
|
import java.util.concurrent.{ RejectedExecutionException, ConcurrentLinkedQueue }
|
||||||
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
|
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
|
|
@ -102,8 +103,6 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
||||||
}
|
}
|
||||||
|
|
||||||
"run termination callbacks in order" in {
|
"run termination callbacks in order" in {
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
|
|
||||||
val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf)
|
val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf)
|
||||||
val result = new ConcurrentLinkedQueue[Int]
|
val result = new ConcurrentLinkedQueue[Int]
|
||||||
val count = 10
|
val count = 10
|
||||||
|
|
@ -121,13 +120,11 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
||||||
Await.ready(latch, 5 seconds)
|
Await.ready(latch, 5 seconds)
|
||||||
|
|
||||||
val expected = (for (i ← 1 to count) yield i).reverse
|
val expected = (for (i ← 1 to count) yield i).reverse
|
||||||
result.asScala.toSeq must be(expected)
|
|
||||||
|
|
||||||
|
immutableSeq(result) must be(expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
"awaitTermination after termination callbacks" in {
|
"awaitTermination after termination callbacks" in {
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
|
|
||||||
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
||||||
@volatile
|
@volatile
|
||||||
var callbackWasRun = false
|
var callbackWasRun = false
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ class CustomRouteSpec extends AkkaSpec {
|
||||||
provider.createRoutees(1)
|
provider.createRoutees(1)
|
||||||
|
|
||||||
{
|
{
|
||||||
case (sender, message: String) ⇒ Seq(Destination(sender, target))
|
case (sender, message: String) ⇒ List(Destination(sender, target))
|
||||||
case (sender, message) ⇒ toAll(sender, provider.routees)
|
case (sender, message) ⇒ toAll(sender, provider.routees)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -43,8 +43,8 @@ class CustomRouteSpec extends AkkaSpec {
|
||||||
val r = Await.result(router.ask(CurrentRoutees)(1 second).
|
val r = Await.result(router.ask(CurrentRoutees)(1 second).
|
||||||
mapTo[RouterRoutees], 1 second)
|
mapTo[RouterRoutees], 1 second)
|
||||||
r.routees.size must be(1)
|
r.routees.size must be(1)
|
||||||
route(testActor -> "hallo") must be(Seq(Destination(testActor, target)))
|
route(testActor -> "hallo") must be(List(Destination(testActor, target)))
|
||||||
route(testActor -> 12) must be(Seq(Destination(testActor, r.routees.head)))
|
route(testActor -> 12) must be(List(Destination(testActor, r.routees.head)))
|
||||||
//#test-route
|
//#test-route
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,9 @@ import akka.testkit.TestEvent._
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import scala.collection.immutable
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import java.util.concurrent.TimeoutException
|
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
object ResizerSpec {
|
object ResizerSpec {
|
||||||
|
|
@ -61,10 +60,10 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
||||||
lowerBound = 2,
|
lowerBound = 2,
|
||||||
upperBound = 3)
|
upperBound = 3)
|
||||||
|
|
||||||
val c1 = resizer.capacity(IndexedSeq.empty[ActorRef])
|
val c1 = resizer.capacity(immutable.IndexedSeq.empty[ActorRef])
|
||||||
c1 must be(2)
|
c1 must be(2)
|
||||||
|
|
||||||
val current = IndexedSeq(system.actorOf(Props[TestActor]), system.actorOf(Props[TestActor]))
|
val current = immutable.IndexedSeq(system.actorOf(Props[TestActor]), system.actorOf(Props[TestActor]))
|
||||||
val c2 = resizer.capacity(current)
|
val c2 = resizer.capacity(current)
|
||||||
c2 must be(0)
|
c2 must be(0)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import language.postfixOps
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import scala.collection.mutable.LinkedList
|
import scala.collection.immutable
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
|
@ -17,7 +17,7 @@ import akka.pattern.{ ask, pipe }
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
import akka.util.Timeout
|
import akka.util.Collections.EmptyImmutableSeq
|
||||||
|
|
||||||
object RoutingSpec {
|
object RoutingSpec {
|
||||||
|
|
||||||
|
|
@ -54,11 +54,10 @@ object RoutingSpec {
|
||||||
class MyRouter(config: Config) extends RouterConfig {
|
class MyRouter(config: Config) extends RouterConfig {
|
||||||
val foo = config.getString("foo")
|
val foo = config.getString("foo")
|
||||||
def createRoute(routeeProvider: RouteeProvider): Route = {
|
def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||||
val routees = IndexedSeq(routeeProvider.context.actorOf(Props[Echo]))
|
routeeProvider.registerRoutees(List(routeeProvider.context.actorOf(Props[Echo])))
|
||||||
routeeProvider.registerRoutees(routees)
|
|
||||||
|
|
||||||
{
|
{
|
||||||
case (sender, message) ⇒ Nil
|
case (sender, message) ⇒ EmptyImmutableSeq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
|
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
|
||||||
|
|
@ -251,15 +250,15 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
||||||
val doneLatch = new TestLatch(connectionCount)
|
val doneLatch = new TestLatch(connectionCount)
|
||||||
|
|
||||||
//lets create some connections.
|
//lets create some connections.
|
||||||
var actors = new LinkedList[ActorRef]
|
@volatile var actors = immutable.IndexedSeq[ActorRef]()
|
||||||
var counters = new LinkedList[AtomicInteger]
|
@volatile var counters = immutable.IndexedSeq[AtomicInteger]()
|
||||||
for (i ← 0 until connectionCount) {
|
for (i ← 0 until connectionCount) {
|
||||||
counters = counters :+ new AtomicInteger()
|
counters = counters :+ new AtomicInteger()
|
||||||
|
|
||||||
val actor = system.actorOf(Props(new Actor {
|
val actor = system.actorOf(Props(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
case msg: Int ⇒ counters(i).addAndGet(msg)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
actors = actors :+ actor
|
actors = actors :+ actor
|
||||||
|
|
@ -278,10 +277,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
||||||
//now wait some and do validations.
|
//now wait some and do validations.
|
||||||
Await.ready(doneLatch, remaining)
|
Await.ready(doneLatch, remaining)
|
||||||
|
|
||||||
for (i ← 0 until connectionCount) {
|
for (i ← 0 until connectionCount)
|
||||||
val counter = counters.get(i).get
|
counters(i).get must be((iterationCount * (i + 1)))
|
||||||
counter.get must be((iterationCount * (i + 1)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"deliver a broadcast message using the !" in {
|
"deliver a broadcast message using the !" in {
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import scala.collection.Seq;
|
||||||
public class JAPI {
|
public class JAPI {
|
||||||
|
|
||||||
public static <T> Seq<T> seq(T... ts) {
|
public static <T> Seq<T> seq(T... ts) {
|
||||||
return Util.arrayToSeq(ts);
|
return Util.immutableSeq(ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.actor
|
||||||
|
|
||||||
import java.io.{ ObjectOutputStream, NotSerializableException }
|
import java.io.{ ObjectOutputStream, NotSerializableException }
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable.TreeSet
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.actor.dungeon.ChildrenContainer
|
import akka.actor.dungeon.ChildrenContainer
|
||||||
|
|
@ -108,7 +108,7 @@ trait ActorContext extends ActorRefFactory {
|
||||||
* val goodLookup = context.actorFor("kid")
|
* val goodLookup = context.actorFor("kid")
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
def children: Iterable[ActorRef]
|
def children: immutable.Iterable[ActorRef]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the child with the given name if it exists.
|
* Get the child with the given name if it exists.
|
||||||
|
|
@ -287,7 +287,7 @@ private[akka] object ActorCell {
|
||||||
|
|
||||||
final val emptyBehaviorStack: List[Actor.Receive] = Nil
|
final val emptyBehaviorStack: List[Actor.Receive] = Nil
|
||||||
|
|
||||||
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
|
final val emptyActorRefSet: Set[ActorRef] = immutable.TreeSet.empty
|
||||||
}
|
}
|
||||||
|
|
||||||
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
|
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package akka.actor
|
package akka.actor
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
import scala.collection.immutable
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
import java.net.MalformedURLException
|
import java.net.MalformedURLException
|
||||||
|
|
||||||
object ActorPath {
|
object ActorPath {
|
||||||
|
|
@ -20,6 +22,8 @@ object ActorPath {
|
||||||
* http://www.ietf.org/rfc/rfc2396.txt
|
* http://www.ietf.org/rfc/rfc2396.txt
|
||||||
*/
|
*/
|
||||||
val ElementRegex = """(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})*""".r
|
val ElementRegex = """(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})*""".r
|
||||||
|
|
||||||
|
private[akka] final val emptyActorPath: immutable.Iterable[String] = List("")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -68,23 +72,18 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
|
||||||
/**
|
/**
|
||||||
* ''Java API'': Recursively create a descendant’s path by appending all child names.
|
* ''Java API'': Recursively create a descendant’s path by appending all child names.
|
||||||
*/
|
*/
|
||||||
def descendant(names: java.lang.Iterable[String]): ActorPath = {
|
def descendant(names: java.lang.Iterable[String]): ActorPath = /(immutableSeq(names))
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
/(names.asScala)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sequence of names for this path from root to this. Performance implication: has to allocate a list.
|
* Sequence of names for this path from root to this. Performance implication: has to allocate a list.
|
||||||
*/
|
*/
|
||||||
def elements: Iterable[String]
|
def elements: immutable.Iterable[String]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ''Java API'': Sequence of names for this path from root to this. Performance implication: has to allocate a list.
|
* ''Java API'': Sequence of names for this path from root to this. Performance implication: has to allocate a list.
|
||||||
*/
|
*/
|
||||||
def getElements: java.lang.Iterable[String] = {
|
def getElements: java.lang.Iterable[String] =
|
||||||
import scala.collection.JavaConverters._
|
scala.collection.JavaConverters.asJavaIterableConverter(elements).asJava
|
||||||
elements.asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Walk up the tree to obtain and return the RootActorPath.
|
* Walk up the tree to obtain and return the RootActorPath.
|
||||||
|
|
@ -112,7 +111,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
|
||||||
|
|
||||||
override def /(child: String): ActorPath = new ChildActorPath(this, child)
|
override def /(child: String): ActorPath = new ChildActorPath(this, child)
|
||||||
|
|
||||||
override val elements: Iterable[String] = List("")
|
override def elements: immutable.Iterable[String] = ActorPath.emptyActorPath
|
||||||
|
|
||||||
override val toString: String = address + name
|
override val toString: String = address + name
|
||||||
|
|
||||||
|
|
@ -134,9 +133,9 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto
|
||||||
|
|
||||||
override def /(child: String): ActorPath = new ChildActorPath(this, child)
|
override def /(child: String): ActorPath = new ChildActorPath(this, child)
|
||||||
|
|
||||||
override def elements: Iterable[String] = {
|
override def elements: immutable.Iterable[String] = {
|
||||||
@tailrec
|
@tailrec
|
||||||
def rec(p: ActorPath, acc: List[String]): Iterable[String] = p match {
|
def rec(p: ActorPath, acc: List[String]): immutable.Iterable[String] = p match {
|
||||||
case r: RootActorPath ⇒ acc
|
case r: RootActorPath ⇒ acc
|
||||||
case _ ⇒ rec(p.parent, p.name :: acc)
|
case _ ⇒ rec(p.parent, p.name :: acc)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,9 @@ import akka.dispatch._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import akka.event._
|
import akka.event._
|
||||||
import akka.util.{ Switch, Helpers }
|
import akka.util.{ Switch, Helpers }
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
|
import akka.util.Collections.EmptyImmutableSeq
|
||||||
import scala.util.{ Success, Failure }
|
import scala.util.{ Success, Failure }
|
||||||
import scala.util.control.NonFatal
|
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
|
|
@ -271,10 +272,7 @@ trait ActorRefFactory {
|
||||||
*
|
*
|
||||||
* For maximum performance use a collection with efficient head & tail operations.
|
* For maximum performance use a collection with efficient head & tail operations.
|
||||||
*/
|
*/
|
||||||
def actorFor(path: java.lang.Iterable[String]): ActorRef = {
|
def actorFor(path: java.lang.Iterable[String]): ActorRef = provider.actorFor(lookupRoot, immutableSeq(path))
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
provider.actorFor(lookupRoot, path.asScala)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an [[akka.actor.ActorSelection]] from the given path, which is
|
* Construct an [[akka.actor.ActorSelection]] from the given path, which is
|
||||||
|
|
@ -480,7 +478,7 @@ class LocalActorRefProvider(
|
||||||
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
||||||
|
|
||||||
private def guardianSupervisorStrategyConfigurator =
|
private def guardianSupervisorStrategyConfigurator =
|
||||||
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Nil).get
|
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, EmptyImmutableSeq).get
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.actor
|
||||||
|
|
||||||
import akka.event._
|
import akka.event._
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import akka.pattern.ask
|
import akka.japi.Util.immutableSeq
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
@ -144,7 +144,7 @@ object ActorSystem {
|
||||||
|
|
||||||
final val LogLevel: String = getString("akka.loglevel")
|
final val LogLevel: String = getString("akka.loglevel")
|
||||||
final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
|
final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
|
||||||
final val EventHandlers: immutable.Seq[String] = getStringList("akka.event-handlers").asScala.to[Vector]
|
final val EventHandlers: immutable.Seq[String] = immutableSeq(getStringList("akka.event-handlers"))
|
||||||
final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
||||||
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
|
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
|
||||||
|
|
||||||
|
|
@ -273,8 +273,7 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
/**
|
/**
|
||||||
* ''Java API'': Recursively create a descendant’s path by appending all child names.
|
* ''Java API'': Recursively create a descendant’s path by appending all child names.
|
||||||
*/
|
*/
|
||||||
def descendant(names: java.lang.Iterable[String]): ActorPath =
|
def descendant(names: java.lang.Iterable[String]): ActorPath = /(immutableSeq(names))
|
||||||
/(scala.collection.JavaConverters.iterableAsScalaIterableConverter(names).asScala)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start-up time in milliseconds since the epoch.
|
* Start-up time in milliseconds since the epoch.
|
||||||
|
|
@ -674,8 +673,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null
|
def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null
|
||||||
|
|
||||||
private def loadExtensions() {
|
private def loadExtensions() {
|
||||||
scala.collection.JavaConverters.collectionAsScalaIterableConverter(
|
immutableSeq(settings.config.getStringList("akka.extensions")) foreach { fqcn ⇒
|
||||||
settings.config.getStringList("akka.extensions")).asScala foreach { fqcn ⇒
|
|
||||||
dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
|
dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
|
||||||
case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup())
|
case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup())
|
||||||
case Success(p: ExtensionId[_]) ⇒ registerExtension(p)
|
case Success(p: ExtensionId[_]) ⇒ registerExtension(p)
|
||||||
|
|
|
||||||
|
|
@ -120,13 +120,12 @@ object AddressFromURIString {
|
||||||
* Given an ActorPath it returns the Address and the path elements if the path is well-formed
|
* Given an ActorPath it returns the Address and the path elements if the path is well-formed
|
||||||
*/
|
*/
|
||||||
object ActorPathExtractor extends PathUtils {
|
object ActorPathExtractor extends PathUtils {
|
||||||
def unapply(addr: String): Option[(Address, Iterable[String])] =
|
def unapply(addr: String): Option[(Address, immutable.Iterable[String])] =
|
||||||
try {
|
try {
|
||||||
val uri = new URI(addr)
|
val uri = new URI(addr)
|
||||||
if (uri.getRawPath == null) None
|
uri.getRawPath match {
|
||||||
else AddressFromURIString.unapply(uri) match {
|
case null ⇒ None
|
||||||
case None ⇒ None
|
case path ⇒ AddressFromURIString.unapply(uri).map((_, split(path).drop(1)))
|
||||||
case Some(addr) ⇒ Some((addr, split(uri.getRawPath).drop(1)))
|
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case _: URISyntaxException ⇒ None
|
case _: URISyntaxException ⇒ None
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,11 @@ package akka.actor
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import com.typesafe.config._
|
import com.typesafe.config._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
import java.util.concurrent.{ TimeUnit }
|
import java.util.concurrent.{ TimeUnit }
|
||||||
import akka.util.WildcardTree
|
import akka.util.WildcardTree
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents deployment configuration for a given actor path. It is
|
* This class represents deployment configuration for a given actor path. It is
|
||||||
|
|
@ -141,7 +142,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
|
||||||
|
|
||||||
val deployment = config.withFallback(default)
|
val deployment = config.withFallback(default)
|
||||||
|
|
||||||
val routees = Vector() ++ deployment.getStringList("routees.paths").asScala
|
val routees = immutableSeq(deployment.getStringList("routees.paths"))
|
||||||
|
|
||||||
val nrOfInstances = deployment.getInt("nr-of-instances")
|
val nrOfInstances = deployment.getInt("nr-of-instances")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,9 +7,11 @@ import language.implicitConversions
|
||||||
|
|
||||||
import java.lang.{ Iterable ⇒ JIterable }
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
|
|
@ -171,7 +173,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
* Implicit conversion from `Seq` of Throwables to a `Decider`.
|
* Implicit conversion from `Seq` of Throwables to a `Decider`.
|
||||||
* This maps the given Throwables to restarts, otherwise escalates.
|
* This maps the given Throwables to restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
implicit def seqThrowable2Decider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit)
|
implicit def seqThrowable2Decider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
||||||
|
|
||||||
type Decider = PartialFunction[Throwable, Directive]
|
type Decider = PartialFunction[Throwable, Directive]
|
||||||
type JDecider = akka.japi.Function[Throwable, Directive]
|
type JDecider = akka.japi.Function[Throwable, Directive]
|
||||||
|
|
@ -181,22 +183,15 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
* Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
def makeDecider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = makeImmutableDecider(trapExit)
|
def makeDecider(trapExit: immutable.Seq[Class[_ <: Throwable]]): Decider = {
|
||||||
|
case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider =
|
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(immutableSeq(trapExit))
|
||||||
makeImmutableDecider(scala.collection.JavaConverters.iterableAsScalaIterableConverter(trapExit).asScala)
|
|
||||||
|
|
||||||
private[this] def makeImmutableDecider(trapExit: Iterable[Class[_]]): Decider = {
|
|
||||||
val traps = trapExit match { // This is the sad, awkward, truth
|
|
||||||
case s: immutable.Seq[_] ⇒ s.asInstanceOf[immutable.Seq[Class[_]]]
|
|
||||||
case other ⇒ other.to[immutable.Seq]
|
|
||||||
}
|
|
||||||
|
|
||||||
{ case x ⇒ if (traps exists (_ isInstance x)) Restart else Escalate }
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decider builder for Iterables of cause-directive pairs, e.g. a map obtained
|
* Decider builder for Iterables of cause-directive pairs, e.g. a map obtained
|
||||||
|
|
@ -228,7 +223,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
case x ⇒ buf insert (x, ca)
|
case x ⇒ buf insert (x, ca)
|
||||||
}
|
}
|
||||||
buf
|
buf
|
||||||
}.to[immutable.Seq]
|
}.to[immutable.IndexedSeq]
|
||||||
|
|
||||||
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
|
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
|
||||||
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
|
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
|
||||||
|
|
|
||||||
|
|
@ -203,8 +203,8 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def close(): Unit = {
|
override def close(): Unit = {
|
||||||
import scala.collection.JavaConverters._
|
val i = hashedWheelTimer.stop().iterator()
|
||||||
hashedWheelTimer.stop().asScala foreach execDirectly
|
while (i.hasNext) execDirectly(i.next())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import scala.concurrent.duration.Duration
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.concurrent.{ Await, Future }
|
import scala.concurrent.{ Await, Future }
|
||||||
import akka.japi.{ Creator, Option ⇒ JOption }
|
import akka.japi.{ Creator, Option ⇒ JOption }
|
||||||
|
import akka.japi.Util.{ immutableSeq, immutableSingletonSeq }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.util.Reflect.instantiator
|
import akka.util.Reflect.instantiator
|
||||||
import akka.serialization.{ JavaSerializer, SerializationExtension }
|
import akka.serialization.{ JavaSerializer, SerializationExtension }
|
||||||
|
|
@ -442,7 +443,7 @@ object TypedProps {
|
||||||
* or a sequence containing only itself, if itself is an interface.
|
* or a sequence containing only itself, if itself is an interface.
|
||||||
*/
|
*/
|
||||||
def extractInterfaces(clazz: Class[_]): immutable.Seq[Class[_]] =
|
def extractInterfaces(clazz: Class[_]): immutable.Seq[Class[_]] =
|
||||||
if (clazz.isInterface) List[Class[_]](clazz) else clazz.getInterfaces.to[List]
|
if (clazz.isInterface) immutableSingletonSeq(clazz) else immutableSeq(clazz.getInterfaces)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Uses the supplied class as the factory for the TypedActor implementation,
|
* Uses the supplied class as the factory for the TypedActor implementation,
|
||||||
|
|
|
||||||
|
|
@ -5,14 +5,12 @@
|
||||||
package akka.actor.dungeon
|
package akka.actor.dungeon
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.JavaConverters.asJavaIterableConverter
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
import scala.collection.immutable
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.actor.ActorCell
|
|
||||||
import akka.actor.ActorPath.ElementRegex
|
import akka.actor.ActorPath.ElementRegex
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import akka.util.{ Unsafe, Helpers }
|
import akka.util.{ Unsafe, Helpers }
|
||||||
import akka.actor.ChildNameReserved
|
|
||||||
|
|
||||||
private[akka] trait Children { this: ActorCell ⇒
|
private[akka] trait Children { this: ActorCell ⇒
|
||||||
|
|
||||||
|
|
@ -24,8 +22,9 @@ private[akka] trait Children { this: ActorCell ⇒
|
||||||
def childrenRefs: ChildrenContainer =
|
def childrenRefs: ChildrenContainer =
|
||||||
Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer]
|
Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer]
|
||||||
|
|
||||||
final def children: Iterable[ActorRef] = childrenRefs.children
|
final def children: immutable.Iterable[ActorRef] = childrenRefs.children
|
||||||
final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava
|
final def getChildren(): java.lang.Iterable[ActorRef] =
|
||||||
|
scala.collection.JavaConverters.asJavaIterableConverter(children).asJava
|
||||||
|
|
||||||
final def child(name: String): Option[ActorRef] = Option(getChild(name))
|
final def child(name: String): Option[ActorRef] = Option(getChild(name))
|
||||||
final def getChild(name: String): ActorRef = childrenRefs.getByName(name) match {
|
final def getChild(name: String): ActorRef = childrenRefs.getByName(name) match {
|
||||||
|
|
@ -141,7 +140,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
||||||
|
|
||||||
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
|
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
|
||||||
|
|
||||||
protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats
|
protected def getAllChildStats: immutable.Iterable[ChildRestartStats] = childrenRefs.stats
|
||||||
|
|
||||||
protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
|
protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
|
||||||
childrenRefs match {
|
childrenRefs match {
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,11 @@
|
||||||
|
|
||||||
package akka.actor.dungeon
|
package akka.actor.dungeon
|
||||||
|
|
||||||
import scala.collection.immutable.TreeMap
|
import scala.collection.immutable
|
||||||
|
|
||||||
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
|
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
|
||||||
import akka.dispatch.SystemMessage
|
import akka.dispatch.SystemMessage
|
||||||
|
import akka.util.Collections.{ EmptyImmutableSeq, PartialImmutableValuesIterable }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -20,8 +21,8 @@ private[akka] trait ChildrenContainer {
|
||||||
def getByName(name: String): Option[ChildStats]
|
def getByName(name: String): Option[ChildStats]
|
||||||
def getByRef(actor: ActorRef): Option[ChildRestartStats]
|
def getByRef(actor: ActorRef): Option[ChildRestartStats]
|
||||||
|
|
||||||
def children: Iterable[ActorRef]
|
def children: immutable.Iterable[ActorRef]
|
||||||
def stats: Iterable[ChildRestartStats]
|
def stats: immutable.Iterable[ChildRestartStats]
|
||||||
|
|
||||||
def shallDie(actor: ActorRef): ChildrenContainer
|
def shallDie(actor: ActorRef): ChildrenContainer
|
||||||
|
|
||||||
|
|
@ -49,6 +50,18 @@ private[akka] object ChildrenContainer {
|
||||||
case class Creation() extends SuspendReason with WaitingForChildren
|
case class Creation() extends SuspendReason with WaitingForChildren
|
||||||
case object Termination extends SuspendReason
|
case object Termination extends SuspendReason
|
||||||
|
|
||||||
|
class ChildRestartsIterable(stats: immutable.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ChildRestartStats] {
|
||||||
|
override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats]
|
||||||
|
override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats]
|
||||||
|
override final def valuesIterator = stats.valuesIterator
|
||||||
|
}
|
||||||
|
|
||||||
|
class ChildrenIterable(stats: immutable.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ActorRef] {
|
||||||
|
override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats].child
|
||||||
|
override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats]
|
||||||
|
override final def valuesIterator = stats.valuesIterator
|
||||||
|
}
|
||||||
|
|
||||||
trait WaitingForChildren {
|
trait WaitingForChildren {
|
||||||
private var todo: SystemMessage = null
|
private var todo: SystemMessage = null
|
||||||
def enqueue(message: SystemMessage) = { message.next = todo; todo = message }
|
def enqueue(message: SystemMessage) = { message.next = todo; todo = message }
|
||||||
|
|
@ -56,13 +69,13 @@ private[akka] object ChildrenContainer {
|
||||||
}
|
}
|
||||||
|
|
||||||
trait EmptyChildrenContainer extends ChildrenContainer {
|
trait EmptyChildrenContainer extends ChildrenContainer {
|
||||||
val emptyStats = TreeMap.empty[String, ChildStats]
|
val emptyStats = immutable.TreeMap.empty[String, ChildStats]
|
||||||
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats))
|
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats))
|
||||||
override def remove(child: ActorRef): ChildrenContainer = this
|
override def remove(child: ActorRef): ChildrenContainer = this
|
||||||
override def getByName(name: String): Option[ChildRestartStats] = None
|
override def getByName(name: String): Option[ChildRestartStats] = None
|
||||||
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
|
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
|
||||||
override def children: Iterable[ActorRef] = Nil
|
override def children: immutable.Iterable[ActorRef] = EmptyImmutableSeq
|
||||||
override def stats: Iterable[ChildRestartStats] = Nil
|
override def stats: immutable.Iterable[ChildRestartStats] = EmptyImmutableSeq
|
||||||
override def shallDie(actor: ActorRef): ChildrenContainer = this
|
override def shallDie(actor: ActorRef): ChildrenContainer = this
|
||||||
override def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
|
override def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
|
||||||
override def unreserve(name: String): ChildrenContainer = this
|
override def unreserve(name: String): ChildrenContainer = this
|
||||||
|
|
@ -95,7 +108,7 @@ private[akka] object ChildrenContainer {
|
||||||
* calling context.stop(child) and processing the ChildTerminated() system
|
* calling context.stop(child) and processing the ChildTerminated() system
|
||||||
* message).
|
* message).
|
||||||
*/
|
*/
|
||||||
class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer {
|
class NormalChildrenContainer(val c: immutable.TreeMap[String, ChildStats]) extends ChildrenContainer {
|
||||||
|
|
||||||
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats))
|
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats))
|
||||||
|
|
||||||
|
|
@ -108,9 +121,11 @@ private[akka] object ChildrenContainer {
|
||||||
case _ ⇒ None
|
case _ ⇒ None
|
||||||
}
|
}
|
||||||
|
|
||||||
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child }
|
override def children: immutable.Iterable[ActorRef] =
|
||||||
|
if (c.isEmpty) EmptyImmutableSeq else new ChildrenIterable(c)
|
||||||
|
|
||||||
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c }
|
override def stats: immutable.Iterable[ChildRestartStats] =
|
||||||
|
if (c.isEmpty) EmptyImmutableSeq else new ChildRestartsIterable(c)
|
||||||
|
|
||||||
override def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
|
override def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
|
||||||
|
|
||||||
|
|
@ -130,7 +145,7 @@ private[akka] object ChildrenContainer {
|
||||||
}
|
}
|
||||||
|
|
||||||
object NormalChildrenContainer {
|
object NormalChildrenContainer {
|
||||||
def apply(c: TreeMap[String, ChildStats]): ChildrenContainer =
|
def apply(c: immutable.TreeMap[String, ChildStats]): ChildrenContainer =
|
||||||
if (c.isEmpty) EmptyChildrenContainer
|
if (c.isEmpty) EmptyChildrenContainer
|
||||||
else new NormalChildrenContainer(c)
|
else new NormalChildrenContainer(c)
|
||||||
}
|
}
|
||||||
|
|
@ -145,7 +160,7 @@ private[akka] object ChildrenContainer {
|
||||||
* type of container, depending on whether or not children are left and whether or not
|
* type of container, depending on whether or not children are left and whether or not
|
||||||
* the reason was “Terminating”.
|
* the reason was “Terminating”.
|
||||||
*/
|
*/
|
||||||
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
|
case class TerminatingChildrenContainer(c: immutable.TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
|
||||||
extends ChildrenContainer {
|
extends ChildrenContainer {
|
||||||
|
|
||||||
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats))
|
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats))
|
||||||
|
|
@ -166,9 +181,11 @@ private[akka] object ChildrenContainer {
|
||||||
case _ ⇒ None
|
case _ ⇒ None
|
||||||
}
|
}
|
||||||
|
|
||||||
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child }
|
override def children: immutable.Iterable[ActorRef] =
|
||||||
|
if (c.isEmpty) EmptyImmutableSeq else new ChildrenIterable(c)
|
||||||
|
|
||||||
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c }
|
override def stats: immutable.Iterable[ChildRestartStats] =
|
||||||
|
if (c.isEmpty) EmptyImmutableSeq else new ChildRestartsIterable(c)
|
||||||
|
|
||||||
override def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
|
override def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.dispatch._
|
||||||
import akka.event.Logging.{ Warning, Error, Debug }
|
import akka.event.Logging.{ Warning, Error, Debug }
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import scala.Some
|
import scala.collection.immutable
|
||||||
import akka.dispatch.ChildTerminated
|
import akka.dispatch.ChildTerminated
|
||||||
import akka.actor.PreRestartException
|
import akka.actor.PreRestartException
|
||||||
import akka.actor.Failed
|
import akka.actor.Failed
|
||||||
|
|
@ -160,7 +160,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final def handleInvokeFailure(childrenNotToSuspend: Iterable[ActorRef], t: Throwable, message: String): Unit = {
|
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable, message: String): Unit = {
|
||||||
publish(Error(t, self.path.toString, clazz(actor), message))
|
publish(Error(t, self.path.toString, clazz(actor), message))
|
||||||
// prevent any further messages to be processed until the actor has been restarted
|
// prevent any further messages to be processed until the actor has been restarted
|
||||||
if (!isFailed) try {
|
if (!isFailed) try {
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,7 @@ object Futures {
|
||||||
*/
|
*/
|
||||||
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = {
|
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = {
|
||||||
implicit val ec = executor
|
implicit val ec = executor
|
||||||
Future.find[T](futures.asScala)(predicate.apply(_))(executor).map(JOption.fromScalaOption(_))
|
Future.find[T](futures.asScala)(predicate.apply(_))(executor) map JOption.fromScalaOption
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -266,9 +266,9 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final def dissociate(monitored: ActorRef): Iterable[ActorRef] = {
|
protected final def dissociate(monitored: ActorRef): immutable.Iterable[ActorRef] = {
|
||||||
@tailrec
|
@tailrec
|
||||||
def dissociateAsMonitored(monitored: ActorRef): Iterable[ActorRef] = {
|
def dissociateAsMonitored(monitored: ActorRef): immutable.Iterable[ActorRef] = {
|
||||||
val current = mappings get monitored
|
val current = mappings get monitored
|
||||||
current match {
|
current match {
|
||||||
case null ⇒ empty
|
case null ⇒ empty
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import scala.collection.immutable
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import scala.runtime.AbstractPartialFunction
|
import scala.runtime.AbstractPartialFunction
|
||||||
|
import akka.util.Collections.EmptyImmutableSeq
|
||||||
import java.util.Collections.{ emptyList, singletonList }
|
import java.util.Collections.{ emptyList, singletonList }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -175,9 +176,40 @@ object Option {
|
||||||
* This class hold common utilities for Java
|
* This class hold common utilities for Java
|
||||||
*/
|
*/
|
||||||
object Util {
|
object Util {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ClassTag describing the provided Class.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
def classTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
|
def classTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
|
||||||
|
|
||||||
def arrayToSeq[T](arr: Array[T]): immutable.Seq[T] = arr.to[immutable.Seq]
|
/**
|
||||||
|
* Returns an immutable.Seq representing the provided array of Classes,
|
||||||
|
* an overloading of the generic immutableSeq in Util, to accommodate for erasure.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def immutableSeq(arr: Array[Class[_]]): immutable.Seq[Class[_]] = immutableSeq[Class[_]](arr)
|
||||||
|
|
||||||
def arrayToSeq(classes: Array[Class[_]]): immutable.Seq[Class[_]] = classes.to[immutable.Seq]
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def immutableSeq[T](arr: Array[T]): immutable.Seq[T] = if ((arr ne null) && arr.length > 0) Vector(arr: _*) else Nil
|
||||||
|
|
||||||
|
def immutableSeq[T](iterable: java.lang.Iterable[T]): immutable.Seq[T] =
|
||||||
|
iterable match {
|
||||||
|
case imm: immutable.Seq[_] ⇒ imm.asInstanceOf[immutable.Seq[T]]
|
||||||
|
case other ⇒
|
||||||
|
val i = other.iterator()
|
||||||
|
if (i.hasNext) {
|
||||||
|
val builder = new immutable.VectorBuilder[T]
|
||||||
|
|
||||||
|
do { builder += i.next() } while (i.hasNext)
|
||||||
|
|
||||||
|
builder.result()
|
||||||
|
} else EmptyImmutableSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
def immutableSingletonSeq[T](value: T): immutable.Seq[T] = value :: Nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,11 @@
|
||||||
*/
|
*/
|
||||||
package akka.routing
|
package akka.routing
|
||||||
|
|
||||||
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
import scala.collection.immutable
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.actor.SupervisorStrategy
|
import akka.actor.SupervisorStrategy
|
||||||
import akka.actor.Props
|
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
|
|
@ -19,16 +19,13 @@ object ConsistentHashingRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new ConsistentHashingRouter, routing to the specified routees
|
* Creates a new ConsistentHashingRouter, routing to the specified routees
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter =
|
def apply(routees: immutable.Iterable[ActorRef]): ConsistentHashingRouter =
|
||||||
new ConsistentHashingRouter(routees = routees map (_.path.toString))
|
new ConsistentHashingRouter(routees = routees map (_.path.toString))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = {
|
def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = apply(immutableSeq(routees))
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
apply(routees.asScala)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If you don't define the `hashMapping` when
|
* If you don't define the `hashMapping` when
|
||||||
|
|
@ -146,7 +143,7 @@ object ConsistentHashingRouter {
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class ConsistentHashingRouter(
|
case class ConsistentHashingRouter(
|
||||||
nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
|
||||||
val virtualNodesFactor: Int = 0,
|
val virtualNodesFactor: Int = 0,
|
||||||
|
|
@ -165,7 +162,7 @@ case class ConsistentHashingRouter(
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = routeePaths.asScala)
|
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -227,7 +224,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
|
||||||
|
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
def virtualNodesFactor: Int
|
def virtualNodesFactor: Int
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,18 +5,20 @@ package akka.routing
|
||||||
|
|
||||||
import language.implicitConversions
|
import language.implicitConversions
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import akka.actor._
|
|
||||||
import scala.concurrent.duration._
|
import scala.collection.immutable
|
||||||
import akka.ConfigurationException
|
|
||||||
import akka.pattern.pipe
|
|
||||||
import com.typesafe.config.Config
|
|
||||||
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import akka.actor._
|
||||||
|
import akka.ConfigurationException
|
||||||
|
import akka.dispatch.Dispatchers
|
||||||
|
import akka.pattern.pipe
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
|
import com.typesafe.config.Config
|
||||||
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
|
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||||
import akka.dispatch.Dispatchers
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import concurrent.ExecutionContext
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
|
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
|
||||||
|
|
@ -50,7 +52,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
||||||
private val resizeCounter = new AtomicLong
|
private val resizeCounter = new AtomicLong
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute
|
private var _routees: immutable.IndexedSeq[ActorRef] = immutable.IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute
|
||||||
def routees = _routees
|
def routees = _routees
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
|
|
@ -75,14 +77,11 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
||||||
* end of construction
|
* end of construction
|
||||||
*/
|
*/
|
||||||
|
|
||||||
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
|
def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match {
|
||||||
case _: AutoReceivedMessage ⇒ Destination(self, self) :: Nil
|
case _: AutoReceivedMessage ⇒ Destination(self, self) :: Nil
|
||||||
case CurrentRoutees ⇒
|
case CurrentRoutees ⇒ sender ! RouterRoutees(_routees); Nil
|
||||||
sender ! RouterRoutees(_routees)
|
case msg if route.isDefinedAt(sender, msg) ⇒ route(sender, message)
|
||||||
Nil
|
case _ ⇒ Nil
|
||||||
case _ ⇒
|
|
||||||
if (route.isDefinedAt(sender, message)) route(sender, message)
|
|
||||||
else Nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -91,7 +90,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
||||||
* Not thread safe, but intended to be called from protected points, such as
|
* Not thread safe, but intended to be called from protected points, such as
|
||||||
* `RouterConfig.createRoute` and `Resizer.resize`
|
* `RouterConfig.createRoute` and `Resizer.resize`
|
||||||
*/
|
*/
|
||||||
private[akka] def addRoutees(newRoutees: Iterable[ActorRef]): Unit = {
|
private[akka] def addRoutees(newRoutees: immutable.Iterable[ActorRef]): Unit = {
|
||||||
_routees = _routees ++ newRoutees
|
_routees = _routees ++ newRoutees
|
||||||
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
|
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
|
||||||
newRoutees foreach watch
|
newRoutees foreach watch
|
||||||
|
|
@ -103,7 +102,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
||||||
* Not thread safe, but intended to be called from protected points, such as
|
* Not thread safe, but intended to be called from protected points, such as
|
||||||
* `Resizer.resize`
|
* `Resizer.resize`
|
||||||
*/
|
*/
|
||||||
private[akka] def removeRoutees(abandonedRoutees: Iterable[ActorRef]): Unit = {
|
private[akka] def removeRoutees(abandonedRoutees: immutable.Iterable[ActorRef]): Unit = {
|
||||||
_routees = abandonedRoutees.foldLeft(_routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_ == x) }
|
_routees = abandonedRoutees.foldLeft(_routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_ == x) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,7 +194,7 @@ trait RouterConfig {
|
||||||
*/
|
*/
|
||||||
def withFallback(other: RouterConfig): RouterConfig = this
|
def withFallback(other: RouterConfig): RouterConfig = this
|
||||||
|
|
||||||
protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] =
|
protected def toAll(sender: ActorRef, routees: immutable.Iterable[ActorRef]): immutable.Iterable[Destination] =
|
||||||
routees.map(Destination(sender, _))
|
routees.map(Destination(sender, _))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -207,7 +206,7 @@ trait RouterConfig {
|
||||||
/**
|
/**
|
||||||
* Check that everything is there which is needed. Called in constructor of RoutedActorRef to fail early.
|
* Check that everything is there which is needed. Called in constructor of RoutedActorRef to fail early.
|
||||||
*/
|
*/
|
||||||
def verifyConfig(): Unit = {}
|
def verifyConfig(): Unit = ()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -226,7 +225,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
* Not thread safe, but intended to be called from protected points, such as
|
* Not thread safe, but intended to be called from protected points, such as
|
||||||
* `RouterConfig.createRoute` and `Resizer.resize`.
|
* `RouterConfig.createRoute` and `Resizer.resize`.
|
||||||
*/
|
*/
|
||||||
def registerRoutees(routees: Iterable[ActorRef]): Unit = routedCell.addRoutees(routees)
|
def registerRoutees(routees: immutable.Iterable[ActorRef]): Unit = routedCell.addRoutees(routees)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the routees to the router.
|
* Adds the routees to the router.
|
||||||
|
|
@ -235,7 +234,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
* `RouterConfig.createRoute` and `Resizer.resize`.
|
* `RouterConfig.createRoute` and `Resizer.resize`.
|
||||||
* Java API.
|
* Java API.
|
||||||
*/
|
*/
|
||||||
def registerRoutees(routees: java.lang.Iterable[ActorRef]): Unit = registerRoutees(routees.asScala)
|
def registerRoutees(routees: java.lang.Iterable[ActorRef]): Unit = registerRoutees(immutableSeq(routees))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes routees from the router. This method doesn't stop the routees.
|
* Removes routees from the router. This method doesn't stop the routees.
|
||||||
|
|
@ -243,7 +242,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
* Not thread safe, but intended to be called from protected points, such as
|
* Not thread safe, but intended to be called from protected points, such as
|
||||||
* `Resizer.resize`.
|
* `Resizer.resize`.
|
||||||
*/
|
*/
|
||||||
def unregisterRoutees(routees: Iterable[ActorRef]): Unit = routedCell.removeRoutees(routees)
|
def unregisterRoutees(routees: immutable.Iterable[ActorRef]): Unit = routedCell.removeRoutees(routees)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes routees from the router. This method doesn't stop the routees.
|
* Removes routees from the router. This method doesn't stop the routees.
|
||||||
|
|
@ -252,28 +251,25 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
* `Resizer.resize`.
|
* `Resizer.resize`.
|
||||||
* JAVA API
|
* JAVA API
|
||||||
*/
|
*/
|
||||||
def unregisterRoutees(routees: java.lang.Iterable[ActorRef]): Unit = unregisterRoutees(routees.asScala)
|
def unregisterRoutees(routees: java.lang.Iterable[ActorRef]): Unit = unregisterRoutees(immutableSeq(routees))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Looks up routes with specified paths and registers them.
|
* Looks up routes with specified paths and registers them.
|
||||||
*/
|
*/
|
||||||
def registerRouteesFor(paths: Iterable[String]): Unit = registerRoutees(paths.map(context.actorFor(_)))
|
def registerRouteesFor(paths: immutable.Iterable[String]): Unit = registerRoutees(paths.map(context.actorFor(_)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Looks up routes with specified paths and registers them.
|
* Looks up routes with specified paths and registers them.
|
||||||
* JAVA API
|
* JAVA API
|
||||||
*/
|
*/
|
||||||
def registerRouteesFor(paths: java.lang.Iterable[String]): Unit = registerRouteesFor(paths.asScala)
|
def registerRouteesFor(paths: java.lang.Iterable[String]): Unit = registerRouteesFor(immutableSeq(paths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates new routees from specified `Props` and registers them.
|
* Creates new routees from specified `Props` and registers them.
|
||||||
*/
|
*/
|
||||||
def createRoutees(nrOfInstances: Int): Unit = {
|
def createRoutees(nrOfInstances: Int): Unit =
|
||||||
if (nrOfInstances <= 0) throw new IllegalArgumentException(
|
if (nrOfInstances <= 0) throw new IllegalArgumentException("Must specify nrOfInstances or routees for [%s]" format context.self.path.toString)
|
||||||
"Must specify nrOfInstances or routees for [%s]" format context.self.path.toString)
|
else registerRoutees(immutable.IndexedSeq.fill(nrOfInstances)(context.actorOf(routeeProps)))
|
||||||
else
|
|
||||||
registerRoutees(IndexedSeq.fill(nrOfInstances)(context.actorOf(routeeProps)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove specified number of routees by unregister them
|
* Remove specified number of routees by unregister them
|
||||||
|
|
@ -296,7 +292,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
* Give concurrent messages a chance to be placed in mailbox before
|
* Give concurrent messages a chance to be placed in mailbox before
|
||||||
* sending PoisonPill.
|
* sending PoisonPill.
|
||||||
*/
|
*/
|
||||||
protected def delayedStop(scheduler: Scheduler, abandon: Iterable[ActorRef], stopDelay: FiniteDuration): Unit = {
|
protected def delayedStop(scheduler: Scheduler, abandon: immutable.Iterable[ActorRef], stopDelay: FiniteDuration): Unit = {
|
||||||
if (abandon.nonEmpty) {
|
if (abandon.nonEmpty) {
|
||||||
if (stopDelay <= Duration.Zero) {
|
if (stopDelay <= Duration.Zero) {
|
||||||
abandon foreach (_ ! PoisonPill)
|
abandon foreach (_ ! PoisonPill)
|
||||||
|
|
@ -314,7 +310,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
|
||||||
/**
|
/**
|
||||||
* All routees of the router
|
* All routees of the router
|
||||||
*/
|
*/
|
||||||
def routees: IndexedSeq[ActorRef] = routedCell.routees
|
def routees: immutable.IndexedSeq[ActorRef] = routedCell.routees
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All routees of the router
|
* All routees of the router
|
||||||
|
|
@ -335,7 +331,7 @@ abstract class CustomRouterConfig extends RouterConfig {
|
||||||
val customRoute = createCustomRoute(routeeProvider)
|
val customRoute = createCustomRoute(routeeProvider)
|
||||||
|
|
||||||
{
|
{
|
||||||
case (sender, message) ⇒ customRoute.destinationsFor(sender, message).asScala
|
case (sender, message) ⇒ customRoute.destinationsFor(sender, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -344,7 +340,13 @@ abstract class CustomRouterConfig extends RouterConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
trait CustomRoute {
|
trait CustomRoute {
|
||||||
def destinationsFor(sender: ActorRef, message: Any): java.lang.Iterable[Destination]
|
/**
|
||||||
|
* use akka.japi.Util.immutableSeq to convert a java.lang.Iterable to the return type needed for destinationsFor,
|
||||||
|
* or if you just want to return a single Destination, use akka.japi.Util.immutableSingletonSeq
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def destinationsFor(sender: ActorRef, message: Any): immutable.Seq[Destination]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -366,7 +368,7 @@ trait Router extends Actor {
|
||||||
if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProvider)) finally ab.set(false)
|
if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProvider)) finally ab.set(false)
|
||||||
|
|
||||||
case Terminated(child) ⇒
|
case Terminated(child) ⇒
|
||||||
ref.removeRoutees(IndexedSeq(child))
|
ref.removeRoutees(child :: Nil)
|
||||||
if (ref.routees.isEmpty) context.stop(self)
|
if (ref.routees.isEmpty) context.stop(self)
|
||||||
|
|
||||||
}: Receive) orElse routerReceive
|
}: Receive) orElse routerReceive
|
||||||
|
|
@ -426,7 +428,7 @@ case object CurrentRoutees extends CurrentRoutees {
|
||||||
* Message used to carry information about what routees the router is currently using.
|
* Message used to carry information about what routees the router is currently using.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class RouterRoutees(routees: Iterable[ActorRef])
|
case class RouterRoutees(routees: immutable.Iterable[ActorRef])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For every message sent to a router, its route determines a set of destinations,
|
* For every message sent to a router, its route determines a set of destinations,
|
||||||
|
|
@ -494,16 +496,14 @@ object RoundRobinRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new RoundRobinRouter, routing to the specified routees
|
* Creates a new RoundRobinRouter, routing to the specified routees
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef]): RoundRobinRouter =
|
def apply(routees: immutable.Iterable[ActorRef]): RoundRobinRouter =
|
||||||
new RoundRobinRouter(routees = routees map (_.path.toString))
|
new RoundRobinRouter(routees = routees map (_.path.toString))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef]): RoundRobinRouter = {
|
def create(routees: java.lang.Iterable[ActorRef]): RoundRobinRouter =
|
||||||
import scala.collection.JavaConverters._
|
apply(immutableSeq(routees))
|
||||||
apply(routees.asScala)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
|
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
|
||||||
|
|
@ -547,7 +547,7 @@ object RoundRobinRouter {
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
||||||
extends RouterConfig with RoundRobinLike {
|
extends RouterConfig with RoundRobinLike {
|
||||||
|
|
@ -564,7 +564,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = routeePaths.asScala)
|
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -602,7 +602,7 @@ trait RoundRobinLike { this: RouterConfig ⇒
|
||||||
|
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
def createRoute(routeeProvider: RouteeProvider): Route = {
|
def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||||
if (resizer.isEmpty) {
|
if (resizer.isEmpty) {
|
||||||
|
|
@ -622,7 +622,7 @@ trait RoundRobinLike { this: RouterConfig ⇒
|
||||||
case (sender, message) ⇒
|
case (sender, message) ⇒
|
||||||
message match {
|
message match {
|
||||||
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
||||||
case msg ⇒ List(Destination(sender, getNext()))
|
case msg ⇒ Destination(sender, getNext()) :: Nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -632,15 +632,13 @@ object RandomRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new RandomRouter, routing to the specified routees
|
* Creates a new RandomRouter, routing to the specified routees
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef]): RandomRouter = new RandomRouter(routees = routees map (_.path.toString))
|
def apply(routees: immutable.Iterable[ActorRef]): RandomRouter = new RandomRouter(routees = routees map (_.path.toString))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef]): RandomRouter = {
|
def create(routees: java.lang.Iterable[ActorRef]): RandomRouter =
|
||||||
import scala.collection.JavaConverters._
|
apply(immutableSeq(routees))
|
||||||
apply(routees.asScala)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* A Router that randomly selects one of the target connections to send a message to.
|
* A Router that randomly selects one of the target connections to send a message to.
|
||||||
|
|
@ -684,7 +682,7 @@ object RandomRouter {
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
case class RandomRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
||||||
extends RouterConfig with RandomLike {
|
extends RouterConfig with RandomLike {
|
||||||
|
|
@ -701,7 +699,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = routeePaths.asScala)
|
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -738,7 +736,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
|
||||||
trait RandomLike { this: RouterConfig ⇒
|
trait RandomLike { this: RouterConfig ⇒
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
def createRoute(routeeProvider: RouteeProvider): Route = {
|
def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||||
if (resizer.isEmpty) {
|
if (resizer.isEmpty) {
|
||||||
|
|
@ -756,7 +754,7 @@ trait RandomLike { this: RouterConfig ⇒
|
||||||
case (sender, message) ⇒
|
case (sender, message) ⇒
|
||||||
message match {
|
message match {
|
||||||
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
||||||
case msg ⇒ List(Destination(sender, getNext()))
|
case msg ⇒ Destination(sender, getNext()) :: Nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -766,16 +764,14 @@ object SmallestMailboxRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new SmallestMailboxRouter, routing to the specified routees
|
* Creates a new SmallestMailboxRouter, routing to the specified routees
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef]): SmallestMailboxRouter =
|
def apply(routees: immutable.Iterable[ActorRef]): SmallestMailboxRouter =
|
||||||
new SmallestMailboxRouter(routees = routees map (_.path.toString))
|
new SmallestMailboxRouter(routees = routees map (_.path.toString))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = {
|
def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter =
|
||||||
import scala.collection.JavaConverters._
|
apply(immutableSeq(routees))
|
||||||
apply(routees.asScala)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
|
* A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
|
||||||
|
|
@ -828,7 +824,7 @@ object SmallestMailboxRouter {
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
||||||
extends RouterConfig with SmallestMailboxLike {
|
extends RouterConfig with SmallestMailboxLike {
|
||||||
|
|
@ -845,7 +841,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = routeePaths.asScala)
|
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -882,7 +878,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
|
||||||
trait SmallestMailboxLike { this: RouterConfig ⇒
|
trait SmallestMailboxLike { this: RouterConfig ⇒
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the actor is currently processing a message.
|
* Returns true if the actor is currently processing a message.
|
||||||
|
|
@ -954,7 +950,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
||||||
// 4. An ActorRef with unknown mailbox size that isn't processing anything
|
// 4. An ActorRef with unknown mailbox size that isn't processing anything
|
||||||
// 5. An ActorRef with a known mailbox size
|
// 5. An ActorRef with a known mailbox size
|
||||||
// 6. An ActorRef without any messages
|
// 6. An ActorRef without any messages
|
||||||
@tailrec def getNext(targets: IndexedSeq[ActorRef] = routeeProvider.routees,
|
@tailrec def getNext(targets: immutable.IndexedSeq[ActorRef] = routeeProvider.routees,
|
||||||
proposedTarget: ActorRef = routeeProvider.context.system.deadLetters,
|
proposedTarget: ActorRef = routeeProvider.context.system.deadLetters,
|
||||||
currentScore: Long = Long.MaxValue,
|
currentScore: Long = Long.MaxValue,
|
||||||
at: Int = 0,
|
at: Int = 0,
|
||||||
|
|
@ -985,7 +981,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
||||||
case (sender, message) ⇒
|
case (sender, message) ⇒
|
||||||
message match {
|
message match {
|
||||||
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
||||||
case msg ⇒ List(Destination(sender, getNext()))
|
case msg ⇒ Destination(sender, getNext()) :: Nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -995,15 +991,13 @@ object BroadcastRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new BroadcastRouter, routing to the specified routees
|
* Creates a new BroadcastRouter, routing to the specified routees
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef]): BroadcastRouter = new BroadcastRouter(routees = routees map (_.path.toString))
|
def apply(routees: immutable.Iterable[ActorRef]): BroadcastRouter = new BroadcastRouter(routees = routees map (_.path.toString))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef]): BroadcastRouter = {
|
def create(routees: java.lang.Iterable[ActorRef]): BroadcastRouter =
|
||||||
import scala.collection.JavaConverters._
|
apply(immutableSeq(routees))
|
||||||
apply(routees.asScala)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* A Router that uses broadcasts a message to all its connections.
|
* A Router that uses broadcasts a message to all its connections.
|
||||||
|
|
@ -1047,7 +1041,7 @@ object BroadcastRouter {
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
case class BroadcastRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
||||||
extends RouterConfig with BroadcastLike {
|
extends RouterConfig with BroadcastLike {
|
||||||
|
|
@ -1064,7 +1058,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = routeePaths.asScala)
|
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -1102,7 +1096,7 @@ trait BroadcastLike { this: RouterConfig ⇒
|
||||||
|
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
def createRoute(routeeProvider: RouteeProvider): Route = {
|
def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||||
if (resizer.isEmpty) {
|
if (resizer.isEmpty) {
|
||||||
|
|
@ -1120,16 +1114,14 @@ object ScatterGatherFirstCompletedRouter {
|
||||||
/**
|
/**
|
||||||
* Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration
|
* Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration
|
||||||
*/
|
*/
|
||||||
def apply(routees: Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
|
def apply(routees: immutable.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
|
||||||
new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)
|
new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API to create router with the supplied 'routees' actors.
|
* Java API to create router with the supplied 'routees' actors.
|
||||||
*/
|
*/
|
||||||
def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter = {
|
def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
|
||||||
import scala.collection.JavaConverters._
|
apply(immutableSeq(routees), within)
|
||||||
apply(routees.asScala, within)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Simple router that broadcasts the message to all routees, and replies with the first response.
|
* Simple router that broadcasts the message to all routees, and replies with the first response.
|
||||||
|
|
@ -1175,7 +1167,7 @@ object ScatterGatherFirstCompletedRouter {
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: FiniteDuration,
|
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, within: FiniteDuration,
|
||||||
override val resizer: Option[Resizer] = None,
|
override val resizer: Option[Resizer] = None,
|
||||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
|
||||||
|
|
@ -1196,7 +1188,7 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
|
||||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||||
*/
|
*/
|
||||||
def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) = this(routees = routeePaths.asScala, within = w)
|
def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) = this(routees = immutableSeq(routeePaths), within = w)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that sets the resizer to be used.
|
* Constructor that sets the resizer to be used.
|
||||||
|
|
@ -1234,7 +1226,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
|
||||||
|
|
||||||
def nrOfInstances: Int
|
def nrOfInstances: Int
|
||||||
|
|
||||||
def routees: Iterable[String]
|
def routees: immutable.Iterable[String]
|
||||||
|
|
||||||
def within: FiniteDuration
|
def within: FiniteDuration
|
||||||
|
|
||||||
|
|
@ -1394,7 +1386,7 @@ case class DefaultResizer(
|
||||||
* @param routees The current actor in the resizer
|
* @param routees The current actor in the resizer
|
||||||
* @return the number of routees by which the resizer should be adjusted (positive, negative or zero)
|
* @return the number of routees by which the resizer should be adjusted (positive, negative or zero)
|
||||||
*/
|
*/
|
||||||
def capacity(routees: IndexedSeq[ActorRef]): Int = {
|
def capacity(routees: immutable.IndexedSeq[ActorRef]): Int = {
|
||||||
val currentSize = routees.size
|
val currentSize = routees.size
|
||||||
val press = pressure(routees)
|
val press = pressure(routees)
|
||||||
val delta = filter(press, currentSize)
|
val delta = filter(press, currentSize)
|
||||||
|
|
@ -1422,7 +1414,7 @@ case class DefaultResizer(
|
||||||
* @param routees the current resizer of routees
|
* @param routees the current resizer of routees
|
||||||
* @return number of busy routees, between 0 and routees.size
|
* @return number of busy routees, between 0 and routees.size
|
||||||
*/
|
*/
|
||||||
def pressure(routees: IndexedSeq[ActorRef]): Int = {
|
def pressure(routees: immutable.IndexedSeq[ActorRef]): Int = {
|
||||||
routees count {
|
routees count {
|
||||||
case a: ActorRefWithCell ⇒
|
case a: ActorRefWithCell ⇒
|
||||||
a.underlying match {
|
a.underlying match {
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,12 @@
|
||||||
|
|
||||||
package akka
|
package akka
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
|
||||||
package object routing {
|
package object routing {
|
||||||
/**
|
/**
|
||||||
* Routing logic, partial function from (sender, message) to a
|
* Routing logic, partial function from (sender, message) to a
|
||||||
* set of destinations.
|
* set of destinations.
|
||||||
*/
|
*/
|
||||||
type Route = PartialFunction[(akka.actor.ActorRef, Any), Iterable[Destination]]
|
type Route = PartialFunction[(akka.actor.ActorRef, Any), immutable.Iterable[Destination]]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,7 @@
|
||||||
package akka.serialization
|
package akka.serialization
|
||||||
|
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.AkkaException
|
import akka.actor.{ Extension, ExtendedActorSystem, Address }
|
||||||
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
|
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
@ -28,17 +27,13 @@ object Serialization {
|
||||||
val currentTransportAddress = new DynamicVariable[Address](null)
|
val currentTransportAddress = new DynamicVariable[Address](null)
|
||||||
|
|
||||||
class Settings(val config: Config) {
|
class Settings(val config: Config) {
|
||||||
|
val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
|
||||||
|
val SerializationBindings: Map[String, String] = configToMap("akka.actor.serialization-bindings")
|
||||||
|
|
||||||
|
private final def configToMap(path: String): Map[String, String] = {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import config._
|
config.getConfig(path).root.unwrapped.asScala.mapValues(_.toString).toMap
|
||||||
|
}
|
||||||
val Serializers: Map[String, String] = configToMap(getConfig("akka.actor.serializers"))
|
|
||||||
|
|
||||||
val SerializationBindings: Map[String, String] = configToMap(getConfig("akka.actor.serialization-bindings"))
|
|
||||||
|
|
||||||
private def configToMap(cfg: Config): Map[String, String] =
|
|
||||||
cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) }
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,16 +58,16 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
||||||
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
|
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
|
||||||
* Returns either the resulting object or an Exception if one was thrown.
|
* Returns either the resulting object or an Exception if one was thrown.
|
||||||
*/
|
*/
|
||||||
def deserialize(bytes: Array[Byte],
|
def deserialize(bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_]]): Try[AnyRef] =
|
||||||
serializerId: Int,
|
Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
|
||||||
clazz: Option[Class[_]]): Try[AnyRef] = Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
|
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
|
||||||
* You can specify an optional ClassLoader to load the object into.
|
* You can specify an optional ClassLoader to load the object into.
|
||||||
* Returns either the resulting object or an Exception if one was thrown.
|
* Returns either the resulting object or an Exception if one was thrown.
|
||||||
*/
|
*/
|
||||||
def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
|
def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] =
|
||||||
|
Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
|
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
|
||||||
|
|
@ -96,8 +91,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
||||||
*/
|
*/
|
||||||
def serializerFor(clazz: Class[_]): Serializer =
|
def serializerFor(clazz: Class[_]): Serializer =
|
||||||
serializerMap.get(clazz) match {
|
serializerMap.get(clazz) match {
|
||||||
case null ⇒
|
case null ⇒ // bindings are ordered from most specific to least specific
|
||||||
// bindings are ordered from most specific to least specific
|
|
||||||
def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean =
|
def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean =
|
||||||
possibilities.size == 1 ||
|
possibilities.size == 1 ||
|
||||||
(possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
|
(possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
|
||||||
|
|
|
||||||
54
akka-actor/src/main/scala/akka/util/Collections.scala
Normal file
54
akka-actor/src/main/scala/akka/util/Collections.scala
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.util
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] object Collections {
|
||||||
|
|
||||||
|
case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
|
||||||
|
override final def iterator = Iterator.empty
|
||||||
|
override final def apply(idx: Int): Nothing = throw new java.lang.IndexOutOfBoundsException(idx.toString)
|
||||||
|
override final def length: Int = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class PartialImmutableValuesIterable[From, To] extends immutable.Iterable[To] {
|
||||||
|
def isDefinedAt(from: From): Boolean
|
||||||
|
def apply(from: From): To
|
||||||
|
def valuesIterator: Iterator[From]
|
||||||
|
final def iterator: Iterator[To] = {
|
||||||
|
val superIterator = valuesIterator
|
||||||
|
new Iterator[To] {
|
||||||
|
private[this] var _next: To = _
|
||||||
|
private[this] var _hasNext = false
|
||||||
|
|
||||||
|
@tailrec override final def hasNext: Boolean =
|
||||||
|
if (!_hasNext && superIterator.hasNext) { // If we need and are able to look for the next value
|
||||||
|
val potentiallyNext = superIterator.next()
|
||||||
|
if (isDefinedAt(potentiallyNext)) {
|
||||||
|
_next = apply(potentiallyNext)
|
||||||
|
_hasNext = true
|
||||||
|
true
|
||||||
|
} else hasNext //Attempt to find the next
|
||||||
|
} else _hasNext // Return if we found one
|
||||||
|
|
||||||
|
override final def next(): To = if (hasNext) {
|
||||||
|
val ret = _next
|
||||||
|
_next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
|
||||||
|
_hasNext = false // Mark as consumed (we need to look for the next value)
|
||||||
|
ret
|
||||||
|
} else throw new java.util.NoSuchElementException("next")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override lazy val size: Int = iterator.size
|
||||||
|
override def foreach[C](f: To ⇒ C) = iterator foreach f
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,45 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package akka.util
|
|
||||||
//FIXME DOCS!
|
|
||||||
object Convert {
|
|
||||||
|
|
||||||
def intToBytes(value: Int): Array[Byte] = {
|
|
||||||
val bytes = Array.fill[Byte](4)(0)
|
|
||||||
bytes(0) = (value >>> 24).asInstanceOf[Byte]
|
|
||||||
bytes(1) = (value >>> 16).asInstanceOf[Byte]
|
|
||||||
bytes(2) = (value >>> 8).asInstanceOf[Byte]
|
|
||||||
bytes(3) = value.asInstanceOf[Byte]
|
|
||||||
bytes
|
|
||||||
}
|
|
||||||
|
|
||||||
def bytesToInt(bytes: Array[Byte], offset: Int): Int = {
|
|
||||||
(0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
|
|
||||||
}
|
|
||||||
|
|
||||||
def longToBytes(value: Long): Array[Byte] = {
|
|
||||||
val writeBuffer = Array.fill[Byte](8)(0)
|
|
||||||
writeBuffer(0) = (value >>> 56).asInstanceOf[Byte]
|
|
||||||
writeBuffer(1) = (value >>> 48).asInstanceOf[Byte]
|
|
||||||
writeBuffer(2) = (value >>> 40).asInstanceOf[Byte]
|
|
||||||
writeBuffer(3) = (value >>> 32).asInstanceOf[Byte]
|
|
||||||
writeBuffer(4) = (value >>> 24).asInstanceOf[Byte]
|
|
||||||
writeBuffer(5) = (value >>> 16).asInstanceOf[Byte]
|
|
||||||
writeBuffer(6) = (value >>> 8).asInstanceOf[Byte]
|
|
||||||
writeBuffer(7) = (value >>> 0).asInstanceOf[Byte]
|
|
||||||
writeBuffer
|
|
||||||
}
|
|
||||||
|
|
||||||
def bytesToLong(buf: Array[Byte]): Long = {
|
|
||||||
((buf(0) & 0xFFL) << 56) |
|
|
||||||
((buf(1) & 0xFFL) << 48) |
|
|
||||||
((buf(2) & 0xFFL) << 40) |
|
|
||||||
((buf(3) & 0xFFL) << 32) |
|
|
||||||
((buf(4) & 0xFFL) << 24) |
|
|
||||||
((buf(5) & 0xFFL) << 16) |
|
|
||||||
((buf(6) & 0xFFL) << 8) |
|
|
||||||
((buf(7) & 0xFFL) << 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -75,7 +75,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {
|
||||||
def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] =
|
def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] =
|
||||||
container get key match {
|
container get key match {
|
||||||
case null ⇒ None
|
case null ⇒ None
|
||||||
case set ⇒ set.iterator.asScala.find(f)
|
case set ⇒ set.iterator.asScala find f
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -4,16 +4,15 @@
|
||||||
|
|
||||||
package akka.camel
|
package akka.camel
|
||||||
|
|
||||||
import internal._
|
import akka.camel.internal._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
import akka.ConfigurationException
|
||||||
import org.apache.camel.ProducerTemplate
|
import org.apache.camel.ProducerTemplate
|
||||||
import org.apache.camel.impl.DefaultCamelContext
|
import org.apache.camel.impl.DefaultCamelContext
|
||||||
import org.apache.camel.model.RouteDefinition
|
import org.apache.camel.model.RouteDefinition
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.ConfigurationException
|
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||||
import scala.concurrent.duration.Duration
|
|
||||||
import java.util.concurrent.TimeUnit._
|
import java.util.concurrent.TimeUnit._
|
||||||
import scala.concurrent.duration.FiniteDuration
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Camel trait encapsulates the underlying camel machinery.
|
* Camel trait encapsulates the underlying camel machinery.
|
||||||
|
|
@ -88,8 +87,8 @@ class CamelSettings private[camel] (config: Config, dynamicAccess: DynamicAccess
|
||||||
final val StreamingCache: Boolean = config.getBoolean("akka.camel.streamingCache")
|
final val StreamingCache: Boolean = config.getBoolean("akka.camel.streamingCache")
|
||||||
|
|
||||||
final val Conversions: (String, RouteDefinition) ⇒ RouteDefinition = {
|
final val Conversions: (String, RouteDefinition) ⇒ RouteDefinition = {
|
||||||
import scala.collection.JavaConverters.asScalaSetConverter
|
|
||||||
val specifiedConversions = {
|
val specifiedConversions = {
|
||||||
|
import scala.collection.JavaConverters.asScalaSetConverter
|
||||||
val section = config.getConfig("akka.camel.conversions")
|
val section = config.getConfig("akka.camel.conversions")
|
||||||
section.entrySet.asScala.map(e ⇒ (e.getKey, section.getString(e.getKey)))
|
section.entrySet.asScala.map(e ⇒ (e.getKey, section.getString(e.getKey)))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,9 +45,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
|
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
|
||||||
}
|
}
|
||||||
|
|
||||||
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
|
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr }.toIndexedSeq
|
||||||
case AddressFromURIString(addr) ⇒ addr
|
|
||||||
}.toIndexedSeq
|
|
||||||
final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
|
final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
|
||||||
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
||||||
final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
|
final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
|
||||||
|
|
|
||||||
|
|
@ -5,16 +5,13 @@ package akka.cluster.routing
|
||||||
|
|
||||||
import java.lang.IllegalStateException
|
import java.lang.IllegalStateException
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import scala.collection.immutable.SortedSet
|
import scala.collection.immutable
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.ConfigurationException
|
import akka.ConfigurationException
|
||||||
import akka.actor.Actor
|
|
||||||
import akka.actor.ActorContext
|
import akka.actor.ActorContext
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.actor.ActorSystemImpl
|
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.actor.Deploy
|
import akka.actor.Deploy
|
||||||
import akka.actor.InternalActorRef
|
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.actor.SupervisorStrategy
|
import akka.actor.SupervisorStrategy
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
|
|
@ -51,7 +48,7 @@ final case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRoute
|
||||||
|
|
||||||
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
|
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
|
||||||
({
|
({
|
||||||
case (sender, message: ClusterDomainEvent) ⇒ Seq(Destination(sender, routeeProvider.context.self))
|
case (sender, message: ClusterDomainEvent) ⇒ List(Destination(sender, routeeProvider.context.self))
|
||||||
}: Route) orElse localRoute
|
}: Route) orElse localRoute
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -156,7 +153,7 @@ private[akka] class ClusterRouteeProvider(
|
||||||
// need this counter as instance variable since Resizer may call createRoutees several times
|
// need this counter as instance variable since Resizer may call createRoutees several times
|
||||||
private val childNameCounter = new AtomicInteger
|
private val childNameCounter = new AtomicInteger
|
||||||
|
|
||||||
override def registerRouteesFor(paths: Iterable[String]): Unit =
|
override def registerRouteesFor(paths: immutable.Iterable[String]): Unit =
|
||||||
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
|
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
|
||||||
format context.self.path.toString)
|
format context.self.path.toString)
|
||||||
|
|
||||||
|
|
@ -183,7 +180,7 @@ private[akka] class ClusterRouteeProvider(
|
||||||
context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
|
context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
|
||||||
}
|
}
|
||||||
// must register each one, since registered routees are used in selectDeploymentTarget
|
// must register each one, since registered routees are used in selectDeploymentTarget
|
||||||
registerRoutees(Some(ref))
|
registerRoutees(List(ref))
|
||||||
|
|
||||||
// recursion until all created
|
// recursion until all created
|
||||||
doCreateRoutees()
|
doCreateRoutees()
|
||||||
|
|
@ -222,27 +219,26 @@ private[akka] class ClusterRouteeProvider(
|
||||||
case a ⇒ a
|
case a ⇒ a
|
||||||
}
|
}
|
||||||
|
|
||||||
private[routing] def availableNodes: SortedSet[Address] = {
|
private[routing] def availableNodes: immutable.SortedSet[Address] = {
|
||||||
import Member.addressOrdering
|
import Member.addressOrdering
|
||||||
val currentNodes = nodes
|
val currentNodes = nodes
|
||||||
if (currentNodes.isEmpty && settings.allowLocalRoutees)
|
if (currentNodes.isEmpty && settings.allowLocalRoutees)
|
||||||
//use my own node, cluster information not updated yet
|
//use my own node, cluster information not updated yet
|
||||||
SortedSet(cluster.selfAddress)
|
immutable.SortedSet(cluster.selfAddress)
|
||||||
else
|
else
|
||||||
currentNodes
|
currentNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
private[routing] var nodes: SortedSet[Address] = {
|
private[routing] var nodes: immutable.SortedSet[Address] = {
|
||||||
import Member.addressOrdering
|
import Member.addressOrdering
|
||||||
cluster.readView.members.collect {
|
cluster.readView.members.collect {
|
||||||
case m if isAvailable(m) ⇒ m.address
|
case m if isAvailable(m) ⇒ m.address
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[routing] def isAvailable(m: Member): Boolean = {
|
private[routing] def isAvailable(m: Member): Boolean =
|
||||||
m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import akka.testkit.ErrorFilter;
|
||||||
import akka.testkit.EventFilter;
|
import akka.testkit.EventFilter;
|
||||||
import akka.testkit.TestEvent;
|
import akka.testkit.TestEvent;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import static akka.japi.Util.immutableSeq;
|
||||||
import akka.japi.Function;
|
import akka.japi.Function;
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
import scala.collection.JavaConverters;
|
import scala.collection.JavaConverters;
|
||||||
|
|
@ -219,8 +220,7 @@ public class FaultHandlingTestBase {
|
||||||
|
|
||||||
//#testkit
|
//#testkit
|
||||||
public <A> Seq<A> seq(A... args) {
|
public <A> Seq<A> seq(A... args) {
|
||||||
return JavaConverters.collectionAsScalaIterableConverter(
|
return immutableSeq(args);
|
||||||
java.util.Arrays.asList(args)).asScala().toList();
|
|
||||||
}
|
}
|
||||||
//#testkit
|
//#testkit
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -180,16 +180,14 @@ public class CustomRouterDocTestBase {
|
||||||
//#crRoutingLogic
|
//#crRoutingLogic
|
||||||
return new CustomRoute() {
|
return new CustomRoute() {
|
||||||
@Override
|
@Override
|
||||||
public Iterable<Destination> destinationsFor(ActorRef sender, Object msg) {
|
public scala.collection.immutable.Seq<Destination> destinationsFor(ActorRef sender, Object msg) {
|
||||||
switch ((Message) msg) {
|
switch ((Message) msg) {
|
||||||
case DemocratVote:
|
case DemocratVote:
|
||||||
case DemocratCountResult:
|
case DemocratCountResult:
|
||||||
return Arrays.asList(
|
return akka.japi.Util.immutableSingletonSeq(new Destination(sender, democratActor));
|
||||||
new Destination[] { new Destination(sender, democratActor) });
|
|
||||||
case RepublicanVote:
|
case RepublicanVote:
|
||||||
case RepublicanCountResult:
|
case RepublicanCountResult:
|
||||||
return Arrays.asList(
|
return akka.japi.Util.immutableSingletonSeq(new Destination(sender, republicanActor));
|
||||||
new Destination[] { new Destination(sender, republicanActor) });
|
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unknown message: " + msg);
|
throw new IllegalArgumentException("Unknown message: " + msg);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
26
akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst
Normal file
26
akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
.. _migration-2.2:
|
||||||
|
|
||||||
|
################################
|
||||||
|
Migration Guide 2.1.x to 2.2.x
|
||||||
|
################################
|
||||||
|
|
||||||
|
The 2.2 release contains several structural changes that require some
|
||||||
|
simple, mechanical source-level changes in client code.
|
||||||
|
|
||||||
|
When migrating from 1.3.x to 2.1.x you should first follow the instructions for
|
||||||
|
migrating `1.3.x to 2.0.x <http://doc.akka.io/docs/akka/2.0.3/project/migration-guide-1.3.x-2.0.x.html>`_ and then :ref:`2.0.x to 2.1.x <migration-2.1>`.
|
||||||
|
|
||||||
|
Immutable everywhere
|
||||||
|
====================
|
||||||
|
|
||||||
|
Akka has in 2.2 been refactored to require ``scala.collection.immutable`` data structures as much as possible,
|
||||||
|
this leads to fewer bugs and more opportunity for sharing data safely.
|
||||||
|
|
||||||
|
==================================== ====================================
|
||||||
|
Search Replace with
|
||||||
|
==================================== ====================================
|
||||||
|
``akka.japi.Util.arrayToSeq`` ``akka.japi.Util.immutableSeq``
|
||||||
|
==================================== ====================================
|
||||||
|
|
||||||
|
If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``,
|
||||||
|
and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.<collection-type>]`` method.
|
||||||
|
|
@ -8,3 +8,4 @@ Migration Guides
|
||||||
|
|
||||||
migration-guide-1.3.x-2.0.x
|
migration-guide-1.3.x-2.0.x
|
||||||
migration-guide-2.0.x-2.1.x
|
migration-guide-2.0.x-2.1.x
|
||||||
|
migration-guide-2.1.x-2.2.x
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ package akka.osgi
|
||||||
import de.kalpatec.pojosr.framework.launch.{ BundleDescriptor, PojoServiceRegistryFactory, ClasspathScanner }
|
import de.kalpatec.pojosr.framework.launch.{ BundleDescriptor, PojoServiceRegistryFactory, ClasspathScanner }
|
||||||
|
|
||||||
import scala.collection.JavaConversions.seqAsJavaList
|
import scala.collection.JavaConversions.seqAsJavaList
|
||||||
import scala.collection.JavaConversions.collectionAsScalaIterable
|
|
||||||
import org.apache.commons.io.IOUtils.copy
|
import org.apache.commons.io.IOUtils.copy
|
||||||
|
|
||||||
import org.osgi.framework._
|
import org.osgi.framework._
|
||||||
|
|
@ -138,12 +137,12 @@ class BundleDescriptorBuilder(name: String) {
|
||||||
}
|
}
|
||||||
|
|
||||||
def extractHeaders(file: File): HashMap[String, String] = {
|
def extractHeaders(file: File): HashMap[String, String] = {
|
||||||
|
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
||||||
val headers = new HashMap[String, String]()
|
val headers = new HashMap[String, String]()
|
||||||
|
|
||||||
val jis = new JarInputStream(new FileInputStream(file))
|
val jis = new JarInputStream(new FileInputStream(file))
|
||||||
try {
|
try {
|
||||||
for (entry ← jis.getManifest().getMainAttributes().entrySet())
|
for (entry ← jis.getManifest.getMainAttributes.entrySet.asScala)
|
||||||
headers.put(entry.getKey().toString(), entry.getValue().toString())
|
headers.put(entry.getKey.toString, entry.getValue.toString)
|
||||||
} finally jis.close()
|
} finally jis.close()
|
||||||
|
|
||||||
headers
|
headers
|
||||||
|
|
|
||||||
|
|
@ -402,10 +402,8 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
|
||||||
}
|
}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
ConfigFactory.parseString(deployString).root.asScala foreach {
|
ConfigFactory.parseString(deployString).root.asScala foreach {
|
||||||
case (key, value: ConfigObject) ⇒
|
case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
|
||||||
deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
|
case (key, x) ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
|
||||||
case (key, x) ⇒
|
|
||||||
throw new IllegalArgumentException("key " + key + " must map to deployment section, not simple value " + x)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,9 @@ package akka.remote
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import akka.remote.routing._
|
import akka.remote.routing._
|
||||||
import com.typesafe.config._
|
|
||||||
import akka.ConfigurationException
|
import akka.ConfigurationException
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
|
import com.typesafe.config._
|
||||||
|
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case class RemoteScope(node: Address) extends Scope {
|
case class RemoteScope(node: Address) extends Scope {
|
||||||
|
|
@ -22,9 +23,9 @@ private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: Dynamic
|
||||||
case d @ Some(deploy) ⇒
|
case d @ Some(deploy) ⇒
|
||||||
deploy.config.getString("remote") match {
|
deploy.config.getString("remote") match {
|
||||||
case AddressFromURIString(r) ⇒ Some(deploy.copy(scope = RemoteScope(r)))
|
case AddressFromURIString(r) ⇒ Some(deploy.copy(scope = RemoteScope(r)))
|
||||||
case str ⇒
|
case str if !str.isEmpty ⇒ throw new ConfigurationException("unparseable remote node name " + str)
|
||||||
if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str)
|
case _ ⇒
|
||||||
val nodes = deploy.config.getStringList("target.nodes").asScala.toIndexedSeq map (AddressFromURIString(_))
|
val nodes = immutableSeq(deploy.config.getStringList("target.nodes")).map(AddressFromURIString(_))
|
||||||
if (nodes.isEmpty || deploy.routerConfig == NoRouter) d
|
if (nodes.isEmpty || deploy.routerConfig == NoRouter) d
|
||||||
else Some(deploy.copy(routerConfig = RemoteRouterConfig(deploy.routerConfig, nodes)))
|
else Some(deploy.copy(routerConfig = RemoteRouterConfig(deploy.routerConfig, nodes)))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import scala.concurrent.duration.Duration
|
||||||
import java.util.concurrent.TimeUnit._
|
import java.util.concurrent.TimeUnit._
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import akka.ConfigurationException
|
import akka.ConfigurationException
|
||||||
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
import akka.japi.Util.immutableSeq
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import akka.dispatch.ThreadPoolConfig
|
import akka.dispatch.ThreadPoolConfig
|
||||||
|
|
||||||
|
|
@ -89,42 +89,19 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
|
||||||
case sz ⇒ sz
|
case sz ⇒ sz
|
||||||
}
|
}
|
||||||
|
|
||||||
val SSLKeyStore = getString("ssl.key-store") match {
|
val SSLKeyStore = Option(getString("ssl.key-store")).filter(_.length > 0)
|
||||||
case "" ⇒ None
|
val SSLTrustStore = Option(getString("ssl.trust-store")).filter(_.length > 0)
|
||||||
case keyStore ⇒ Some(keyStore)
|
val SSLKeyStorePassword = Option(getString("ssl.key-store-password")).filter(_.length > 0)
|
||||||
}
|
|
||||||
|
|
||||||
val SSLTrustStore = getString("ssl.trust-store") match {
|
val SSLTrustStorePassword = Option(getString("ssl.trust-store-password")).filter(_.length > 0)
|
||||||
case "" ⇒ None
|
|
||||||
case trustStore ⇒ Some(trustStore)
|
|
||||||
}
|
|
||||||
|
|
||||||
val SSLKeyStorePassword = getString("ssl.key-store-password") match {
|
val SSLEnabledAlgorithms = immutableSeq(getStringList("ssl.enabled-algorithms")).to[Set]
|
||||||
case "" ⇒ None
|
|
||||||
case password ⇒ Some(password)
|
|
||||||
}
|
|
||||||
|
|
||||||
val SSLTrustStorePassword = getString("ssl.trust-store-password") match {
|
val SSLProtocol = Option(getString("ssl.protocol")).filter(_.length > 0)
|
||||||
case "" ⇒ None
|
|
||||||
case password ⇒ Some(password)
|
|
||||||
}
|
|
||||||
|
|
||||||
val SSLEnabledAlgorithms = iterableAsScalaIterableConverter(getStringList("ssl.enabled-algorithms")).asScala.toSet[String]
|
val SSLRandomSource = Option(getString("ssl.sha1prng-random-source")).filter(_.length > 0)
|
||||||
|
|
||||||
val SSLProtocol = getString("ssl.protocol") match {
|
val SSLRandomNumberGenerator = Option(getString("ssl.random-number-generator")).filter(_.length > 0)
|
||||||
case "" ⇒ None
|
|
||||||
case protocol ⇒ Some(protocol)
|
|
||||||
}
|
|
||||||
|
|
||||||
val SSLRandomSource = getString("ssl.sha1prng-random-source") match {
|
|
||||||
case "" ⇒ None
|
|
||||||
case path ⇒ Some(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
val SSLRandomNumberGenerator = getString("ssl.random-number-generator") match {
|
|
||||||
case "" ⇒ None
|
|
||||||
case rng ⇒ Some(rng)
|
|
||||||
}
|
|
||||||
|
|
||||||
val EnableSSL = {
|
val EnableSSL = {
|
||||||
val enableSSL = getBoolean("ssl.enable")
|
val enableSSL = getBoolean("ssl.enable")
|
||||||
|
|
|
||||||
|
|
@ -6,19 +6,17 @@ package akka.remote.routing
|
||||||
import akka.routing.{ Route, Router, RouterConfig, RouteeProvider, Resizer }
|
import akka.routing.{ Route, Router, RouterConfig, RouteeProvider, Resizer }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.ActorContext
|
import akka.actor.ActorContext
|
||||||
import akka.actor.ActorRef
|
|
||||||
import akka.actor.Deploy
|
import akka.actor.Deploy
|
||||||
import akka.actor.InternalActorRef
|
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.ConfigurationException
|
|
||||||
import akka.remote.RemoteScope
|
|
||||||
import akka.actor.AddressFromURIString
|
|
||||||
import akka.actor.SupervisorStrategy
|
import akka.actor.SupervisorStrategy
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import scala.collection.JavaConverters._
|
import akka.actor.ActorCell
|
||||||
|
import akka.ConfigurationException
|
||||||
|
import akka.remote.RemoteScope
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
|
import scala.collection.immutable
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.lang.IllegalStateException
|
import java.lang.IllegalStateException
|
||||||
import akka.actor.ActorCell
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined
|
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined
|
||||||
|
|
@ -29,7 +27,7 @@ import akka.actor.ActorCell
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) extends RouterConfig {
|
final case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) extends RouterConfig {
|
||||||
|
|
||||||
def this(local: RouterConfig, nodes: java.lang.Iterable[Address]) = this(local, nodes.asScala)
|
def this(local: RouterConfig, nodes: java.lang.Iterable[Address]) = this(local, immutableSeq(nodes))
|
||||||
def this(local: RouterConfig, nodes: Array[Address]) = this(local, nodes: Iterable[Address])
|
def this(local: RouterConfig, nodes: Array[Address]) = this(local, nodes: Iterable[Address])
|
||||||
|
|
||||||
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
|
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
|
||||||
|
|
@ -64,20 +62,20 @@ final case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address
|
||||||
final class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer])
|
final class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer])
|
||||||
extends RouteeProvider(_context, _routeeProps, _resizer) {
|
extends RouteeProvider(_context, _routeeProps, _resizer) {
|
||||||
|
|
||||||
if (nodes.isEmpty) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]"
|
if (nodes.isEmpty)
|
||||||
format context.self.path.toString)
|
throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" format context.self.path.toString)
|
||||||
|
|
||||||
// need this iterator as instance variable since Resizer may call createRoutees several times
|
// need this iterator as instance variable since Resizer may call createRoutees several times
|
||||||
private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator
|
private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator
|
||||||
// need this counter as instance variable since Resizer may call createRoutees several times
|
// need this counter as instance variable since Resizer may call createRoutees several times
|
||||||
private val childNameCounter = new AtomicInteger
|
private val childNameCounter = new AtomicInteger
|
||||||
|
|
||||||
override def registerRouteesFor(paths: Iterable[String]): Unit =
|
override def registerRouteesFor(paths: immutable.Iterable[String]): Unit =
|
||||||
throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
|
throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
|
||||||
format context.self.path.toString)
|
format context.self.path.toString)
|
||||||
|
|
||||||
override def createRoutees(nrOfInstances: Int): Unit = {
|
override def createRoutees(nrOfInstances: Int): Unit = {
|
||||||
val refs = IndexedSeq.fill(nrOfInstances) {
|
val refs = immutable.IndexedSeq.fill(nrOfInstances) {
|
||||||
val name = "c" + childNameCounter.incrementAndGet
|
val name = "c" + childNameCounter.incrementAndGet
|
||||||
val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig,
|
val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig,
|
||||||
scope = RemoteScope(nodeAddressIter.next))
|
scope = RemoteScope(nodeAddressIter.next))
|
||||||
|
|
|
||||||
|
|
@ -184,31 +184,31 @@ public class JavaTestKit {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object expectMsgAnyOf(Object... msgs) {
|
public Object expectMsgAnyOf(Object... msgs) {
|
||||||
return p.expectMsgAnyOf(Util.arrayToSeq(msgs));
|
return p.expectMsgAnyOf(Util.immutableSeq(msgs));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object expectMsgAnyOf(FiniteDuration max, Object... msgs) {
|
public Object expectMsgAnyOf(FiniteDuration max, Object... msgs) {
|
||||||
return p.expectMsgAnyOf(max, Util.arrayToSeq(msgs));
|
return p.expectMsgAnyOf(max, Util.immutableSeq(msgs));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object[] expectMsgAllOf(Object... msgs) {
|
public Object[] expectMsgAllOf(Object... msgs) {
|
||||||
return (Object[]) p.expectMsgAllOf(Util.arrayToSeq(msgs)).toArray(
|
return (Object[]) p.expectMsgAllOf(Util.immutableSeq(msgs)).toArray(
|
||||||
Util.classTag(Object.class));
|
Util.classTag(Object.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object[] expectMsgAllOf(FiniteDuration max, Object... msgs) {
|
public Object[] expectMsgAllOf(FiniteDuration max, Object... msgs) {
|
||||||
return (Object[]) p.expectMsgAllOf(max, Util.arrayToSeq(msgs)).toArray(
|
return (Object[]) p.expectMsgAllOf(max, Util.immutableSeq(msgs)).toArray(
|
||||||
Util.classTag(Object.class));
|
Util.classTag(Object.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T> T expectMsgAnyClassOf(Class<? extends T>... classes) {
|
public <T> T expectMsgAnyClassOf(Class<? extends T>... classes) {
|
||||||
final Object result = p.expectMsgAnyClassOf(Util.arrayToSeq(classes));
|
final Object result = p.expectMsgAnyClassOf(Util.immutableSeq(classes));
|
||||||
return (T) result;
|
return (T) result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object expectMsgAnyClassOf(FiniteDuration max, Class<?>... classes) {
|
public Object expectMsgAnyClassOf(FiniteDuration max, Class<?>... classes) {
|
||||||
return p.expectMsgAnyClassOf(max, Util.arrayToSeq(classes));
|
return p.expectMsgAnyClassOf(max, Util.immutableSeq(classes));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void expectNoMsg() {
|
public void expectNoMsg() {
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ package akka.testkit
|
||||||
import language.existentials
|
import language.existentials
|
||||||
|
|
||||||
import scala.util.matching.Regex
|
import scala.util.matching.Regex
|
||||||
import scala.collection.JavaConverters
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
@ -14,8 +13,9 @@ import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage }
|
||||||
import akka.dispatch.{ SystemMessage, Terminate }
|
import akka.dispatch.{ SystemMessage, Terminate }
|
||||||
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
|
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import java.lang.{ Iterable ⇒ JIterable }
|
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
|
import akka.japi.Util.immutableSeq
|
||||||
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation helpers of the EventFilter facilities: send `Mute`
|
* Implementation helpers of the EventFilter facilities: send `Mute`
|
||||||
|
|
@ -45,7 +45,7 @@ object TestEvent {
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq])
|
def this(filters: JIterable[EventFilter]) = this(immutableSeq(filters))
|
||||||
}
|
}
|
||||||
object UnMute {
|
object UnMute {
|
||||||
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq])
|
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq])
|
||||||
|
|
@ -54,7 +54,7 @@ object TestEvent {
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
def this(filters: JIterable[EventFilter]) = this(JavaConverters.iterableAsScalaIterableConverter(filters).asScala.to[immutable.Seq])
|
def this(filters: JIterable[EventFilter]) = this(immutableSeq(filters))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import akka.testkit.ErrorFilter;
|
||||||
import akka.testkit.TestEvent;
|
import akka.testkit.TestEvent;
|
||||||
import akka.util.Timeout;
|
import akka.util.Timeout;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import static akka.japi.Util.immutableSeq;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
@ -110,6 +110,6 @@ public class UntypedCoordinatedIncrementTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
public <A> Seq<A> seq(A... args) {
|
public <A> Seq<A> seq(A... args) {
|
||||||
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toList();
|
return immutableSeq(args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import akka.testkit.ErrorFilter;
|
||||||
import akka.testkit.TestEvent;
|
import akka.testkit.TestEvent;
|
||||||
import akka.util.Timeout;
|
import akka.util.Timeout;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import static akka.japi.Util.immutableSeq;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
@ -118,8 +118,6 @@ public class UntypedTransactorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
public <A> Seq<A> seq(A... args) {
|
public <A> Seq<A> seq(A... args) {
|
||||||
return JavaConverters
|
return immutableSeq(args);
|
||||||
.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala()
|
|
||||||
.toList();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue