Merge branch 'wip-1190-remote-DeathWatch-∂π'

This commit is contained in:
Roland 2011-12-30 00:22:33 +01:00
commit fa2440aeca
26 changed files with 296 additions and 1599 deletions

View file

@ -43,6 +43,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match {
case RelativeActorPath(elems) system.actorFor("/").path / elems
})
"An ActorSystem" must {
"find actors by looking up their path" in {
@ -101,14 +105,18 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
system.actorFor("system/") must be === syst
}
"return deadLetters for non-existing paths" in {
system.actorFor("a/b/c") must be === system.deadLetters
system.actorFor("") must be === system.deadLetters
system.actorFor("akka://all-systems/Nobody") must be === system.deadLetters
system.actorFor("akka://all-systems/user") must be === system.deadLetters
system.actorFor(system / "hallo") must be === system.deadLetters
system.actorFor(Seq()) must be === system.deadLetters
system.actorFor(Seq("a")) must be === system.deadLetters
"return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in {
def check(lookup: ActorRef, result: ActorRef) = {
lookup.getClass must be === result.getClass
lookup must be === result
}
check(system.actorFor("a/b/c"), empty("a/b/c"))
check(system.actorFor(""), system.deadLetters)
check(system.actorFor("akka://all-systems/Nobody"), system.deadLetters)
check(system.actorFor("akka://all-systems/user"), system.deadLetters)
check(system.actorFor(system / "hallo"), empty("user/hallo"))
check(system.actorFor(Seq()), system.deadLetters)
check(system.actorFor(Seq("a")), empty("a"))
}
"find temporary actors" in {
@ -119,13 +127,14 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
system.actorFor(a.path.toString) must be === a
system.actorFor(a.path.elements) must be === a
system.actorFor(a.path.toString + "/") must be === a
system.actorFor(a.path.toString + "/hallo") must be === system.deadLetters
system.actorFor(a.path.toString + "/hallo").isTerminated must be === true
f.isCompleted must be === false
a.isTerminated must be === false
a ! 42
f.isCompleted must be === true
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second)
awaitCond(system.actorFor(a.path).isTerminated, 1 second)
}
}
@ -195,21 +204,27 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
for (target Seq(root, syst, user, system.deadLetters)) check(target)
}
"return deadLetters for non-existing paths" in {
"return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in {
import scala.collection.JavaConverters._
def checkOne(looker: ActorRef, query: Query) {
Await.result(looker ? query, timeout.duration) must be === system.deadLetters
def checkOne(looker: ActorRef, query: Query, result: ActorRef) {
val lookup = Await.result(looker ? query, timeout.duration)
lookup.getClass must be === result.getClass
lookup must be === result
}
def check(looker: ActorRef) {
Seq(LookupString("a/b/c"),
LookupString(""),
LookupString("akka://all-systems/Nobody"),
LookupPath(system / "hallo"),
LookupPath(looker.path child "hallo"), // test Java API
LookupPath(looker.path descendant Seq("a", "b").asJava), // test Java API
LookupElems(Seq()),
LookupElems(Seq("a"))) foreach (checkOne(looker, _))
val lookname = looker.path.elements.mkString("", "/", "/")
for (
(l, r) Seq(
LookupString("a/b/c") -> empty(lookname + "a/b/c"),
LookupString("") -> system.deadLetters,
LookupString("akka://all-systems/Nobody") -> system.deadLetters,
LookupPath(system / "hallo") -> empty("user/hallo"),
LookupPath(looker.path child "hallo") -> empty(lookname + "hallo"), // test Java API
LookupPath(looker.path descendant Seq("a", "b").asJava) -> empty(lookname + "a/b"), // test Java API
LookupElems(Seq()) -> system.deadLetters,
LookupElems(Seq("a")) -> empty(lookname + "a"))
) checkOne(looker, l, r)
}
for (looker all) check(looker)
}
@ -228,11 +243,12 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a
f.isCompleted must be === false
a.isTerminated must be === false
a ! 42
f.isCompleted must be === true
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second)
awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration).asInstanceOf[ActorRef].isTerminated, 1 second)
}
}

View file

