diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index 131cdeee8f..469b84070f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -60,7 +60,11 @@ object SupervisorSpec { class Master extends Actor { self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt) - val temp = self.spawnLink[TemporaryActor] + val temp = { + val a = actorOf[TemporaryActor] + self link a + a.start + } override def receive = { case Die => temp !! (Die, TimeoutMillis) diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala deleted file mode 100644 index 4e3bebf663..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import org.scalatest.Spec -import org.scalatest.Assertions -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.DefaultCompletableFuture -import java.util.concurrent.{TimeUnit, CountDownLatch} -import annotation.tailrec -import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger} -import akka.actor.ActorRegistry - -@RunWith(classOf[JUnitRunner]) -class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { - describe("DataflowVariable") { - it("should be able to set the value of one variable from other variables") { - import DataFlow._ - - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - val x, y, z = new DataFlowVariable[Int] - thread { - z << x() + y() - result.set(z()) - latch.countDown - } - thread { x << 40 } - thread { y << 2 } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (42) - List(x,y,z).foreach(_.shutdown) - } - - it("should be able to sum a sequence of ints") { - import DataFlow._ - - def ints(n: Int, max: Int): List[Int] = - if (n == max) Nil - else n :: ints(n + 1, max) - - def sum(s: Int, stream: List[Int]): List[Int] = stream match { - case Nil => s :: Nil - case h :: t => s :: sum(h + s, t) - } - - val latch = new CountDownLatch(1) - val result = new AtomicReference[List[Int]](Nil) - val x = new DataFlowVariable[List[Int]] - val y = new DataFlowVariable[List[Int]] - val z = new DataFlowVariable[List[Int]] - - thread { x << ints(0, 1000) } - thread { y << sum(0, x()) } - - thread { z << y() - result.set(z()) - latch.countDown - } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (sum(0,ints(0,1000))) - List(x,y,z).foreach(_.shutdown) - } -/* - it("should be able to join streams") { - import DataFlow._ - Actor.registry.local.shutdownAll - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - val t1 = thread { ints(0, 1000, producer) } - val t2 = thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - } -*/ - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - Actor.registry.local.shutdownAll - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown - } - - val setY = thread { - // Thread.sleep(2000) - y << 2 - } - - val setV = thread { - v << y - } - List(x,y,z,v) foreach (_.shutdown) - latch.await(2,TimeUnit.SECONDS) should equal (true) - }*/ - } -} diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index a946713c3d..552fc5f23c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -134,81 +134,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldFutureAwaitEitherLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitEitherRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitAll = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "Hello" - Futures.awaitAll(List(future1, future2)) - assert(future1.result.isDefined) - assert("World" === future1.result.get) - assert(future2.result.isDefined) - assert("World" === future2.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFuturesAwaitMapHandleEmptySequence { - assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) - } - - @Test def shouldFuturesAwaitMapHandleNonEmptySequence { - val latches = (1 to 3) map (_ => new StandardLatch) - val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start) - val futures = actors map (actor => (actor.!!![String]("Hello"))) - latches foreach { _.open } - - assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length)) - } - @Test def shouldFoldResults { val actors = (1 to 10).toList map { _ => actorOf(new Actor { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 00a280188a..bb4305aab4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -19,7 +19,7 @@ import com.eaio.uuid.UUID /** * Life-cycle messages for the Actors */ -@serializable sealed trait LifeCycleMessage +sealed trait LifeCycleMessage extends Serializable /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } @@ -40,7 +40,7 @@ case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) /** * Java API with default non-stacking behavior */ - def this(code: akka.japi.Function[ActorRef,Procedure[Any]]) = this(code, true) + def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) } case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage @@ -145,6 +145,9 @@ object Actor extends ListenerManagement { def actorOf[T <: Actor : Manifest](address: String): ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) + /** + * FIXME document + */ def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], (new UUID).toString) @@ -170,8 +173,11 @@ object Actor extends ListenerManagement { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }, None, false, address) + }, address) + /** + * FIXME document + */ def actorOf(clazz: Class[_ <: Actor]): ActorRef = actorOf(clazz, (new UUID).toString) /** @@ -192,8 +198,11 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor, address: String): ActorRef = new LocalActorRef(() => factory, None, false, address) + def actorOf(factory: => Actor, address: String): ActorRef = new LocalActorRef(() => factory, address) + /** + * FIXME document + */ def actorOf(factory: => Actor): ActorRef = actorOf(factory, (new UUID).toString) /** @@ -204,7 +213,7 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor], address: String): ActorRef = new LocalActorRef(() => creator.create, None, false, address) + def actorOf(creator: Creator[Actor], address: String): ActorRef = new LocalActorRef(() => creator.create, address) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -264,8 +273,7 @@ object Actor extends ListenerManagement { *

* Here you find functions like: * - !, !!, !!! and forward - * - link, unlink, startLink, spawnLink etc - * - makeRemote etc. + * - link, unlink, startLink etc * - start, stop * - etc. * @@ -288,7 +296,6 @@ object Actor extends ListenerManagement { * import self._ * id = ... * dispatcher = ... - * spawnLink[OtherActor] * ... * } * @@ -318,7 +325,6 @@ trait Actor { "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") Actor.actorRefInCreation.value = None - optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? optRef.asInstanceOf[Some[ActorRef]] } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index f17f0dfbe9..45c5704f29 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -92,20 +92,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None protected[akka] val guard = new ReentrantGuard - /** - * User overridable callback/setting. - *

- * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - *

- * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - @BeanProperty - @volatile - var id: String = _uuid.toString - /** * FIXME Document */ @@ -190,16 +176,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher def getDispatcher(): MessageDispatcher = dispatcher - /** - * Returns on which node this actor lives if None it lives in the local ActorRegistry - */ - def homeAddress: Option[InetSocketAddress] - - /** - * Java API.

- */ - def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null - /** * Holds the hot swapped partial function. */ @@ -453,38 +429,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def startLink(actorRef: ActorRef): Unit - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.actorOf instead") - def spawn(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Atomically create (from actor class), link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success") - def spawnLink(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote, link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - /** * Returns the mailbox size. */ @@ -569,8 +513,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { @@ -578,7 +520,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal that.asInstanceOf[ActorRef].uuid == uuid } - override def toString = "Actor[" + id + ":" + uuid + "]" + override def toString = "Actor[" + address + ":" + uuid + "]" protected[akka] def checkReceiveTimeout = { cancelReceiveTimeout @@ -600,11 +542,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * * @author Jonas Bonér */ -class LocalActorRef private[akka] ( - private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean, - _address: String) +class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, _address: String) extends ActorRef with ScalaActorRef { this.address = _address @@ -629,18 +567,15 @@ class LocalActorRef private[akka] ( // used only for deserialization private[akka] def this( __uuid: Uuid, - __id: String, + __address: String, __timeout: Long, __receiveTimeout: Option[Long], __lifeCycle: LifeCycle, __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () => Actor, - __homeAddress: Option[InetSocketAddress], - __address: String) = { - this(__factory, __homeAddress, false, __address) + __factory: () => Actor) = { + this(__factory, __address) _uuid = __uuid - id = __id timeout = __timeout receiveTimeout = __receiveTimeout lifeCycle = __lifeCycle @@ -650,11 +585,6 @@ class LocalActorRef private[akka] ( start } - /** - * Returns whether this actor ref is client-managed remote or not - */ - private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled - // ========= PUBLIC FUNCTIONS ========= /** @@ -698,9 +628,6 @@ class LocalActorRef private[akka] ( if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance - if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - checkReceiveTimeout //Schedule the initial Receive timeout } this @@ -720,11 +647,9 @@ class LocalActorRef private[akka] ( } finally { currentMessage = null Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) + if (isRemotingEnabled) Actor.remote.unregister(this) - } + setActorSelfFields(actorInstance.get,null) } } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") @@ -774,52 +699,6 @@ class LocalActorRef private[akka] ( actorRef.start } - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - def spawn(clazz: Class[_ <: Actor]): ActorRef = - Actor.actorOf(clazz).start - - /** - * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val ref = Actor.remote.actorOf(clazz, hostname, port) - ref.timeout = timeout - ref.start - } - - /** - * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. - */ - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { - val actor = spawn(clazz) - link(actor) - actor.start - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val actor = Actor.remote.actorOf(clazz, hostname, port) - actor.timeout = timeout - link(actor) - actor.start - actor - } - /** * Returns the mailbox. */ @@ -837,10 +716,6 @@ class LocalActorRef private[akka] ( protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_?) { - Actor.remote.send[Any]( - message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) - } else dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( @@ -848,17 +723,10 @@ class LocalActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_?) { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) - future.get - } + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + dispatcher dispatchMessage new MessageInvocation( + this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + future.get } /** @@ -1015,14 +883,6 @@ class LocalActorRef private[akka] ( } } - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { - ensureRemotingEnabled - if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) - Some(_supervisor.get.uuid) - } else None - } - def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) // ========= PRIVATE FUNCTIONS ========= @@ -1130,29 +990,20 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - classOrServiceName: String, + _address: String, val actorClassName: String, - val hostname: String, - val port: Int, _timeout: Long, loader: Option[ClassLoader], val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { ensureRemotingEnabled - - val homeAddress = Some(new InetSocketAddress(hostname, port)) - - //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed - id = classOrServiceName - //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id - timeout = _timeout - + address = _address start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - Actor.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader) + Actor.remote.send[Any](message, senderOption, None, timeout, true, this, None, actorType, loader) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1161,8 +1012,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, - homeAddress.get, timeout, - false, this, None, + timeout, false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) @@ -1180,8 +1030,6 @@ private[akka] case class RemoteActorRef private[akka] ( } } - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None - // ==== NOT SUPPORTED ==== def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported @@ -1189,10 +1037,6 @@ private[akka] case class RemoteActorRef private[akka] ( def link(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported - def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def mailbox: AnyRef = unsupported @@ -1217,6 +1061,10 @@ private[akka] case class RemoteActorRef private[akka] ( * //superclass, which ActorRefShared is. */ trait ActorRefShared { + + /** + * Returns the address for the actor. + */ def address: String /** @@ -1233,17 +1081,8 @@ trait ActorRefShared { trait ScalaActorRef extends ActorRefShared { ref: ActorRef => /** - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - *

- * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. + * Address for actor, must be a unique one. Default is the 'uuid'. */ - def id: String - - def id_=(id: String): Unit - def address: String def address_=(address: String): Unit @@ -1404,32 +1243,4 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => true } else false } - - /** - * Atomically create (from actor class) and start an actor. - */ - def spawn[T <: Actor: Manifest]: ActorRef = - spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start and make an actor remote. - */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } - - /** - * Atomically create (from actor class), start and link an actor. - */ - def spawnLink[T <: Actor: Manifest]: ActorRef = - spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start, link and make an actor remote. - */ - def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } } diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index bb08bcdf80..f877759321 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -93,8 +93,7 @@ case class SupervisorFactory(val config: SupervisorConfig) { *

* The supervisor class is only used for the configuration system when configuring supervisor * hierarchies declaratively. Should not be used as part of the regular programming API. Instead - * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the - * children that should trap error signals and trigger restart. + * wire the children together using 'link', 'startLink' etc. *

* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children. * diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 9f63c64bc1..c7c0ba40bd 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -101,32 +101,18 @@ object Supervision { val target: Class[_], val lifeCycle: LifeCycle, val timeout: Long, - _dispatcher: MessageDispatcher, // optional - _remoteAddress: RemoteAddress // optional + _dispatcher: MessageDispatcher // optional ) extends Server { val intf: Option[Class[_]] = Option(_intf) val dispatcher: Option[MessageDispatcher] = Option(_dispatcher) - val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) + this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress) + this(intf, target, lifeCycle, timeout, null: MessageDispatcher) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress) + this(null: Class[_], target, lifeCycle, timeout, dispatcher) } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala deleted file mode 100644 index 72fbbaaeb2..0000000000 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - -import akka.event.EventHandler -import akka.actor.{Actor, ActorRef} -import akka.actor.Actor._ -import akka.dispatch.CompletableFuture -import akka.AkkaException -import akka.japi.{ Function, Effect } - -/** - * Implements Oz-style dataflow (single assignment) variables. - * - * @author Jonas Bonér - */ -object DataFlow { - object Start - object Exit - - class DataFlowVariableException(msg: String) extends AkkaException(msg) - - /** - * Executes the supplied thunk in another thread. - */ - def thread(body: => Unit): Unit = spawn(body) - - /** - * JavaAPI. - * Executes the supplied Effect in another thread. - */ - def thread(body: Effect): Unit = spawn(body.apply) - - /** - * Executes the supplied function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: A => R) = - actorOf(new ReactiveEventBasedThread(body)).start - - /** - * JavaAPI. - * Executes the supplied Function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) = - actorOf(new ReactiveEventBasedThread(body.apply)).start - - private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) - extends Actor { - def receive = { - case Exit => self.stop - case message => self.reply(body(message.asInstanceOf[A])) - } - } - - private object DataFlowVariable { - private sealed abstract class DataFlowVariableMessage - private case class Set[T <: Any](value: T) extends DataFlowVariableMessage - private object Get extends DataFlowVariableMessage - } - - /** - * @author Jonas Bonér - */ - @deprecated("Superceeded by Future and CompletableFuture as of 1.1") - sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { - import DataFlowVariable._ - - def this() = this(1000 * 60) - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] - - private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - def receive = { - case s@Set(v) => - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - while(dataFlow.blockedReaders.peek ne null) - dataFlow.blockedReaders.poll ! s - } else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit => self.stop - } - } - - private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - private var readerFuture: Option[CompletableFuture[Any]] = None - def receive = { - case Get => dataFlow.value.get match { - case Some(value) => self reply value - case None => readerFuture = self.senderFuture - } - case Set(v:T) => readerFuture.map(_ completeWithResult v) - case Exit => self.stop - } - } - - private[this] val in = actorOf(new In(this)).start - - /** - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def <<(ref: DataFlowVariable[T]) { - if (this.value.get.isEmpty) in ! Set(ref()) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(ref: DataFlowVariable[T]) { this << ref } - - /** - * Sets the value of this variable (if unset). - */ - def <<(value: T) { - if (this.value.get.isEmpty) in ! Set(value) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(value: T) { this << value } - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def get(): T = this() - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def apply(): T = { - value.get getOrElse { - val out = actorOf(new Out(this)).start - - val result = try { - blockedReaders offer out - (out !! Get).as[T] - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - out ! Exit - throw e - } - - result.getOrElse(throw new DataFlowVariableException( - "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) - } - } - - def shutdown = in ! Exit - } -} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cfe64a8992..632d777184 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -61,7 +61,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) + firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures),timeout) /** * A non-blocking fold over the specified futures. @@ -87,7 +87,7 @@ object Futures { results add r.b if (results.size == allDone) { //Only one thread can get here try { - result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) + result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun) } catch { case e: Exception => EventHandler.error(e, this, e.getMessage) @@ -115,7 +115,7 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))( fun.apply _ ) /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first @@ -150,7 +150,7 @@ object Futures { * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -175,36 +175,6 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - - // ===================================== - // Deprecations - // ===================================== - - /** - * (Blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await - - - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { @@ -508,26 +478,26 @@ trait CompletableFuture[T] extends Future[T] { * Completes this Future with the specified result, if not already completed. * @return this */ - def complete(value: Either[Throwable, T]): CompletableFuture[T] + def complete(value: Either[Throwable, T]): Future[T] /** * Completes this Future with the specified result, if not already completed. * @return this */ - final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + final def completeWithResult(result: T): Future[T] = complete(Right(result)) /** * Completes this Future with the specified exception, if not already completed. * @return this */ - final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, * unless this Future has already been completed. * @return this. */ - final def completeWith(other: Future[T]): CompletableFuture[T] = { + final def completeWith(other: Future[T]): Future[T] = { other onComplete { f => complete(f.value.get) } this } @@ -535,12 +505,12 @@ trait CompletableFuture[T] extends Future[T] { /** * Alias for complete(Right(value)). */ - final def << (value: T): CompletableFuture[T] = complete(Right(value)) + final def << (value: T): Future[T] = complete(Right(value)) /** * Alias for completeWith(other). */ - final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) + final def << (other : Future[T]): Future[T] = completeWith(other) } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 83c30f23e0..130eeb9163 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -88,9 +88,6 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi import ThreadPoolConfig._ def build = dispatcherFactory(config) - //TODO remove this, for backwards compat only - @deprecated("Use .build instead") def buildThreadPool = build - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index fe75a31dc0..86ea4b338c 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -74,16 +74,16 @@ object EventHandler extends ListenerManagement { override val level = DebugLevel } - val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val warning = "[WARN] [%s] [%s] [%s] %s".intern - val info = "[INFO] [%s] [%s] [%s] %s".intern - val debug = "[DEBUG] [%s] [%s] [%s] %s".intern + val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warning = "[WARN] [%s] [%s] [%s] %s".intern + val info = "[INFO] [%s] [%s] [%s] %s".intern + val debug = "[DEBUG] [%s] [%s] [%s] %s".intern val generic = "[GENERIC] [%s] [%s]".intern - val ID = "event:handler".intern + val ADDRESS = "event:handler".intern class EventHandlerException extends AkkaException - lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build + lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ADDRESS).build val level: Int = config.getString("akka.event-handler-level", "DEBUG") match { case "ERROR" => ErrorLevel @@ -150,7 +150,7 @@ object EventHandler extends ListenerManagement { } class DefaultListener extends Actor { - self.id = ID + self.address = ADDRESS self.dispatcher = EventHandlerDispatcher def receive = { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala index 6534634e08..58b9d41fab 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala @@ -15,7 +15,7 @@ import akka.event.EventHandler class RemoteEventHandler extends Actor { import EventHandler._ - self.id = ID + self.address = ADDRESS self.dispatcher = EventHandlerDispatcher def receive = { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 210db70514..df266f99f0 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -31,28 +31,28 @@ trait RemoteModule { /** Lookup methods **/ - private[akka] def findActorById(id: String) : ActorRef = actors.get(id) + private[akka] def findActorByAddress(address: String) : ActorRef = actors.get(address) private[akka] def findActorByUuid(uuid: String) : ActorRef = actorsByUuid.get(uuid) - private[akka] def findActorFactory(id: String) : () => ActorRef = actorsFactories.get(id) + private[akka] def findActorFactory(address: String) : () => ActorRef = actorsFactories.get(address) - private[akka] def findTypedActorById(id: String) : AnyRef = typedActors.get(id) + private[akka] def findTypedActorByAddress(address: String) : AnyRef = typedActors.get(address) - private[akka] def findTypedActorFactory(id: String) : () => AnyRef = typedActorsFactories.get(id) + private[akka] def findTypedActorFactory(address: String) : () => AnyRef = typedActorsFactories.get(address) private[akka] def findTypedActorByUuid(uuid: String) : AnyRef = typedActorsByUuid.get(uuid) - private[akka] def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length)) - else findActorById(id) + private[akka] def findActorByAddressOrUuid(address: String, uuid: String) : ActorRef = { + var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) + else findActorByAddress(address) if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) actorRefOrNull } - private[akka] def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length)) - else findTypedActorById(id) + private[akka] def findTypedActorByAddressOrUuid(address: String, uuid: String) : AnyRef = { + var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findTypedActorByUuid(address.substring(UUID_PREFIX.length)) + else findTypedActorByAddress(address) if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid) actorRefOrNull } @@ -151,81 +151,6 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule clear } - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *

-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(factory: => Actor, host: String, port: Int): ActorRef = - Actor.remote.clientManagedActorOf(() => factory, host, port) - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf[MyActor]("www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: Any): Unit = super.notifyListeners(message) @@ -312,10 +237,10 @@ trait RemoteServerModule extends RemoteModule { /** * Register remote typed actor by a specific id. - * @param id custom actor id + * @param address actor address * @param typedActor typed actor to register */ - def registerTypedActor(id: String, typedActor: AnyRef): Unit + def registerTypedActor(address: String, typedActor: AnyRef): Unit /** * Register typed actor by interface name. @@ -330,23 +255,23 @@ trait RemoteServerModule extends RemoteModule { /** * Register remote typed actor by a specific id. - * @param id custom actor id + * @param address actor address * @param typedActor typed actor to register */ - def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit + def registerTypedPerSessionActor(address: String, factory: => AnyRef): Unit /** * Register remote typed actor by a specific id. - * @param id custom actor id + * @param address actor address * @param typedActor typed actor to register * Java API */ - def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = registerTypedPerSessionActor(id, factory.create) + def registerTypedPerSessionActor(address: String, factory: Creator[AnyRef]): Unit = registerTypedPerSessionActor(address, factory.create) /** * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. */ - def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef) + def register(actorRef: ActorRef): Unit = register(actorRef.address, actorRef) /** * Register Remote Actor by the Actor's uuid field. It starts the Actor if it is not started already. @@ -358,14 +283,14 @@ trait RemoteServerModule extends RemoteModule { *

* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ - def register(id: String, actorRef: ActorRef): Unit + def register(address: String, actorRef: ActorRef): Unit /** * Register Remote Session Actor by a specific 'id' passed as argument. *

* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ - def registerPerSession(id: String, factory: => ActorRef): Unit + def registerPerSession(address: String, factory: => ActorRef): Unit /** * Register Remote Session Actor by a specific 'id' passed as argument. @@ -373,7 +298,7 @@ trait RemoteServerModule extends RemoteModule { * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. * Java API */ - def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = registerPerSession(id, factory.create) + def registerPerSession(address: String, factory: Creator[ActorRef]): Unit = registerPerSession(address, factory.create) /** * Unregister Remote Actor that is registered using its 'id' field (not custom ID). @@ -385,52 +310,52 @@ trait RemoteServerModule extends RemoteModule { *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregister(id: String): Unit + def unregister(address: String): Unit /** * Unregister Remote Actor by specific 'id'. *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterPerSession(id: String): Unit + def unregisterPerSession(address: String): Unit /** * Unregister Remote Typed Actor by specific 'id'. *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterTypedActor(id: String): Unit + def unregisterTypedActor(address: String): Unit /** * Unregister Remote Typed Actor by specific 'id'. *

* NOTE: You need to call this method if you have registered an actor by a custom ID. */ - def unregisterTypedPerSessionActor(id: String): Unit + def unregisterTypedPerSessionActor(address: String): Unit } trait RemoteClientModule extends RemoteModule { self: RemoteModule => - def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None) + def actorFor(classNameOrServiceAddress: String, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceAddress, classNameOrServiceAddress, Actor.TIMEOUT, hostname, port, None) - def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader)) + def actorFor(classNameOrServiceAddress: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceAddress, classNameOrServiceAddress, Actor.TIMEOUT, hostname, port, Some(loader)) - def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None) + def actorFor(address: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(address, className, Actor.TIMEOUT, hostname, port, None) - def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader)) + def actorFor(address: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(address, className, Actor.TIMEOUT, hostname, port, Some(loader)) - def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) + def actorFor(classNameOrServiceAddress: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceAddress, classNameOrServiceAddress, timeout, hostname, port, None) - def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) + def actorFor(classNameOrServiceAddress: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceAddress, classNameOrServiceAddress, timeout, hostname, port, Some(loader)) - def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(serviceId, className, timeout, hostname, port, None) + def actorFor(address: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(address, className, timeout, hostname, port, None) def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None) @@ -441,12 +366,8 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) - def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = - typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) - - @deprecated("Will be removed after 1.1") - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef - + def typedActorFor[T](intfClass: Class[T], address: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = + typedActorFor(intfClass, address, implClassName, timeout, hostname, port, Some(loader)) /** * Clean-up all open connections. @@ -465,28 +386,17 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => /** Methods that needs to be implemented by a transport **/ - protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): T + protected[akka] def typedActorFor[T](intfClass: Class[T], serviceaddress: String, implClassName: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): T - protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef + protected[akka] def actorFor(serviceaddress: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]], - remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType, loader: Option[ClassLoader]): Option[CompletableFuture[T]] - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef - - @deprecated("Will be removed after 1.1") - private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit - - @deprecated("Will be removed after 1.1") - private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index fb5673277c..933e3cd9a9 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -37,7 +37,7 @@ object Duration { * Construct a Duration by parsing a String. In case of a format error, a * RuntimeException is thrown. See `unapply(String)` for more information. */ - def apply(s : String) : Duration = unapply(s) getOrElse error("format error") + def apply(s : String) : Duration = unapply(s) getOrElse sys.error("format error") /** * Deconstruct a Duration into length and unit if it is finite. @@ -77,7 +77,7 @@ object Duration { if ( ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if ( ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else - error("made some error in regex (should not be possible)") + sys.error("made some error in regex (should not be possible)") case REinf() => Some(Inf) case REminf() => Some(MinusInf) case _ => None diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 159b7eec68..37f74eb031 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -269,7 +269,7 @@ class RootEndpoint extends Actor with Endpoint { self.dispatcher = Endpoint.Dispatcher // adopt the configured id - if (RootActorBuiltin) self.id = RootActorID + if (RootActorBuiltin) self.address = RootActorID override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 443b28041f..db99861a92 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -63,7 +63,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private val lock = new ReadWriteGuard protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = - TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor)) + TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, timeout, loader, AkkaActorType.TypedActor)) protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], @@ -119,16 +119,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } } - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef)) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { - remoteClients.get(Address(actorRef.homeAddress.get)) match { - case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef) - case None => actorRef - } - } - /** * Clean-up all open connections. */ @@ -142,15 +132,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem remoteClients.foreach({ case (addr, client) => client.shutdown }) remoteClients.clear } - - def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = { - remoteActors.put(Address(hostname, port), uuid) - } - - private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = { - remoteActors.remove(Address(hostname,port), uuid) - //TODO: should the connection be closed when the last actor deregisters? - } } /** @@ -170,7 +151,6 @@ abstract class RemoteClient private[akka] ( remoteAddress.getPort protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -219,7 +199,7 @@ abstract class RemoteClient private[akka] ( send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), - actorRef.id, + actorRef.address, actorRef.actorClassName, actorRef.timeout, Right(message), @@ -320,16 +300,6 @@ abstract class RemoteClient private[akka] ( pendingRequest = pendingRequests.peek // try to grab next message } } - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't register supervisor for " + actorRef + " since it is not under supervision") - else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't unregister supervisor for " + actorRef + " since it is not under supervision") - else supervisors.remove(actorRef.supervisor.get.uuid) } /** @@ -358,7 +328,7 @@ class ActiveRemoteClient private[akka] ( timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -433,7 +403,6 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, futures: ConcurrentMap[Uuid, CompletableFuture[_]], - supervisors: ConcurrentMap[Uuid, ActorRef], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -450,7 +419,7 @@ class ActiveRemoteClientPipelineFactory( case _ => (Nil,Nil) } - val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil new StaticChannelPipeline(stages: _*) } @@ -463,7 +432,6 @@ class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientHandler( val name: String, val futures: ConcurrentMap[Uuid, CompletableFuture[_]], - val supervisors: ConcurrentMap[Uuid, ActorRef], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -488,19 +456,7 @@ class ActiveRemoteClientHandler( val message = MessageSerializer.deserialize(reply.getMessage) future.completeWithResult(message) } else { - val exception = parseException(reply, client.loader) - - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) - } - - future.completeWithException(exception) + future.completeWithException(parseException(reply, client.loader)) } case other => throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) @@ -584,25 +540,12 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with if (optimizeLocalScoped_?) { val home = this.address if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) {//TODO: switch to InetSocketAddress.equals? - val localRef = findActorByIdOrUuid(serviceId,serviceId) + val localRef = findActorByAddressOrUuid(serviceId,serviceId) if (localRef ne null) return localRef //Code significantly simpler with the return statement } } - RemoteActorRef(serviceId, className, host, port, timeout, loader) - } - - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = { - - if (optimizeLocalScoped_?) { - val home = this.address - if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? - return new LocalActorRef(factory, None, false, "todo") // Code is much simpler with return - } - - val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), true, "todo") - //ref.timeout = timeout //removed because setting default timeout should be done after construction - ref + RemoteActorRef(serviceId, className, timeout, loader) } } @@ -762,7 +705,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregister(actorRef: ActorRef): Unit = guard withGuard { if (_isRunning.isOn) { - actors.remove(actorRef.id, actorRef) + actors.remove(actorRef.address, actorRef) actorsByUuid.remove(actorRef.uuid, actorRef) } } @@ -899,19 +842,18 @@ class RemoteServerHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - import scala.collection.JavaConversions.asScalaIterable val clientAddress = getClientAddress(ctx) // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { actor ! PoisonPill } catch { case e: Exception => } } //FIXME switch approach or use other thread to execute this // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { TypedActor.stop(actor) } catch { case e: Exception => } } @@ -990,7 +932,7 @@ class RemoteServerHandler( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), - actorInfo.getId, + actorInfo.getAddress, actorInfo.getTarget, actorInfo.getTimeout, r, @@ -1013,6 +955,14 @@ class RemoteServerHandler( private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = { val actorInfo = request.getActorInfo val typedActorInfo = actorInfo.getTypedActorInfo + /* TODO Implement sender references for remote TypedActor calls + if (request.hasSender) { + val iface = //TODO extrace the senderProxy interface from the request, load it as a class using the application loader + val ref = RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) + val senderTA = TypedActor.createProxyForRemoteActorRef[AnyRef](iface, ref) + Some() + } else None + */ val typedActor = createTypedActor(actorInfo, channel) //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo? @@ -1053,7 +1003,8 @@ class RemoteServerHandler( try { val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) - + //TODO SenderContextInfo.senderActorRef.value = sender + //TODO SenderContextInfo.senderProxy.value = senderProxy if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread else { //Sends the response @@ -1061,7 +1012,7 @@ class RemoteServerHandler( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), - actorInfo.getId, + actorInfo.getAddress, actorInfo.getTarget, actorInfo.getTimeout, result, @@ -1079,21 +1030,23 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) } - messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread + messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread //If it's a future, we can lift on that to defer the send to when the future is completed case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) ) case other => sendResponse(Right(other)) } } } catch { - case e: InvocationTargetException => - EventHandler.error(e, this, e.getMessage) - write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) - server.notifyListeners(RemoteServerError(e, server)) case e: Exception => EventHandler.error(e, this, e.getMessage) - write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) + write(channel, createErrorReplyMessage(e match { + case e: InvocationTargetException => e.getCause + case e => e + }, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) + } finally { + //TODO SenderContextInfo.senderActorRef.value = None ? + //TODO SenderContextInfo.senderProxy.value = None ? } } @@ -1114,50 +1067,22 @@ class RemoteServerHandler( */ private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid - val id = actorInfo.getId + val address = actorInfo.getAddress - findSessionActor(id, channel) match { + findSessionActor(address, channel) match { case null => // we dont have it in the session either, see if we have a factory for it - server.findActorFactory(id) match { + server.findActorFactory(address) match { case null => null case factory => val actorRef = factory() actorRef.uuid = parseUuid(uuid) //FIXME is this sensible? - sessionActors.get(channel).put(id, actorRef) + sessionActors.get(channel).put(address, actorRef) actorRef.start //Start it where's it's created } case sessionActor => sessionActor } } - - private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = { - val uuid = actorInfo.getUuid - val id = actorInfo.getId - val timeout = actorInfo.getTimeout - val name = actorInfo.getTarget - - try { - if (UNTRUSTED_MODE) throw new SecurityException( - "RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) - else Class.forName(name) - val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) - actorRef.uuid = parseUuid(uuid) - actorRef.id = id - actorRef.timeout = timeout - server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid - actorRef.start //Start it where it's created - } catch { - case e: Throwable => - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - - } - /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -1167,14 +1092,11 @@ class RemoteServerHandler( */ private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid - val id = actorInfo.getId + val address = actorInfo.getAddress - server.findActorByIdOrUuid(id, parseUuid(uuid).toString) match { - case null => // the actor has not been registered globally. See if we have it in the session - createSessionActor(actorInfo, channel) match { - case null => createClientManagedActor(actorInfo) // maybe it is a client managed actor - case sessionActor => sessionActor - } + server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match { + // the actor has not been registered globally. See if we have it in the session + case null => createSessionActor(actorInfo, channel) case actorRef => actorRef } } @@ -1183,63 +1105,25 @@ class RemoteServerHandler( * gets the actor from the session, or creates one if there is a factory for it */ private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={ - val id = actorInfo.getId - findTypedSessionActor(id, channel) match { + val address = actorInfo.getAddress + findTypedSessionActor(address, channel) match { case null => - server.findTypedActorFactory(id) match { + server.findTypedActorFactory(address) match { case null => null case factory => val newInstance = factory() - typedSessionActors.get(channel).put(id, newInstance) + typedSessionActors.get(channel).put(address, newInstance) newInstance } case sessionActor => sessionActor } } - private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { - val typedActorInfo = actorInfo.getTypedActorInfo - val interfaceClassname = typedActorInfo.getInterface - val targetClassname = actorInfo.getTarget - val uuid = actorInfo.getUuid - - try { - if (UNTRUSTED_MODE) throw new SecurityException( - "RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - val (interfaceClass, targetClass) = - if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), - applicationLoader.get.loadClass(targetClassname)) - else (Class.forName(interfaceClassname), Class.forName(targetClassname)) - - val newInstance = TypedActor.newInstance( - interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid - newInstance - } catch { - case e: Throwable => - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } - private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { val uuid = actorInfo.getUuid - - server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match { - case null => // the actor has not been registered globally. See if we have it in the session - createTypedSessionActor(actorInfo, channel) match { - case null => - // FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail - -/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]") - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e -*/ createClientManagedTypedActor(actorInfo) // client-managed actor - case sessionActor => sessionActor - } + server.findTypedActorByAddressOrUuid(actorInfo.getAddress, parseUuid(uuid).toString) match { + // the actor has not been registered globally. See if we have it in the session + case null => createTypedSessionActor(actorInfo, channel) case typedActor => typedActor } } @@ -1249,7 +1133,7 @@ class RemoteServerHandler( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, Right(request.getUuid), - actorInfo.getId, + actorInfo.getAddress, actorInfo.getTarget, actorInfo.getTimeout, Left(exception), @@ -1284,7 +1168,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na protected val open = new AtomicBoolean(true) override def add(channel: Channel): Boolean = guard withReadGuard { - if(open.get) { + if (open.get) { super.add(channel) } else { channel.close diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 149dca68b6..42fb72cf3f 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -44,7 +44,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait StatelessActorFormat[T <: Actor] extends Format[T] { +trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable{ def fromBinary(bytes: Array[Byte], act: T) = act def toBinary(ac: T) = Array.empty[Byte] @@ -64,7 +64,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { +trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { val serializer: Serializer def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] @@ -114,12 +114,10 @@ object ActorSerialization { val builder = SerializedActorRefProtocol.newBuilder .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) - .setId(actorRef.id) + .setAddress(actorRef.address) .setActorClassname(actorRef.actorClass.getName) - .setOriginalAddress(toAddressProtocol(actorRef)) .setTimeout(actorRef.timeout) - if (serializeMailBox == true) { val messages = actorRef.mailbox match { @@ -135,7 +133,7 @@ object ActorSerialization { RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), - actorRef.id, + actorRef.address, actorRef.actorClassName, actorRef.timeout, Right(m.message), @@ -161,10 +159,6 @@ object ActorSerialization { homeAddress: Option[InetSocketAddress], format: Format[T]): ActorRef = { val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - homeAddress.foreach { addr => - val addressProtocol = AddressProtocol.newBuilder.setHostname(addr.getAddress.getHostAddress).setPort(addr.getPort).build - builder.setOriginalAddress(addressProtocol) - } fromProtobufToLocalActorRef(builder.build, format, None) } @@ -181,7 +175,7 @@ object ActorSerialization { protocol.getLifeCycle.getLifeCycle match { case LifeCycleType.PERMANENT => Permanent case LifeCycleType.TEMPORARY => Temporary - case unknown => throw new IllegalActorStateException("LifeCycle type is not valid: " + unknown) + case unknown => throw new IllegalActorStateException("LifeCycle type is not valid [" + unknown + "]") } } else UndefinedLifeCycle @@ -205,21 +199,15 @@ object ActorSerialization { else actorClass.newInstance.asInstanceOf[Actor] } - val homeAddress = { - val address = protocol.getOriginalAddress - Some(new InetSocketAddress(address.getHostname, address.getPort)) - } - val ar = new LocalActorRef( uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), - protocol.getId, + protocol.getAddress, if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None, lifeCycle, supervisor, hotswap, factory, - homeAddress, "address") // FIXME grab real address and use that val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] @@ -249,10 +237,8 @@ object RemoteActorSerialization { */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { val ref = RemoteActorRef( - protocol.getClassOrServiceName, + protocol.getAddress, protocol.getActorClassname, - protocol.getHomeAddress.getHostname, - protocol.getHomeAddress.getPort, protocol.getTimeout, loader) ref @@ -277,7 +263,7 @@ object RemoteActorSerialization { def createRemoteMessageProtocolBuilder( actorRef: Option[ActorRef], replyUuid: Either[Uuid, UuidProtocol], - actorId: String, + actorAddress: String, actorClassName: String, timeout: Long, message: Either[Throwable, Any], @@ -294,7 +280,7 @@ object RemoteActorSerialization { val actorInfoBuilder = ActorInfoProtocol.newBuilder .setUuid(uuidProtocol) - .setId(actorId) + .setAddress(actorAddress) .setTarget(actorClassName) .setTimeout(timeout) @@ -336,7 +322,8 @@ object RemoteActorSerialization { secureCookie.foreach(messageBuilder.setCookie(_)) - actorRef.foreach { ref => + /* TODO invent new supervision strategy + actorRef.foreach { ref => ref.registerSupervisorAsRemoteActor.foreach { id => messageBuilder.setSupervisorUuid( UuidProtocol.newBuilder @@ -344,7 +331,7 @@ object RemoteActorSerialization { .setLow(id.getClockSeqAndNode) .build) } - } + } */ if( senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index 3a292e0de0..3fc661afce 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -17,7 +17,7 @@ import sjson.json.{Serializer => SJSONSerializer} /** * @author Jonas Bonér */ -@serializable trait Serializer { +trait Serializer extends scala.Serializable { @volatile var classLoader: Option[ClassLoader] = None def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala deleted file mode 100644 index 65693636b4..0000000000 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -package akka.actor.remote - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.Dispatchers -import akka.actor.Actor._ -import akka.actor._ - -class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg) - -object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) -} -class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown - } -} - -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => throw new ExpectedRemoteProblem("expected") - } -} - -class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -class CountDownActor(latch: CountDownLatch) extends Actor { - def receive = { - case "World" => latch.countDown - } -} -/* -object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) -} -class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ - - def sendOff = sendTo ! "Hello" - - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown - } -}*/ - -class MyActorCustomConstructor extends Actor { - var prefix = "default-" - var count = 0 - def receive = { - case "incrPrefix" => count += 1; prefix = "" + count + "-" - case msg: String => self.reply(prefix + msg) - } -} - -class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest { - "ClientInitiatedRemoteActor" should { - "shouldSendOneWay" in { - val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start - clientManaged must not be null - clientManaged.getClass must be (classOf[LocalActorRef]) - clientManaged ! "OneWay" - RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true) - clientManaged.stop - } - - "shouldSendOneWayAndReceiveReply" in { - val latch = new CountDownLatch(1) - val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start - implicit val sender = Some(actorOf(new CountDownActor(latch)).start) - - actor ! "Hello" - - latch.await(3,TimeUnit.SECONDS) must be (true) - } - - "shouldSendBangBangMessageAndReceiveReply" in { - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start - val result = actor !! ("Hello", 10000) - "World" must equal (result.get.asInstanceOf[String]) - actor.stop - } - - "shouldSendBangBangMessageAndReceiveReplyConcurrently" in { - val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList - actors.map(_ !!! ("Hello", 10000)) foreach { future => - "World" must equal (future.await.result.asInstanceOf[Option[String]].get) - } - actors.foreach(_.stop) - } - - "shouldRegisterActorByUuid" in { - val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start - val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("1-test") - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("2-test") - - (actor2 !! "test").get must equal ("default-test") - - actor1.stop - actor2.stop - } - - "shouldSendAndReceiveRemoteException" in { - - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start - try { - implicit val timeout = 500000000L - val f = (actor !!! "Failure").await.resultOrException - fail("Shouldn't get here!!!") - } catch { - case e: ExpectedRemoteProblem => - } - actor.stop - } - } -} diff --git a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala index f6e0c1806f..459f7641f3 100644 --- a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala @@ -19,11 +19,5 @@ class OptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must be (fooActor) } - - "Create local actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index d75b36a628..4e76f4bb6c 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -23,7 +23,7 @@ object Log { } } -@serializable class RemotePingPong1Actor extends Actor { +class RemotePingPong1Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -41,7 +41,7 @@ object Log { } } -@serializable class RemotePingPong2Actor extends Actor { +class RemotePingPong2Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -55,7 +55,7 @@ object Log { } } -@serializable class RemotePingPong3Actor extends Actor { +class RemotePingPong3Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -69,7 +69,7 @@ object Log { } } -class RemoteSupervisorSpec extends AkkaRemoteTest { +/*class RemoteSupervisorSpec extends AkkaRemoteTest { var pingpong1: ActorRef = _ var pingpong2: ActorRef = _ @@ -324,7 +324,6 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { factory.newInstance } - /* // Uncomment when the same test passes in SupervisorSpec - pending bug @Test def shouldKillMultipleActorsOneForOne2 = { clearMessageLogs @@ -338,9 +337,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } -*/ - /* @Test def shouldOneWayKillSingleActorOneForOne = { clearMessageLogs @@ -435,6 +432,4 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } - */ - -} +}*/ diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index bd772e65a0..2dc95853f3 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,13 +4,9 @@ package akka.actor.remote -import akka.config.Supervision._ -import akka.actor._ - import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} -import akka.testing._ object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] @@ -24,73 +20,11 @@ object RemoteTypedActorLog { class RemoteTypedActorSpec extends AkkaRemoteTest { - import RemoteTypedActorLog._ - - private var conf: TypedActorConfigurator = _ - - override def beforeEach { - super.beforeEach - Config.config - conf = new TypedActorConfigurator - conf.configure( - new AllForOneStrategy(List(classOf[Exception]), 3, 5000), - List( - new SuperviseTypedActor( - classOf[RemoteTypedActorOne], - classOf[RemoteTypedActorOneImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)), - new SuperviseTypedActor( - classOf[RemoteTypedActorTwo], - classOf[RemoteTypedActorTwoImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)) - ).toArray).supervise - } - - override def afterEach { - clearMessageLogs - conf.stop - super.afterEach - Thread.sleep(1000) - } "RemoteModule Typed Actor " should { - /*"receives one-way message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + "have unit tests" in { - ta.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") } - - "responds to request-reply message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - ta.requestReply("ping") must equal ("pong") - } */ - - "be restarted on failure" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - - try { - ta.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - } - - /* "restarts linked friends on failure" in { - val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) - val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) - - try { - ta1.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - }*/ } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 0f20cc22ac..6debd4acec 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -191,7 +191,7 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { while(!testDone()) { if (latch.await(200, TimeUnit.MILLISECONDS)) - error("Test didn't complete within 100 cycles") + sys.error("Test didn't complete within 100 cycles") else latch.countDown } diff --git a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala index 001a66eae0..151dd4ba0f 100644 --- a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala @@ -18,11 +18,5 @@ class UnOptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must not be (fooActor) } - - "Create remote actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must not be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 2eec948698..7aa0cede07 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -221,7 +221,7 @@ class MyActorWithDualCounter extends Actor { } } -@serializable class MyActor extends Actor { +class MyActor extends Actor with scala.Serializable { var count = 0 def receive = { @@ -249,7 +249,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { } } -@serializable class MyJavaSerializableActor extends Actor { +class MyJavaSerializableActor extends Actor with scala.Serializable { var count = 0 self.receiveTimeout = Some(1000) diff --git a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala deleted file mode 100644 index 6edec702b5..0000000000 --- a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.actor.ticket - -import akka.actor._ -import akka.actor.remote.AkkaRemoteTest - - -class Ticket519Spec extends AkkaRemoteTest { - "A remote TypedActor" should { - "should handle remote future replies" in { - val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,host,port) - val r = actor.someFutureString - - r.await.result.get must equal ("foo") - } - } -} diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 5a2c580e5e..1fa5e9ead1 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -22,7 +22,6 @@ object Think extends DiningHakkerMessage * A Chopstick is an actor, it can be taken, and put back */ class Chopstick(name: String) extends Actor { - self.id = name //When a Chopstick is taken by a hakker //It will refuse to be taken by other hakkers @@ -49,7 +48,6 @@ class Chopstick(name: String) extends Actor { * A hakker is an awesome dude or dudett who either thinks about hacking or has to eat ;-) */ class Hakker(name: String,left: ActorRef, right: ActorRef) extends Actor { - self.id = name //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat @@ -78,7 +76,7 @@ class Hakker(name: String,left: ActorRef, right: ActorRef) extends Actor { //back to think about how he should obtain his chopsticks :-) def waiting_for(chopstickToWaitFor: ActorRef, otherChopstick: ActorRef): Receive = { case Taken(`chopstickToWaitFor`) => - println("%s has picked up %s and %s, and starts to eat",name,left.id,right.id) + println("%s has picked up %s and %s, and starts to eat",name,left.address,right.address) become(eating) Scheduler.scheduleOnce(self,Think,5,TimeUnit.SECONDS) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 3273136690..ccc400daaf 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -31,7 +31,6 @@ case class TakenBy(hakker: Option[ActorRef]) * A chopstick is an actor, it can be taken, and put back */ class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { - self.id = name // A chopstick begins its existence as available and taken by no one startWith(Available, TakenBy(None)) @@ -82,7 +81,6 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { - self.id = name //All hakkers start waiting startWith(Waiting, TakenChopsticks(None, None)) @@ -128,7 +126,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startEating(left: ActorRef, right: ActorRef): State = { - println("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) + println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) goto(Eating) using TakenChopsticks(Some(left), Some(right)) forMax (5 seconds) } diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala deleted file mode 100644 index 3477ff5783..0000000000 --- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package sample.remote - -import akka.actor.Actor._ -import akka.actor. {ActorRegistry, Actor} -import Actor.remote - -class RemoteHelloWorldActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -object ClientManagedRemoteActorServer { - def run = { - remote.start("localhost", 2552) - } - - def main(args: Array[String]) = run -} - -object ClientManagedRemoteActorClient { - - def run = { - val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start - val result = actor !! "Hello" - } - - def main(args: Array[String]) = run -} - diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index b8fb70f491..13b9c2e816 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -33,7 +33,7 @@ object Logger { class Slf4jEventHandler extends Actor with Logging { import EventHandler._ - self.id = ID + self.address = ADDRESS self.dispatcher = EventHandlerDispatcher def receive = { diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index 74b1bf5a9e..5d1aa9dc96 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -11,7 +11,7 @@ import org.multiverse.transactional.refs.BasicRef /** * Common trait for all the transactional objects. */ -@serializable trait Transactional { +trait Transactional extends Serializable { val uuid: String } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ce198be6bf..e6fd8ebbce 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -206,8 +206,8 @@ class NestingQueue { def pop = q.poll @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } + def enter { if (active) sys.error("already active") else active = true } + def leave { if (!active) sys.error("not active") else active = false } def isActive = active } diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index f74d1598f3..e718ad193a 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -286,39 +286,15 @@ abstract class TypedActor extends Actor with Proxyable { final class TypedActorContext(private[akka] val actorRef: ActorRef) { private[akka] var _sender: AnyRef = _ - /** - * Returns the uuid for the actor. - * @deprecated use 'uuid()' - */ - def getUuid() = actorRef.uuid - /** 5 * Returns the uuid for the actor. */ def uuid = actorRef.uuid - def timeout = actorRef.timeout - - /** - * @deprecated use 'timeout()' - */ - def getTimout = timeout - def setTimout(timeout: Long) = actorRef.timeout = timeout - - def id = actorRef.id - - /** - * @deprecated use 'id()' - */ - def getId = id - def setId(id: String) = actorRef.id = id + def address = actorRef.address def receiveTimeout = actorRef.receiveTimeout - /** - * @deprecated use 'receiveTimeout()' - */ - def getReceiveTimeout = receiveTimeout def setReceiveTimeout(timeout: Long) = actorRef.setReceiveTimeout(timeout) def mailboxSize = actorRef.mailboxSize @@ -364,11 +340,6 @@ final class TypedActorContext(private[akka] val actorRef: ActorRef) { * @deprecated use 'senderFuture()' */ def getSenderFuture = senderFuture - - /** - * Returns the home address and port for this actor. - */ - def homeAddress: InetSocketAddress = actorRef.homeAddress.getOrElse(null) } object TypedActorConfiguration { @@ -377,18 +348,12 @@ object TypedActorConfiguration { new TypedActorConfiguration() } - def apply(timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().timeout(Duration(timeout, "millis")) + def apply(timeoutMillis: Long) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(Duration(timeoutMillis, "millis")) } - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port) - } - - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis")) + def apply(timeout: Duration) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(timeout) } } @@ -399,10 +364,8 @@ object TypedActorConfiguration { */ final class TypedActorConfiguration { private[akka] var _timeout: Long = Actor.TIMEOUT - private[akka] var _host: Option[InetSocketAddress] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None private[akka] var _threadBasedDispatcher: Option[Boolean] = None - private[akka] var _id: Option[String] = None def timeout = _timeout def timeout(timeout: Duration) : TypedActorConfiguration = { @@ -410,21 +373,6 @@ final class TypedActorConfiguration { this } - def id = _id - def id(id: String): TypedActorConfiguration = { - _id = Option(id) - this - } - - @deprecated("Will be removed after 1.1") - def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port)) - - @deprecated("Will be removed after 1.1") - def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = { - _host = Some(remoteAddress) - this - } - def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = { if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException( "Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'") @@ -480,28 +428,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration()) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -522,32 +448,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration(timeout)) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -555,20 +455,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(factory),config), config) - - /** - * Creates an ActorRef, can be local only or client-managed-remote - */ - @deprecated("Will be removed after 1.1") - private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = { - config match { - case null => actorOf(typedActor) - case c: TypedActorConfiguration if (c._host.isDefined) => - Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort) - case _ => actorOf(typedActor) - } - } + newInstance(intfClass, actorOf(newTypedActor(factory)), config) /** * Factory method for typed actor. @@ -577,7 +464,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config) + newInstance(intfClass, actorOf(newTypedActor(targetClass)), config) private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") @@ -585,11 +472,8 @@ object TypedActor { } private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val config = TypedActorConfiguration(timeout) - if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get) - newInstance(intfClass, targetClass, config) - } + remoteAddress: Option[InetSocketAddress], timeout: Long): T = + newInstance(intfClass, targetClass, TypedActorConfiguration(timeout)) private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = { val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] @@ -597,17 +481,10 @@ object TypedActor { typedActor.initialize(proxy) if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) - if (config._id.isDefined) actorRef.id = config._id.get actorRef.timeout = config.timeout - val remoteAddress = actorRef match { - case remote: RemoteActorRef => remote.homeAddress - case local: LocalActorRef if local.clientManaged => local.homeAddress - case _ => None - } - - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout)) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.timeout)) actorRef.start proxy.asInstanceOf[T] } @@ -633,20 +510,6 @@ object TypedActor { def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T = newInstance(intfClass, factory.create) - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, hostname, port) - - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, timeout, hostname, port) - /** * Java API. */ @@ -677,7 +540,7 @@ object TypedActor { val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler) val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false) - AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, actorRef.homeAddress, 5000L)) + AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, 5000L)) awProxy.asInstanceOf[T] } @@ -731,7 +594,7 @@ object TypedActor { * * Example linking another typed actor from within a typed actor: *

-   *   TypedActor.link(getContext(), child);
+   *   TypedActor.link(TypedActor.proxyFor(this.getContext().actorRef()).get, child);
    * 
* * @param supervisor the supervisor Typed Actor @@ -750,7 +613,7 @@ object TypedActor { * * Example linking another typed actor from within a typed actor: *
-   *   TypedActor.link(getContext(), child, faultHandler);
+   *   TypedActor.link(TypedActor.proxyFor(this.getContext().actorRef()).get, child, faultHandlingStrategy);
    * 
* * @param supervisor the supervisor Typed Actor @@ -773,7 +636,7 @@ object TypedActor { * * Example unlinking another typed actor from within a typed actor: *
-   *   TypedActor.unlink(getContext(), child);
+   *   TypedActor.unlink(TypedActor.proxyFor(this.getContext().actorRef()).get, child);
    * 
* * @param supervisor the supervisor Typed Actor @@ -964,19 +827,19 @@ private[akka] abstract class ActorAspect { case -1 => s case x => s.substring(0,x + TypedActor.AW_PROXY_PREFIX.length) } - //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo? + //TODO: Add ownerTypeHint and parameter types to the TypedActorInfo? val message: Tuple3[String, Array[Class[_]], Array[AnyRef]] = ((extractOwnerTypeHint(methodRtti.getMethod.getDeclaringClass.getName), methodRtti.getParameterTypes, methodRtti.getParameterValues)) - //FIXME send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server + //TODO send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server //val senderProxy = Option(SenderContextInfo.senderProxy.value) val future = Actor.remote.send[AnyRef]( - message, senderActorRef, None, remoteAddress.get, + message, senderActorRef, None, timeout, isOneWay, actorRef, - Some((interfaceClass.getName, methodRtti.getMethod.getName)), + Some((interfaceClass.getName, methodRtti.getMethod.getName)), //TODO Include the interface of the senderProxy here somehow ActorType.TypedActor, None) //TODO: REVISIT: Use another classloader? @@ -999,7 +862,6 @@ private[akka] abstract class ActorAspect { typedActor = init.targetInstance actorRef = init.actorRef uuid = actorRef.uuid - remoteAddress = init.remoteAddress timeout = init.timeout } } @@ -1052,14 +914,10 @@ private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) * @author Jonas Bonér */ private[akka] sealed case class AspectInit( - val interfaceClass: Class[_], - val targetInstance: TypedActor, - val actorRef: ActorRef, - val remoteAddress: Option[InetSocketAddress], - val timeout: Long) { - def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) = - this(interfaceClass, targetInstance, actorRef, None, timeout) - + interfaceClass: Class[_], + targetInstance: TypedActor, + actorRef: ActorRef, + timeout: Long) { } diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala index f2f2a7a1fc..01a1e8add7 100644 --- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala @@ -106,14 +106,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa val implementationClass = component.target val timeout = component.timeout - val (remoteAddress,actorRef) = - component.remoteAddress match { - case Some(a) => - (Some(new InetSocketAddress(a.hostname, a.port)), - Actor.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port)) - case None => - (None, Actor.actorOf(TypedActor.newTypedActor(implementationClass))) - } + val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass)) actorRef.timeout = timeout if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get @@ -123,7 +116,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa AspectInitRegistry.register( proxy, - AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout)) + AspectInit(interfaceClass, typedActor, actorRef, timeout)) typedActor.initialize(proxy) actorRef.start diff --git a/config/akka-reference.conf b/config/akka-reference.conf index df2c2c3e0d..fd83525584 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -2,6 +2,13 @@ # Akka Config File # #################### + +# spawn-mapping { +# address1 { replication-factor: 2.0, deep-copy = on } +# address2 { replication-factor: 5.0, router = "MEM" } +# ... +# } + # This file has all the default settings, so all these could be removed with no visible effect. # Modify as needed.