From 0dff50fa522fb1e733e7527b7b321197b5535a85 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 12:48:30 +0200 Subject: [PATCH] Removed client-managed actors, a lot of deprecated methods and DataFlowVariable (superceded by Future) --- .../actor/supervisor/SupervisorSpec.scala | 6 +- .../scala/akka/dataflow/DataFlowSpec.scala | 165 ------------------ .../test/scala/akka/dispatch/FutureSpec.scala | 75 -------- .../src/main/scala/akka/actor/Actor.scala | 12 +- .../src/main/scala/akka/actor/ActorRef.scala | 158 ++--------------- .../main/scala/akka/actor/Supervisor.scala | 3 +- .../scala/akka/config/SupervisionConfig.scala | 22 +-- .../main/scala/akka/dataflow/DataFlow.scala | 165 ------------------ .../src/main/scala/akka/dispatch/Future.scala | 30 ---- .../remoteinterface/RemoteInterface.scala | 79 --------- .../remote/netty/NettyRemoteSupport.scala | 13 -- .../serialization/SerializationProtocol.scala | 10 +- .../scala/akka/serialization/Serializer.scala | 2 +- .../ClientInitiatedRemoteActorSpec.scala | 142 --------------- .../remote/OptimizedLocalScopedSpec.scala | 6 - .../scala/remote/RemoteSupervisorSpec.scala | 15 +- .../scala/remote/RemoteTypedActorSpec.scala | 68 +------- .../remote/UnOptimizedLocalScopedSpec.scala | 6 - .../SerializableTypeClassActorSpec.scala | 4 +- .../src/test/scala/ticket/Ticket519Spec.scala | 19 -- .../ClientManagedRemoteActorSample.scala | 35 ---- akka-stm/src/main/scala/akka/stm/Ref.scala | 2 +- .../main/scala/akka/actor/TypedActor.scala | 118 +------------ .../config/TypedActorGuiceConfigurator.scala | 11 +- 24 files changed, 53 insertions(+), 1113 deletions(-) delete mode 100644 akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala delete mode 100644 akka-actor/src/main/scala/akka/dataflow/DataFlow.scala delete mode 100644 akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala delete mode 100644 akka-remote/src/test/scala/ticket/Ticket519Spec.scala delete mode 100644 akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index 131cdeee8f..469b84070f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -60,7 +60,11 @@ object SupervisorSpec { class Master extends Actor { self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt) - val temp = self.spawnLink[TemporaryActor] + val temp = { + val a = actorOf[TemporaryActor] + self link a + a.start + } override def receive = { case Die => temp !! (Die, TimeoutMillis) diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala deleted file mode 100644 index f5a107f511..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import org.scalatest.Spec -import org.scalatest.Assertions -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.DefaultCompletableFuture -import java.util.concurrent.{TimeUnit, CountDownLatch} -import annotation.tailrec -import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger} -import akka.actor.ActorRegistry - -@RunWith(classOf[JUnitRunner]) -class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { - describe("DataflowVariable") { - it("should be able to set the value of one variable from other variables") { - import DataFlow._ - - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - val x, y, z = new DataFlowVariable[Int] - thread { - z << x() + y() - result.set(z()) - latch.countDown - } - thread { x << 40 } - thread { y << 2 } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (42) - List(x,y,z).foreach(_.shutdown) - } - - it("should be able to sum a sequence of ints") { - import DataFlow._ - - def ints(n: Int, max: Int): List[Int] = - if (n == max) Nil - else n :: ints(n + 1, max) - - def sum(s: Int, stream: List[Int]): List[Int] = stream match { - case Nil => s :: Nil - case h :: t => s :: sum(h + s, t) - } - - val latch = new CountDownLatch(1) - val result = new AtomicReference[List[Int]](Nil) - val x = new DataFlowVariable[List[Int]] - val y = new DataFlowVariable[List[Int]] - val z = new DataFlowVariable[List[Int]] - - thread { x << ints(0, 1000) } - thread { y << sum(0, x()) } - - thread { z << y() - result.set(z()) - latch.countDown - } - - latch.await(10,TimeUnit.SECONDS) should equal (true) - result.get should equal (sum(0,ints(0,1000))) - List(x,y,z).foreach(_.shutdown) - } -/* - it("should be able to join streams") { - import DataFlow._ - Actor.registry.shutdownAll - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - val t1 = thread { ints(0, 1000, producer) } - val t2 = thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - } -*/ - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - Actor.registry.shutdownAll - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown - } - - val setY = thread { - // Thread.sleep(2000) - y << 2 - } - - val setV = thread { - v << y - } - List(x,y,z,v) foreach (_.shutdown) - latch.await(2,TimeUnit.SECONDS) should equal (true) - }*/ - } -} diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index a946713c3d..552fc5f23c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -134,81 +134,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldFutureAwaitEitherLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitEitherRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitEither(future1, future2) - assert(result.isDefined) - assert("World" === result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneLeft = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "NoReply" - val future2 = actor2 !!! "Hello" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitOneRight = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "NoReply" - val result = Futures.awaitOne(List(future1, future2)) - assert(result.result.isDefined) - assert("World" === result.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFutureAwaitAll = { - val actor1 = actorOf[TestActor].start - val actor2 = actorOf[TestActor].start - val future1 = actor1 !!! "Hello" - val future2 = actor2 !!! "Hello" - Futures.awaitAll(List(future1, future2)) - assert(future1.result.isDefined) - assert("World" === future1.result.get) - assert(future2.result.isDefined) - assert("World" === future2.result.get) - actor1.stop - actor2.stop - } - - @Test def shouldFuturesAwaitMapHandleEmptySequence { - assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) - } - - @Test def shouldFuturesAwaitMapHandleNonEmptySequence { - val latches = (1 to 3) map (_ => new StandardLatch) - val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start) - val futures = actors map (actor => (actor.!!![String]("Hello"))) - latches foreach { _.open } - - assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length)) - } - @Test def shouldFoldResults { val actors = (1 to 10).toList map { _ => actorOf(new Actor { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 718c3b94bb..49b76d58d5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -18,7 +18,7 @@ import akka.japi. {Creator, Procedure} /** * Life-cycle messages for the Actors */ -@serializable sealed trait LifeCycleMessage +sealed trait LifeCycleMessage extends Serializable /* Marker trait to show which Messages are automatically handled by Akka */ sealed trait AutoReceivedMessage { self: LifeCycleMessage => } @@ -165,7 +165,7 @@ object Actor extends ListenerManagement { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }, None) + }) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -185,7 +185,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) + def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -195,7 +195,7 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) + def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when @@ -245,8 +245,7 @@ object Actor extends ListenerManagement { *

* Here you find functions like: * - !, !!, !!! and forward - * - link, unlink, startLink, spawnLink etc - * - makeRemote etc. + * - link, unlink, startLink etc * - start, stop * - etc. * @@ -269,7 +268,6 @@ object Actor extends ListenerManagement { * import self._ * id = ... * dispatcher = ... - * spawnLink[OtherActor] * ... * } * diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index cb28a07407..883b6668bf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -446,38 +446,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def startLink(actorRef: ActorRef): Unit - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.actorOf instead") - def spawn(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Atomically create (from actor class), link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success") - def spawnLink(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote, link and start an actor. - *

- * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed") - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - /** * Returns the mailbox size. */ @@ -593,10 +561,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * * @author Jonas Bonér */ -class LocalActorRef private[akka] ( - private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) +class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor) extends ActorRef with ScalaActorRef { @volatile @@ -626,9 +591,8 @@ class LocalActorRef private[akka] ( __lifeCycle: LifeCycle, __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) + __factory: () => Actor) = { + this(__factory) _uuid = __uuid id = __id timeout = __timeout @@ -640,11 +604,6 @@ class LocalActorRef private[akka] ( start } - /** - * Returns whether this actor ref is client-managed remote or not - */ - private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled - // ========= PUBLIC FUNCTIONS ========= /** @@ -657,6 +616,8 @@ class LocalActorRef private[akka] ( */ def actorClassName: String = actorClass.getName + final def homeAddress: Option[InetSocketAddress] = None + /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ @@ -688,9 +649,6 @@ class LocalActorRef private[akka] ( if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance - if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - checkReceiveTimeout //Schedule the initial Receive timeout } this @@ -710,11 +668,9 @@ class LocalActorRef private[akka] ( } finally { currentMessage = null Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) + if (isRemotingEnabled) Actor.remote.unregister(this) - } + setActorSelfFields(actorInstance.get,null) } } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") @@ -764,52 +720,6 @@ class LocalActorRef private[akka] ( actorRef.start } - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - def spawn(clazz: Class[_ <: Actor]): ActorRef = - Actor.actorOf(clazz).start - - /** - * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val ref = Actor.remote.actorOf(clazz, hostname, port) - ref.timeout = timeout - ref.start - } - - /** - * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. - */ - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { - val actor = spawn(clazz) - link(actor) - actor.start - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val actor = Actor.remote.actorOf(clazz, hostname, port) - actor.timeout = timeout - link(actor) - actor.start - actor - } - /** * Returns the mailbox. */ @@ -827,10 +737,6 @@ class LocalActorRef private[akka] ( protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_?) { - Actor.remote.send[Any]( - message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) - } else dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( @@ -838,17 +744,10 @@ class LocalActorRef private[akka] ( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_?) { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) - future.get - } + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + dispatcher dispatchMessage new MessageInvocation( + this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + future.get } /** @@ -1004,11 +903,10 @@ class LocalActorRef private[akka] ( } } } - + //TODO KEEP THIS? protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { ensureRemotingEnabled if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) Some(_supervisor.get.uuid) } else None } @@ -1179,10 +1077,6 @@ private[akka] case class RemoteActorRef private[akka] ( def link(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported - def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def mailbox: AnyRef = unsupported @@ -1388,32 +1282,4 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => true } else false } - - /** - * Atomically create (from actor class) and start an actor. - */ - def spawn[T <: Actor: Manifest]: ActorRef = - spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start and make an actor remote. - */ - def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } - - /** - * Atomically create (from actor class), start and link an actor. - */ - def spawnLink[T <: Actor: Manifest]: ActorRef = - spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start, link and make an actor remote. - */ - def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } } diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index bb08bcdf80..f877759321 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -93,8 +93,7 @@ case class SupervisorFactory(val config: SupervisorConfig) { *

* The supervisor class is only used for the configuration system when configuring supervisor * hierarchies declaratively. Should not be used as part of the regular programming API. Instead - * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the - * children that should trap error signals and trigger restart. + * wire the children together using 'link', 'startLink' etc. *

* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children. * diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 9f63c64bc1..c7c0ba40bd 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -101,32 +101,18 @@ object Supervision { val target: Class[_], val lifeCycle: LifeCycle, val timeout: Long, - _dispatcher: MessageDispatcher, // optional - _remoteAddress: RemoteAddress // optional + _dispatcher: MessageDispatcher // optional ) extends Server { val intf: Option[Class[_]] = Option(_intf) val dispatcher: Option[MessageDispatcher] = Option(_dispatcher) - val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) + this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress) + this(intf, target, lifeCycle, timeout, null: MessageDispatcher) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress) + this(null: Class[_], target, lifeCycle, timeout, dispatcher) } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala deleted file mode 100644 index 72fbbaaeb2..0000000000 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - -import akka.event.EventHandler -import akka.actor.{Actor, ActorRef} -import akka.actor.Actor._ -import akka.dispatch.CompletableFuture -import akka.AkkaException -import akka.japi.{ Function, Effect } - -/** - * Implements Oz-style dataflow (single assignment) variables. - * - * @author Jonas Bonér - */ -object DataFlow { - object Start - object Exit - - class DataFlowVariableException(msg: String) extends AkkaException(msg) - - /** - * Executes the supplied thunk in another thread. - */ - def thread(body: => Unit): Unit = spawn(body) - - /** - * JavaAPI. - * Executes the supplied Effect in another thread. - */ - def thread(body: Effect): Unit = spawn(body.apply) - - /** - * Executes the supplied function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: A => R) = - actorOf(new ReactiveEventBasedThread(body)).start - - /** - * JavaAPI. - * Executes the supplied Function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) = - actorOf(new ReactiveEventBasedThread(body.apply)).start - - private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) - extends Actor { - def receive = { - case Exit => self.stop - case message => self.reply(body(message.asInstanceOf[A])) - } - } - - private object DataFlowVariable { - private sealed abstract class DataFlowVariableMessage - private case class Set[T <: Any](value: T) extends DataFlowVariableMessage - private object Get extends DataFlowVariableMessage - } - - /** - * @author Jonas Bonér - */ - @deprecated("Superceeded by Future and CompletableFuture as of 1.1") - sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { - import DataFlowVariable._ - - def this() = this(1000 * 60) - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] - - private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - def receive = { - case s@Set(v) => - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - while(dataFlow.blockedReaders.peek ne null) - dataFlow.blockedReaders.poll ! s - } else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit => self.stop - } - } - - private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - private var readerFuture: Option[CompletableFuture[Any]] = None - def receive = { - case Get => dataFlow.value.get match { - case Some(value) => self reply value - case None => readerFuture = self.senderFuture - } - case Set(v:T) => readerFuture.map(_ completeWithResult v) - case Exit => self.stop - } - } - - private[this] val in = actorOf(new In(this)).start - - /** - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def <<(ref: DataFlowVariable[T]) { - if (this.value.get.isEmpty) in ! Set(ref()) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(ref: DataFlowVariable[T]) { this << ref } - - /** - * Sets the value of this variable (if unset). - */ - def <<(value: T) { - if (this.value.get.isEmpty) in ! Set(value) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(value: T) { this << value } - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def get(): T = this() - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def apply(): T = { - value.get getOrElse { - val out = actorOf(new Out(this)).start - - val result = try { - blockedReaders offer out - (out !! Get).as[T] - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - out ! Exit - throw e - } - - result.getOrElse(throw new DataFlowVariableException( - "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) - } - } - - def shutdown = in ! Exit - } -} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cfe64a8992..8c6b8992b1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -175,36 +175,6 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - - // ===================================== - // Deprecations - // ===================================== - - /** - * (Blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await - - - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index e9e4168995..964c9f9f29 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -151,81 +151,6 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule clear } - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *

-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(factory: => Actor, host: String, port: Int): ActorRef = - Actor.remote.clientManagedActorOf(() => factory, host, port) - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - - /** - * Creates a Client-managed ActorRef out of the Actor of the specified Class. - * If the supplied host and port is identical of the configured local node, it will be a local actor - *
-   *   import Actor._
-   *   val actor = actorOf[MyActor]("www.akka.io",2552)
-   *   actor.start
-   *   actor ! message
-   *   actor.stop
-   * 
- * You can create and start the actor in one statement like this: - *
-   *   val actor = actorOf[MyActor]("www.akka.io",2552).start
-   * 
- */ - @deprecated("Will be removed after 1.1") - def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - clientManagedActorOf(() => - createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( - throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")), - host, port) - } - protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) @@ -444,10 +369,6 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) - @deprecated("Will be removed after 1.1") - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef - - /** * Clean-up all open connections. */ diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d9ded4b0e2..e7e1eaad6f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -591,19 +591,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with RemoteActorRef(serviceId, className, host, port, timeout, loader) } - - def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = { - - if (optimizeLocalScoped_?) { - val home = this.address - if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? - return new LocalActorRef(factory, None) // Code is much simpler with return - } - - val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true) - //ref.timeout = timeout //removed because setting default timeout should be done after construction - ref - } } class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 7ad0c1e443..c3332bb3f4 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -44,7 +44,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait StatelessActorFormat[T <: Actor] extends Format[T] { +trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable{ def fromBinary(bytes: Array[Byte], act: T) = act def toBinary(ac: T) = Array.empty[Byte] @@ -64,7 +64,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * } * */ -@serializable trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { +trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { val serializer: Serializer def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] @@ -205,10 +205,11 @@ object ActorSerialization { else actorClass.newInstance.asInstanceOf[Actor] } + /* TODO Can we remove originalAddress from the protocol? val homeAddress = { val address = protocol.getOriginalAddress Some(new InetSocketAddress(address.getHostname, address.getPort)) - } + }*/ val ar = new LocalActorRef( uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), @@ -218,8 +219,7 @@ object ActorSerialization { lifeCycle, supervisor, hotswap, - factory, - homeAddress) + factory) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index 3a292e0de0..3fc661afce 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -17,7 +17,7 @@ import sjson.json.{Serializer => SJSONSerializer} /** * @author Jonas Bonér */ -@serializable trait Serializer { +trait Serializer extends scala.Serializable { @volatile var classLoader: Option[ClassLoader] = None def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala deleted file mode 100644 index 65693636b4..0000000000 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -package akka.actor.remote - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.Dispatchers -import akka.actor.Actor._ -import akka.actor._ - -class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg) - -object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) -} -class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown - } -} - -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => throw new ExpectedRemoteProblem("expected") - } -} - -class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -class CountDownActor(latch: CountDownLatch) extends Actor { - def receive = { - case "World" => latch.countDown - } -} -/* -object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) -} -class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ - - def sendOff = sendTo ! "Hello" - - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown - } -}*/ - -class MyActorCustomConstructor extends Actor { - var prefix = "default-" - var count = 0 - def receive = { - case "incrPrefix" => count += 1; prefix = "" + count + "-" - case msg: String => self.reply(prefix + msg) - } -} - -class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest { - "ClientInitiatedRemoteActor" should { - "shouldSendOneWay" in { - val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start - clientManaged must not be null - clientManaged.getClass must be (classOf[LocalActorRef]) - clientManaged ! "OneWay" - RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true) - clientManaged.stop - } - - "shouldSendOneWayAndReceiveReply" in { - val latch = new CountDownLatch(1) - val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start - implicit val sender = Some(actorOf(new CountDownActor(latch)).start) - - actor ! "Hello" - - latch.await(3,TimeUnit.SECONDS) must be (true) - } - - "shouldSendBangBangMessageAndReceiveReply" in { - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start - val result = actor !! ("Hello", 10000) - "World" must equal (result.get.asInstanceOf[String]) - actor.stop - } - - "shouldSendBangBangMessageAndReceiveReplyConcurrently" in { - val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList - actors.map(_ !!! ("Hello", 10000)) foreach { future => - "World" must equal (future.await.result.asInstanceOf[Option[String]].get) - } - actors.foreach(_.stop) - } - - "shouldRegisterActorByUuid" in { - val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start - val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("1-test") - - actor1 ! "incrPrefix" - - (actor1 !! "test").get must equal ("2-test") - - (actor2 !! "test").get must equal ("default-test") - - actor1.stop - actor2.stop - } - - "shouldSendAndReceiveRemoteException" in { - - val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start - try { - implicit val timeout = 500000000L - val f = (actor !!! "Failure").await.resultOrException - fail("Shouldn't get here!!!") - } catch { - case e: ExpectedRemoteProblem => - } - actor.stop - } - } -} diff --git a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala index f6e0c1806f..459f7641f3 100644 --- a/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/OptimizedLocalScopedSpec.scala @@ -19,11 +19,5 @@ class OptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must be (fooActor) } - - "Create local actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index 4026418d18..6a357861f9 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -23,7 +23,7 @@ object Log { } } -@serializable class RemotePingPong1Actor extends Actor { +class RemotePingPong1Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -41,7 +41,7 @@ object Log { } } -@serializable class RemotePingPong2Actor extends Actor { +class RemotePingPong2Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -55,7 +55,7 @@ object Log { } } -@serializable class RemotePingPong3Actor extends Actor { +class RemotePingPong3Actor extends Actor with scala.Serializable { def receive = { case "Ping" => Log.messageLog.put("ping") @@ -69,7 +69,7 @@ object Log { } } -class RemoteSupervisorSpec extends AkkaRemoteTest { +/*class RemoteSupervisorSpec extends AkkaRemoteTest { var pingpong1: ActorRef = _ var pingpong2: ActorRef = _ @@ -324,7 +324,6 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { factory.newInstance } - /* // Uncomment when the same test passes in SupervisorSpec - pending bug @Test def shouldKillMultipleActorsOneForOne2 = { clearMessageLogs @@ -338,9 +337,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } -*/ - /* @Test def shouldOneWayKillSingleActorOneForOne = { clearMessageLogs @@ -435,6 +432,4 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { messageLog.poll(5, TimeUnit.SECONDS) } } - */ - -} +}*/ diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 988236b85b..43edd29404 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -4,13 +4,9 @@ package akka.actor.remote -import akka.config.Supervision._ -import akka.actor._ - import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} -import akka.testing._ object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] @@ -24,73 +20,11 @@ object RemoteTypedActorLog { class RemoteTypedActorSpec extends AkkaRemoteTest { - import RemoteTypedActorLog._ - - private var conf: TypedActorConfigurator = _ - - override def beforeEach { - super.beforeEach - Config.config - conf = new TypedActorConfigurator - conf.configure( - new AllForOneStrategy(List(classOf[Exception]), 3, 5000), - List( - new SuperviseTypedActor( - classOf[RemoteTypedActorOne], - classOf[RemoteTypedActorOneImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)), - new SuperviseTypedActor( - classOf[RemoteTypedActorTwo], - classOf[RemoteTypedActorTwoImpl], - Permanent, - Testing.testTime(20000), - RemoteAddress(host,port)) - ).toArray).supervise - } - - override def afterEach { - clearMessageLogs - conf.stop - super.afterEach - Thread.sleep(1000) - } "Remote Typed Actor " should { - /*"receives one-way message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + "have unit tests" in { - ta.oneWay - oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") } - - "responds to request-reply message" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - ta.requestReply("ping") must equal ("pong") - } */ - - "be restarted on failure" in { - val ta = conf.getInstance(classOf[RemoteTypedActorOne]) - - try { - ta.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - } - - /* "restarts linked friends on failure" in { - val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) - val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) - - try { - ta1.requestReply("die") - fail("Shouldn't get here") - } catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => } - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance") - }*/ } } diff --git a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala index 001a66eae0..151dd4ba0f 100644 --- a/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala +++ b/akka-remote/src/test/scala/remote/UnOptimizedLocalScopedSpec.scala @@ -18,11 +18,5 @@ class UnOptimizedLocalScopedSpec extends AkkaRemoteTest { remote.actorFor("foo", host, port) must not be (fooActor) } - - "Create remote actor when client-managed is hosted locally" in { - val localClientManaged = Actor.remote.actorOf[TestActor](host, port) - localClientManaged.homeAddress must not be (None) - } - } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 2eec948698..7aa0cede07 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -221,7 +221,7 @@ class MyActorWithDualCounter extends Actor { } } -@serializable class MyActor extends Actor { +class MyActor extends Actor with scala.Serializable { var count = 0 def receive = { @@ -249,7 +249,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { } } -@serializable class MyJavaSerializableActor extends Actor { +class MyJavaSerializableActor extends Actor with scala.Serializable { var count = 0 self.receiveTimeout = Some(1000) diff --git a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala deleted file mode 100644 index 6edec702b5..0000000000 --- a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.actor.ticket - -import akka.actor._ -import akka.actor.remote.AkkaRemoteTest - - -class Ticket519Spec extends AkkaRemoteTest { - "A remote TypedActor" should { - "should handle remote future replies" in { - val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,host,port) - val r = actor.someFutureString - - r.await.result.get must equal ("foo") - } - } -} diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala deleted file mode 100644 index 3477ff5783..0000000000 --- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package sample.remote - -import akka.actor.Actor._ -import akka.actor. {ActorRegistry, Actor} -import Actor.remote - -class RemoteHelloWorldActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } -} - -object ClientManagedRemoteActorServer { - def run = { - remote.start("localhost", 2552) - } - - def main(args: Array[String]) = run -} - -object ClientManagedRemoteActorClient { - - def run = { - val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start - val result = actor !! "Hello" - } - - def main(args: Array[String]) = run -} - diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index 74b1bf5a9e..5d1aa9dc96 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -11,7 +11,7 @@ import org.multiverse.transactional.refs.BasicRef /** * Common trait for all the transactional objects. */ -@serializable trait Transactional { +trait Transactional extends Serializable { val uuid: String } diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 813c26ab94..3db2288e8f 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -377,18 +377,12 @@ object TypedActorConfiguration { new TypedActorConfiguration() } - def apply(timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().timeout(Duration(timeout, "millis")) + def apply(timeoutMillis: Long) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(Duration(timeoutMillis, "millis")) } - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port) - } - - @deprecated("Will be removed after 1.1") - def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = { - new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis")) + def apply(timeout: Duration) : TypedActorConfiguration = { + new TypedActorConfiguration().timeout(timeout) } } @@ -399,7 +393,6 @@ object TypedActorConfiguration { */ final class TypedActorConfiguration { private[akka] var _timeout: Long = Actor.TIMEOUT - private[akka] var _host: Option[InetSocketAddress] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None private[akka] var _threadBasedDispatcher: Option[Boolean] = None private[akka] var _id: Option[String] = None @@ -416,15 +409,6 @@ final class TypedActorConfiguration { this } - @deprecated("Will be removed after 1.1") - def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port)) - - @deprecated("Will be removed after 1.1") - def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = { - _host = Some(remoteAddress) - this - } - def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = { if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException( "Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'") @@ -480,28 +464,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration()) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @param host hostanme of the remote server - * @param port port of the remote server - */ - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -522,32 +484,6 @@ object TypedActor { newInstance(intfClass, factory, TypedActorConfiguration(timeout)) } - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param targetClass implementation class of the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout)) - } - - /** - * Factory method for remote typed actor. - * @param intfClass interface the typed actor implements - * @param factory factory method that constructs the typed actor - * @paramm timeout timeout for future - * @param host hostanme of the remote server - * @param port port of the remote server - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = { - newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout)) - } - /** * Factory method for typed actor. * @param intfClass interface the typed actor implements @@ -555,20 +491,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(factory),config), config) - - /** - * Creates an ActorRef, can be local only or client-managed-remote - */ - @deprecated("Will be removed after 1.1") - private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = { - config match { - case null => actorOf(typedActor) - case c: TypedActorConfiguration if (c._host.isDefined) => - Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort) - case _ => actorOf(typedActor) - } - } + newInstance(intfClass, actorOf(newTypedActor(factory)), config) /** * Factory method for typed actor. @@ -577,7 +500,7 @@ object TypedActor { * @paramm config configuration object fo the typed actor */ def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = - newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config) + newInstance(intfClass, actorOf(newTypedActor(targetClass)), config) private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") @@ -585,11 +508,8 @@ object TypedActor { } private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], - remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val config = TypedActorConfiguration(timeout) - if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get) - newInstance(intfClass, targetClass, config) - } + remoteAddress: Option[InetSocketAddress], timeout: Long): T = + newInstance(intfClass, targetClass, TypedActorConfiguration(timeout)) private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = { val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] @@ -601,13 +521,7 @@ object TypedActor { actorRef.timeout = config.timeout - val remoteAddress = actorRef match { - case remote: RemoteActorRef => remote.homeAddress - case local: LocalActorRef if local.clientManaged => local.homeAddress - case _ => None - } - - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout)) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout)) actorRef.start proxy.asInstanceOf[T] } @@ -633,20 +547,6 @@ object TypedActor { def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T = newInstance(intfClass, factory.create) - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, hostname, port) - - /** - * Java API. - */ - @deprecated("Will be removed after 1.1") - def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T = - newRemoteInstance(intfClass, factory.create, timeout, hostname, port) - /** * Java API. */ diff --git a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala index f2f2a7a1fc..71dbe04413 100644 --- a/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/akka/config/TypedActorGuiceConfigurator.scala @@ -106,14 +106,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa val implementationClass = component.target val timeout = component.timeout - val (remoteAddress,actorRef) = - component.remoteAddress match { - case Some(a) => - (Some(new InetSocketAddress(a.hostname, a.port)), - Actor.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port)) - case None => - (None, Actor.actorOf(TypedActor.newTypedActor(implementationClass))) - } + val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass)) actorRef.timeout = timeout if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get @@ -123,7 +116,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa AspectInitRegistry.register( proxy, - AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout)) + AspectInit(interfaceClass, typedActor, actorRef, None, timeout)) typedActor.initialize(proxy) actorRef.start