@ -281,7 +281,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"
}
"must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
"must return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
import java.io._
val baos = new ByteArrayOutputStream(8192 * 32)
@ -297,7 +297,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === system.deadLetters
in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing")
}
}

View file

@ -4,18 +4,28 @@
package akka.actor
import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
def startWatching(target: ActorRef) = system.actorOf(Props(new Actor {
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec
object DeathWatchSpec {
def props(target: ActorRef, testActor: ActorRef) = Props(new Actor {
context.watch(target)
def receive = { case x testActor forward x }
}))
})
}
trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
import DeathWatchSpec._
lazy val supervisor = system.actorOf(Props[Supervisor], "watchers")
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
"The Death Watch" must {
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {

View file

@ -54,5 +54,14 @@ class LocalActorRefProviderSpec extends AkkaSpec {
}
}
"throw suitable exceptions for malformed actor names" in {
intercept[InvalidActorNameException](system.actorOf(Props.empty, null)).getMessage.contains("null") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "")).getMessage.contains("empty") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "$hallo")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "a%")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "a?")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "üß")).getMessage.contains("conform") must be(true)
}
}
}

View file

@ -10,6 +10,9 @@ import akka.testkit._
import akka.util.duration._
import akka.dispatch.Await
import akka.util.Duration
import akka.config.ConfigurationException
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory
object RoutingSpec {
@ -371,6 +374,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
}), "Actor:" + id)
}
"router FromConfig" must {
"throw suitable exception when not configured" in {
intercept[ConfigurationException] {
system.actorOf(Props.empty.withRouter(FromConfig))
}.getMessage.contains("application.conf") must be(true)
}
"allow external configuration" in {
val sys = ActorSystem("FromConfig", ConfigFactory
.parseString("akka.actor.deployment./routed.router=round-robin")
.withFallback(system.settings.config))
try {
sys.actorOf(Props.empty.withRouter(FromConfig), "routed")
} finally {
sys.shutdown()
}
}
}
"custom router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter))

View file

