From 0d40ba0a0e1d0c4f47b02a1a8c8982f4d0d4a286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Fri, 2 Jul 2010 11:14:49 +0200 Subject: [PATCH] Changed Akka to use IllegalActorStateException instead of IllegalStateException --- akka-amqp/src/main/scala/AMQP.scala | 4 +- .../src/main/scala/actor/ActiveObject.scala | 38 +++++++++---------- akka-core/src/main/scala/actor/Actor.scala | 2 +- akka-core/src/main/scala/actor/ActorRef.scala | 17 ++++----- .../src/main/scala/actor/ActorRegistry.scala | 2 +- .../scala/actor/SerializationProtocol.scala | 2 +- .../ActiveObjectGuiceConfigurator.scala | 8 ++-- .../ExecutorBasedEventDrivenDispatcher.scala | 6 +-- ...sedEventDrivenWorkStealingDispatcher.scala | 8 ++-- ...BasedThreadPoolEventDrivenDispatcher.scala | 8 ++-- .../scala/dispatch/ThreadPoolBuilder.scala | 5 ++- .../src/main/scala/remote/RemoteClient.scala | 10 ++--- .../src/main/scala/remote/RemoteServer.scala | 2 +- .../scala/stm/TransactionManagement.scala | 8 ++-- ...ventDrivenWorkStealingDispatcherSpec.scala | 4 +- .../src/test/scala/ReceiveTimeoutSpec.scala | 9 ++--- akka-http/src/main/scala/Security.scala | 10 ++--- .../src/main/scala/ActiveObjectParser.scala | 4 +- 18 files changed, 75 insertions(+), 72 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 04e4570310..d687619cbd 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.amqp import com.rabbitmq.client.{AMQP => RabbitMQ, _} import com.rabbitmq.client.ConnectionFactory -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import se.scalablesolutions.akka.config.ScalaConfig._ @@ -468,7 +468,7 @@ object AMQP { override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = { def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = { - if (listener.tag.isEmpty) throw new IllegalStateException( + if (listener.tag.isEmpty) throw new IllegalActorStateException( "MessageConsumerListener [" + listener + "] does not have a tag") listener.tag.get == listenerTag } diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 0201391940..fc70441fae 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -102,7 +102,7 @@ final class ActiveObjectContext { * Scala style getter. */ def sender: AnyRef = { - if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.") + if (_sender eq null) throw new IllegalActorStateException("Sender reference should not be null.") else _sender } @@ -111,7 +111,7 @@ final class ActiveObjectContext { * Java style getter. */ def getSender: AnyRef = { - if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.") + if (_sender eq null) throw new IllegalActorStateException("Sender reference should not be null.") else _sender } @@ -392,9 +392,9 @@ object ActiveObject extends Logging { */ def link(supervisor: AnyRef, supervised: AnyRef) = { val supervisorActor = actorFor(supervisor).getOrElse( - throw new IllegalStateException("Can't link when the supervisor is not an active object")) + throw new IllegalActorStateException("Can't link when the supervisor is not an active object")) val supervisedActor = actorFor(supervised).getOrElse( - throw new IllegalStateException("Can't link when the supervised is not an active object")) + throw new IllegalActorStateException("Can't link when the supervised is not an active object")) supervisorActor.link(supervisedActor) } @@ -407,9 +407,9 @@ object ActiveObject extends Logging { */ def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { val supervisorActor = actorFor(supervisor).getOrElse( - throw new IllegalStateException("Can't link when the supervisor is not an active object")) + throw new IllegalActorStateException("Can't link when the supervisor is not an active object")) val supervisedActor = actorFor(supervised).getOrElse( - throw new IllegalStateException("Can't link when the supervised is not an active object")) + throw new IllegalActorStateException("Can't link when the supervised is not an active object")) supervisorActor.trapExit = trapExceptions.toList supervisorActor.faultHandler = Some(handler) supervisorActor.link(supervisedActor) @@ -422,9 +422,9 @@ object ActiveObject extends Logging { */ def unlink(supervisor: AnyRef, supervised: AnyRef) = { val supervisorActor = actorFor(supervisor).getOrElse( - throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + throw new IllegalActorStateException("Can't unlink when the supervisor is not an active object")) val supervisedActor = actorFor(supervised).getOrElse( - throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + throw new IllegalActorStateException("Can't unlink when the supervised is not an active object")) supervisorActor.unlink(supervisedActor) } @@ -435,7 +435,7 @@ object ActiveObject extends Logging { */ def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { val supervisorActor = actorFor(supervisor).getOrElse( - throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + throw new IllegalActorStateException("Can't set trap exceptions when the supervisor is not an active object")) supervisorActor.trapExit = trapExceptions.toList this } @@ -447,7 +447,7 @@ object ActiveObject extends Logging { */ def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { val supervisorActor = actorFor(supervisor).getOrElse( - throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + throw new IllegalActorStateException("Can't set fault handler when the supervisor is not an active object")) supervisorActor.faultHandler = Some(handler) this } @@ -553,7 +553,7 @@ private[akka] sealed class ActiveObjectAspect { } else { val result = (actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)).as[AnyRef] if (result.isDefined) result.get - else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") + else throw new IllegalActorStateException("No result defined for invocation [" + joinPoint + "]") } } @@ -581,8 +581,8 @@ private[akka] sealed class ActiveObjectAspect { future.get.await val result = getResultOrThrowException(future.get) if (result.isDefined) result.get - else throw new IllegalStateException("No result returned from call to [" + joinPoint + "]") - } else throw new IllegalStateException("No future returned from call to [" + joinPoint + "]") + else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]") + } else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]") } } @@ -686,12 +686,12 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op case Some(RestartCallbacks(pre, post)) => preRestart = Some(try { targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*) - } catch { case e => throw new IllegalStateException( + } catch { case e => throw new IllegalActorStateException( "Could not find pre restart method [" + pre + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) postRestart = Some(try { targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*) - } catch { case e => throw new IllegalStateException( + } catch { case e => throw new IllegalActorStateException( "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } @@ -701,11 +701,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart)) if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) - throw new IllegalStateException( + throw new IllegalActorStateException( "Method annotated with @prerestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0) - throw new IllegalStateException( + throw new IllegalActorStateException( "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") @@ -715,7 +715,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) - throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") + throw new IllegalActorStateException("Method annotated with @inittransactionalstate must have a zero argument definition") if (initTxState.isDefined) initTxState.get.setAccessible(true) } @@ -736,7 +736,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op case Link(target) => self.link(target) case Unlink(target) => self.unlink(target) case unexpected => - throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") + throw new IllegalActorStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } override def preRestart(reason: Throwable) { diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 213bd29b61..33bf9bf998 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -438,7 +438,7 @@ trait Actor extends Logging { cancelReceiveTimeout lifeCycles orElse (self.hotswap getOrElse receive) } catch { - case e: NullPointerException => throw new IllegalStateException( + case e: NullPointerException => throw new IllegalActorStateException( "The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.") } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index f09d3065ab..d1684bdd2b 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -319,7 +319,7 @@ trait ActorRef extends TransactionManagement { if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout( message, timeout, sender.get.sender, sender.get.senderFuture) else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get)) - else throw new IllegalStateException("Can't forward message when initial sender is not an actor") + else throw new IllegalActorStateException("Can't forward message when initial sender is not an actor") } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -329,7 +329,7 @@ trait ActorRef extends TransactionManagement { *

