mid address refactoring

This commit is contained in:
Jonas Bonér 2011-04-18 13:18:47 +02:00
parent 3374eef6ce
commit d1bdddd588
12 changed files with 100 additions and 74 deletions

1
.gitignore vendored
View file

@ -48,3 +48,4 @@ multiverse.log
akka-docs/_build/ akka-docs/_build/
akka-tutorials/akka-tutorial-first/project/boot/ akka-tutorials/akka-tutorial-first/project/boot/
akka-tutorials/akka-tutorial-first/project/plugins/project/ akka-tutorials/akka-tutorial-first/project/plugins/project/
akka-docs/exts/

View file

@ -15,17 +15,17 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
var log = "" var log = ""
case object Die case object Die
class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor { class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor {
self.id = myId self.address = myId
self.lifeCycle = Permanent self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000) self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
a.foreach(self.link(_)) a.foreach(self.link(_))
def receive = { def receive = {
case Die => throw new Exception(self.id + " is dying...") case Die => throw new Exception(self.address + " is dying...")
} }
override def preRestart(reason: Throwable) { override def preRestart(reason: Throwable) {
log += self.id log += self.address
} }
} }

View file

@ -14,7 +14,7 @@ import Actor._
*/ */
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
class SlowActor(finishedCounter: CountDownLatch) extends Actor { class SlowActor(finishedCounter: CountDownLatch) extends Actor {
self.id = "SlowActor" self.address = "SlowActor"
def receive = { def receive = {
case x: Int => { case x: Int => {
@ -25,7 +25,7 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
} }
class FastActor(finishedCounter: CountDownLatch) extends Actor { class FastActor(finishedCounter: CountDownLatch) extends Actor {
self.id = "FastActor" self.address = "FastActor"
def receive = { def receive = {
case x: Int => { case x: Int => {

View file

@ -19,7 +19,7 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
self.dispatcher = delayableActorDispatcher self.dispatcher = delayableActorDispatcher
@volatile var invocationCount = 0 @volatile var invocationCount = 0
self.id = name self.address = name
def receive = { def receive = {
case x: Int => { case x: Int => {

View file

@ -8,7 +8,7 @@ import java.util.concurrent.{CyclicBarrier, TimeUnit, CountDownLatch}
object ActorRegistrySpec { object ActorRegistrySpec {
var record = "" var record = ""
class TestActor extends Actor { class TestActor extends Actor {
self.id = "MyID" self.address = "MyID"
def receive = { def receive = {
case "ping" => case "ping" =>
record = "pong" + record record = "pong" + record
@ -17,7 +17,7 @@ object ActorRegistrySpec {
} }
class TestActor2 extends Actor { class TestActor2 extends Actor {
self.id = "MyID2" self.address = "MyID2"
def receive = { def receive = {
case "ping" => case "ping" =>
record = "pong" + record record = "pong" + record
@ -60,7 +60,7 @@ class ActorRegistrySpec extends JUnitSuite {
val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a })
assert(found.isDefined) assert(found.isDefined)
assert(found.get.actor.isInstanceOf[TestActor]) assert(found.get.actor.isInstanceOf[TestActor])
assert(found.get.id === "MyID") assert(found.get.address === "MyID")
actor.stop actor.stop
} }
@ -73,9 +73,9 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = Actor.registry.local.actors val actors = Actor.registry.local.actors
assert(actors.size === 2) assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.id === "MyID") assert(actors.head.address === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor]) assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.id === "MyID") assert(actors.last.address === "MyID")
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@ -121,7 +121,7 @@ class ActorRegistrySpec extends JUnitSuite {
Actor.registry.local.shutdownAll Actor.registry.local.shutdownAll
def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor { def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor {
self.id = i.toString self.address = i.toString
def receive = { case _ => } def receive = { case _ => }
}) })

View file

@ -1000,10 +1000,15 @@ private[akka] case class RemoteActorRef private[akka] (
ensureRemotingEnabled ensureRemotingEnabled
timeout = _timeout timeout = _timeout
address = _address address = _address
// FIXME BAD, we should not have different ActorRefs
val remoteAddress: InetSocketAddress = AddressRegistry.lookupRemoteAddress(address).getOrElse(
throw new IllegalStateException("Actor [" + actorClassName + "] is not configured as being a remote actor."))
start start
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
Actor.remote.send[Any](message, senderOption, None, timeout, true, this, None, actorType, loader) Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, None, actorType, loader)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
@ -1012,7 +1017,7 @@ private[akka] case class RemoteActorRef private[akka] (
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = Actor.remote.send[T]( val future = Actor.remote.send[T](
message, senderOption, senderFuture, message, senderOption, senderFuture,
timeout, false, this, None, remoteAddress, timeout, false, this, None,
actorType, loader) actorType, loader)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import java.net.InetSocketAddress
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object AddressRegistry {
def isLocal(address: String): Boolean = {
true
}
def lookupRemoteAddress(address: String): Option[InetSocketAddress] = {
None
}
}

View file

@ -393,6 +393,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
protected[akka] def send[T](message: Any, protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]], senderFuture: Option[CompletableFuture[T]],
remoteAddress: InetSocketAddress,
timeout: Long, timeout: Long,
isOneWay: Boolean, isOneWay: Boolean,
actorRef: ActorRef, actorRef: ActorRef,

View file

@ -93,15 +93,6 @@ object ActorSerialization {
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format) toBinary(a, srlMailBox)(format)
private[akka] def toAddressProtocol(actorRef: ActorRef) = {
val address = actorRef.homeAddress.getOrElse(Actor.remote.address)
AddressProtocol.newBuilder
.setHostname(address.getAddress.getHostAddress)
.setPort(address.getPort)
.build
}
private[akka] def toSerializedActorRefProtocol[T <: Actor]( private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = { val lifeCycleProtocol: Option[LifeCycleProtocol] = {
@ -207,8 +198,7 @@ object ActorSerialization {
lifeCycle, lifeCycle,
supervisor, supervisor,
hotswap, hotswap,
factory, factory)
"address") // FIXME grab real address and use that
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
@ -253,9 +243,8 @@ object RemoteActorSerialization {
Actor.remote.registerByUuid(ar) Actor.remote.registerByUuid(ar)
RemoteActorRefProtocol.newBuilder RemoteActorRefProtocol.newBuilder
.setClassOrServiceName("uuid:"+uuid.toString) .setAddress("uuid:" + uuid.toString)
.setActorClassname(actorClassName) .setActorClassname(actorClassName)
.setHomeAddress(ActorSerialization.toAddressProtocol(ar))
.setTimeout(timeout) .setTimeout(timeout)
.build .build
} }

View file

@ -11,7 +11,7 @@ object RemoteErrorHandlingNetworkTest {
case class Send(actor: ActorRef) case class Send(actor: ActorRef)
class RemoteActorSpecActorUnidirectional extends Actor { class RemoteActorSpecActorUnidirectional extends Actor {
self.id = "network-drop:unidirectional" self.address = "network-drop:unidirectional"
def receive = { def receive = {
case "Ping" => self.reply_?("Pong") case "Ping" => self.reply_?("Pong")
} }

View file

@ -23,7 +23,7 @@ object TypedActorSpec {
} }
class MyTypedActorImpl extends TypedActor with MyTypedActor { class MyTypedActorImpl extends TypedActor with MyTypedActor {
self.id = "my-custom-id" self.address = "my-custom-id"
def sendOneWay(msg: String) { def sendOneWay(msg: String) {
println("got " + msg ) println("got " + msg )
} }
@ -33,7 +33,7 @@ object TypedActorSpec {
} }
class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor { class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor {
self.id = "my-custom-id" self.address = "my-custom-id"
def sendOneWay(msg: String) { def sendOneWay(msg: String) {
println("got " + msg + " " + aString + " " + aLong) println("got " + msg + " " + aString + " " + aLong)
} }
@ -44,7 +44,7 @@ object TypedActorSpec {
} }
class MyActor extends Actor { class MyActor extends Actor {
self.id = "my-custom-id" self.address = "my-custom-id"
def receive = { def receive = {
case msg: String => println("got " + msg) case msg: String => println("got " + msg)
} }

View file

@ -2,13 +2,6 @@
# Akka Config File # # Akka Config File #
#################### ####################
# spawn-mapping {
# address1 { replication-factor: 2.0, deep-copy = on }
# address2 { replication-factor: 5.0, router = "MEM" }
# ...
# }
# This file has all the default settings, so all these could be removed with no visible effect. # This file has all the default settings, so all these could be removed with no visible effect.
# Modify as needed. # Modify as needed.
@ -19,7 +12,7 @@ akka {
time-unit = "seconds" # Time unit for all timeout properties throughout the config time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handlers = ["akka.event.EventHandler$DefaultListener"] # Event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
@ -32,6 +25,25 @@ akka {
boot = [] boot = []
actor { actor {
deployment {
pi {
clustered = on # makes the actor available in the cluster registry; default is off
stateless { # if not defined then stateful which means replicated through transaction log
replication-factor = 3 # default is 1; -1 means auto-scaling
router = "round-robin" # default is "round-robin"; available "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages"
}
}
ping { } # local actor
pong {
clustered = on
stateless {
replication-factor = -1 # auto-scaling
router = "cpu"
}
}
session-registry { clustered = on } # stateful, replicated actor
}
timeout = 5 # Default timeout for Future based invocations timeout = 5 # Default timeout for Future based invocations
# - Actor: !! && !!! # - Actor: !! && !!!
# - UntypedActor: sendRequestReply && sendRequestReplyFuture # - UntypedActor: sendRequestReply && sendRequestReplyFuture
@ -56,11 +68,8 @@ akka {
throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property # If positive then a bounded mailbox is used and the capacity is set using the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
# could lead to deadlock, use with care # The following are only used for ExecutorBasedEventDriven and only if mailbox-capacity > 0
#
# The following are only used for ExecutorBasedEventDriven
# and only if mailbox-capacity > 0
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
# (in unit defined by the time-unit property) # (in unit defined by the time-unit property)
} }