Removing ActorCell.ref (use ActorCell.self instead), introducing Props.randomAddress which will use the toString of the uuid of the actor ref as address, bypassing deployer for actors with 'randomAddress' since it isn't possible to know what the address will be anyway, removing Address.validate since it serves no useful purpose, removing guard.withGuard in MessageDispatcher in favor of the less costly lock try-finally unlock strategy

This commit is contained in:
Viktor Klang 2011-10-18 19:14:42 +02:00
parent 474787a81d
commit 01efcd7b50
19 changed files with 69 additions and 59 deletions

View file

@ -108,27 +108,27 @@ object ActorModelSpec {
}
abstract override def suspend(actor: ActorCell) {
getStats(actor.ref).suspensions.incrementAndGet()
getStats(actor.self).suspensions.incrementAndGet()
super.suspend(actor)
}
abstract override def resume(actor: ActorCell) {
super.resume(actor)
getStats(actor.ref).resumes.incrementAndGet()
getStats(actor.self).resumes.incrementAndGet()
}
protected[akka] abstract override def register(actor: ActorCell) {
getStats(actor.ref).registers.incrementAndGet()
getStats(actor.self).registers.incrementAndGet()
super.register(actor)
}
protected[akka] abstract override def unregister(actor: ActorCell) {
getStats(actor.ref).unregisters.incrementAndGet()
getStats(actor.self).unregisters.incrementAndGet()
super.unregister(actor)
}
protected[akka] abstract override def dispatch(invocation: Envelope) {
val stats = getStats(invocation.receiver.ref)
val stats = getStats(invocation.receiver.self)
stats.msgsReceived.incrementAndGet()
super.dispatch(invocation)
}

View file

@ -227,8 +227,6 @@ private[akka] class ActorCell(
var actor: Actor = _ //FIXME We can most probably make this just a regular reference to Actor
def ref: ActorRef with ScalaActorRef = self
def uuid: Uuid = self.uuid
@inline
@ -245,7 +243,7 @@ private[akka] class ActorCell(
if (props.supervisor.isDefined) {
props.supervisor.get match {
case l: LocalActorRef
l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self))
l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self)) //FIXME TODO Support all ActorRefs?
case other throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can")
}
}

View file

