mid-address-refactoring: all tests except remote and serialization tests passes
This commit is contained in:
parent
d775ec8438
commit
896e174d63
13 changed files with 38 additions and 60 deletions
|
|
@ -14,8 +14,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
|
|||
|
||||
var log = ""
|
||||
case object Die
|
||||
class Chainer(myId: String, a: Option[ActorRef] = None) extends Actor {
|
||||
self.address = myId
|
||||
class Chainer(a: Option[ActorRef] = None) extends Actor {
|
||||
self.lifeCycle = Permanent
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
|
||||
a.foreach(self.link(_))
|
||||
|
|
@ -34,9 +33,9 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
|
|||
"be able to kill the middle actor and see itself and its child restarted" in {
|
||||
log = "INIT"
|
||||
|
||||
val lastActor = actorOf(new Chainer("lastActor")).start
|
||||
val middleActor = actorOf(new Chainer("middleActor", Some(lastActor))).start
|
||||
val headActor = actorOf(new Chainer("headActor", Some(middleActor))).start
|
||||
val lastActor = actorOf(new Chainer, "lastActor").start
|
||||
val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start
|
||||
val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start
|
||||
|
||||
middleActor ! Die
|
||||
Thread.sleep(100)
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import Actor._
|
|||
*/
|
||||
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
|
||||
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
|
||||
self.address = "SlowActor"
|
||||
|
||||
def receive = {
|
||||
case x: Int => {
|
||||
|
|
@ -25,8 +24,6 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
|
|||
}
|
||||
|
||||
class FastActor(finishedCounter: CountDownLatch) extends Actor {
|
||||
self.address = "FastActor"
|
||||
|
||||
def receive = {
|
||||
case x: Int => {
|
||||
finishedCounter.countDown
|
||||
|
|
@ -37,8 +34,8 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
|
|||
@Test def slowActorShouldntBlockFastActor {
|
||||
val sFinished = new CountDownLatch(50)
|
||||
val fFinished = new CountDownLatch(10)
|
||||
val s = actorOf(new SlowActor(sFinished)).start
|
||||
val f = actorOf(new FastActor(fFinished)).start
|
||||
val s = actorOf(new SlowActor(sFinished), "SlowActor").start
|
||||
val f = actorOf(new FastActor(fFinished), "FastActor").start
|
||||
|
||||
// send a lot of stuff to s
|
||||
for (i <- 1 to 50) {
|
||||
|
|
|
|||
|
|
@ -16,10 +16,9 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
|||
|
||||
val delayableActorDispatcher, sharedActorDispatcher, parentActorDispatcher = newWorkStealer()
|
||||
|
||||
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
|
||||
class DelayableActor(delay: Int, finishedCounter: CountDownLatch) extends Actor {
|
||||
self.dispatcher = delayableActorDispatcher
|
||||
@volatile var invocationCount = 0
|
||||
self.address = name
|
||||
|
||||
def receive = {
|
||||
case x: Int => {
|
||||
|
|
@ -58,8 +57,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
|
|||
@Test def fastActorShouldStealWorkFromSlowActor {
|
||||
val finishedCounter = new CountDownLatch(110)
|
||||
|
||||
val slow = actorOf(new DelayableActor("slow", 50, finishedCounter)).start
|
||||
val fast = actorOf(new DelayableActor("fast", 10, finishedCounter)).start
|
||||
val slow = actorOf(new DelayableActor(50, finishedCounter), "slow").start
|
||||
val fast = actorOf(new DelayableActor(10, finishedCounter), "fast").start
|
||||
|
||||
var sentToFast = 0
|
||||
|
||||
|
|
|
|||
|
|
@ -4,11 +4,11 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
import Actor._
|
||||
import java.util.concurrent.{CyclicBarrier, TimeUnit, CountDownLatch}
|
||||
import org.scalatest.Assertions._
|
||||
|
||||
object ActorRegistrySpec {
|
||||
var record = ""
|
||||
class TestActor extends Actor {
|
||||
self.address = "MyID"
|
||||
def receive = {
|
||||
case "ping" =>
|
||||
record = "pong" + record
|
||||
|
|
@ -17,7 +17,6 @@ object ActorRegistrySpec {
|
|||
}
|
||||
|
||||
class TestActor2 extends Actor {
|
||||
self.address = "MyID2"
|
||||
def receive = {
|
||||
case "ping" =>
|
||||
record = "pong" + record
|
||||
|
|
@ -34,57 +33,59 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldGetActorByAddressFromActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = Actor.registry.actorFor(actor1.address)
|
||||
assert(actor2.isDefined)
|
||||
assert(actor2.get.address === actor1.address)
|
||||
assert(actor2.get.address === "test-actor-1")
|
||||
actor2.get.stop
|
||||
}
|
||||
|
||||
@Test def shouldGetActorByUUIDFromLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor = actorOf[TestActor]
|
||||
val actor = actorOf[TestActor]("test-actor-1")
|
||||
val uuid = actor.uuid
|
||||
actor.start
|
||||
val actorOrNone = Actor.registry.local.actorFor(uuid)
|
||||
assert(actorOrNone.isDefined)
|
||||
assert(actorOrNone.get.uuid === uuid)
|
||||
assert(actorOrNone.get.address === "test-actor-1")
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldFindThingsFromLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor = actorOf[TestActor]
|
||||
val actor = actorOf[TestActor]("test-actor-1")
|
||||
actor.start
|
||||
val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a })
|
||||
assert(found.isDefined)
|
||||
assert(found.get.actor.isInstanceOf[TestActor])
|
||||
assert(found.get.address === "MyID")
|
||||
assert(found.get.address === "test-actor-1")
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldGetAllActorsFromLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
val actors = Actor.registry.local.actors
|
||||
assert(actors.size === 2)
|
||||
assert(actors.head.actor.isInstanceOf[TestActor])
|
||||
assert(actors.head.address === "MyID")
|
||||
assert(actors.head.address === "test-actor-2")
|
||||
assert(actors.last.actor.isInstanceOf[TestActor])
|
||||
assert(actors.last.address === "MyID")
|
||||
assert(actors.last.address === "test-actor-1")
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
record = ""
|
||||
Actor.registry.local.foreach(actor => actor !! "ping")
|
||||
|
|
@ -95,9 +96,9 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldShutdownAllActorsInLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
Actor.registry.local.shutdownAll
|
||||
assert(Actor.registry.local.actors.size === 0)
|
||||
|
|
@ -105,9 +106,9 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldRemoveUnregisterActorInLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
assert(Actor.registry.local.actors.size === 2)
|
||||
Actor.registry.unregister(actor1)
|
||||
|
|
|
|||
|
|
@ -92,12 +92,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
|
||||
protected[akka] val guard = new ReentrantGuard
|
||||
|
||||
/**
|
||||
* FIXME Document
|
||||
*/
|
||||
@BeanProperty
|
||||
@volatile
|
||||
var address: String = _uuid.toString // FIXME set 'address' in ActorRef and make 'val'
|
||||
val address: String
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -542,9 +537,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, _address: String)
|
||||
class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, val address: String)
|
||||
extends ActorRef with ScalaActorRef {
|
||||
this.address = _address
|
||||
|
||||
@volatile
|
||||
private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
|
|
@ -990,7 +984,7 @@ object RemoteActorSystemMessage {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] case class RemoteActorRef private[akka] (
|
||||
_address: String,
|
||||
val address: String,
|
||||
val actorClassName: String,
|
||||
_timeout: Long,
|
||||
loader: Option[ClassLoader],
|
||||
|
|
@ -999,10 +993,9 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
ensureRemotingEnabled
|
||||
timeout = _timeout
|
||||
address = _address
|
||||
|
||||
// FIXME BAD, we should not have different ActorRefs
|
||||
val remoteAddress: InetSocketAddress = AddressRegistry.lookupRemoteAddress(address).getOrElse(
|
||||
val remoteAddress: InetSocketAddress = AddressRegistry.remoteAddressFor(address).getOrElse(
|
||||
throw new IllegalStateException("Actor [" + actorClassName + "] is not configured as being a remote actor."))
|
||||
|
||||
start
|
||||
|
|
@ -1086,12 +1079,10 @@ trait ActorRefShared {
|
|||
trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
||||
|
||||
/**
|
||||
* Address for actor, must be a unique one. Default is the 'uuid'.
|
||||
* Address for actor, must be a unique one.
|
||||
*/
|
||||
def address: String
|
||||
|
||||
def address_=(address: String): Unit
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -79,11 +79,10 @@ object EventHandler extends ListenerManagement {
|
|||
val info = "[INFO] [%s] [%s] [%s] %s".intern
|
||||
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
|
||||
val generic = "[GENERIC] [%s] [%s]".intern
|
||||
val ADDRESS = "event:handler".intern
|
||||
|
||||
class EventHandlerException extends AkkaException
|
||||
|
||||
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ADDRESS).build
|
||||
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build
|
||||
|
||||
val level: Int = config.getString("akka.event-handler-level", "DEBUG") match {
|
||||
case "ERROR" => ErrorLevel
|
||||
|
|
@ -91,7 +90,7 @@ object EventHandler extends ListenerManagement {
|
|||
case "INFO" => InfoLevel
|
||||
case "DEBUG" => DebugLevel
|
||||
case unknown => throw new ConfigurationException(
|
||||
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
|
||||
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
|
||||
}
|
||||
|
||||
def notify(event: Any) { notifyListeners(event) }
|
||||
|
|
@ -150,7 +149,6 @@ object EventHandler extends ListenerManagement {
|
|||
}
|
||||
|
||||
class DefaultListener extends Actor {
|
||||
self.address = ADDRESS
|
||||
self.dispatcher = EventHandlerDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
@ -191,7 +189,7 @@ object EventHandler extends ListenerManagement {
|
|||
defaultListeners foreach { listenerName =>
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz =>
|
||||
addListener(Actor.actorOf(clazz).start)
|
||||
addListener(Actor.actorOf(clazz, listenerName).start)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import akka.event.EventHandler
|
|||
class RemoteEventHandler extends Actor {
|
||||
import EventHandler._
|
||||
|
||||
self.address = ADDRESS
|
||||
self.dispatcher = EventHandlerDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -268,9 +268,6 @@ class RootEndpoint extends Actor with Endpoint {
|
|||
// use the configurable dispatcher
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
// adopt the configured id
|
||||
if (RootActorBuiltin) self.address = RootActorID
|
||||
|
||||
override def preStart =
|
||||
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ object RemoteErrorHandlingNetworkTest {
|
|||
case class Send(actor: ActorRef)
|
||||
|
||||
class RemoteActorSpecActorUnidirectional extends Actor {
|
||||
self.address = "network-drop:unidirectional"
|
||||
|
||||
def receive = {
|
||||
case "Ping" => self.reply_?("Pong")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ object Logger {
|
|||
class Slf4jEventHandler extends Actor with Logging {
|
||||
import EventHandler._
|
||||
|
||||
self.address = ADDRESS
|
||||
self.dispatcher = EventHandlerDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -868,6 +868,7 @@ private[akka] abstract class ActorAspect {
|
|||
actorRef = init.actorRef
|
||||
uuid = actorRef.uuid
|
||||
timeout = init.timeout
|
||||
remoteAddress = None//actorRef.remoteAddress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,11 +9,11 @@ import akka.actor.TypedActor._
|
|||
|
||||
import akka.config.Supervision._
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.TypedActorConfigurator
|
||||
|
||||
import akka.testing._
|
||||
import akka.util.duration._
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -49,7 +49,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
fail("expected exception not thrown")
|
||||
} catch {
|
||||
case e: RuntimeException => {
|
||||
cdl.await
|
||||
cdl.await(10, TimeUnit.SECONDS)
|
||||
assert(SamplePojoImpl._pre)
|
||||
assert(SamplePojoImpl._post)
|
||||
assert(!SamplePojoImpl._down)
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ object TypedActorSpec {
|
|||
}
|
||||
|
||||
class MyTypedActorImpl extends TypedActor with MyTypedActor {
|
||||
self.address = "my-custom-id"
|
||||
def sendOneWay(msg: String) {
|
||||
println("got " + msg )
|
||||
}
|
||||
|
|
@ -33,7 +32,6 @@ object TypedActorSpec {
|
|||
}
|
||||
|
||||
class MyTypedActorWithConstructorArgsImpl(aString: String, aLong: Long) extends TypedActor with MyTypedActor {
|
||||
self.address = "my-custom-id"
|
||||
def sendOneWay(msg: String) {
|
||||
println("got " + msg + " " + aString + " " + aLong)
|
||||
}
|
||||
|
|
@ -44,7 +42,6 @@ object TypedActorSpec {
|
|||
}
|
||||
|
||||
class MyActor extends Actor {
|
||||
self.address = "my-custom-id"
|
||||
def receive = {
|
||||
case msg: String => println("got " + msg)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue