diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 628c9fb7bb..e6ee2a4c66 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -630,7 +630,6 @@ trait ActorRef extends override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { - that != null && that.isInstanceOf[ActorRef] && that.asInstanceOf[ActorRef].uuid == uuid } @@ -667,9 +666,6 @@ class LocalActorRef private[akka]( @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None @volatile private[akka] var _supervisor: Option[ActorRef] = None @volatile private var isInInitialization = false - @volatile private var runActorInitialization = false - @volatile private var isDeserialized = false - @volatile private var loader: Option[ClassLoader] = None @volatile private var maxNrOfRetriesCount: Int = 0 @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L @volatile private var _mailbox: AnyRef = _ @@ -680,7 +676,8 @@ class LocalActorRef private[akka]( // instance elegible for garbage collection private val actorSelfFields = findActorSelfField(actor.getClass) - if (runActorInitialization && !isDeserialized) initializeActorInstance + //If it was started inside "newActor", initialize it + if (isRunning) initializeActorInstance private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) @@ -696,11 +693,8 @@ class LocalActorRef private[akka]( __lifeCycle: Option[LifeCycle], __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], - __loader: ClassLoader, __factory: () => Actor) = { this(__factory) - loader = Some(__loader) - isDeserialized = true _uuid = __uuid id = __id homeAddress = (__hostname, __port) @@ -818,7 +812,6 @@ class LocalActorRef private[akka]( } _status = ActorRefStatus.RUNNING if (!isInInitialization) initializeActorInstance - else runActorInitialization = true } this } @@ -1302,7 +1295,7 @@ class LocalActorRef private[akka]( } catch { case e: NoSuchFieldException => val parent = clazz.getSuperclass - if (parent != null) findActorSelfField(parent) + if (parent ne null) findActorSelfField(parent) else throw new IllegalActorStateException( toString + " is not an Actor since it have not mixed in the 'Actor' trait") } diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index c9d9a4968b..7a4ac4be48 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -33,7 +33,7 @@ object Config { val HOME = { val systemHome = System.getenv("AKKA_HOME") - if (systemHome == null || systemHome.length == 0 || systemHome == ".") { + if ((systemHome eq null) || systemHome.length == 0 || systemHome == ".") { val optionHome = System.getProperty("akka.home", "") if (optionHome.length != 0) Some(optionHome) else None @@ -52,7 +52,7 @@ object Config { "\n\tdue to: " + e.toString) } Configgy.config - } else if (getClass.getClassLoader.getResource("akka.conf") != null) { + } else if (getClass.getClassLoader.getResource("akka.conf") ne null) { try { Configgy.configureFromResource("akka.conf", getClass.getClassLoader) ConfigLogger.log.info("Config loaded from the application classpath.") diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index f40261cddb..d36af50ad6 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -116,6 +116,10 @@ class ExecutorBasedEventDrivenDispatcher( val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 do { nextMessage.invoke + + if (nextMessage.receiver.isBeingRestarted) + return !self.isEmpty + if (throttle) { // Will be elided when false processedMessages += 1 if ((processedMessages >= throughput) || diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index faefa4fd10..a5ed113b97 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -75,33 +75,36 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return true if the mailbox was processed, false otherwise */ private def tryProcessMailbox(mailbox: MessageQueue): Boolean = { - var lockAcquiredOnce = false + var mailboxWasProcessed = false // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { if (mailbox.dispatcherLock.tryLock) { - lockAcquiredOnce = true try { - processMailbox(mailbox) + mailboxWasProcessed = processMailbox(mailbox) } finally { mailbox.dispatcherLock.unlock } } - } while ((lockAcquiredOnce && !mailbox.isEmpty)) + } while ((mailboxWasProcessed && !mailbox.isEmpty)) - lockAcquiredOnce + mailboxWasProcessed } /** * Process the messages in the mailbox of the given actor. + * @return */ - private def processMailbox(mailbox: MessageQueue) = { + private def processMailbox(mailbox: MessageQueue): Boolean = { var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke + if (messageInvocation.receiver.isBeingRestarted) + return false messageInvocation = mailbox.dequeue } + true } private def findThief(receiver: ActorRef): Option[ActorRef] = { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index d2ff59854a..60c62c56b9 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -39,7 +39,6 @@ final class MessageInvocation(val receiver: ActorRef, } override def equals(that: Any): Boolean = { - that != null && that.isInstanceOf[MessageInvocation] && that.asInstanceOf[MessageInvocation].receiver.actor == receiver.actor && that.asInstanceOf[MessageInvocation].message == message diff --git a/akka-actor/src/main/scala/stm/Transaction.scala b/akka-actor/src/main/scala/stm/Transaction.scala index 60e0cd6772..9ea32d7ca6 100644 --- a/akka-actor/src/main/scala/stm/Transaction.scala +++ b/akka-actor/src/main/scala/stm/Transaction.scala @@ -165,7 +165,6 @@ object Transaction { } */ override def equals(that: Any): Boolean = synchronized { - that != null && that.isInstanceOf[Transaction] && that.asInstanceOf[Transaction].id == this.id } diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index eab9e1981d..394b39e101 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -11,7 +11,7 @@ import java.security.MessageDigest */ object Helpers extends Logging { - implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None + implicit def null2Option[T](t: T): Option[T] = Option(t) def intToBytes(value: Int): Array[Byte] = { val bytes = new Array[Byte](4) @@ -41,7 +41,7 @@ object Helpers extends Logging { * if the actual type is not assignable from the given one. */ def narrow[T](o: Option[Any]): Option[T] = { - require(o != null, "Option to be narrowed must not be null!") + require((o ne null), "Option to be narrowed must not be null!") o.asInstanceOf[Option[T]] } diff --git a/akka-camel/src/main/scala/component/TypedActorComponent.scala b/akka-camel/src/main/scala/component/TypedActorComponent.scala index 542705d0c6..f172cc808b 100644 --- a/akka-camel/src/main/scala/component/TypedActorComponent.scala +++ b/akka-camel/src/main/scala/component/TypedActorComponent.scala @@ -104,7 +104,7 @@ class TypedActorInfo(context: CamelContext, clazz: Class[_], strategy: Parameter } } val superclass = clazz.getSuperclass - if (superclass != null && !superclass.equals(classOf[AnyRef])) { + if ((superclass ne null) && !superclass.equals(classOf[AnyRef])) { introspect(superclass) } } diff --git a/akka-http/src/main/scala/Security.scala b/akka-http/src/main/scala/Security.scala index b0f3c10be0..2db1e4981b 100644 --- a/akka-http/src/main/scala/Security.scala +++ b/akka-http/src/main/scala/Security.scala @@ -207,7 +207,7 @@ trait AuthenticationActor[C <: Credentials] extends Actor { //Turns the aforementioned header value into an option def authOption(r: Req): Option[String] = { val a = auth(r) - if (a != null && a.length > 0) Some(a) else None + if ((a ne null) && a.length > 0) Some(a) else None } } diff --git a/akka-jta/src/main/scala/TransactionProtocol.scala b/akka-jta/src/main/scala/TransactionProtocol.scala index f85b7ee1e3..487dece483 100644 --- a/akka-jta/src/main/scala/TransactionProtocol.scala +++ b/akka-jta/src/main/scala/TransactionProtocol.scala @@ -221,7 +221,7 @@ trait TransactionProtocol extends Logging { private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx) private def fetchFromThreadLocal: Option[Transaction] = { - if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction]) + if ((suspendedTx ne null) && (suspendedTx.get() ne null)) Some(suspendedTx.get.asInstanceOf[Transaction]) else None } } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 088c0b8ff4..77e40a6127 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -71,6 +71,16 @@ trait Storage { throw new UnsupportedOperationException } +private[akka] object PersistentMap { + // operations on the Map + sealed trait Op + case object GET extends Op + case object PUT extends Op + case object REM extends Op + case object UPD extends Op + case object CLR extends Op +} + /** * Implementation of PersistentMap for every concrete * storage will have the same workflow. This abstracts the workflow. @@ -83,13 +93,8 @@ trait Storage { trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] with Transactional with Committable with Abortable with Logging { - // operations on the Map - trait Op - case object GET extends Op - case object PUT extends Op - case object REM extends Op - case object UPD extends Op - case object CLR extends Op + //Import Ops + import PersistentMap._ // append only log: records all mutating operations protected val appendOnlyTxLog = TransactionalVector[LogEntry]() @@ -362,17 +367,22 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { } } +private[akka] object PersistentVector { + // operations on the Vector + sealed trait Op + case object ADD extends Op + case object UPD extends Op + case object POP extends Op +} + /** * Implements a template for a concrete persistent transactional vector based storage. * * @author Jonas Bonér */ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable { - // operations on the Vector - trait Op - case object ADD extends Op - case object UPD extends Op - case object POP extends Op + //Import Ops + import PersistentVector._ // append only log: records all mutating operations protected val appendOnlyTxLog = TransactionalVector[LogEntry]() @@ -510,6 +520,13 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { } } + private[akka] object PersistentQueue { + //Operations for PersistentQueue + sealed trait QueueOp + case object ENQ extends QueueOp + case object DEQ extends QueueOp + } + /** * Implementation of PersistentQueue for every concrete * storage will have the same workflow. This abstracts the workflow. @@ -538,10 +555,8 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] with Transactional with Committable with Abortable with Logging { - sealed trait QueueOp - case object ENQ extends QueueOp - case object DEQ extends QueueOp - + //Import Ops + import PersistentQueue._ import scala.collection.immutable.Queue // current trail that will be played on commit to the underlying store diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index e08c45d159..20b9804ed4 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -21,6 +21,11 @@ import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} import collection.mutable.{Set, HashSet, ArrayBuffer} import java.util.{Properties, Map => JMap} +/* + RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores + In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent + */ + private[akka] object VoldemortStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with @@ -49,10 +54,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val vectorSizeIndex = getIndexedBytes(-1) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) - case class QueueMetadata(head: Int, tail: Int) { - def size = tail - head - //worry about wrapping etc - } + implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) @@ -224,11 +226,24 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def remove(name: String): Boolean = { - false + val mdata = getQueueMetadata(name) + mdata.getActiveIndexes foreach { + index => + queueClient.delete(getIndexedKey(name, index)) + } + queueClient.delete(getKey(name, queueHeadIndex)) + queueClient.delete(getKey(name, queueTailIndex)) } def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { - List(Array.empty[Byte]) + val mdata = getQueueMetadata(name) + val ret = mdata.getPeekIndexes(start, count).toList map { + index: Int => { + log.debug("peeking:" + index) + queueClient.getValue(getIndexedKey(name, index)) + } + } + ret } def size(name: String): Int = { @@ -236,15 +251,37 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def dequeue(name: String): Option[Array[Byte]] = { - None + val mdata = getQueueMetadata(name) + if (mdata.canDequeue) { + val key = getIndexedKey(name, mdata.head) + try { + val dequeued = queueClient.getValue(key) + queueClient.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue)) + Some(dequeued) + } + finally { + try { + queueClient.delete(key) + } catch { + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") + } + } + } else { + None + } } def enqueue(name: String, item: Array[Byte]): Option[Int] = { val mdata = getQueueMetadata(name) - val key = getIndexedKey(name, mdata.tail) - queueClient.put(key, item) - queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) - Some(mdata.size + 1) + if (mdata.canEnqueue) { + val key = getIndexedKey(name, mdata.tail) + queueClient.put(key, item) + queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) + Some(mdata.size + 1) + } else { + None + } } @@ -307,7 +344,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def initStoreClients() = { - if (storeClientFactory != null) { + if (storeClientFactory ne null) { storeClientFactory.close } @@ -326,6 +363,60 @@ MapStorageBackend[Array[Byte], Array[Byte]] with queueClient = storeClientFactory.getStoreClient(queueStore) } + + case class QueueMetadata(head: Int, tail: Int) { + //queue is an sequence with indexes from 0 to Int.MAX_VALUE + //wraps around when one pointer gets to max value + //head has an element in it. + //tail is the next slot to write to. + def size = { + if (tail >= head) { + tail - head + } else { + //queue has wrapped + (Integer.MAX_VALUE - head) + (tail + 1) + } + } + + def canEnqueue = { + //the -1 stops the tail from catching the head on a wrap around + size < Integer.MAX_VALUE - 1 + } + + def canDequeue = {size > 0} + + def getActiveIndexes(): IndexedSeq[Int] = { + if (tail >= head) { + Range(head, tail) + } else { + //queue has wrapped + val headRange = Range.inclusive(head, Integer.MAX_VALUE) + (if (tail > 0) {headRange ++ Range(0, tail)} else {headRange}) + } + } + + def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = { + val indexes = getActiveIndexes + if (indexes.size < start) + {IndexedSeq.empty[Int]} else + {indexes.drop(start).take(count)} + } + + def nextEnqueue = { + tail match { + case Integer.MAX_VALUE => 0 + case _ => tail + 1 + } + } + + def nextDequeue = { + head match { + case Integer.MAX_VALUE => 0 + case _ => head + 1 + } + } + } + object IntSerializer { val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 613181cbd2..8ac3d306c4 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -8,6 +8,7 @@ import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ import se.scalablesolutions.akka.util.{Logging} import collection.immutable.TreeSet import VoldemortStorageBackendSuite._ +import scala.None @RunWith(classOf[JUnitRunner]) class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { @@ -126,6 +127,44 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb } + test("Persistent Queue apis function as expected") { + val key = "queueApiKey" + val value = bytes("some bytes even") + val valueOdd = bytes("some bytes odd") + + remove(key) + VoldemortStorageBackend.size(key) should be(0) + enqueue(key, value) should be(Some(1)) + VoldemortStorageBackend.size(key) should be(1) + enqueue(key, valueOdd) should be(Some(2)) + VoldemortStorageBackend.size(key) should be(2) + peek(key, 0, 1)(0) should be(value) + peek(key, 1, 1)(0) should be(valueOdd) + dequeue(key).get should be(value) + VoldemortStorageBackend.size(key) should be(1) + dequeue(key).get should be(valueOdd) + VoldemortStorageBackend.size(key) should be(0) + dequeue(key) should be(None) + queueClient.put(getKey(key, queueHeadIndex), IntSerializer.toBytes(Integer.MAX_VALUE)) + queueClient.put(getKey(key, queueTailIndex), IntSerializer.toBytes(Integer.MAX_VALUE)) + VoldemortStorageBackend.size(key) should be(0) + enqueue(key, value) should be(Some(1)) + VoldemortStorageBackend.size(key) should be(1) + enqueue(key, valueOdd) should be(Some(2)) + VoldemortStorageBackend.size(key) should be(2) + peek(key, 0, 1)(0) should be(value) + peek(key, 1, 1)(0) should be(valueOdd) + dequeue(key).get should be(value) + VoldemortStorageBackend.size(key) should be(1) + dequeue(key).get should be(valueOdd) + VoldemortStorageBackend.size(key) should be(0) + dequeue(key) should be(None) + + + } + + + } object VoldemortStorageBackendSuite { diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bed9e9f933..74224d13a6 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -115,7 +115,6 @@ object RemoteServer { result } override def equals(that: Any): Boolean = { - that != null && that.isInstanceOf[Address] && that.asInstanceOf[Address].hostname == hostname && that.asInstanceOf[Address].port == port diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 2482e15b05..5ec8bb6344 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -202,7 +202,6 @@ object ActorSerialization { lifeCycle, supervisor, hotswap, - classLoader, // TODO: should we fall back to getClass.getClassLoader? factory) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]] @@ -341,7 +340,7 @@ object TypedActorSerialization { proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = { val init = AspectInitRegistry.initFor(proxy) - if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.") + if (init eq null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.") SerializedTypedActorRefProtocol.newBuilder .setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format)) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 8b61b30600..e961b500f2 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -201,18 +201,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { def shouldRegisterAndUnregister { val actor1 = actorOf[RemoteActorSpecActorUnidirectional] server.register("my-service-1", actor1) - assert(server.actors().get("my-service-1") != null, "actor registered") + assert(server.actors().get("my-service-1") ne null, "actor registered") server.unregister("my-service-1") - assert(server.actors().get("my-service-1") == null, "actor unregistered") + assert(server.actors().get("my-service-1") eq null, "actor unregistered") } @Test def shouldRegisterAndUnregisterByUuid { val actor1 = actorOf[RemoteActorSpecActorUnidirectional] server.register("uuid:" + actor1.uuid, actor1) - assert(server.actorsByUuid().get(actor1.uuid.toString) != null, "actor registered") + assert(server.actorsByUuid().get(actor1.uuid.toString) ne null, "actor registered") server.unregister("uuid:" + actor1.uuid) - assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered") + assert(server.actorsByUuid().get(actor1.uuid) eq null, "actor unregistered") } } diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala index fb35965418..87233ab451 100644 --- a/akka-spring/src/main/scala/ActorFactoryBean.scala +++ b/akka-spring/src/main/scala/ActorFactoryBean.scala @@ -100,9 +100,9 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App } private[akka] def createTypedInstance() : AnyRef = { - if (interface == null || interface == "") throw new AkkaBeansException( + if ((interface eq null) || interface == "") throw new AkkaBeansException( "The 'interface' part of the 'akka:actor' element in the Spring config file can't be null or empty string") - if (implementation == null || implementation == "") throw new AkkaBeansException( + if ((implementation eq null) || implementation == "") throw new AkkaBeansException( "The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string") val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) @@ -121,7 +121,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App * Create an UntypedActor. */ private[akka] def createUntypedInstance() : ActorRef = { - if (implementation == null || implementation == "") throw new AkkaBeansException( + if ((implementation eq null) || implementation == "") throw new AkkaBeansException( "The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string") val actorRef = Actor.actorOf(implementation.toClass) if (timeout > 0) { @@ -199,11 +199,11 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App config } - private[akka] def isRemote = (host != null) && (!host.isEmpty) + private[akka] def isRemote = (host ne null) && (!host.isEmpty) private[akka] def hasDispatcher = - (dispatcher != null) && - (dispatcher.dispatcherType != null) && + (dispatcher ne null) && + (dispatcher.dispatcherType ne null) && (!dispatcher.dispatcherType.isEmpty) /** diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 0947c6f944..e8048d1cd2 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -28,18 +28,18 @@ trait ActorParser extends BeanParser with DispatcherParser { val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG) - if (remoteElement != null) { + if (remoteElement ne null) { objectProperties.host = mandatory(remoteElement, HOST) objectProperties.port = mandatory(remoteElement, PORT) - objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED)) + objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) ne null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED)) val serviceName = remoteElement.getAttribute(SERVICE_NAME) - if ((serviceName != null) && (!serviceName.isEmpty)) { + if ((serviceName ne null) && (!serviceName.isEmpty)) { objectProperties.serviceName = serviceName objectProperties.serverManaged = true } } - if (dispatcherElement != null) { + if (dispatcherElement ne null) { val dispatcherProperties = parseDispatcher(dispatcherElement) objectProperties.dispatcher = dispatcherProperties } @@ -108,7 +108,7 @@ trait BeanParser extends Logging { * @param attribute name of the mandatory attribute */ def mandatory(element: Element, attribute: String): String = { - if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { + if ((element.getAttribute(attribute) eq null) || (element.getAttribute(attribute).isEmpty)) { throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) } else { element.getAttribute(attribute) @@ -122,7 +122,7 @@ trait BeanParser extends Logging { */ def mandatoryElement(element: Element, childName: String): Element = { val childElement = DomUtils.getChildElementByTagName(element, childName); - if (childElement == null) { + if (childElement eq null) { throw new IllegalArgumentException("Mandatory element missing: ''") } else { childElement @@ -150,7 +150,7 @@ trait DispatcherParser extends BeanParser { if (hasRef(element)) { val ref = element.getAttribute(REF) dispatcherElement = element.getOwnerDocument.getElementById(ref) - if (dispatcherElement == null) { + if (dispatcherElement eq null) { throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") } } @@ -173,7 +173,7 @@ trait DispatcherParser extends BeanParser { } val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); - if (threadPoolElement != null) { + if (threadPoolElement ne null) { if (properties.dispatcherType == THREAD_BASED) { throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") } @@ -220,7 +220,7 @@ trait DispatcherParser extends BeanParser { def hasRef(element: Element): Boolean = { val ref = element.getAttribute(REF) - (ref != null) && !ref.isEmpty + (ref ne null) && !ref.isEmpty } } diff --git a/akka-spring/src/main/scala/ConfiggyPropertyPlaceholderConfigurer.scala b/akka-spring/src/main/scala/ConfiggyPropertyPlaceholderConfigurer.scala index 411c36d86d..1360b62d9c 100644 --- a/akka-spring/src/main/scala/ConfiggyPropertyPlaceholderConfigurer.scala +++ b/akka-spring/src/main/scala/ConfiggyPropertyPlaceholderConfigurer.scala @@ -18,7 +18,7 @@ class ConfiggyPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigure * @param configgyResource akka.conf */ override def setLocation(configgyResource: Resource) { - if (configgyResource == null) throw new IllegalArgumentException("Property 'config' must be set") + if (configgyResource eq null) throw new IllegalArgumentException("Property 'config' must be set") val properties = loadAkkaConfig(configgyResource) setProperties(properties) } diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index 4d13fa6814..34a3a012ea 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -35,7 +35,7 @@ object DispatcherFactoryBean { case _ => throw new IllegalArgumentException("unknown dispatcher type") } // build threadpool - if ((properties.threadPool != null) && (properties.threadPool.queue != null)) { + if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) { var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder] threadPoolBuilder = properties.threadPool.queue match { case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness) @@ -59,7 +59,7 @@ object DispatcherFactoryBean { if (properties.threadPool.mailboxCapacity > -1) { threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity) } - if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { + if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { case "abort-policy" => new AbortPolicy() case "caller-runs-policy" => new CallerRunsPolicy() diff --git a/akka-spring/src/main/scala/StringReflect.scala b/akka-spring/src/main/scala/StringReflect.scala index 9e8cab8172..c0c8aab9ff 100644 --- a/akka-spring/src/main/scala/StringReflect.scala +++ b/akka-spring/src/main/scala/StringReflect.scala @@ -17,7 +17,7 @@ object StringReflect { * @author michaelkober */ class StringReflect(val self: String) { - if (self == null || self == "") throw new IllegalArgumentException("Class name can't be null or empty string [" + self + "]") + if ((self eq null) || self == "") throw new IllegalArgumentException("Class name can't be null or empty string [" + self + "]") def toClass[T <: AnyRef]: Class[T] = { val clazz = Class.forName(self) clazz.asInstanceOf[Class[T]] diff --git a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala index cc88e39f91..164018f588 100644 --- a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala @@ -33,11 +33,11 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser val strategyElement = mandatoryElement(element, STRATEGY_TAG) val typedActorsElement = DomUtils.getChildElementByTagName(element, TYPED_ACTORS_TAG) val untypedActorsElement = DomUtils.getChildElementByTagName(element, UNTYPED_ACTORS_TAG) - if ((typedActorsElement == null) && (untypedActorsElement == null)) { + if ((typedActorsElement eq null) && (untypedActorsElement eq null)) { throw new IllegalArgumentException("One of 'akka:typed-actors' or 'akka:untyped-actors' needed.") } parseRestartStrategy(strategyElement, builder) - if (typedActorsElement != null) { + if (typedActorsElement ne null) { builder.addPropertyValue("typed", AkkaSpringConfigurationTags.TYPED_ACTOR_TAG) parseTypedActorList(typedActorsElement, builder) } else { diff --git a/akka-spring/src/main/scala/SupervisionFactoryBean.scala b/akka-spring/src/main/scala/SupervisionFactoryBean.scala index a19b6fdeea..657a40c90a 100644 --- a/akka-spring/src/main/scala/SupervisionFactoryBean.scala +++ b/akka-spring/src/main/scala/SupervisionFactoryBean.scala @@ -57,8 +57,8 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { private[akka] def createComponent(props: ActorProperties): Component = { import StringReflect._ val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent()) - val isRemote = (props.host != null) && (!props.host.isEmpty) - val withInterface = (props.interface != null) && (!props.interface.isEmpty) + val isRemote = (props.host ne null) && (!props.host.isEmpty) + val withInterface = (props.interface ne null) && (!props.interface.isEmpty) if (isRemote) { //val remote = new RemoteAddress(props.host, props.port) val remote = new RemoteAddress(props.host, props.port.toInt) @@ -82,7 +82,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { private[akka] def createSupervise(props: ActorProperties): Server = { import StringReflect._ val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent()) - val isRemote = (props.host != null) && (!props.host.isEmpty) + val isRemote = (props.host ne null) && (!props.host.isEmpty) val actorRef = Actor.actorOf(props.target.toClass) if (props.timeout > 0) { actorRef.setTimeout(props.timeout) diff --git a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala index 9dfb5bce94..85b233e034 100644 --- a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala @@ -24,7 +24,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { type="executor-based-event-driven" name="myDispatcher"/> var props = parser.parseDispatcher(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.dispatcherType === "executor-based-event-driven") assert(props.name === "myDispatcher") @@ -45,7 +45,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { keep-alive="2000" rejection-policy="caller-runs-policy"/> val props = parser.parseThreadPool(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.queue == "bounded-array-blocking-queue") assert(props.capacity == 100) assert(props.fairness) @@ -66,7 +66,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { keep-alive="1000"/> val props = parser.parseDispatcher(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.dispatcherType == "executor-based-event-driven") assert(props.name == "myDispatcher") assert(props.threadPool.corePoolSize == 2) @@ -97,7 +97,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { type="hawt" aggregate="false"/> var props = parser.parseDispatcher(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.dispatcherType === "hawt") assert(props.aggregate === false) } diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index a388bb418e..7d886c7fb0 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -47,7 +47,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { scenario("get a dispatcher via ref from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val pojo = context.getBean("typed-actor-with-dispatcher-ref").asInstanceOf[IMyPojo] - assert(pojo != null) + assert(pojo ne null) } scenario("get a executor-event-driven-dispatcher with blocking-queue with unbounded capacity from context") { @@ -99,7 +99,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher] - assert(dispatcher != null) + assert(dispatcher ne null) assert(dispatcher.name === "akka:event-driven-work-stealing:dispatcher:workStealingDispatcher") val executor = getThreadPoolExecutorAndAssert(dispatcher) assert(executor.getQueue().isInstanceOf[BlockingQueue[Runnable]]) @@ -108,7 +108,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { scenario("get a hawt-dispatcher from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val dispatcher = context.getBean("hawt-dispatcher").asInstanceOf[HawtDispatcher] - assert(dispatcher != null) + assert(dispatcher ne null) assert(dispatcher.toString === "HawtDispatchEventDrivenDispatcher") assert(dispatcher.aggregate === false) } @@ -116,7 +116,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { scenario("get a thread-based-dispatcher for typed actor from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val pojo = context.getBean("typed-actor-with-thread-based-dispatcher").asInstanceOf[IMyPojo] - assert(pojo != null) + assert(pojo ne null) } scenario("get a thread-based-dispatcher for untyped from context") { @@ -138,7 +138,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { val field = pool.getClass.getDeclaredField("se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder") field.setAccessible(true) val executor = field.get(pool).asInstanceOf[ThreadPoolExecutor] - assert(executor != null) + assert(executor ne null) executor; } diff --git a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala index fd9ad3e3bd..15734fc9fa 100644 --- a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala @@ -28,7 +28,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { it("should be able to parse typed actor configuration") { val props = parser.parseActor(createTypedActorElement); - assert(props != null) + assert(props ne null) assert(props.timeout == 1000) assert(props.target == "foo.bar.MyPojo") assert(props.transactional) @@ -37,7 +37,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { it("should parse the supervisor restart strategy") { parser.parseSupervisor(createSupervisorElement, builder); val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[RestartStrategy] - assert(strategy != null) + assert(strategy ne null) assert(strategy.scheme match { case x:AllForOne => true case _ => false }) @@ -48,7 +48,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { it("should parse the supervised typed actors") { parser.parseSupervisor(createSupervisorElement, builder); val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActorProperties]] - assert(supervised != null) + assert(supervised ne null) expect(4) { supervised.length } val iterator = supervised.iterator val prop1 = iterator.next diff --git a/akka-spring/src/test/scala/SupervisorSpringFeatureTest.scala b/akka-spring/src/test/scala/SupervisorSpringFeatureTest.scala index 1a35451315..89a779039c 100644 --- a/akka-spring/src/test/scala/SupervisorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/SupervisorSpringFeatureTest.scala @@ -34,11 +34,11 @@ class SupervisorSpringFeatureTest extends FeatureSpec with ShouldMatchers { val myConfigurator = context.getBean("supervision1").asInstanceOf[TypedActorConfigurator] // get TypedActors val foo = myConfigurator.getInstance(classOf[IFoo]) - assert(foo != null) + assert(foo ne null) val bar = myConfigurator.getInstance(classOf[IBar]) - assert(bar != null) + assert(bar ne null) val pojo = myConfigurator.getInstance(classOf[IMyPojo]) - assert(pojo != null) + assert(pojo ne null) } scenario("get a supervisor for untyped actors from context") { @@ -51,7 +51,7 @@ class SupervisorSpringFeatureTest extends FeatureSpec with ShouldMatchers { val context = new ClassPathXmlApplicationContext("/supervisor-config.xml") val myConfigurator = context.getBean("supervision-with-dispatcher").asInstanceOf[TypedActorConfigurator] val foo = myConfigurator.getInstance(classOf[IFoo]) - assert(foo != null) + assert(foo ne null) } } } diff --git a/akka-spring/src/test/scala/TypedActorBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/TypedActorBeanDefinitionParserTest.scala index 52663afe63..15ed97bd27 100644 --- a/akka-spring/src/test/scala/TypedActorBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/TypedActorBeanDefinitionParserTest.scala @@ -31,7 +31,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers { val props = parser.parseActor(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.timeout === 1000) assert(props.target === "foo.bar.MyPojo") assert(props.transactional) @@ -53,7 +53,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers { val props = parser.parseActor(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.dispatcher.dispatcherType === "thread-based") } @@ -63,7 +63,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers { val props = parser.parseActor(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.host === "com.some.host") assert(props.port === "9999") assert(!props.serverManaged) @@ -75,7 +75,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers { val props = parser.parseActor(dom(xml).getDocumentElement); - assert(props != null) + assert(props ne null) assert(props.host === "com.some.host") assert(props.port === "9999") assert(props.serviceName === "my-service") diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index a4c7ddada1..7b35abfa57 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -543,7 +543,7 @@ object TypedActor extends Logging { } def isTransactional(clazz: Class[_]): Boolean = { - if (clazz == null) false + if (clazz eq null) false else if (clazz.isAssignableFrom(classOf[TypedTransactor])) true else isTransactional(clazz.getSuperclass) }