From d1bdddd588a3b3ccdda0d097effe960298cb5e98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 18 Apr 2011 13:18:47 +0200 Subject: [PATCH] mid address refactoring --- .gitignore | 3 +- .../actor/supervisor/SupervisorTreeSpec.scala | 10 +-- ...BasedEventDrivenDispatcherActorsSpec.scala | 4 +- ...ventDrivenWorkStealingDispatcherSpec.scala | 2 +- .../scala/akka/misc/ActorRegistrySpec.scala | 12 +-- .../src/main/scala/akka/actor/ActorRef.scala | 9 +- .../scala/akka/actor/AddressRegistry.scala | 21 +++++ .../remoteinterface/RemoteInterface.scala | 1 + .../serialization/SerializationProtocol.scala | 15 +--- .../RemoteErrorHandlingNetworkTest.scala | 2 +- .../actor/typed-actor/TypedActorSpec.scala | 6 +- config/akka-reference.conf | 89 ++++++++++--------- 12 files changed, 100 insertions(+), 74 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/actor/AddressRegistry.scala diff --git a/.gitignore b/.gitignore index 28bd0c884d..c59c81a806 100755 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,5 @@ multiverse.log .*.swp akka-docs/_build/ akka-tutorials/akka-tutorial-first/project/boot/ -akka-tutorials/akka-tutorial-first/project/plugins/project/ \ No newline at end of file +akka-tutorials/akka-tutorial-first/project/plugins/project/ +akka-docs/exts/ \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index cb694d7408..530d51a47d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -11,21 +11,21 @@ import akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, import Actor._ class SupervisorTreeSpec extends WordSpec with MustMatchers { - + var log = "" case object Die class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor { - self.id = myId + self.address = myId self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000) a.foreach(self.link(_)) def receive = { - case Die => throw new Exception(self.id + " is dying...") + case Die => throw new Exception(self.address + " is dying...") } override def preRestart(reason: Throwable) { - log += self.id + log += self.address } } @@ -37,7 +37,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { val lastActor = actorOf(new Chainer("lastActor")).start val middleActor = actorOf(new Chainer("middleActor", Some(lastActor))).start val headActor = actorOf(new Chainer("headActor", Some(middleActor))).start - + middleActor ! Die Thread.sleep(100) log must equal ("INITmiddleActorlastActor") diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala index 66a02e0d33..1326b954a0 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala @@ -14,7 +14,7 @@ import Actor._ */ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { class SlowActor(finishedCounter: CountDownLatch) extends Actor { - self.id = "SlowActor" + self.address = "SlowActor" def receive = { case x: Int => { @@ -25,7 +25,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM } class FastActor(finishedCounter: CountDownLatch) extends Actor { - self.id = "FastActor" + self.address = "FastActor" def receive = { case x: Int => { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 2085ed66a0..6389e5a6a9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -19,7 +19,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { self.dispatcher = delayableActorDispatcher @volatile var invocationCount = 0 - self.id = name + self.address = name def receive = { case x: Int => { diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 4030b953db..002fdc7c65 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -8,7 +8,7 @@ import java.util.concurrent.{CyclicBarrier, TimeUnit, CountDownLatch} object ActorRegistrySpec { var record = "" class TestActor extends Actor { - self.id = "MyID" + self.address = "MyID" def receive = { case "ping" => record = "pong" + record @@ -17,7 +17,7 @@ object ActorRegistrySpec { } class TestActor2 extends Actor { - self.id = "MyID2" + self.address = "MyID2" def receive = { case "ping" => record = "pong" + record @@ -60,7 +60,7 @@ class ActorRegistrySpec extends JUnitSuite { val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) assert(found.isDefined) assert(found.get.actor.isInstanceOf[TestActor]) - assert(found.get.id === "MyID") + assert(found.get.address === "MyID") actor.stop } @@ -73,9 +73,9 @@ class ActorRegistrySpec extends JUnitSuite { val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") + assert(actors.head.address === "MyID") assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") + assert(actors.last.address === "MyID") actor1.stop actor2.stop } @@ -121,7 +121,7 @@ class ActorRegistrySpec extends JUnitSuite { Actor.registry.local.shutdownAll def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor { - self.id = i.toString + self.address = i.toString def receive = { case _ => } }) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 45c5704f29..d70d1cd351 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1000,10 +1000,15 @@ private[akka] case class RemoteActorRef private[akka] ( ensureRemotingEnabled timeout = _timeout address = _address + + // FIXME BAD, we should not have different ActorRefs + val remoteAddress: InetSocketAddress = AddressRegistry.lookupRemoteAddress(address).getOrElse( + throw new IllegalStateException("Actor [" + actorClassName + "] is not configured as being a remote actor.")) + start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - Actor.remote.send[Any](message, senderOption, None, timeout, true, this, None, actorType, loader) + Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, None, actorType, loader) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1012,7 +1017,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, - timeout, false, this, None, + remoteAddress, timeout, false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) diff --git a/akka-actor/src/main/scala/akka/actor/AddressRegistry.scala b/akka-actor/src/main/scala/akka/actor/AddressRegistry.scala new file mode 100644 index 0000000000..cf5f5f536d --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/AddressRegistry.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +import java.net.InetSocketAddress + +/** + * @author Jonas Bonér + */ +object AddressRegistry { + + def isLocal(address: String): Boolean = { + true + } + + def lookupRemoteAddress(address: String): Option[InetSocketAddress] = { + None + } +} diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index df266f99f0..8727f110ac 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -393,6 +393,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]], + remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 42fb72cf3f..fa83d8f3b2 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -93,15 +93,6 @@ object ActorSerialization { def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = toBinary(a, srlMailBox)(format) - private[akka] def toAddressProtocol(actorRef: ActorRef) = { - val address = actorRef.homeAddress.getOrElse(Actor.remote.address) - AddressProtocol.newBuilder - .setHostname(address.getAddress.getHostAddress) - .setPort(address.getPort) - .build - } - - private[akka] def toSerializedActorRefProtocol[T <: Actor]( actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { @@ -207,8 +198,7 @@ object ActorSerialization { lifeCycle, supervisor, hotswap, - factory, - "address") // FIXME grab real address and use that + factory) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) @@ -253,9 +243,8 @@ object RemoteActorSerialization { Actor.remote.registerByUuid(ar) RemoteActorRefProtocol.newBuilder - .setClassOrServiceName("uuid:"+uuid.toString) + .setAddress("uuid:" + uuid.toString) .setActorClassname(actorClassName) - .setHomeAddress(ActorSerialization.toAddressProtocol(ar)) .setTimeout(timeout) .build } diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala index eee7e5e690..ebca985949 100644 --- a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala +++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala @@ -11,7 +11,7 @@ object RemoteErrorHandlingNetworkTest { case class Send(actor: ActorRef) class RemoteActorSpecActorUnidirectional extends Actor { - self.id = "network-drop:unidirectional" + self.address = "network-drop:unidirectional" def receive = { case "Ping" => self.reply_?("Pong") } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala index 897e809a96..edce3c12e4 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -23,7 +23,7 @@ object TypedActorSpec { } class MyTypedActorImpl extends TypedActor with MyTypedActor { - self.id = "my-custom-id" + self.address = "my-custom-id" def sendOneWay(msg: String) { println("got " + msg ) } @@ -33,7 +33,7 @@ object TypedActorSpec { } class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor { - self.id = "my-custom-id" + self.address = "my-custom-id" def sendOneWay(msg: String) { println("got " + msg + " " + aString + " " + aLong) } @@ -44,7 +44,7 @@ object TypedActorSpec { } class MyActor extends Actor { - self.id = "my-custom-id" + self.address = "my-custom-id" def receive = { case msg: String => println("got " + msg) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index fd83525584..1f331c9010 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -2,25 +2,18 @@ # Akka Config File # #################### - -# spawn-mapping { -# address1 { replication-factor: 2.0, deep-copy = on } -# address2 { replication-factor: 5.0, router = "MEM" } -# ... -# } - # This file has all the default settings, so all these could be removed with no visible effect. # Modify as needed. akka { - version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka. + version = "1.1-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] + enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] - time-unit = "seconds" # Time unit for all timeout properties throughout the config + time-unit = "seconds" # Time unit for all timeout properties throughout the config - event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) - event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG + event-handlers = ["akka.event.EventHandler$DefaultListener"] # Event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) + event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # Can be used to bootstrap your application(s) @@ -32,6 +25,25 @@ akka { boot = [] actor { + deployment { + pi { + clustered = on # makes the actor available in the cluster registry; default is off + stateless { # if not defined then stateful which means replicated through transaction log + replication-factor = 3 # default is 1; -1 means auto-scaling + router = "round-robin" # default is "round-robin"; available "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" + } + } + ping { } # local actor + pong { + clustered = on + stateless { + replication-factor = -1 # auto-scaling + router = "cpu" + } + } + session-registry { clustered = on } # stateful, replicated actor + } + timeout = 5 # Default timeout for Future based invocations # - Actor: !! && !!! # - UntypedActor: sendRequestReply && sendRequestReplyFuture @@ -46,23 +58,20 @@ akka { # - ExecutorBasedEventDriven # - ExecutorBasedEventDrivenWorkStealing # - GlobalExecutorBasedEventDriven - keep-alive-time = 60 # Keep alive time for threads - core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) - max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) - executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded - allow-core-timeout = on # Allow core threads to time out - rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness - throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline - mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) - # If positive then a bounded mailbox is used and the capacity is set using the property - # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, - # could lead to deadlock, use with care - # - # The following are only used for ExecutorBasedEventDriven - # and only if mailbox-capacity > 0 - mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout - # (in unit defined by the time-unit property) + keep-alive-time = 60 # Keep alive time for threads + core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) + max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + allow-core-timeout = on # Allow core threads to time out + rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard + throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness + throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline + mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) + # If positive then a bounded mailbox is used and the capacity is set using the property + # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care + # The following are only used for ExecutorBasedEventDriven and only if mailbox-capacity > 0 + mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout + # (in unit defined by the time-unit property) } } @@ -116,7 +125,7 @@ akka { realm = "" } - #If you are using akka.http.AkkaMistServlet + # If you are using akka.http.AkkaMistServlet mist-dispatcher { #type = "GlobalExecutorBasedEventDriven" # Uncomment if you want to use a different dispatcher than the default one for Comet } @@ -139,17 +148,17 @@ akka { layer = "akka.remote.netty.NettyRemoteSupport" server { - hostname = "localhost" # The hostname or IP that clients should connect to - port = 2552 # The port clients should connect to. Default is 2552 (AKKA) - message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads + hostname = "localhost" # The hostname or IP that clients should connect to + port = 2552 # The port clients should connect to. Default is 2552 (AKKA) + message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads connection-timeout = 1 - require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? - untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. - backlog = 4096 # Sets the size of the connection backlog - execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling - execution-pool-size = 16# Size of the core pool of the remote execution unit - max-channel-memory-size = 0 # Maximum channel size, 0 for off - max-total-memory-size = 0 # Maximum total size of all channels, 0 for off + require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? + untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. + backlog = 4096 # Sets the size of the connection backlog + execution-pool-keepalive = 60 # Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-size = 16 # Size of the core pool of the remote execution unit + max-channel-memory-size = 0 # Maximum channel size, 0 for off + max-total-memory-size = 0 # Maximum total size of all channels, 0 for off } client {