@ -233,8 +233,13 @@ private[akka] class ActorCell(
def actorOf(props: Props): ActorRef = _actorOf(props, randomName())
def actorOf(props: Props, name: String): ActorRef = {
if (name == null || name == "" || name.charAt(0) == '$')
throw new InvalidActorNameException("actor name must not be null, empty or start with $")
import ActorPath.ElementRegex
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() // this is fine
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
if (childrenRefs contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
_actorOf(props, name)

View file

@ -15,6 +15,8 @@ object ActorPath {
}
rec(s.length, Nil)
}
val ElementRegex = """[-\w:@&=+,.!~*'_;][-\w:@&=+,.!~*'$_;]*""".r
}
/**
@ -57,7 +59,7 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
/**
* Recursively create a descendants path by appending all child names.
*/
def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _)
def /(child: Iterable[String]): ActorPath = (this /: child)((path, elem) if (elem.isEmpty) path else path / elem)
/**
* ''Java API'': Recursively create a descendants path by appending all child names.

View file

@ -208,13 +208,26 @@ trait ScalaActorRef { ref: ActorRef ⇒
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
/**
* All ActorRefs have a scope which describes where they live. Since it is
* often necessary to distinguish between local and non-local references, this
* is the only method provided on the scope.
*/
trait ActorRefScope {
def isLocal: Boolean
}
trait LocalRef extends ActorRefScope {
final def isLocal = true
}
/**
* Internal trait for assembling all the functionality needed internally on
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
*
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
*/
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef {
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope
def resume(): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
@ -230,6 +243,11 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
* exist, return Nobody.
*/
def getChild(name: Iterator[String]): InternalActorRef
/**
* Scope: if this ref points to an actor which resides within the same JVM,
* i.e. whose mailbox is directly reachable etc.
*/
def isLocal: Boolean
}
private[akka] case object Nobody extends MinimalActorRef {
@ -247,7 +265,7 @@ private[akka] class LocalActorRef private[akka] (
val systemService: Boolean = false,
_receiveTimeout: Option[Duration] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends InternalActorRef {
extends InternalActorRef with LocalRef {
/*
* actorCell.start() publishes actorCell & this to the dispatcher, which
@ -359,7 +377,7 @@ private[akka] class LocalActorRef private[akka] (
def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
/**
@ -382,7 +400,7 @@ case class SerializedActorRef(path: String) {
/**
* Trait for ActorRef implementations where all methods contain default stubs.
*/
trait MinimalActorRef extends InternalActorRef {
trait MinimalActorRef extends InternalActorRef with LocalRef {
def getParent: InternalActorRef = Nobody
def getChild(names: Iterator[String]): InternalActorRef = {
@ -405,6 +423,9 @@ trait MinimalActorRef extends InternalActorRef {
def sendSystemMessage(message: SystemMessage): Unit = ()
def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
object MinimalActorRef {
@ -436,8 +457,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
_path
}
private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) {
_path = rootPath / "deadLetters"
private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) {
_path = path
brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher)
}
@ -456,7 +477,20 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
/**
* This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful.
*/
class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
extends DeadLetterActorRef(_eventStream) {
init(_dispatcher, _path)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops
case _ eventStream.publish(DeadLetter(message, sender, this))
}
}
class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef {
@ -530,7 +564,4 @@ class AskActorRef(
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}

View file

@ -451,21 +451,32 @@ class LocalActorRefProvider(
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case RelativeActorPath(elems)
if (elems.isEmpty) deadLetters
else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail)
if (elems.isEmpty) {
log.debug("look-up of empty path string '{}' fails (per definition)", path)
deadLetters
} else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail)
else actorFor(ref, elems)
case LocalActorPath(address, elems) if address == rootPath.address actorFor(rootGuardian, elems)
case _ deadLetters
case _
log.debug("look-up of unknown path '{}' failed", path)
deadLetters
}
def actorFor(path: ActorPath): InternalActorRef =
if (path.root == rootPath) actorFor(rootGuardian, path.elements)
else deadLetters
else {
log.debug("look-up of foreign ActorPath '{}' failed", path)
deadLetters
}
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
if (path.isEmpty) deadLetters
else ref.getChild(path.iterator) match {
case Nobody deadLetters
if (path.isEmpty) {
log.debug("look-up of empty path sequence fails (per definition)")
deadLetters
} else ref.getChild(path.iterator) match {
case Nobody
log.debug("look-up of path sequence '{}' failed", path)
new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path)
case x x
}

View file

@ -381,7 +381,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
deadLetters.init(dispatcher, provider.rootPath)
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters")
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
registerOnTermination(stopScheduler())
_locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch)

View file

@ -18,7 +18,7 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de
val soul = iter.next()
deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere
soul.getKey match {
case _: LocalActorRef // nothing to do, they know what they signed up for
case _: LocalRef // nothing to do, they know what they signed up for
case nonlocal nonlocal.stop() // try again in case it was due to a communications failure
}
}

View file

@ -6,7 +6,6 @@ package akka.dispatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.ConcurrentHashMap
import akka.actor.LocalActorRef
import akka.actor.newUuid
import akka.util.{ Duration, ReflectiveAccess }
import akka.actor.ActorSystem

View file

@ -7,6 +7,7 @@ import akka.actor._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import akka.util.{ Duration, Timeout }
import akka.config.ConfigurationException
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -170,6 +171,22 @@ case object NoRouter extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null
}
/**
* Router configuration which has no default, i.e. external configuration is required.
*/
case object FromConfig extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route =
throw new ConfigurationException("router " + ref + " needs external configuration from file (e.g. application.conf)")
}
/**
* Java API: Router configuration which has no default, i.e. external configuration is required.
*/
case class FromConfig() extends RouterConfig {
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route =
throw new ConfigurationException("router " + ref + " needs external configuration from file (e.g. application.conf)")
}
object RoundRobinRouter {
def apply(routees: Iterable[ActorRef]) = new RoundRobinRouter(routees = routees map (_.path.toString))
}

View file

