Adding even more future proofing
This commit is contained in:
parent
c6d60e1089
commit
d092d17aed
4 changed files with 70 additions and 36 deletions
|
|
@ -305,11 +305,10 @@ private[akka] class LocalActorRef private[akka] (
|
|||
def rec(ref: InternalActorRef, name: Iterator[String]): InternalActorRef =
|
||||
ref match {
|
||||
case l: LocalActorRef ⇒
|
||||
val n = name.next()
|
||||
val next = n match {
|
||||
val next = name.next() match {
|
||||
case ".." ⇒ l.getParent
|
||||
case "" ⇒ l
|
||||
case _ ⇒ l.getSingleChild(n)
|
||||
case any ⇒ l.getSingleChild(any)
|
||||
}
|
||||
if (next == Nobody || name.isEmpty) next else rec(next, name)
|
||||
case _ ⇒
|
||||
|
|
@ -324,7 +323,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
protected[akka] def underlying: ActorCell = actorCell
|
||||
|
||||
override def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) }
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = underlying.dispatcher.systemDispatch(underlying, message)
|
||||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,10 @@ package akka.actor
|
|||
import java.util.regex.Pattern
|
||||
import akka.util.Helpers
|
||||
|
||||
/**
|
||||
* An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors,
|
||||
* allowing for broadcasting of messages to that section.
|
||||
*/
|
||||
abstract class ActorSelection {
|
||||
this: ScalaActorSelection ⇒
|
||||
|
||||
|
|
@ -12,11 +16,11 @@ abstract class ActorSelection {
|
|||
|
||||
protected def path: Array[AnyRef]
|
||||
|
||||
def tell(msg: Any) { target ! toMessage(msg, path) }
|
||||
def tell(msg: Any): Unit = target ! toMessage(msg, path)
|
||||
|
||||
def tell(msg: Any, sender: ActorRef) { target.tell(toMessage(msg, path), sender) }
|
||||
def tell(msg: Any, sender: ActorRef): Unit = target.tell(toMessage(msg, path), sender)
|
||||
|
||||
// this may want to be fast ...
|
||||
// FIXME make this so that "next" instead is the remaining path
|
||||
private def toMessage(msg: Any, path: Array[AnyRef]): Any = {
|
||||
var acc = msg
|
||||
var index = path.length - 1
|
||||
|
|
@ -32,7 +36,12 @@ abstract class ActorSelection {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors,
|
||||
* allowing for broadcasting of messages to that section.
|
||||
*/
|
||||
object ActorSelection {
|
||||
//This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection
|
||||
implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection]
|
||||
|
||||
/**
|
||||
|
|
@ -43,7 +52,7 @@ object ActorSelection {
|
|||
*/
|
||||
def apply(anchor: ActorRef, path: String): ActorSelection = {
|
||||
val elems = path.split("/+").dropWhile(_.isEmpty)
|
||||
val compiled: Array[AnyRef] = elems map (x ⇒ if (x.contains("?") || x.contains("*")) Helpers.makePattern(x) else x)
|
||||
val compiled: Array[AnyRef] = elems map (x ⇒ if (x.contains('?') || x.contains('*')) Helpers.makePattern(x) else x)
|
||||
new ActorSelection with ScalaActorSelection {
|
||||
def target = anchor
|
||||
def path = compiled
|
||||
|
|
@ -51,6 +60,10 @@ object ActorSelection {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Contains the Scala API (!-method) for ActorSelections) which provides automatic tracking of the sender,
|
||||
* as per the usual implicit ActorRef pattern.
|
||||
*/
|
||||
trait ScalaActorSelection {
|
||||
this: ActorSelection ⇒
|
||||
|
||||
|
|
|
|||
|
|
@ -10,32 +10,30 @@ import akka.dispatch._
|
|||
import akka.pattern.ask
|
||||
import org.jboss.netty.akka.util.HashedWheelTimer
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import scala.annotation.tailrec
|
||||
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
|
||||
import java.io.Closeable
|
||||
import akka.dispatch.Await.Awaitable
|
||||
import akka.dispatch.Await.CanAwait
|
||||
import akka.dispatch.Await.{ Awaitable, CanAwait }
|
||||
import akka.util._
|
||||
import collection.immutable.Stack
|
||||
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||
|
||||
object ActorSystem {
|
||||
|
||||
val Version = "2.1-SNAPSHOT"
|
||||
val Version: String = "2.1-SNAPSHOT"
|
||||
|
||||
val EnvHome = System.getenv("AKKA_HOME") match {
|
||||
val EnvHome: Option[String] = System.getenv("AKKA_HOME") match {
|
||||
case null | "" | "." ⇒ None
|
||||
case value ⇒ Some(value)
|
||||
}
|
||||
|
||||
val SystemHome = System.getProperty("akka.home") match {
|
||||
val SystemHome: Option[String] = System.getProperty("akka.home") match {
|
||||
case null | "" ⇒ None
|
||||
case value ⇒ Some(value)
|
||||
}
|
||||
|
||||
val GlobalHome = SystemHome orElse EnvHome
|
||||
val GlobalHome: Option[String] = SystemHome orElse EnvHome
|
||||
|
||||
/**
|
||||
* Creates a new ActorSystem with the name "default",
|
||||
|
|
@ -102,8 +100,16 @@ object ActorSystem {
|
|||
*/
|
||||
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start()
|
||||
|
||||
/**
|
||||
* Settings are the overall ActorSystem Settings which also provides a convenient access to the Config object.
|
||||
*
|
||||
* For more detailed information about the different possible configuration options, look in the Akka Documentation under "Configuration"
|
||||
*/
|
||||
class Settings(classLoader: ClassLoader, cfg: Config, final val name: String) {
|
||||
|
||||
/**
|
||||
* The backing Config of this ActorSystem's Settings
|
||||
*/
|
||||
final val config: Config = {
|
||||
val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader))
|
||||
config.checkValid(ConfigFactory.defaultReference(classLoader), "akka")
|
||||
|
|
@ -114,11 +120,9 @@ object ActorSystem {
|
|||
import config._
|
||||
|
||||
final val ConfigVersion = getString("akka.version")
|
||||
|
||||
final val ProviderClass = getString("akka.actor.provider")
|
||||
|
||||
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
|
||||
final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
|
||||
|
||||
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
|
||||
final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators")
|
||||
|
||||
|
|
@ -148,11 +152,14 @@ object ActorSystem {
|
|||
if (ConfigVersion != Version)
|
||||
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
|
||||
|
||||
/**
|
||||
* Returns the String representation of the Config that this Settings is backed by
|
||||
*/
|
||||
override def toString: String = config.root.render
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL
|
||||
* INTERNAL USE ONLY
|
||||
*/
|
||||
private[akka] def findClassLoader(): ClassLoader = {
|
||||
def findCaller(get: Int ⇒ Class[_]): ClassLoader =
|
||||
|
|
@ -422,6 +429,8 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
def dynamicAccess: DynamicAccess
|
||||
}
|
||||
|
||||
//FIXME This should most probably not be protected[akka] right? - √
|
||||
//FIXME We also need to decide whether this should be supported API or not - √
|
||||
class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem {
|
||||
|
||||
if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$"""))
|
||||
|
|
@ -475,7 +484,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
|
|||
|
||||
def logConfiguration(): Unit = log.info(settings.toString)
|
||||
|
||||
protected def systemImpl = this
|
||||
protected def systemImpl: ActorSystemImpl = this
|
||||
|
||||
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
|
||||
implicit val timeout = settings.CreationTimeout
|
||||
|
|
@ -539,6 +548,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
|
|||
|
||||
def deadLetters: ActorRef = provider.deadLetters
|
||||
|
||||
//FIXME Why do we need this at all?
|
||||
val deadLetterQueue: MessageQueue = new MessageQueue {
|
||||
def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
|
||||
def dequeue() = null
|
||||
|
|
@ -546,7 +556,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
|
|||
def numberOfMessages = 0
|
||||
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
|
||||
}
|
||||
|
||||
//FIXME Why do we need this at all?
|
||||
val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) {
|
||||
becomeClosed()
|
||||
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver)
|
||||
|
|
|
|||
|
|
@ -21,32 +21,43 @@ final case class Address private (protocol: String, system: String, host: Option
|
|||
def this(protocol: String, system: String) = this(protocol, system, None, None)
|
||||
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
|
||||
|
||||
/**
|
||||
* Returns the canonical String representation of this Address formatted as:
|
||||
*
|
||||
* <protocol>://<system>@<host>:<port>
|
||||
*/
|
||||
@transient
|
||||
override lazy val toString: String = {
|
||||
val sb = new StringBuilder(protocol)
|
||||
sb.append("://")
|
||||
sb.append(system)
|
||||
if (host.isDefined) {
|
||||
sb.append('@')
|
||||
sb.append(host.get)
|
||||
}
|
||||
if (port.isDefined) {
|
||||
sb.append(':')
|
||||
sb.append(port.get)
|
||||
}
|
||||
val sb = (new StringBuilder(protocol)).append("://").append(system)
|
||||
|
||||
if (host.isDefined) sb.append('@').append(host.get)
|
||||
if (port.isDefined) sb.append(':').append(port.get)
|
||||
|
||||
sb.toString
|
||||
}
|
||||
|
||||
def hostPort: String = toString.substring(protocol.length() + 3)
|
||||
/**
|
||||
* Returns a String representation formatted as:
|
||||
*
|
||||
* <system>@<host>:<port>
|
||||
*/
|
||||
def hostPort: String = toString.substring(protocol.length + 3)
|
||||
}
|
||||
|
||||
object Address {
|
||||
/**
|
||||
* Constructs a new Address with the specified protocol and system name
|
||||
*/
|
||||
def apply(protocol: String, system: String) = new Address(protocol, system)
|
||||
|
||||
/**
|
||||
* Constructs a new Address with the specified protocol, system name, host and port
|
||||
*/
|
||||
def apply(protocol: String, system: String, host: String, port: Int) = new Address(protocol, system, Some(host), Some(port))
|
||||
}
|
||||
|
||||
private[akka] trait PathUtils {
|
||||
def split(s: String): List[String] = {
|
||||
protected def split(s: String): List[String] = {
|
||||
@tailrec
|
||||
def rec(pos: Int, acc: List[String]): List[String] = {
|
||||
val from = s.lastIndexOf('/', pos - 1)
|
||||
|
|
@ -94,7 +105,7 @@ object AddressFromURIString {
|
|||
*/
|
||||
def apply(addr: String): Address = addr match {
|
||||
case AddressFromURIString(address) ⇒ address
|
||||
case _ ⇒ throw new MalformedURLException
|
||||
case _ ⇒ throw new MalformedURLException(addr)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -103,6 +114,7 @@ object AddressFromURIString {
|
|||
def parse(addr: String): Address = apply(addr)
|
||||
}
|
||||
|
||||
//FIXME is this public API? - √
|
||||
object ActorPathExtractor extends PathUtils {
|
||||
def unapply(addr: String): Option[(Address, Iterable[String])] =
|
||||
try {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue