From 0dff50fa522fb1e733e7527b7b321197b5535a85 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 12:48:30 +0200 Subject: [PATCH 1/6] Removed client-managed actors, a lot of deprecated methods and DataFlowVariable (superceded by Future) --- .../actor/supervisor/SupervisorSpec.scala | 6 +- .../scala/akka/dataflow/DataFlowSpec.scala | 165 ------------------ .../test/scala/akka/dispatch/FutureSpec.scala | 75 -------- .../src/main/scala/akka/actor/Actor.scala | 12 +- .../src/main/scala/akka/actor/ActorRef.scala | 158 ++--------------- .../main/scala/akka/actor/Supervisor.scala | 3 +- .../scala/akka/config/SupervisionConfig.scala | 22 +-- .../main/scala/akka/dataflow/DataFlow.scala | 165 ------------------ .../src/main/scala/akka/dispatch/Future.scala | 30 ---- .../remoteinterface/RemoteInterface.scala | 79 --------- .../remote/netty/NettyRemoteSupport.scala | 13 -- .../serialization/SerializationProtocol.scala | 10 +- .../scala/akka/serialization/Serializer.scala | 2 +- .../ClientInitiatedRemoteActorSpec.scala | 142 --------------- .../remote/OptimizedLocalScopedSpec.scala | 6 - .../scala/remote/RemoteSupervisorSpec.scala | 15 +- .../scala/remote/RemoteTypedActorSpec.scala | 68 +------- .../remote/UnOptimizedLocalScopedSpec.scala | 6 - .../SerializableTypeClassActorSpec.scala | 4 +- .../src/test/scala/ticket/Ticket519Spec.scala | 19 -- .../ClientManagedRemoteActorSample.scala | 35 ---- akka-stm/src/main/scala/akka/stm/Ref.scala | 2 +- .../main/scala/akka/actor/TypedActor.scala | 118 +------------ .../config/TypedActorGuiceConfigurator.scala | 11 +- 24 files changed, 53 insertions(+), 1113 deletions(-) delete mode 100644 akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala delete mode 100644 akka-actor/src/main/scala/akka/dataflow/DataFlow.scala delete mode 100644 akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala delete mode 100644 akka-remote/src/test/scala/ticket/Ticket519Spec.scala delete mode 100644 akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index 131cdeee8f..469b84070f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -60,7 +60,11 @@ object SupervisorSpec { class Master extends Actor { self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt) - val temp = self.spawnLink[TemporaryActor] + val temp = { + val a = actorOf[TemporaryActor] + self link a + a.start + } override def receive = { case Die => temp !! (Die, TimeoutMillis) diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala deleted file mode 100644 index f5a107f511..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import org.scalatest.Spec -import org.scalatest.Assertions -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.DefaultCompletableFuture -import java.util.concurrent.{TimeUnit, CountDownLatch} -import annotation.tailrec -import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger} -import akka.actor.ActorRegistry - -@RunWith(classOf[JUnitRunner]) -class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { - describe("DataflowVariable") { - it("should be able to set the value of one variable from other variables") { - import DataFlow._ - - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - val x, y, z = new DataFlowVariable[Int] - thread { - z << x() + y() - result.set(z()) - latch.countDown - } - thread { x << 40 } - thread { y << 2 } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (42) - List(x,y,z).foreach(_.shutdown) - } - - it("should be able to sum a sequence of ints") { - import DataFlow._ - - def ints(n: Int, max: Int): List[Int] = - if (n == max) Nil - else n :: ints(n + 1, max) - - def sum(s: Int, stream: List[Int]): List[Int] = stream match { - case Nil => s :: Nil - case h :: t => s :: sum(h + s, t) - } - - val latch = new CountDownLatch(1) - val result = new AtomicReference[List[Int]](Nil) - val x = new DataFlowVariable[List[Int]] - val y = new DataFlowVariable[List[Int]] - val z = new DataFlowVariable[List[Int]] - - thread { x << ints(0, 1000) } - thread { y << sum(0, x()) } - - thread { z << y() - result.set(z()) - latch.countDown - } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (sum(0,ints(0,1000))) - List(x,y,z).foreach(_.shutdown) - } -/* - it("should be able to join streams") { - import DataFlow._ - Actor.registry.shutdownAll - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - val t1 = thread { ints(0, 1000, producer) } - val t2 = thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - } -*/ - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - Actor.registry.shutdownAll - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown - } - - val setY = thread { - // Thread.sleep(2000) - y << 2 - } - - val setV = thread { - v << y - } - List(x,y,z,v) foreach (_.shutdown) - latch.await(2,TimeUnit.SECONDS) should equal (true) - }*/ - } -} diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index a946713c3d..552fc5f23c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -134,81 +134,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldFutureAwaitEitherLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitEitherRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitAll = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "Hello" - Futures.awaitAll(List(future1, future2)) - assert(future1.result.isDefined) - assert("World" === future1.result.get) - assert(future2.result.isDefined) - assert("World" === future2.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFuturesAwaitMapHandleEmptySequence { - assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) - } - - @Test def shouldFuturesAwaitMapHandleNonEmptySequence { - val latches = (1 to 3) map (_ => new StandardLatch) - val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start) - val futures = actors map (actor => (actor.!!![String]("Hello"))) - latches foreach { _.open } - - assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length)) - } - @Test def shouldFoldResults { val actors = (1 to 10).toList map { _ => actorOf(new Actor { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 718c3b94bb..49b76d58d5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -18,7 +18,7 @@ import akka.japi. {Creator, Procedure} /** * Life-cycle messages for the Actors */ -@serializable sealed trait LifeCycleMessage +sealed trait LifeCycleMessage extends Serializable /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } @@ -165,7 +165,7 @@ object Actor extends ListenerManagement { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }, None) + }) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -185,7 +185,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) + def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -195,7 +195,7 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) + def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when @@ -245,8 +245,7 @@ object Actor extends ListenerManagement { *

* Here you find functions like: * - !, !!, !!! and forward - * - link, unlink, startLink, spawnLink etc - * - makeRemote etc. + * - link, unlink, startLink etc * - start, stop * - etc. * @@ -269,7 +268,6 @@ object Actor extends ListenerManagement { * import self._ * id = ... * dispatcher = ... - * spawnLink[OtherActor] * ... * } * diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index cb28a07407..883b6668bf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -446,38 +446,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def startLink(actorRef: ActorRef): Unit - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.actorOf instead") - def spawn(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Atomically create (from actor class), link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success") - def spawnLink(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote, link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - /** * Returns the mailbox size. */ @@ -593,10 +561,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * * @author Jonas Bonér */ -class LocalActorRef private[akka] ( - private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) +class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor) extends ActorRef with ScalaActorRef { @volatile @@ -626,9 +591,8 @@ class LocalActorRef private[akka] ( __lifeCycle: LifeCycle, __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) + __factory: () => Actor) = { + this(__factory) _uuid = __uuid id = __id timeout = __timeout @@ -640,11 +604,6 @@ class LocalActorRef private[akka] ( start } - /** - * Returns whether this actor ref is client-managed remote or not - */ - private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled - // ========= PUBLIC FUNCTIONS ========= /** @@ -657,6 +616,8 @@ class LocalActorRef private[akka] ( */ def actorClassName: String = actorClass.getName + final def homeAddress: Option[InetSocketAddress] = None + /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ @@ -688,9 +649,6 @@ class LocalActorRef private[akka] ( if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance - if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - checkReceiveTimeout //Schedule the initial Receive timeout } this @@ -710,11 +668,9 @@ class LocalActorRef private[akka] ( } finally { currentMessage = null Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) + if (isRemotingEnabled) Actor.remote.unregister(this) - } + setActorSelfFields(actorInstance.get,null) } } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") @@ -764,52 +720,6 @@ class LocalActorRef private[akka] ( actorRef.start } - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - def spawn(clazz: Class[_ <: Actor]): ActorRef = - Actor.actorOf(clazz).start - - /** - * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val ref = Actor.remote.actorOf(clazz, hostname, port) - ref.timeout = timeout - ref.start - } - - /** - * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. - */ - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { - val actor = spawn(clazz) - link(actor) - actor.start - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val actor = Actor.remote.actorOf(clazz, hostname, port) - actor.timeout = timeout - link(actor) - actor.start - actor - } - /** * Returns the mailbox. */ @@ -827,10 +737,6 @@ class LocalActorRef private[akka] ( protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_?) { - Actor.remote.send[Any]( - message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) - } else dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( @@ -838,17 +744,10 @@ class LocalActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_?) { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) - future.get - } + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + dispatcher dispatchMessage new MessageInvocation( + this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + future.get } /** @@ -1004,11 +903,10 @@ class LocalActorRef private[akka] ( } } } - + //TODO KEEP THIS? protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -1179,10 +1077,6 @@ private[akka] case class RemoteActorRef private[akka] ( def link(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported - def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def mailbox: AnyRef = unsupported @@ -1388,32 +1282,4 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => true } else false } - - /** - * Atomically create (from actor class) and start an actor. - */ - def spawn[T <: Actor: Manifest]: ActorRef = - spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start and make an actor remote. - */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } - - /** - * Atomically create (from actor class), start and link an actor. - */ - def spawnLink[T <: Actor: Manifest]: ActorRef = - spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start, link and make an actor remote. - */ - def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } } diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index bb08bcdf80..f877759321 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -93,8 +93,7 @@ case class SupervisorFactory(val config: SupervisorConfig) { *

* The supervisor class is only used for the configuration system when configuring supervisor * hierarchies declaratively. Should not be used as part of the regular programming API. Instead - * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the - * children that should trap error signals and trigger restart. + * wire the children together using 'link', 'startLink' etc. *

* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children. * diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 9f63c64bc1..c7c0ba40bd 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -101,32 +101,18 @@ object Supervision { val target: Class[_], val lifeCycle: LifeCycle, val timeout: Long, - _dispatcher: MessageDispatcher, // optional - _remoteAddress: RemoteAddress // optional + _dispatcher: MessageDispatcher // optional ) extends Server { val intf: Option[Class[_]] = Option(_intf) val dispatcher: Option[MessageDispatcher] = Option(_dispatcher) - val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) + this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress) + this(intf, target, lifeCycle, timeout, null: MessageDispatcher) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress) + this(null: Class[_], target, lifeCycle, timeout, dispatcher) } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala deleted file mode 100644 index 72fbbaaeb2..0000000000 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - -import akka.event.EventHandler -import akka.actor.{Actor, ActorRef} -import akka.actor.Actor._ -import akka.dispatch.CompletableFuture -import akka.AkkaException -import akka.japi.{ Function, Effect } - -/** - * Implements Oz-style dataflow (single assignment) variables. - * - * @author Jonas Bonér - */ -object DataFlow { - object Start - object Exit - - class DataFlowVariableException(msg: String) extends AkkaException(msg) - - /** - * Executes the supplied thunk in another thread. - */ - def thread(body: => Unit): Unit = spawn(body) - - /** - * JavaAPI. - * Executes the supplied Effect in another thread. - */ - def thread(body: Effect): Unit = spawn(body.apply) - - /** - * Executes the supplied function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: A => R) = - actorOf(new ReactiveEventBasedThread(body)).start - - /** - * JavaAPI. - * Executes the supplied Function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) = - actorOf(new ReactiveEventBasedThread(body.apply)).start - - private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) - extends Actor { - def receive = { - case Exit => self.stop - case message => self.reply(body(message.asInstanceOf[A])) - } - } - - private object DataFlowVariable { - private sealed abstract class DataFlowVariableMessage - private case class Set[T <: Any](value: T) extends DataFlowVariableMessage - private object Get extends DataFlowVariableMessage - } - - /** - * @author Jonas Bonér - */ - @deprecated("Superceeded by Future and CompletableFuture as of 1.1") - sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { - import DataFlowVariable._ - - def this() = this(1000 * 60) - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] - - private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - def receive = { - case s@Set(v) => - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - while(dataFlow.blockedReaders.peek ne null) - dataFlow.blockedReaders.poll ! s - } else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit => self.stop - } - } - - private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - private var readerFuture: Option[CompletableFuture[Any]] = None - def receive = { - case Get => dataFlow.value.get match { - case Some(value) => self reply value - case None => readerFuture = self.senderFuture - } - case Set(v:T) => readerFuture.map(_ completeWithResult v) - case Exit => self.stop - } - } - - private[this] val in = actorOf(new In(this)).start - - /** - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def <<(ref: DataFlowVariable[T]) { - if (this.value.get.isEmpty) in ! Set(ref()) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(ref: DataFlowVariable[T]) { this << ref } - - /** - * Sets the value of this variable (if unset). - */ - def <<(value: T) { - if (this.value.get.isEmpty) in ! Set(value) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(value: T) { this << value } - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def get(): T = this() - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def apply(): T = { - value.get getOrElse { - val out = actorOf(new Out(this)).start - - val result = try { - blockedReaders offer out - (out !! Get).as[T] - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - out ! Exit - throw e - } - - result.getOrElse(throw new DataFlowVariableException( - "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) - } - } - - def shutdown = in ! Exit - } -} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cfe64a8992..8c6b8992b1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -175,36 +175,6 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - - // ===================================== - // Deprecations - // ===================================== - - /** - * (Blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await - - - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index e9e4168995..964c9f9f29 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -151,81 +151,6 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule clear } - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *

-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(factory: => Actor, host: String, port: Int): ActorRef = - Actor.remote.clientManagedActorOf(() => factory, host, port) - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf[MyActor]("www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) @@ -444,10 +369,6 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) - @deprecated("Will be removed after 1.1") - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef - - /** * Clean-up all open connections. */ diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d9ded4b0e2..e7e1eaad6f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -591,19 +591,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with RemoteActorRef(serviceId, className, host, port, timeout, loader) } - - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = { - - if (optimizeLocalScoped_?) { - val home = this.address - if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? - return new LocalActorRef(factory, None) // Code is much simpler with return - } - - val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true) - //ref.timeout = timeout //removed because setting default timeout should be done after construction - ref - } } class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 7ad0c1e443..c3332bb3f4 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -44,7 +44,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait StatelessActorFormat[T <: Actor] extends Format[T] { +trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable{ def fromBinary(bytes: Array[Byte], act: T) = act def toBinary(ac: T) = Array.empty[Byte] @@ -64,7 +64,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { +trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { val serializer: Serializer def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] @@ -205,10 +205,11 @@ object ActorSerialization { else actorClass.newInstance.asInstanceOf[Actor] } + /* TODO Can we remove originalAddress from the protocol? val homeAddress = { val address = protocol.getOriginalAddress Some(new InetSocketAddress(address.getHostname, address.getPort)) - } + }*/ val ar = new LocalActorRef( uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), @@ -218,8 +219,7 @@ object ActorSerialization { lifeCycle, supervisor, hotswap, - factory, - homeAddress) + factory) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index 3a292e0de0..3fc661afce 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -17,7 +17,7 @@ import sjson.json.{Serializer => SJSONSerializer} /** * @author Jonas Bonér */ -@serializable trait Serializer { +trait Serializer extends scala.Serializable { @volatile var classLoader: Option[ClassLoader] = None def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala deleted file mode 100644 index 65693636b4..0000000000 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -package akka.actor.remote - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.Dispatchers -import akka.actor.Actor._ -import akka.actor._ - -class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg) - -object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) -} -class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown - } -} - -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => throw new ExpectedRemoteProblem("expected") - } -} - -class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -class CountDownActor(latch: CountDownLatch) extends Actor { - def receive = { - case "World" => latch.countDown - } -} -/* -object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) -} -class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ - - def sendOff = sendTo ! "Hello" - - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown - } -}*/ - -class MyActorCustomConstructor extends Actor { - var prefix = "default-" - var count = 0 - def receive = { - case "incrPrefix" => count += 1; prefix = "" + count + "-" - case msg: String => self.reply(prefix + msg) - } -} - -class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest { - "ClientInitiatedRemoteActor" should { - "shouldSendOneWay" in { - val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start - clientManaged must not be null - clientManaged.getClass must be (classOf[LocalActorRef]) - clientManaged ! "OneWay" - RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true) - clientManaged.stop - } - - "shouldSendOneWayAndReceiveReply" in { - val latch = new CountDownLatch(1) - val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start - implicit val sender = Some(actorOf(new CountDownActor(latch)).start) - - actor ! "Hello" - - latch.await(3,TimeUnit.SECONDS) must be (true) - } - - "shouldSendBangBangMessageAndReceiveReply" in { - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start - val result = actor !! ("Hello", 10000) - "World" must equal (result.get.asInstanceOf[String]) - actor.stop - } - - "shouldSendBangBangMessageAndReceiveReplyConcurrently" in { - val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList - actors.map(_ !!! ("Hello", 10000)) foreach { future => - "World" must equal (future.await.result.asInstanceOf[Option[String]].get) - } - actors.foreach(_.stop) - } - - "shouldRegisterActorByUuid" in { - val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start - val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("1-test") - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("2-test") - - (actor2 !! "test").get must equal ("default-test") - - actor1.stop - actor2.stop - } - - "shouldSendAndReceiveRemoteException" in { - - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start - try { - implicit val timeout = 500000000L - val f = (actor !!! "Failure").await.resultOrException - fail("Shouldn't get here!!!") - } catch { - case e: ExpectedRemoteProblem => - } - actor.stop - } - } -} diff --git a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala index f6e0c1806f..459f7641f3 100644 --- a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala @@ -19,11 +19,5 @@ class OptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must be (fooActor) } - - "Create local actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index 4026418d18..6a357861f9 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -23,7 +23,7 @@ object Log { } } -@serializable class RemotePingPong1Actor extends Actor { +class RemotePingPong1Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -41,7 +41,7 @@ object Log { } } -@serializable class RemotePingPong2Actor extends Actor { +class RemotePingPong2Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -55,7 +55,7 @@ object Log { } } -@serializable class RemotePingPong3Actor extends Actor { +class RemotePingPong3Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -69,7 +69,7 @@ object Log { } } -class RemoteSupervisorSpec extends AkkaRemoteTest { +/*class RemoteSupervisorSpec extends AkkaRemoteTest { var pingpong1: ActorRef = _ var pingpong2: ActorRef = _ @@ -324,7 +324,6 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { factory.newInstance } - /* // Uncomment when the same test passes in SupervisorSpec - pending bug @Test def shouldKillMultipleActorsOneForOne2 = { clearMessageLogs @@ -338,9 +337,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } -*/ - /* @Test def shouldOneWayKillSingleActorOneForOne = { clearMessageLogs @@ -435,6 +432,4 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } - */ - -} +}*/ diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 988236b85b..43edd29404 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,13 +4,9 @@ package akka.actor.remote -import akka.config.Supervision._ -import akka.actor._ - import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} -import akka.testing._ object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] @@ -24,73 +20,11 @@ object RemoteTypedActorLog { class RemoteTypedActorSpec extends AkkaRemoteTest { - import RemoteTypedActorLog._ - - private var conf: TypedActorConfigurator = _ - - override def beforeEach { - super.beforeEach - Config.config - conf = new TypedActorConfigurator - conf.configure( - new AllForOneStrategy(List(classOf[Exception]), 3, 5000), - List( - new SuperviseTypedActor( - classOf[RemoteTypedActorOne], - classOf[RemoteTypedActorOneImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)), - new SuperviseTypedActor( - classOf[RemoteTypedActorTwo], - classOf[RemoteTypedActorTwoImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)) - ).toArray).supervise - } - - override def afterEach { - clearMessageLogs - conf.stop - super.afterEach - Thread.sleep(1000) - } "Remote Typed Actor " should { - /*"receives one-way message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + "have unit tests" in { - ta.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") } - - "responds to request-reply message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - ta.requestReply("ping") must equal ("pong") - } */ - - "be restarted on failure" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - - try { - ta.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - } - - /* "restarts linked friends on failure" in { - val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) - val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) - - try { - ta1.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - }*/ } } diff --git a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala index 001a66eae0..151dd4ba0f 100644 --- a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala @@ -18,11 +18,5 @@ class UnOptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must not be (fooActor) } - - "Create remote actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must not be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 2eec948698..7aa0cede07 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -221,7 +221,7 @@ class MyActorWithDualCounter extends Actor { } } -@serializable class MyActor extends Actor { +class MyActor extends Actor with scala.Serializable { var count = 0 def receive = { @@ -249,7 +249,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { } } -@serializable class MyJavaSerializableActor extends Actor { +class MyJavaSerializableActor extends Actor with scala.Serializable { var count = 0 self.receiveTimeout = Some(1000) diff --git a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala deleted file mode 100644 index 6edec702b5..0000000000 --- a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.actor.ticket - -import akka.actor._ -import akka.actor.remote.AkkaRemoteTest - - -class Ticket519Spec extends AkkaRemoteTest { - "A remote TypedActor" should { - "should handle remote future replies" in { - val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,host,port) - val r = actor.someFutureString - - r.await.result.get must equal ("foo") - } - } -} diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala deleted file mode 100644 index 3477ff5783..0000000000 --- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package sample.remote - -import akka.actor.Actor._ -import akka.actor. {ActorRegistry, Actor} -import Actor.remote - -class RemoteHelloWorldActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -object ClientManagedRemoteActorServer { - def run = { - remote.start("localhost", 2552) - } - - def main(args: Array[String]) = run -} - -object ClientManagedRemoteActorClient { - - def run = { - val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start - val result = actor !! "Hello" - } - - def main(args: Array[String]) = run -} - diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index 74b1bf5a9e..5d1aa9dc96 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -11,7 +11,7 @@ import org.multiverse.transactional.refs.BasicRef /** * Common trait for all the transactional objects. */ -@serializable trait Transactional { +trait Transactional extends Serializable { val uuid: String } diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 813c26ab94..3db2288e8f 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -377,18 +377,12 @@ object TypedActorConfiguration { new TypedActorConfiguration() } - def apply(timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().timeout(Duration(timeout, "millis")) + def apply(timeoutMillis: Long) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(Duration(timeoutMillis, "millis")) } - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port) - } - - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis")) + def apply(timeout: Duration) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(timeout) } } @@ -399,7 +393,6 @@ object TypedActorConfiguration { */ final class TypedActorConfiguration { private[akka] var _timeout: Long = Actor.TIMEOUT - private[akka] var _host: Option[InetSocketAddress] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None private[akka] var _threadBasedDispatcher: Option[Boolean] = None private[akka] var _id: Option[String] = None @@ -416,15 +409,6 @@ final class TypedActorConfiguration { this } - @deprecated("Will be removed after 1.1") - def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port)) - - @deprecated("Will be removed after 1.1") - def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = { - _host = Some(remoteAddress) - this - } - def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = { if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException( "Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'") @@ -480,28 +464,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration()) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -522,32 +484,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration(timeout)) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -555,20 +491,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(factory),config), config) - - /** - * Creates an ActorRef, can be local only or client-managed-remote - */ - @deprecated("Will be removed after 1.1") - private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = { - config match { - case null => actorOf(typedActor) - case c: TypedActorConfiguration if (c._host.isDefined) => - Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort) - case _ => actorOf(typedActor) - } - } + newInstance(intfClass, actorOf(newTypedActor(factory)), config) /** * Factory method for typed actor. @@ -577,7 +500,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config) + newInstance(intfClass, actorOf(newTypedActor(targetClass)), config) private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") @@ -585,11 +508,8 @@ object TypedActor { } private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val config = TypedActorConfiguration(timeout) - if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get) - newInstance(intfClass, targetClass, config) - } + remoteAddress: Option[InetSocketAddress], timeout: Long): T = + newInstance(intfClass, targetClass, TypedActorConfiguration(timeout)) private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = { val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] @@ -601,13 +521,7 @@ object TypedActor { actorRef.timeout = config.timeout - val remoteAddress = actorRef match { - case remote: RemoteActorRef => remote.homeAddress - case local: LocalActorRef if local.clientManaged => local.homeAddress - case _ => None - } - - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout)) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout)) actorRef.start proxy.asInstanceOf[T] } @@ -633,20 +547,6 @@ object TypedActor { def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T = newInstance(intfClass, factory.create) - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, hostname, port) - - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, timeout, hostname, port) - /** * Java API. */ diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala index f2f2a7a1fc..71dbe04413 100644 --- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala @@ -106,14 +106,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa val implementationClass = component.target val timeout = component.timeout - val (remoteAddress,actorRef) = - component.remoteAddress match { - case Some(a) => - (Some(new InetSocketAddress(a.hostname, a.port)), - Actor.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port)) - case None => - (None, Actor.actorOf(TypedActor.newTypedActor(implementationClass))) - } + val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass)) actorRef.timeout = timeout if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get @@ -123,7 +116,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa AspectInitRegistry.register( proxy, - AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout)) + AspectInit(interfaceClass, typedActor, actorRef, None, timeout)) typedActor.initialize(proxy) actorRef.start From 3f49eadedccc23adae375f9eca2199d34d29e711 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 13:03:23 +0200 Subject: [PATCH 2/6] Removing more deprecation warnings and more client-managed actors residue --- .../src/main/scala/akka/dispatch/Future.scala | 8 ++-- .../akka/dispatch/ThreadPoolBuilder.scala | 3 -- .../remoteinterface/RemoteInterface.scala | 10 ----- .../src/main/scala/akka/util/Duration.scala | 4 +- .../remote/netty/NettyRemoteSupport.scala | 45 +++---------------- .../ServerInitiatedRemoteActorSpec.scala | 2 +- .../testkit/CallingThreadDispatcher.scala | 4 +- 7 files changed, 14 insertions(+), 62 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 8c6b8992b1..6cd13d69eb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -61,7 +61,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) + firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures),timeout) /** * A non-blocking fold over the specified futures. @@ -87,7 +87,7 @@ object Futures { results add r.b if (results.size == allDone) { //Only one thread can get here try { - result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) + result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun) } catch { case e: Exception => EventHandler.error(e, this, e.getMessage) @@ -115,7 +115,7 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))( fun.apply _ ) /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first @@ -150,7 +150,7 @@ object Futures { * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 83c30f23e0..130eeb9163 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -88,9 +88,6 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi import ThreadPoolConfig._ def build = dispatcherFactory(config) - //TODO remove this, for backwards compat only - @deprecated("Use .build instead") def buildThreadPool = build - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 964c9f9f29..18bc792d98 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -400,14 +400,4 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType, loader: Option[ClassLoader]): Option[CompletableFuture[T]] - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef - - @deprecated("Will be removed after 1.1") - private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit - - @deprecated("Will be removed after 1.1") - private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index fb5673277c..933e3cd9a9 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -37,7 +37,7 @@ object Duration { * Construct a Duration by parsing a String. In case of a format error, a * RuntimeException is thrown. See `unapply(String)` for more information. */ - def apply(s : String) : Duration = unapply(s) getOrElse error("format error") + def apply(s : String) : Duration = unapply(s) getOrElse sys.error("format error") /** * Deconstruct a Duration into length and unit if it is finite. @@ -77,7 +77,7 @@ object Duration { if ( ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if ( ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else - error("made some error in regex (should not be possible)") + sys.error("made some error in regex (should not be possible)") case REinf() => Some(Inf) case REminf() => Some(MinusInf) case _ => None diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e7e1eaad6f..9fbca92c40 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -119,16 +119,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } } - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef)) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { - remoteClients.get(Address(actorRef.homeAddress.get)) match { - case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef) - case None => actorRef - } - } - /** * Clean-up all open connections. */ @@ -170,7 +160,6 @@ abstract class RemoteClient private[akka] ( remoteAddress.getPort protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -320,16 +309,6 @@ abstract class RemoteClient private[akka] ( pendingRequest = pendingRequests.peek // try to grab next message } } - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't register supervisor for " + actorRef + " since it is not under supervision") - else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't unregister supervisor for " + actorRef + " since it is not under supervision") - else supervisors.remove(actorRef.supervisor.get.uuid) } /** @@ -358,7 +337,7 @@ class ActiveRemoteClient private[akka] ( timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -433,7 +412,6 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, futures: ConcurrentMap[Uuid, CompletableFuture[_]], - supervisors: ConcurrentMap[Uuid, ActorRef], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -450,7 +428,7 @@ class ActiveRemoteClientPipelineFactory( case _ => (Nil,Nil) } - val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil new StaticChannelPipeline(stages: _*) } @@ -463,7 +441,6 @@ class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientHandler( val name: String, val futures: ConcurrentMap[Uuid, CompletableFuture[_]], - val supervisors: ConcurrentMap[Uuid, ActorRef], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -488,19 +465,7 @@ class ActiveRemoteClientHandler( val message = MessageSerializer.deserialize(reply.getMessage) future.completeWithResult(message) } else { - val exception = parseException(reply, client.loader) - - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) - } - - future.completeWithException(exception) + future.completeWithException(parseException(reply, client.loader)) } case other => throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) @@ -891,14 +856,14 @@ class RemoteServerHandler( // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { actor ! PoisonPill } catch { case e: Exception => } } //FIXME switch approach or use other thread to execute this // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { TypedActor.stop(actor) } catch { case e: Exception => } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 88a5ec8ec3..c978135ad2 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -189,7 +189,7 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { while(!testDone()) { if (latch.await(200, TimeUnit.MILLISECONDS)) - error("Test didn't complete within 100 cycles") + sys.error("Test didn't complete within 100 cycles") else latch.countDown } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ce198be6bf..e6fd8ebbce 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -206,8 +206,8 @@ class NestingQueue { def pop = q.poll @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } + def enter { if (active) sys.error("already active") else active = true } + def leave { if (!active) sys.error("not active") else active = false } def isActive = active } From b41ecfe09e4acda833e32742abce1213e708d963 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 13:14:59 +0200 Subject: [PATCH 3/6] Removing even more client-managed remote actors residue, damn, this stuff is sticky --- .../remote/netty/NettyRemoteSupport.scala | 85 +------------------ 1 file changed, 4 insertions(+), 81 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 9fbca92c40..621dc25c18 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -132,15 +132,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem remoteClients.foreach({ case (addr, client) => client.shutdown }) remoteClients.clear } - - def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = { - remoteActors.put(Address(hostname, port), uuid) - } - - private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = { - remoteActors.remove(Address(hostname,port), uuid) - //TODO: should the connection be closed when the last actor deregisters? - } } /** @@ -1082,34 +1073,6 @@ class RemoteServerHandler( } } - - private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = { - val uuid = actorInfo.getUuid - val id = actorInfo.getId - val timeout = actorInfo.getTimeout - val name = actorInfo.getTarget - - try { - if (UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) - else Class.forName(name) - val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) - actorRef.uuid = parseUuid(uuid) - actorRef.id = id - actorRef.timeout = timeout - server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid - actorRef.start //Start it where it's created - } catch { - case e: Throwable => - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - - } - /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -1122,11 +1085,8 @@ class RemoteServerHandler( val id = actorInfo.getId server.findActorByIdOrUuid(id, parseUuid(uuid).toString) match { - case null => // the actor has not been registered globally. See if we have it in the session - createSessionActor(actorInfo, channel) match { - case null => createClientManagedActor(actorInfo) // maybe it is a client managed actor - case sessionActor => sessionActor - } + // the actor has not been registered globally. See if we have it in the session + case null => createSessionActor(actorInfo, channel) case actorRef => actorRef } } @@ -1149,49 +1109,12 @@ class RemoteServerHandler( } } - private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = { - val typedActorInfo = actorInfo.getTypedActorInfo - val interfaceClassname = typedActorInfo.getInterface - val targetClassname = actorInfo.getTarget - val uuid = actorInfo.getUuid - - try { - if (UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - - val (interfaceClass, targetClass) = - if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), - applicationLoader.get.loadClass(targetClassname)) - else (Class.forName(interfaceClassname), Class.forName(targetClassname)) - - val newInstance = TypedActor.newInstance( - interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid - newInstance - } catch { - case e: Throwable => - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e - } - } - private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = { val uuid = actorInfo.getUuid server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match { - case null => // the actor has not been registered globally. See if we have it in the session - createTypedSessionActor(actorInfo, channel) match { - case null => - // FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail - -/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]") - EventHandler.error(e, this, e.getMessage) - server.notifyListeners(RemoteServerError(e, server)) - throw e -*/ createClientManagedTypedActor(actorInfo) // client-managed actor - case sessionActor => sessionActor - } + // the actor has not been registered globally. See if we have it in the session + case null => createTypedSessionActor(actorInfo, channel) case typedActor => typedActor } } From 87069f6e02824561b49c57a3afaa367d1f0cb056 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 13:30:41 +0200 Subject: [PATCH 4/6] Adding TODOs for solving the problem with sender references and senderproxies for remote TypedActor calls --- .../remote/netty/NettyRemoteSupport.scala | 26 +++++++++++++------ .../main/scala/akka/actor/TypedActor.scala | 6 ++--- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 621dc25c18..8e9f8c95ad 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -842,7 +842,6 @@ class RemoteServerHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - import scala.collection.JavaConversions.asScalaIterable val clientAddress = getClientAddress(ctx) // stop all session actors @@ -956,6 +955,14 @@ class RemoteServerHandler( private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = { val actorInfo = request.getActorInfo val typedActorInfo = actorInfo.getTypedActorInfo + /* TODO Implement sender references for remote TypedActor calls + if (request.hasSender) { + val iface = //TODO extrace the senderProxy interface from the request, load it as a class using the application loader + val ref = RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) + val senderTA = TypedActor.createProxyForRemoteActorRef[AnyRef](iface, ref) + Some() + } else None + */ val typedActor = createTypedActor(actorInfo, channel) //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo? @@ -996,7 +1003,8 @@ class RemoteServerHandler( try { val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) - + //TODO SenderContextInfo.senderActorRef.value = sender + //TODO SenderContextInfo.senderProxy.value = senderProxy if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread else { //Sends the response @@ -1022,21 +1030,23 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(e, server)) } - messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread + messageReceiver.invoke(typedActor, args: _*) match { //TODO execute in non-IO thread //If it's a future, we can lift on that to defer the send to when the future is completed case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) ) case other => sendResponse(Right(other)) } } } catch { - case e: InvocationTargetException => - EventHandler.error(e, this, e.getMessage) - write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) - server.notifyListeners(RemoteServerError(e, server)) case e: Exception => EventHandler.error(e, this, e.getMessage) - write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) + write(channel, createErrorReplyMessage(e match { + case e: InvocationTargetException => e.getCause + case e => e + }, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) + } finally { + //TODO SenderContextInfo.senderActorRef.value = None ? + //TODO SenderContextInfo.senderProxy.value = None ? } } diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 3db2288e8f..3601fd6368 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -846,19 +846,19 @@ private[akka] abstract class ActorAspect { case -1 => s case x => s.substring(0,x + TypedActor.AW_PROXY_PREFIX.length) } - //FIXME: Add ownerTypeHint and parameter types to the TypedActorInfo? + //TODO: Add ownerTypeHint and parameter types to the TypedActorInfo? val message: Tuple3[String, Array[Class[_]], Array[AnyRef]] = ((extractOwnerTypeHint(methodRtti.getMethod.getDeclaringClass.getName), methodRtti.getParameterTypes, methodRtti.getParameterValues)) - //FIXME send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server + //TODO send the interface name of the senderProxy in the TypedActorContext and assemble a context.sender with that interface on the server //val senderProxy = Option(SenderContextInfo.senderProxy.value) val future = Actor.remote.send[AnyRef]( message, senderActorRef, None, remoteAddress.get, timeout, isOneWay, actorRef, - Some((interfaceClass.getName, methodRtti.getMethod.getName)), + Some((interfaceClass.getName, methodRtti.getMethod.getName)), //TODO Include the interface of the senderProxy here somehow ActorType.TypedActor, None) //TODO: REVISIT: Use another classloader? From 05ba4499190f06f30e7aaee2053ed2d8fa52b71d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 13:50:34 +0200 Subject: [PATCH 5/6] Removing registerSupervisorAsRemoteActor from ActorRef + SerializationProtocol --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 11 ----------- .../akka/serialization/SerializationProtocol.scala | 5 +++-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 883b6668bf..98d7959a9f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -530,8 +530,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { @@ -903,13 +901,6 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor) } } } - //TODO KEEP THIS? - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { - ensureRemotingEnabled - if (_supervisor.isDefined) { - Some(_supervisor.get.uuid) - } else None - } def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) @@ -1068,8 +1059,6 @@ private[akka] case class RemoteActorRef private[akka] ( } } - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None - // ==== NOT SUPPORTED ==== def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index c3332bb3f4..a1b663d405 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -335,7 +335,8 @@ object RemoteActorSerialization { secureCookie.foreach(messageBuilder.setCookie(_)) - actorRef.foreach { ref => + /* TODO invent new supervision strategy + actorRef.foreach { ref => ref.registerSupervisorAsRemoteActor.foreach { id => messageBuilder.setSupervisorUuid( UuidProtocol.newBuilder @@ -343,7 +344,7 @@ object RemoteActorSerialization { .setLow(id.getClockSeqAndNode) .build) } - } + } */ if( senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) From 75be2bd9d2f0660522028a08694d0927a02d4a99 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 14:09:08 +0200 Subject: [PATCH 6/6] Changing the complete* signature from : CompletableFuture to Future, since they can only be written once anyway --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 6cd13d69eb..632d777184 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -478,26 +478,26 @@ trait CompletableFuture[T] extends Future[T] { * Completes this Future with the specified result, if not already completed. * @return this */ - def complete(value: Either[Throwable, T]): CompletableFuture[T] + def complete(value: Either[Throwable, T]): Future[T] /** * Completes this Future with the specified result, if not already completed. * @return this */ - final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + final def completeWithResult(result: T): Future[T] = complete(Right(result)) /** * Completes this Future with the specified exception, if not already completed. * @return this */ - final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, * unless this Future has already been completed. * @return this. */ - final def completeWith(other: Future[T]): CompletableFuture[T] = { + final def completeWith(other: Future[T]): Future[T] = { other onComplete { f => complete(f.value.get) } this } @@ -505,12 +505,12 @@ trait CompletableFuture[T] extends Future[T] { /** * Alias for complete(Right(value)). */ - final def << (value: T): CompletableFuture[T] = complete(Right(value)) + final def << (value: T): Future[T] = complete(Right(value)) /** * Alias for completeWith(other). */ - final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) + final def << (other : Future[T]): Future[T] = completeWith(other) } /**