@ -26,7 +26,7 @@ object Helpers {
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
}
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+%"
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~"
@tailrec
def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = {

View file

@ -6,7 +6,6 @@ package akka.actor.mailbox
import com.surftools.BeanstalkClient._
import com.surftools.BeanstalkClientImpl._
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import akka.actor.ActorContext

View file

@ -10,8 +10,8 @@ import org.bson.io.OutputBuffer
import org.bson.types.ObjectId
import java.io.InputStream
import org.bson.collection._
import akka.actor.{ ActorRef, ActorSystem }
import akka.dispatch.Envelope
import akka.actor.{ ActorSystem, LocalActorRef, ActorRef }
/**
* A container message for durable mailbox messages, which can be easily stuffed into

View file

@ -4,7 +4,6 @@
package akka.actor.mailbox
import com.redis._
import akka.actor.LocalActorRef
import akka.AkkaException
import akka.actor.ActorContext
import akka.dispatch.Envelope

View file

@ -4,7 +4,6 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import org.I0Itec.zkclient.serialize._

View file

@ -1,6 +1,6 @@
package akka.actor.mailbox
import akka.actor.{ Actor, LocalActorRef }
import akka.actor.Actor
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher

File diff suppressed because it is too large Load diff

View file

@ -76,14 +76,6 @@ message MessageProtocol {
optional bytes messageManifest = 2;
}
/**
* Defines a UUID.
*/
message UuidProtocol {
required uint64 high = 1;
required uint64 low = 2;
}
/**
* Defines a meta data entry.
*/
@ -109,37 +101,6 @@ message ExceptionProtocol {
required string message = 2;
}
/**
* Defines the remote system daemon message.
*/
message RemoteSystemDaemonMessageProtocol {
required RemoteSystemDaemonMessageType messageType = 1;
optional string actorPath = 2;
optional bytes payload = 3;
optional UuidProtocol replicateActorFromUuid = 4;
optional string supervisor = 5;
}
/**
* Defines the remote system daemon message type.
*/
enum RemoteSystemDaemonMessageType {
STOP = 1;
USE = 2;
RELEASE = 3;
MAKE_AVAILABLE = 4;
MAKE_UNAVAILABLE = 5;
DISCONNECT = 6;
RECONNECT = 7;
RESIGN = 8;
GOSSIP = 9;
FAIL_OVER_CONNECTIONS = 20;
FUNCTION_FUN0_UNIT = 21;
FUNCTION_FUN0_ANY = 22;
FUNCTION_FUN1_ARG_UNIT = 23;
FUNCTION_FUN1_ARG_ANY = 24;
}
/**
* Defines the durable mailbox message.
*/

View file

@ -8,8 +8,6 @@ import akka.actor._
import akka.actor.Status._
import akka.event.Logging
import akka.util.Duration
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import akka.config.ConfigurationException
import akka.serialization.SerializationExtension
@ -253,7 +251,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) {
try {
val t = remoteSettings.RemoteSystemDaemonAckTimeout
Await.result(connection ? (toRemoteMessage(newGossip), t), t) match {
Await.result(connection ? (newGossip, t), t) match {
case Success(receiver) log.debug("Gossip sent to [{}] was successfully received", receiver)
case Failure(cause) log.error(cause, cause.toString)
}
@ -308,19 +306,6 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) {
from copy (version = newVersion)
}
private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = {
val gossipAsBytes = serialization.serialize(gossip) match {
case Left(error) throw error
case Right(bytes) bytes
}
RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(GOSSIP)
.setActorPath(remote.remoteDaemon.path.toString)
.setPayload(ByteString.copyFrom(gossipAsBytes))
.build()
}
private def latestVersionOf(newGossip: Gossip, oldGossip: Gossip): Gossip = {
(newGossip.version compare oldGossip.version) match {
case VectorClock.After newGossip // gossiped version is newer, use new version

View file

@ -11,8 +11,6 @@ import akka.util._
import akka.util.duration._
import akka.util.Helpers._
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension }
@ -21,6 +19,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.dispatch.SystemMessage
import scala.annotation.tailrec
import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol }
/**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
@ -115,6 +114,10 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
}
}
sealed trait DaemonMsg
case class DaemonMsgCreate(factory: () Actor, path: String, supervisor: ActorRef) extends DaemonMsg
case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg
/**
* Internal system "daemon" actor for remote internal communication.
*
@ -149,140 +152,39 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
}
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case message: RemoteSystemDaemonMessageProtocol
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.remoteSettings.NodeName)
message.getMessageType match {
case USE handleUse(message)
case RELEASE handleRelease(message)
// case STOP cluster.shutdown()
// case DISCONNECT cluster.disconnect()
// case RECONNECT cluster.reconnect()
// case RESIGN cluster.resign()
// case FAIL_OVER_CONNECTIONS handleFailover(message)
case GOSSIP handleGossip(message)
// case FUNCTION_FUN0_UNIT handle_fun0_unit(message)
// case FUNCTION_FUN0_ANY handle_fun0_any(message, sender)
// case FUNCTION_FUN1_ARG_UNIT handle_fun1_arg_unit(message)
// case FUNCTION_FUN1_ARG_ANY handle_fun1_arg_any(message, sender)
case unknown log.warning("Unknown message type {} received by {}", unknown, this)
}
case Terminated(child) removeChild(child.path.elements.drop(1).mkString("/"))
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}
def handleUse(message: RemoteSystemDaemonMessageProtocol) {
if (!message.hasActorPath || !message.hasSupervisor) log.error("Ignoring incomplete USE command [{}]", message)
else {
val actorFactoryBytes =
if (remote.remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray)
else message.getPayload.toByteArray
val actorFactory =
remote.serialization.deserialize(actorFactoryBytes, classOf[() Actor], None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[() Actor]
}
case message: DaemonMsg
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName)
message match {
case DaemonMsgCreate(factory, path, supervisor)
import remote.remoteAddress
implicit val t = remote.transports
message.getActorPath match {
path match {
case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote"
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val path = remote.remoteDaemon.path / subpath
val supervisor = system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true, None)
val actor = system.provider.actorOf(system,
Props(creator = factory),
supervisor.asInstanceOf[InternalActorRef],
path, true, None)
addChild(subpath.mkString("/"), actor)
system.deathWatch.subscribe(this, actor)
case _
log.error("remote path does not match path from message [{}]", message)
}
}
case DaemonMsgWatch(watcher, watched)
val other = system.actorFor(watcher.path.root / "remote")
system.deathWatch.subscribe(other, watched)
}
// FIXME implement handleRelease
def handleRelease(message: RemoteSystemDaemonMessageProtocol) {
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case t: Terminated system.deathWatch.publish(t)
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}
def handleGossip(message: RemoteSystemDaemonMessageProtocol) {
// try {
// val gossip = serialization.deserialize(message.getPayload.toByteArray, classOf[Gossip], None) match {
// case Left(error) throw error
// case Right(instance) instance.asInstanceOf[Gossip]
// }
// gossiper tell gossip
// sender ! Success(address.toString)
// } catch {
// case error: Throwable
// sender ! Failure(error)
// throw error
// }
}
/*
* generate name for temporary actor refs
*/
// private val tempNumber = new AtomicLong
// def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
// def tempPath = remote.remoteDaemon.path / tempName
//
// // FIXME: handle real remote supervision, ticket #1408
// def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
// new LocalActorRef(remote.system,
// Props(
// context {
// case f: Function0[_] try { f() } finally { context.self.stop() }
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
// }
//
// // FIXME: handle real remote supervision, ticket #1408
// def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
// implicit val s = sender
// new LocalActorRef(remote.system,
// Props(
// context {
// case f: Function0[_] try { context.sender ! f() } finally { context.self.stop() }
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Any]])
// }
//
// // FIXME: handle real remote supervision, ticket #1408
// def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
// new LocalActorRef(remote.system,
// Props(
// context {
// case (fun: Function[_, _], param: Any) try { fun.asInstanceOf[Any Unit].apply(param) } finally { context.self.stop() }
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
// }
//
// // FIXME: handle real remote supervision, ticket #1408
// def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
// implicit val s = sender
// new LocalActorRef(remote.system,
// Props(
// context {
// case (fun: Function[_, _], param: Any) try { context.sender ! fun.asInstanceOf[Any Any](param) } finally { context.self.stop() }
// }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
// }
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
// val (from, to) = payloadFor(message, classOf[(InetSocketremoteDaemonServiceName, InetSocketremoteDaemonServiceName)])
// cluster.failOverClusterActorRefConnections(from, to)
}
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
remote.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[T]
}
}
}
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader] = None) {
@ -350,14 +252,13 @@ trait RemoteMarshallingOps {
remoteMessage.recipient match {
case `remoteDaemon`
remoteMessage.payload match {
case m: RemoteSystemDaemonMessageProtocol
implicit val timeout = system.settings.ActorTimeout
case m @ (_: DaemonMsg | _: Terminated)
try remoteDaemon ! m catch {
case e: Exception log.error(e, "exception while processing remote command {} from {}", m.getMessageType(), remoteMessage.sender)
case e: Exception log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender)
}
case x log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
}
case l @ (_: LocalActorRef | _: MinimalActorRef)
case l: LocalRef
remoteMessage.payload match {
case msg: SystemMessage
if (useUntrustedMode)

View file

@ -11,8 +11,6 @@ import akka.util.Timeout
import akka.config.ConfigurationException
import akka.event.{ DeathWatch, Logging }
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import com.google.protobuf.ByteString
import akka.event.EventStream
import akka.dispatch.Promise
@ -33,7 +31,6 @@ class RemoteActorRefProvider(
val remoteSettings = new RemoteSettings(settings.config, systemName)
def deathWatch = local.deathWatch
def rootGuardian = local.rootGuardian
def guardian = local.guardian
def systemGuardian = local.systemGuardian
@ -51,6 +48,8 @@ class RemoteActorRefProvider(
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)
val deathWatch = new RemoteDeathWatch(local.deathWatch, this)
def init(system: ActorSystemImpl) {
local.init(system)
remote.init(system, this)
@ -147,24 +146,15 @@ class RemoteActorRefProvider(
def useActorOnNode(path: ActorPath, actorFactory: () Actor, supervisor: ActorRef) {
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
val actorFactoryBytes =
remote.serialization.serialize(actorFactory) match {
case Left(error) throw error
case Right(bytes) if (remoteSettings.ShouldCompressData) LZF.compress(bytes) else bytes
}
val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorPath(path.toString)
.setPayload(ByteString.copyFrom(actorFactoryBytes))
.setSupervisor(supervisor.path.toString)
.build()
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
actorFor(RootActorPath(path.address) / "remote") ! command
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor)
}
}
trait RemoteRef extends ActorRefScope {
final def isLocal = false
}
/**
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
* This reference is network-aware (remembers its origin) and immutable.
@ -175,7 +165,7 @@ private[akka] class RemoteActorRef private[akka] (
val path: ActorPath,
val getParent: InternalActorRef,
loader: Option[ClassLoader])
extends InternalActorRef {
extends InternalActorRef with RemoteRef {
def getChild(name: Iterator[String]): InternalActorRef = {
val s = name.toStream
@ -217,3 +207,25 @@ private[akka] class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
ret
case l: LocalRef
local.subscribe(watcher, watched)
case _
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)
false
}
def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
def publish(event: Terminated): Unit = local.publish(event)
}

View file

@ -60,6 +60,7 @@ abstract class RemoteClient private[akka] (
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
log.debug("Sending message: {}", message)
send(remoteSupport.createRemoteMessageProtocolBuilder(recipient, message, senderOption).build)
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
@ -70,9 +71,7 @@ abstract class RemoteClient private[akka] (
/**
* Sends the message across the wire
*/
def send(request: RemoteMessageProtocol): Unit = {
log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport.system))
private def send(request: RemoteMessageProtocol): Unit = {
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.testkit._
import akka.actor.{ ActorSystem, DeathWatchSpec }
import com.typesafe.config.ConfigFactory
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/watchers.remote = "akka://other@127.0.0.1:2666"
}
}
cluster.nodename = buh
remote.server {
hostname = "127.0.0.1"
port = 2665
}
}
""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.server.port=2666").withFallback(system.settings.config))
override def atTermination() {
other.shutdown()
}
}