From 3f49eadedccc23adae375f9eca2199d34d29e711 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Apr 2011 13:03:23 +0200 Subject: [PATCH] Removing more deprecation warnings and more client-managed actors residue --- .../src/main/scala/akka/dispatch/Future.scala | 8 ++-- .../akka/dispatch/ThreadPoolBuilder.scala | 3 -- .../remoteinterface/RemoteInterface.scala | 10 ----- .../src/main/scala/akka/util/Duration.scala | 4 +- .../remote/netty/NettyRemoteSupport.scala | 45 +++---------------- .../ServerInitiatedRemoteActorSpec.scala | 2 +- .../testkit/CallingThreadDispatcher.scala | 4 +- 7 files changed, 14 insertions(+), 62 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 8c6b8992b1..6cd13d69eb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -61,7 +61,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) + firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures),timeout) /** * A non-blocking fold over the specified futures. @@ -87,7 +87,7 @@ object Futures { results add r.b if (results.size == allDone) { //Only one thread can get here try { - result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) + result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun) } catch { case e: Exception => EventHandler.error(e, this, e.getMessage) @@ -115,7 +115,7 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))( fun.apply _ ) /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first @@ -150,7 +150,7 @@ object Futures { * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 83c30f23e0..130eeb9163 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -88,9 +88,6 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi import ThreadPoolConfig._ def build = dispatcherFactory(config) - //TODO remove this, for backwards compat only - @deprecated("Use .build instead") def buildThreadPool = build - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 964c9f9f29..18bc792d98 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -400,14 +400,4 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType, loader: Option[ClassLoader]): Option[CompletableFuture[T]] - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef - - @deprecated("Will be removed after 1.1") - private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit - - @deprecated("Will be removed after 1.1") - private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index fb5673277c..933e3cd9a9 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -37,7 +37,7 @@ object Duration { * Construct a Duration by parsing a String. In case of a format error, a * RuntimeException is thrown. See `unapply(String)` for more information. */ - def apply(s : String) : Duration = unapply(s) getOrElse error("format error") + def apply(s : String) : Duration = unapply(s) getOrElse sys.error("format error") /** * Deconstruct a Duration into length and unit if it is finite. @@ -77,7 +77,7 @@ object Duration { if ( ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if ( ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else - error("made some error in regex (should not be possible)") + sys.error("made some error in regex (should not be possible)") case REinf() => Some(Inf) case REminf() => Some(MinusInf) case _ => None diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e7e1eaad6f..9fbca92c40 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -119,16 +119,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } } - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - withClientFor(actorRef.homeAddress.get, None)(_.registerSupervisorForActor(actorRef)) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { - remoteClients.get(Address(actorRef.homeAddress.get)) match { - case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef) - case None => actorRef - } - } - /** * Clean-up all open connections. */ @@ -170,7 +160,6 @@ abstract class RemoteClient private[akka] ( remoteAddress.getPort protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -320,16 +309,6 @@ abstract class RemoteClient private[akka] ( pendingRequest = pendingRequests.peek // try to grab next message } } - - private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't register supervisor for " + actorRef + " since it is not under supervision") - else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - - private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = - if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( - "Can't unregister supervisor for " + actorRef + " since it is not under supervision") - else supervisors.remove(actorRef.supervisor.get.uuid) } /** @@ -358,7 +337,7 @@ class ActiveRemoteClient private[akka] ( timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -433,7 +412,6 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, futures: ConcurrentMap[Uuid, CompletableFuture[_]], - supervisors: ConcurrentMap[Uuid, ActorRef], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -450,7 +428,7 @@ class ActiveRemoteClientPipelineFactory( case _ => (Nil,Nil) } - val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil new StaticChannelPipeline(stages: _*) } @@ -463,7 +441,6 @@ class ActiveRemoteClientPipelineFactory( class ActiveRemoteClientHandler( val name: String, val futures: ConcurrentMap[Uuid, CompletableFuture[_]], - val supervisors: ConcurrentMap[Uuid, ActorRef], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -488,19 +465,7 @@ class ActiveRemoteClientHandler( val message = MessageSerializer.deserialize(reply.getMessage) future.completeWithResult(message) } else { - val exception = parseException(reply, client.loader) - - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) - } - - future.completeWithException(exception) + future.completeWithException(parseException(reply, client.loader)) } case other => throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) @@ -891,14 +856,14 @@ class RemoteServerHandler( // stop all session actors for (map <- Option(sessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { actor ! PoisonPill } catch { case e: Exception => } } //FIXME switch approach or use other thread to execute this // stop all typed session actors for (map <- Option(typedSessionActors.remove(event.getChannel)); - actor <- asScalaIterable(map.values)) { + actor <- collectionAsScalaIterable(map.values)) { try { TypedActor.stop(actor) } catch { case e: Exception => } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 88a5ec8ec3..c978135ad2 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -189,7 +189,7 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { while(!testDone()) { if (latch.await(200, TimeUnit.MILLISECONDS)) - error("Test didn't complete within 100 cycles") + sys.error("Test didn't complete within 100 cycles") else latch.countDown } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ce198be6bf..e6fd8ebbce 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -206,8 +206,8 @@ class NestingQueue { def pop = q.poll @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } + def enter { if (active) sys.error("already active") else active = true } + def leave { if (!active) sys.error("not active") else active = false } def isActive = active }