@ -148,21 +148,21 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
class LocalActorRef private[akka] (
app: AkkaApplication,
props: Props,
givenAddress: String,
givenAddress: String, //Never refer to this internally instead use "address"
val systemService: Boolean = false,
private[akka] val uuid: Uuid = newUuid,
receiveTimeout: Option[Long] = None,
hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends ActorRef with ScalaActorRef {
private[this] val actorCell = new ActorCell(app, this, props, receiveTimeout, hotswap)
actorCell.start()
final def address: String = givenAddress match {
case null | "" uuid.toString
final val address: String = givenAddress match {
case null | Props.randomAddress uuid.toString
case other other
}
private[this] val actorCell = new ActorCell(app, this, props, receiveTimeout, hotswap)
actorCell.start()
/**
* Is the actor shut down?
* If this method returns true, it will never return false again, but if it returns false, you cannot be sure if it's alive still (race condition)

View file

@ -43,7 +43,7 @@ trait ActorRefFactory {
def dispatcher: MessageDispatcher
def actorOf(props: Props): ActorRef = actorOf(props, new UUID().toString)
def actorOf(props: Props): ActorRef = actorOf(props, Props.randomAddress)
/*
* TODO this will have to go at some point, because creating two actors with
@ -63,7 +63,7 @@ trait ActorRefFactory {
def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() creator.create()))
def actorOf(props: RoutedProps): ActorRef = actorOf(props, new UUID().toString)
def actorOf(props: RoutedProps): ActorRef = actorOf(props, Props.randomAddress)
def actorOf(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, address)
@ -94,15 +94,19 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
private[akka] def evict(address: String): Boolean = actors.remove(address) ne null
private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = {
if (systemService) new LocalActorRef(app, props, address, systemService = true)
else {
if ((address eq null) || address == Props.randomAddress) {
val actor = new LocalActorRef(app, props, address, systemService = true)
actors.putIfAbsent(actor.address, actor) match {
case null actor
case other throw new IllegalStateException("Same uuid generated twice for: " + actor + " and " + other)
}
} else {
val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(address, newFuture) match {
case null
val actor: ActorRef = try {
app.deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
(if (systemService) None else app.deployer.lookupDeploymentFor(address)) match { // see if the deployment already exists, if so use it, if not create actor
// create a local actor
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope))

View file

@ -19,6 +19,10 @@ trait ActorDeployer {
private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only?
private[akka] def deploy(deployment: Deploy): Unit
private[akka] def lookupDeploymentFor(address: String): Option[Deploy]
def lookupDeployment(address: String): Option[Deploy] = address match {
case null | Props.`randomAddress` None
case some lookupDeploymentFor(some)
}
private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
}
@ -256,9 +260,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
// akka.actor.deployment.<address>.cluster
// --------------------------------
addressConfig.getSection("cluster") match {
case None
Some(Deploy(address, recipe, router, nrOfInstances, NoOpFailureDetector, LocalScope)) // deploy locally
case None None
case Some(clusterConfig)
// --------------------------------

View file

@ -20,9 +20,7 @@ object DeploymentConfig {
routing: Routing = Direct,
nrOfInstances: NrOfInstances = ZeroNrOfInstances,
failureDetector: FailureDetector = NoOpFailureDetector,
scope: Scope = LocalScope) {
Address.validate(address)
}
scope: Scope = LocalScope)
// --------------------------------
// --- Actor Recipe

View file

@ -22,6 +22,7 @@ object Props {
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None)
final val defaultSupervisor: Option[ActorRef] = None
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
final val randomAddress: String = ""
/**
* The default Props instance, uses the settings from the Props object starting with default*

View file

@ -113,7 +113,8 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
* Detaches the specified actor instance from this dispatcher
*/
final def detach(actor: ActorCell) {
guard withGuard {
guard.lock.lock()
try {
unregister(actor)
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
@ -125,11 +126,15 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
case RESCHEDULED //Already marked for reschedule
}
}
}
} finally { guard.lock.unlock() }
}
protected final def startIfUnstarted() {
if (active.isOff) guard withGuard { active.switchOn { start() } }
if (active.isOff) {
guard.lock.lock()
try { active.switchOn { start() } }
finally { guard.lock.unlock() }
}
}
protected[akka] final def dispatchTask(block: () Unit) {
@ -146,7 +151,8 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
private val taskCleanup: () Unit =
() if (_tasks.decrementAndGet() == 0) {
guard withGuard {
guard.lock.lock()
try {
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
@ -157,7 +163,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
case RESCHEDULED //Already marked for reschedule
}
}
}
} finally { guard.lock.unlock() }
}
/**
@ -206,7 +212,8 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
private val shutdownAction = new Runnable {
def run() {
guard withGuard {
guard.lock.lock()
try {
shutdownSchedule match {
case RESCHEDULED
shutdownSchedule = SCHEDULED
@ -220,7 +227,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
shutdownSchedule = UNSCHEDULED
case UNSCHEDULED //Do nothing
}
}
} finally { guard.lock.unlock() }
}
}

View file

@ -204,7 +204,7 @@ class EventHandler(app: AkkaApplication) extends ListenerManagement {
defaultListeners foreach { listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case Right(actorClass) addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), newUuid.toString, systemService = true))
case Right(actorClass) addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), Props.randomAddress, systemService = true))
case Left(exception) throw exception
}
} catch {

View file

@ -32,8 +32,8 @@ private[camel] object TypedCamel {
* and re-uses the <code>activationTracker</code> of <code>service</code>.
*/
def onCamelServiceStart(service: CamelService) {
consumerPublisher = new LocalActorRef(Props(new TypedConsumerPublisher(service.activationTracker)), newUuid.toString, true)
publishRequestor = new LocalActorRef(Props(new TypedConsumerPublishRequestor), newUuid.toString, true)
consumerPublisher = new LocalActorRef(Props(new TypedConsumerPublisher(service.activationTracker)), Props.randomAddress, true)
publishRequestor = new LocalActorRef(Props(new TypedConsumerPublishRequestor), Props.randomAddress, true)
registerPublishRequestor

View file

@ -26,9 +26,9 @@ import TypedCamelAccess._
* @author Martin Krasser
*/
trait CamelService extends Bootable {
private[camel] val activationTracker = new LocalActorRef(Props[ActivationTracker], newUuid.toString, true)
private[camel] val consumerPublisher = new LocalActorRef(Props(new ConsumerPublisher(activationTracker)), newUuid.toString, true)
private[camel] val publishRequestor = new LocalActorRef(Props(new ConsumerPublishRequestor), newUuid.toString, true)
private[camel] val activationTracker = new LocalActorRef(Props[ActivationTracker], Props.randomAddress, true)
private[camel] val consumerPublisher = new LocalActorRef(Props(new ConsumerPublisher(activationTracker)), Props.randomAddress, true)
private[camel] val publishRequestor = new LocalActorRef(Props(new ConsumerPublishRequestor), Props.randomAddress, true)
private val serviceEnabled = config.getList("akka.enabled-modules").exists(_ == "camel")

View file

@ -1860,7 +1860,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
Props(
self {
case f: Function0[_] try { f() } finally { self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -1868,7 +1868,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
Props(
self {
case f: Function0[_] try { self.reply(f()) } finally { self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -1876,7 +1876,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
Props(
self {
case (fun: Function[_, _], param: Any) try { fun.asInstanceOf[Any Unit].apply(param) } finally { self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -1884,7 +1884,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
Props(
self {
case (fun: Function[_, _], param: Any) try { self.reply(fun.asInstanceOf[Any Any](param)) } finally { self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) {

View file

@ -31,7 +31,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM
"should handle reply to ! for 1 message" in {
val latch = new CountDownLatch(1)
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
val sender = new LocalActorRef(Props(self { case "sum" latch.countDown }), newUuid.toString, true)
val sender = new LocalActorRef(Props(self { case "sum" latch.countDown }), Props.randomAddress, true)
queueActor.!("sum")(Some(sender))
latch.await(10, TimeUnit.SECONDS) must be(true)
@ -40,7 +40,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM
"should handle reply to ! for multiple messages" in {
val latch = new CountDownLatch(5)
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
val sender = new LocalActorRef(Props(self { case "sum" latch.countDown }), newUuid.toString, true)
val sender = new LocalActorRef(Props(self { case "sum" latch.countDown }), Props.randomAddress, true)
for (i 1 to 5) queueActor.!("sum")(Some(sender))

View file

@ -65,7 +65,7 @@ class NetworkEventStream(val app: AkkaApplication) {
import NetworkEventStream._
private[akka] val channel = app.provider.actorOf(
Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), newUuid.toString, systemService = true)
Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), Props.randomAddress, systemService = true)
/**
* Registers a network event stream listener (asyncronously).

View file

@ -177,7 +177,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
Props(
context {
case f: Function0[_] try { f() } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -185,7 +185,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
Props(
context {
case f: Function0[_] try { reply(f()) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -193,7 +193,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
Props(
context {
case (fun: Function[_, _], param: Any) try { fun.asInstanceOf[Any Unit].apply(param) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) {
@ -201,7 +201,7 @@ class RemoteDaemon(val remote: Remote) extends Actor {
Props(
context {
case (fun: Function[_, _], param: Any) try { reply(fun.asInstanceOf[Any Any](param)) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), newUuid.toString, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) {

View file

@ -101,7 +101,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) {
l map { m
remoteActorSerialization.createRemoteMessageProtocolBuilder(
Option(m.receiver.ref),
Option(m.receiver.self),
Left(actorRef.uuid),
actorRef.address,
app.AkkaConfig.ActorTimeoutMillis,

View file

@ -23,7 +23,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
"Serializable actor" must {
"must be able to serialize and de-serialize a stateful actor with a given serializer" ignore {
val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], newUuid.toString, systemService = true)
val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], Props.randomAddress, systemService = true)
(actor1 ? "hello").get must equal("world 1")
(actor1 ? "hello").get must equal("world 2")
@ -39,7 +39,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
"must be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore {
val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true)
val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], Props.randomAddress, systemService = true)
for (i 1 to 10) actor1 ! "hello"
actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0)
@ -57,7 +57,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
"must be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore {
val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050"))
val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], newUuid.toString, systemService = true)
val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], Props.randomAddress, systemService = true)
(actor1 ! p1)
(actor1 ! p1)
(actor1 ! p1)
@ -103,7 +103,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
"serialize actor that accepts protobuf message" ignore {
"must serialize" ignore {
val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], newUuid.toString, systemService = true)
val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], Props.randomAddress, systemService = true)
val msg = MyMessage(123, "debasish ghosh", true)
val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build
for (i 1 to 10) actor1 ! b

View file

@ -42,15 +42,15 @@ class TestActorRef[T <: Actor](_app: AkkaApplication, props: Props, address: Str
object TestActorRef {
def apply[T <: Actor](factory: T)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), new UUID().toString)
def apply[T <: Actor](factory: T)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), Props.randomAddress)
def apply[T <: Actor](factory: T, address: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), address)
def apply[T <: Actor](props: Props)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, new UUID().toString)
def apply[T <: Actor](props: Props)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, Props.randomAddress)
def apply[T <: Actor](props: Props, address: String)(implicit app: AkkaApplication): TestActorRef[T] = new TestActorRef(app, props, address)
def apply[T <: Actor](implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](new UUID().toString)
def apply[T <: Actor](implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props.randomAddress)
def apply[T <: Actor](address: String)(implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props({
import ReflectiveAccess.{ createInstance, noParams, noArgs }

View file

@ -81,7 +81,7 @@ class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, address:
object TestFSMRef {
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] =
new TestFSMRef(app, Props(creator = () factory), new UUID().toString)
new TestFSMRef(app, Props(creator = () factory), Props.randomAddress)
def apply[S, D, T <: Actor](factory: T, address: String)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] =
new TestFSMRef(app, Props(creator = () factory), address)