* Throws an IllegalStateException if unable to determine what to reply to. */ - def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException( + def reply(message: Any) = if(!reply_?(message)) throw new IllegalActorStateException( "\n\tNo sender in scope, can't reply. " + "\n\tYou have probably: " + "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + @@ -760,7 +760,7 @@ sealed class LocalActorRef private[akka]( * To be invoked from within the actor itself. */ def link(actorRef: ActorRef) = guard.withGuard { - if (actorRef.supervisor.isDefined) throw new IllegalStateException( + if (actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") linkedActors.put(actorRef.uuid, actorRef) actorRef.supervisor = Some(this) @@ -773,7 +773,7 @@ sealed class LocalActorRef private[akka]( * To be invoked from within the actor itself. */ def unlink(actorRef: ActorRef) = guard.withGuard { - if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException( + if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalActorStateException( "Actor [" + actorRef + "] is not a linked actor, can't unlink") linkedActors.remove(actorRef.uuid) actorRef.supervisor = None @@ -943,7 +943,7 @@ sealed class LocalActorRef private[akka]( val future = RemoteClient.clientFor(remoteAddress.get).send( createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) @@ -1011,7 +1011,6 @@ sealed class LocalActorRef private[akka]( setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit } } catch { - case e: IllegalStateException => {} case e => _isBeingRestarted = true // abort transaction set @@ -1046,7 +1045,7 @@ sealed class LocalActorRef private[akka]( case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) } - } else throw new IllegalStateException( + } else throw new IllegalActorStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + "\n\tto non-empty list of exception classes - can't proceed " + toString) } else { @@ -1148,7 +1147,7 @@ sealed class LocalActorRef private[akka]( case e: NoSuchFieldException => val parent = clazz.getSuperclass if (parent != null) findActorSelfField(parent) - else throw new IllegalStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + else throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") } } @@ -1214,7 +1213,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = remoteClient.send(createRemoteRequestProtocolBuilder(this, message, false, senderOption).build, senderFuture) if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } def start: ActorRef = { diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 766f69af7f..fea1b3b01c 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -116,7 +116,7 @@ object ActorRegistry extends ListenerManagement { // ID val id = actor.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actor) + if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) if (actorsById.containsKey(id)) actorsById.get(id).add(actor) else { val set = new CopyOnWriteArraySet[ActorRef] diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala index 4478f7df1c..54ee27034e 100644 --- a/akka-core/src/main/scala/actor/SerializationProtocol.scala +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -138,7 +138,7 @@ object ActorSerialization { else None Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks) - else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) + else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) } else None val supervisor = diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 8d3a089d26..3beac783a7 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config import com.google.inject._ import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef, Actor} +import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef, Actor, IllegalActorStateException} import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.util.Logging @@ -42,10 +42,10 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat */ def getInstance[T](clazz: Class[T]): List[T] = synchronized { log.debug("Retrieving active object [%s]", clazz.getName) - if (injector eq null) throw new IllegalStateException( + if (injector eq null) throw new IllegalActorStateException( "inject() and/or supervise() must be called before invoking getInstance(clazz)") val (proxy, targetInstance, component) = - activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException( + activeObjectRegistry.getOrElse(clazz, throw new IllegalActorStateException( "Class [" + clazz.getName + "] has not been put under supervision" + "\n(by passing in the config to the 'configure' and then invoking 'supervise') method")) injector.injectMembers(targetInstance) @@ -114,7 +114,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat } override def inject: ActiveObjectConfiguratorBase = synchronized { - if (injector ne null) throw new IllegalStateException("inject() has already been called on this configurator") + if (injector ne null) throw new IllegalActorStateException("inject() has already been called on this configurator") injector = Guice.createInjector(modules) this } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 3ceec873a7..8d42b3c8fc 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.dispatch -import se.scalablesolutions.akka.actor.ActorRef +import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} /** * Default settings are: @@ -92,7 +92,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) } }) - } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") + } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** @@ -133,7 +133,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat def usesActorMailbox = true - def ensureNotActive: Unit = if (active) throw new IllegalStateException( + def ensureNotActive: Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 438abadfda..a602eab223 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.CopyOnWriteArrayList -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -55,7 +55,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } } }) - } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") + } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by @@ -162,7 +162,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess references.clear } - def ensureNotActive: Unit = if (active) throw new IllegalStateException( + def ensureNotActive: Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool @@ -187,7 +187,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } case Some(aType) => { if (aType != actorOfId.actor.getClass) - throw new IllegalStateException( + throw new IllegalActorStateException( String.format("Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", actorOfId.actor, aType)) } diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index e73ad26013..09961d8eb0 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -8,6 +8,8 @@ import java.util.concurrent.locks.ReentrantLock import java.util.{HashSet, HashMap, LinkedList, List} +import se.scalablesolutions.akka.actor.IllegalActorStateException + /** * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350]. @@ -105,10 +107,10 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) val invocations = selectedInvocations.iterator while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) { val invocation = invocations.next - if (invocation eq null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") + if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]") if (!busyActors.contains(invocation.receiver)) { val invoker = messageInvokers.get(invocation.receiver) - if (invoker eq null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") + if (invoker eq null) throw new IllegalActorStateException("Message invoker for invocation [" + invocation + "] is null") resume(invocation.receiver) invocations.remove executor.execute(new Runnable() { @@ -137,7 +139,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) def usesActorMailbox = false - def ensureNotActive: Unit = if (active) throw new IllegalStateException( + def ensureNotActive: Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 21753e77c9..6f1d11e14e 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -9,6 +9,7 @@ import java.util.concurrent._ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy +import se.scalablesolutions.akka.actor.IllegalActorStateException import se.scalablesolutions.akka.util.Logging trait ThreadPoolBuilder { @@ -142,12 +143,12 @@ trait ThreadPoolBuilder { } protected def verifyNotInConstructionPhase = { - if (inProcessOfBuilding) throw new IllegalStateException("Is already in the process of building a thread pool") + if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool") inProcessOfBuilding = true } protected def verifyInConstructionPhase = { - if (!inProcessOfBuilding) throw new IllegalStateException( + if (!inProcessOfBuilding) throw new IllegalActorStateException( "Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods") } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index fc8d8ed0e3..4c18dcc6c8 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ -import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef} +import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.config.Config.config @@ -230,11 +230,11 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } private[akka] def registerSupervisorForActor(actorRef: ActorRef) = - if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision") + if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException("Can't register supervisor for " + actorRef + " since it is not under supervision") else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) = - if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision") + if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision") else supervisors.remove(actorRef.supervisor.get.uuid) } @@ -302,10 +302,10 @@ class RemoteClientHandler(val name: String, } else { if (reply.hasSupervisorUuid()) { val supervisorUuid = reply.getSupervisorUuid - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException( + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException( + if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply)) } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 943481ebc3..28d087b3e1 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -341,7 +341,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val message = event.getMessage - if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) + if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequestProtocol]) { handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index e6485ff761..0c6a244f42 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -147,7 +147,7 @@ class GlobalStm extends TransactionManagement with Logging { val txSet = getTransactionSetInScope log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - txSet.joinCommit(mtx) + try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} } clearTransaction result } @@ -160,13 +160,15 @@ trait StmUtil { * Schedule a deferred task on the thread local transaction (use within an atomic). * This is executed when the transaction commits. */ - def deferred[T](body: => T): Unit = MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body }) + def deferred[T](body: => T): Unit = + MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body }) /** * Schedule a compensating task on the thread local transaction (use within an atomic). * This is executed when the transaction aborts. */ - def compensating[T](body: => T): Unit = MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body }) + def compensating[T](body: => T): Unit = + MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body }) /** * STM retry for blocking transactions (use within an atomic). diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 64f5a95173..3603760953 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -89,7 +89,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with val second = actorOf[SecondActor] first.start - intercept[IllegalStateException] { + intercept[IllegalActorStateException] { second.start } } @@ -99,7 +99,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with val child = actorOf[ChildActor] parent.start - intercept[IllegalStateException] { + intercept[IllegalActorStateException] { child.start } } diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala index aa800c8f3d..938cb08d43 100644 --- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -21,8 +21,7 @@ class ReceiveTimeoutSpec extends JUnitSuite { } }).start - // after max 1 second the timeout should already been sent - assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS)) } @Test def swappedReceiveShouldAlsoGetTimout = { @@ -37,15 +36,14 @@ class ReceiveTimeoutSpec extends JUnitSuite { }).start // after max 1 second the timeout should already been sent - assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS)) val swappedLatch = new StandardLatch timeoutActor ! HotSwap(Some{ case ReceiveTimeout => swappedLatch.open }) - // after max 1 second the timeout should already been sent - assert(swappedLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS)) } @Test def timeoutShouldBeCancelledAfterRegularReceive = { @@ -62,7 +60,6 @@ class ReceiveTimeoutSpec extends JUnitSuite { }).start timeoutActor ! Tick - // timeout already after 500 ms, so 1 second wait should be enough assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false) } } diff --git a/akka-http/src/main/scala/Security.scala b/akka-http/src/main/scala/Security.scala index f550177965..b0f3c10be0 100644 --- a/akka-http/src/main/scala/Security.scala +++ b/akka-http/src/main/scala/Security.scala @@ -22,7 +22,7 @@ package se.scalablesolutions.akka.security -import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry} +import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.Config import se.scalablesolutions.akka.util.Logging @@ -102,7 +102,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { lazy val authenticatorFQN = { val auth = Config.config.getString("akka.rest.authenticator", "N/A") - if (auth == "N/A") throw new IllegalStateException("The config option 'akka.rest.authenticator' is not defined in 'akka.conf'") + if (auth == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.authenticator' is not defined in 'akka.conf'") auth } @@ -400,7 +400,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w */ lazy val servicePrincipal = { val p = Config.config.getString("akka.rest.kerberos.servicePrincipal", "N/A") - if (p == "N/A") throw new IllegalStateException("The config option 'akka.rest.kerberos.servicePrincipal' is not defined in 'akka.conf'") + if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.servicePrincipal' is not defined in 'akka.conf'") p } @@ -409,13 +409,13 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w */ lazy val keyTabLocation = { val p = Config.config.getString("akka.rest.kerberos.keyTabLocation", "N/A") - if (p == "N/A") throw new IllegalStateException("The config option 'akka.rest.kerberos.keyTabLocation' is not defined in 'akka.conf'") + if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.keyTabLocation' is not defined in 'akka.conf'") p } lazy val kerberosDebug = { val p = Config.config.getString("akka.rest.kerberos.kerberosDebug", "N/A") - if (p == "N/A") throw new IllegalStateException("The config option 'akka.rest.kerberos.kerberosDebug' is not defined in 'akka.conf'") + if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.kerberosDebug' is not defined in 'akka.conf'") p } diff --git a/akka-spring/src/main/scala/ActiveObjectParser.scala b/akka-spring/src/main/scala/ActiveObjectParser.scala index a1ddc0ae89..fc6b372720 100644 --- a/akka-spring/src/main/scala/ActiveObjectParser.scala +++ b/akka-spring/src/main/scala/ActiveObjectParser.scala @@ -7,6 +7,8 @@ import org.springframework.util.xml.DomUtils import org.w3c.dom.Element import scala.collection.JavaConversions._ +import se.scalablesolutions.akka.actor.IllegalActorStateException + /** * Parser trait for custom namespace configuration for active-object. * @author michaelkober @@ -36,7 +38,7 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser { objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART) objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART) if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) { - throw new IllegalStateException("At least one of pre or post must be defined.") + throw new IllegalActorStateException("At least one of pre or post must be defined.") } }