Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-09-29 09:48:57 +02:00
commit bd95d20edb
29 changed files with 246 additions and 105 deletions

View file

@ -630,7 +630,6 @@ trait ActorRef extends
override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[ActorRef] &&
that.asInstanceOf[ActorRef].uuid == uuid
}
@ -667,9 +666,6 @@ class LocalActorRef private[akka](
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[Uuid, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
@volatile private var isDeserialized = false
@volatile private var loader: Option[ClassLoader] = None
@volatile private var maxNrOfRetriesCount: Int = 0
@volatile private var restartsWithinTimeRangeTimestamp: Long = 0L
@volatile private var _mailbox: AnyRef = _
@ -680,7 +676,8 @@ class LocalActorRef private[akka](
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization && !isDeserialized) initializeActorInstance
//If it was started inside "newActor", initialize it
if (isRunning) initializeActorInstance
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
@ -696,11 +693,8 @@ class LocalActorRef private[akka](
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader,
__factory: () => Actor) = {
this(__factory)
loader = Some(__loader)
isDeserialized = true
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
@ -818,7 +812,6 @@ class LocalActorRef private[akka](
}
_status = ActorRefStatus.RUNNING
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
this
}
@ -1302,7 +1295,7 @@ class LocalActorRef private[akka](
} catch {
case e: NoSuchFieldException =>
val parent = clazz.getSuperclass
if (parent != null) findActorSelfField(parent)
if (parent ne null) findActorSelfField(parent)
else throw new IllegalActorStateException(
toString + " is not an Actor since it have not mixed in the 'Actor' trait")
}

View file

@ -33,7 +33,7 @@ object Config {
val HOME = {
val systemHome = System.getenv("AKKA_HOME")
if (systemHome == null || systemHome.length == 0 || systemHome == ".") {
if ((systemHome eq null) || systemHome.length == 0 || systemHome == ".") {
val optionHome = System.getProperty("akka.home", "")
if (optionHome.length != 0) Some(optionHome)
else None
@ -52,7 +52,7 @@ object Config {
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (getClass.getClassLoader.getResource("akka.conf") != null) {
} else if (getClass.getClassLoader.getResource("akka.conf") ne null) {
try {
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
ConfigLogger.log.info("Config loaded from the application classpath.")

View file

@ -116,6 +116,10 @@ class ExecutorBasedEventDrivenDispatcher(
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (nextMessage.receiver.isBeingRestarted)
return !self.isEmpty
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= throughput) ||

View file

@ -75,33 +75,36 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
var lockAcquiredOnce = false
var mailboxWasProcessed = false
// this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock
do {
if (mailbox.dispatcherLock.tryLock) {
lockAcquiredOnce = true
try {
processMailbox(mailbox)
mailboxWasProcessed = processMailbox(mailbox)
} finally {
mailbox.dispatcherLock.unlock
}
}
} while ((lockAcquiredOnce && !mailbox.isEmpty))
} while ((mailboxWasProcessed && !mailbox.isEmpty))
lockAcquiredOnce
mailboxWasProcessed
}
/**
* Process the messages in the mailbox of the given actor.
* @return
*/
private def processMailbox(mailbox: MessageQueue) = {
private def processMailbox(mailbox: MessageQueue): Boolean = {
var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) {
messageInvocation.invoke
if (messageInvocation.receiver.isBeingRestarted)
return false
messageInvocation = mailbox.dequeue
}
true
}
private def findThief(receiver: ActorRef): Option[ActorRef] = {

View file

@ -39,7 +39,6 @@ final class MessageInvocation(val receiver: ActorRef,
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[MessageInvocation] &&
that.asInstanceOf[MessageInvocation].receiver.actor == receiver.actor &&
that.asInstanceOf[MessageInvocation].message == message

View file

@ -165,7 +165,6 @@ object Transaction {
}
*/
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[Transaction] &&
that.asInstanceOf[Transaction].id == this.id
}

View file

@ -11,7 +11,7 @@ import java.security.MessageDigest
*/
object Helpers extends Logging {
implicit def null2Option[T](t: T): Option[T] = if (t != null) Some(t) else None
implicit def null2Option[T](t: T): Option[T] = Option(t)
def intToBytes(value: Int): Array[Byte] = {
val bytes = new Array[Byte](4)
@ -41,7 +41,7 @@ object Helpers extends Logging {
* if the actual type is not assignable from the given one.
*/
def narrow[T](o: Option[Any]): Option[T] = {
require(o != null, "Option to be narrowed must not be null!")
require((o ne null), "Option to be narrowed must not be null!")
o.asInstanceOf[Option[T]]
}

View file

@ -104,7 +104,7 @@ class TypedActorInfo(context: CamelContext, clazz: Class[_], strategy: Parameter
}
}
val superclass = clazz.getSuperclass
if (superclass != null && !superclass.equals(classOf[AnyRef])) {
if ((superclass ne null) && !superclass.equals(classOf[AnyRef])) {
introspect(superclass)
}
}

View file

@ -207,7 +207,7 @@ trait AuthenticationActor[C <: Credentials] extends Actor {
//Turns the aforementioned header value into an option
def authOption(r: Req): Option[String] = {
val a = auth(r)
if (a != null && a.length > 0) Some(a) else None
if ((a ne null) && a.length > 0) Some(a) else None
}
}

View file

@ -221,7 +221,7 @@ trait TransactionProtocol extends Logging {
private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx)
private def fetchFromThreadLocal: Option[Transaction] = {
if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction])
if ((suspendedTx ne null) && (suspendedTx.get() ne null)) Some(suspendedTx.get.asInstanceOf[Transaction])
else None
}
}

View file

@ -71,6 +71,16 @@ trait Storage {
throw new UnsupportedOperationException
}
private[akka] object PersistentMap {
// operations on the Map
sealed trait Op
case object GET extends Op
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
case object CLR extends Op
}
/**
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
@ -83,13 +93,8 @@ trait Storage {
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
// operations on the Map
trait Op
case object GET extends Op
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
case object CLR extends Op
//Import Ops
import PersistentMap._
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
@ -362,17 +367,22 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
}
}
private[akka] object PersistentVector {
// operations on the Vector
sealed trait Op
case object ADD extends Op
case object UPD extends Op
case object POP extends Op
}
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
// operations on the Vector
trait Op
case object ADD extends Op
case object UPD extends Op
case object POP extends Op
//Import Ops
import PersistentVector._
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
@ -510,6 +520,13 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
}
}
private[akka] object PersistentQueue {
//Operations for PersistentQueue
sealed trait QueueOp
case object ENQ extends QueueOp
case object DEQ extends QueueOp
}
/**
* Implementation of <tt>PersistentQueue</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
@ -538,10 +555,8 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Abortable with Logging {
sealed trait QueueOp
case object ENQ extends QueueOp
case object DEQ extends QueueOp
//Import Ops
import PersistentQueue._
import scala.collection.immutable.Queue
// current trail that will be played on commit to the underlying store

View file

@ -21,6 +21,11 @@ import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
import collection.mutable.{Set, HashSet, ArrayBuffer}
import java.util.{Properties, Map => JMap}
/*
RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores
In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent
*/
private[akka] object VoldemortStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
@ -49,10 +54,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val vectorSizeIndex = getIndexedBytes(-1)
val queueHeadIndex = getIndexedBytes(-1)
val queueTailIndex = getIndexedBytes(-2)
case class QueueMetadata(head: Int, tail: Int) {
def size = tail - head
//worry about wrapping etc
}
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
@ -224,11 +226,24 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def remove(name: String): Boolean = {
false
val mdata = getQueueMetadata(name)
mdata.getActiveIndexes foreach {
index =>
queueClient.delete(getIndexedKey(name, index))
}
queueClient.delete(getKey(name, queueHeadIndex))
queueClient.delete(getKey(name, queueTailIndex))
}
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
List(Array.empty[Byte])
val mdata = getQueueMetadata(name)
val ret = mdata.getPeekIndexes(start, count).toList map {
index: Int => {
log.debug("peeking:" + index)
queueClient.getValue(getIndexedKey(name, index))
}
}
ret
}
def size(name: String): Int = {
@ -236,15 +251,37 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def dequeue(name: String): Option[Array[Byte]] = {
None
val mdata = getQueueMetadata(name)
if (mdata.canDequeue) {
val key = getIndexedKey(name, mdata.head)
try {
val dequeued = queueClient.getValue(key)
queueClient.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue))
Some(dequeued)
}
finally {
try {
queueClient.delete(key)
} catch {
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue")
}
}
} else {
None
}
}
def enqueue(name: String, item: Array[Byte]): Option[Int] = {
val mdata = getQueueMetadata(name)
val key = getIndexedKey(name, mdata.tail)
queueClient.put(key, item)
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
Some(mdata.size + 1)
if (mdata.canEnqueue) {
val key = getIndexedKey(name, mdata.tail)
queueClient.put(key, item)
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue))
Some(mdata.size + 1)
} else {
None
}
}
@ -307,7 +344,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def initStoreClients() = {
if (storeClientFactory != null) {
if (storeClientFactory ne null) {
storeClientFactory.close
}
@ -326,6 +363,60 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
queueClient = storeClientFactory.getStoreClient(queueStore)
}
case class QueueMetadata(head: Int, tail: Int) {
//queue is an sequence with indexes from 0 to Int.MAX_VALUE
//wraps around when one pointer gets to max value
//head has an element in it.
//tail is the next slot to write to.
def size = {
if (tail >= head) {
tail - head
} else {
//queue has wrapped
(Integer.MAX_VALUE - head) + (tail + 1)
}
}
def canEnqueue = {
//the -1 stops the tail from catching the head on a wrap around
size < Integer.MAX_VALUE - 1
}
def canDequeue = {size > 0}
def getActiveIndexes(): IndexedSeq[Int] = {
if (tail >= head) {
Range(head, tail)
} else {
//queue has wrapped
val headRange = Range.inclusive(head, Integer.MAX_VALUE)
(if (tail > 0) {headRange ++ Range(0, tail)} else {headRange})
}
}
def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = {
val indexes = getActiveIndexes
if (indexes.size < start)
{IndexedSeq.empty[Int]} else
{indexes.drop(start).take(count)}
}
def nextEnqueue = {
tail match {
case Integer.MAX_VALUE => 0
case _ => tail + 1
}
}
def nextDequeue = {
head match {
case Integer.MAX_VALUE => 0
case _ => head + 1
}
}
}
object IntSerializer {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE

View file

@ -8,6 +8,7 @@ import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
import se.scalablesolutions.akka.util.{Logging}
import collection.immutable.TreeSet
import VoldemortStorageBackendSuite._
import scala.None
@RunWith(classOf[JUnitRunner])
class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
@ -126,6 +127,44 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
}
test("Persistent Queue apis function as expected") {
val key = "queueApiKey"
val value = bytes("some bytes even")
val valueOdd = bytes("some bytes odd")
remove(key)
VoldemortStorageBackend.size(key) should be(0)
enqueue(key, value) should be(Some(1))
VoldemortStorageBackend.size(key) should be(1)
enqueue(key, valueOdd) should be(Some(2))
VoldemortStorageBackend.size(key) should be(2)
peek(key, 0, 1)(0) should be(value)
peek(key, 1, 1)(0) should be(valueOdd)
dequeue(key).get should be(value)
VoldemortStorageBackend.size(key) should be(1)
dequeue(key).get should be(valueOdd)
VoldemortStorageBackend.size(key) should be(0)
dequeue(key) should be(None)
queueClient.put(getKey(key, queueHeadIndex), IntSerializer.toBytes(Integer.MAX_VALUE))
queueClient.put(getKey(key, queueTailIndex), IntSerializer.toBytes(Integer.MAX_VALUE))
VoldemortStorageBackend.size(key) should be(0)
enqueue(key, value) should be(Some(1))
VoldemortStorageBackend.size(key) should be(1)
enqueue(key, valueOdd) should be(Some(2))
VoldemortStorageBackend.size(key) should be(2)
peek(key, 0, 1)(0) should be(value)
peek(key, 1, 1)(0) should be(valueOdd)
dequeue(key).get should be(value)
VoldemortStorageBackend.size(key) should be(1)
dequeue(key).get should be(valueOdd)
VoldemortStorageBackend.size(key) should be(0)
dequeue(key) should be(None)
}
}
object VoldemortStorageBackendSuite {

View file

@ -115,7 +115,6 @@ object RemoteServer {
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Address] &&
that.asInstanceOf[Address].hostname == hostname &&
that.asInstanceOf[Address].port == port

View file

@ -202,7 +202,6 @@ object ActorSerialization {
lifeCycle,
supervisor,
hotswap,
classLoader, // TODO: should we fall back to getClass.getClassLoader?
factory)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
@ -341,7 +340,7 @@ object TypedActorSerialization {
proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = {
val init = AspectInitRegistry.initFor(proxy)
if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
if (init eq null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
SerializedTypedActorRefProtocol.newBuilder
.setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format))

View file

@ -201,18 +201,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
def shouldRegisterAndUnregister {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("my-service-1", actor1)
assert(server.actors().get("my-service-1") != null, "actor registered")
assert(server.actors().get("my-service-1") ne null, "actor registered")
server.unregister("my-service-1")
assert(server.actors().get("my-service-1") == null, "actor unregistered")
assert(server.actors().get("my-service-1") eq null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
assert(server.actorsByUuid().get(actor1.uuid.toString) != null, "actor registered")
assert(server.actorsByUuid().get(actor1.uuid.toString) ne null, "actor registered")
server.unregister("uuid:" + actor1.uuid)
assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered")
assert(server.actorsByUuid().get(actor1.uuid) eq null, "actor unregistered")
}
}

View file

@ -100,9 +100,9 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
}
private[akka] def createTypedInstance() : AnyRef = {
if (interface == null || interface == "") throw new AkkaBeansException(
if ((interface eq null) || interface == "") throw new AkkaBeansException(
"The 'interface' part of the 'akka:actor' element in the Spring config file can't be null or empty string")
if (implementation == null || implementation == "") throw new AkkaBeansException(
if ((implementation eq null) || implementation == "") throw new AkkaBeansException(
"The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string")
val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
@ -121,7 +121,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
* Create an UntypedActor.
*/
private[akka] def createUntypedInstance() : ActorRef = {
if (implementation == null || implementation == "") throw new AkkaBeansException(
if ((implementation eq null) || implementation == "") throw new AkkaBeansException(
"The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string")
val actorRef = Actor.actorOf(implementation.toClass)
if (timeout > 0) {
@ -199,11 +199,11 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
config
}
private[akka] def isRemote = (host != null) && (!host.isEmpty)
private[akka] def isRemote = (host ne null) && (!host.isEmpty)
private[akka] def hasDispatcher =
(dispatcher != null) &&
(dispatcher.dispatcherType != null) &&
(dispatcher ne null) &&
(dispatcher.dispatcherType ne null) &&
(!dispatcher.dispatcherType.isEmpty)
/**

View file

@ -28,18 +28,18 @@ trait ActorParser extends BeanParser with DispatcherParser {
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG)
if (remoteElement != null) {
if (remoteElement ne null) {
objectProperties.host = mandatory(remoteElement, HOST)
objectProperties.port = mandatory(remoteElement, PORT)
objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED))
objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) ne null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED))
val serviceName = remoteElement.getAttribute(SERVICE_NAME)
if ((serviceName != null) && (!serviceName.isEmpty)) {
if ((serviceName ne null) && (!serviceName.isEmpty)) {
objectProperties.serviceName = serviceName
objectProperties.serverManaged = true
}
}
if (dispatcherElement != null) {
if (dispatcherElement ne null) {
val dispatcherProperties = parseDispatcher(dispatcherElement)
objectProperties.dispatcher = dispatcherProperties
}
@ -108,7 +108,7 @@ trait BeanParser extends Logging {
* @param attribute name of the mandatory attribute
*/
def mandatory(element: Element, attribute: String): String = {
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
if ((element.getAttribute(attribute) eq null) || (element.getAttribute(attribute).isEmpty)) {
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
} else {
element.getAttribute(attribute)
@ -122,7 +122,7 @@ trait BeanParser extends Logging {
*/
def mandatoryElement(element: Element, childName: String): Element = {
val childElement = DomUtils.getChildElementByTagName(element, childName);
if (childElement == null) {
if (childElement eq null) {
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
} else {
childElement
@ -150,7 +150,7 @@ trait DispatcherParser extends BeanParser {
if (hasRef(element)) {
val ref = element.getAttribute(REF)
dispatcherElement = element.getOwnerDocument.getElementById(ref)
if (dispatcherElement == null) {
if (dispatcherElement eq null) {
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
}
}
@ -173,7 +173,7 @@ trait DispatcherParser extends BeanParser {
}
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
if (threadPoolElement != null) {
if (threadPoolElement ne null) {
if (properties.dispatcherType == THREAD_BASED) {
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
}
@ -220,7 +220,7 @@ trait DispatcherParser extends BeanParser {
def hasRef(element: Element): Boolean = {
val ref = element.getAttribute(REF)
(ref != null) && !ref.isEmpty
(ref ne null) && !ref.isEmpty
}
}

View file

@ -18,7 +18,7 @@ class ConfiggyPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigure
* @param configgyResource akka.conf
*/
override def setLocation(configgyResource: Resource) {
if (configgyResource == null) throw new IllegalArgumentException("Property 'config' must be set")
if (configgyResource eq null) throw new IllegalArgumentException("Property 'config' must be set")
val properties = loadAkkaConfig(configgyResource)
setProperties(properties)
}

View file

@ -35,7 +35,7 @@ object DispatcherFactoryBean {
case _ => throw new IllegalArgumentException("unknown dispatcher type")
}
// build threadpool
if ((properties.threadPool != null) && (properties.threadPool.queue != null)) {
if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) {
var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder]
threadPoolBuilder = properties.threadPool.queue match {
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness)
@ -59,7 +59,7 @@ object DispatcherFactoryBean {
if (properties.threadPool.mailboxCapacity > -1) {
threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity)
}
if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
case "abort-policy" => new AbortPolicy()
case "caller-runs-policy" => new CallerRunsPolicy()

View file

@ -17,7 +17,7 @@ object StringReflect {
* @author michaelkober
*/
class StringReflect(val self: String) {
if (self == null || self == "") throw new IllegalArgumentException("Class name can't be null or empty string [" + self + "]")
if ((self eq null) || self == "") throw new IllegalArgumentException("Class name can't be null or empty string [" + self + "]")
def toClass[T <: AnyRef]: Class[T] = {
val clazz = Class.forName(self)
clazz.asInstanceOf[Class[T]]

View file

@ -33,11 +33,11 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
val strategyElement = mandatoryElement(element, STRATEGY_TAG)
val typedActorsElement = DomUtils.getChildElementByTagName(element, TYPED_ACTORS_TAG)
val untypedActorsElement = DomUtils.getChildElementByTagName(element, UNTYPED_ACTORS_TAG)
if ((typedActorsElement == null) && (untypedActorsElement == null)) {
if ((typedActorsElement eq null) && (untypedActorsElement eq null)) {
throw new IllegalArgumentException("One of 'akka:typed-actors' or 'akka:untyped-actors' needed.")
}
parseRestartStrategy(strategyElement, builder)
if (typedActorsElement != null) {
if (typedActorsElement ne null) {
builder.addPropertyValue("typed", AkkaSpringConfigurationTags.TYPED_ACTOR_TAG)
parseTypedActorList(typedActorsElement, builder)
} else {

View file

@ -57,8 +57,8 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
private[akka] def createComponent(props: ActorProperties): Component = {
import StringReflect._
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val isRemote = (props.host != null) && (!props.host.isEmpty)
val withInterface = (props.interface != null) && (!props.interface.isEmpty)
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val withInterface = (props.interface ne null) && (!props.interface.isEmpty)
if (isRemote) {
//val remote = new RemoteAddress(props.host, props.port)
val remote = new RemoteAddress(props.host, props.port.toInt)
@ -82,7 +82,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
private[akka] def createSupervise(props: ActorProperties): Server = {
import StringReflect._
val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
val isRemote = (props.host != null) && (!props.host.isEmpty)
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val actorRef = Actor.actorOf(props.target.toClass)
if (props.timeout > 0) {
actorRef.setTimeout(props.timeout)

View file

@ -24,7 +24,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
type="executor-based-event-driven"
name="myDispatcher"/>
var props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.dispatcherType === "executor-based-event-driven")
assert(props.name === "myDispatcher")
@ -45,7 +45,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
keep-alive="2000"
rejection-policy="caller-runs-policy"/>
val props = parser.parseThreadPool(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.queue == "bounded-array-blocking-queue")
assert(props.capacity == 100)
assert(props.fairness)
@ -66,7 +66,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
keep-alive="1000"/>
</akka:dispatcher>
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.dispatcherType == "executor-based-event-driven")
assert(props.name == "myDispatcher")
assert(props.threadPool.corePoolSize == 2)
@ -97,7 +97,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
type="hawt"
aggregate="false"/>
var props = parser.parseDispatcher(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.dispatcherType === "hawt")
assert(props.aggregate === false)
}

View file

@ -47,7 +47,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a dispatcher via ref from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val pojo = context.getBean("typed-actor-with-dispatcher-ref").asInstanceOf[IMyPojo]
assert(pojo != null)
assert(pojo ne null)
}
scenario("get a executor-event-driven-dispatcher with blocking-queue with unbounded capacity from context") {
@ -99,7 +99,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]
assert(dispatcher != null)
assert(dispatcher ne null)
assert(dispatcher.name === "akka:event-driven-work-stealing:dispatcher:workStealingDispatcher")
val executor = getThreadPoolExecutorAndAssert(dispatcher)
assert(executor.getQueue().isInstanceOf[BlockingQueue[Runnable]])
@ -108,7 +108,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a hawt-dispatcher from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("hawt-dispatcher").asInstanceOf[HawtDispatcher]
assert(dispatcher != null)
assert(dispatcher ne null)
assert(dispatcher.toString === "HawtDispatchEventDrivenDispatcher")
assert(dispatcher.aggregate === false)
}
@ -116,7 +116,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
scenario("get a thread-based-dispatcher for typed actor from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val pojo = context.getBean("typed-actor-with-thread-based-dispatcher").asInstanceOf[IMyPojo]
assert(pojo != null)
assert(pojo ne null)
}
scenario("get a thread-based-dispatcher for untyped from context") {
@ -138,7 +138,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
val field = pool.getClass.getDeclaredField("se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder")
field.setAccessible(true)
val executor = field.get(pool).asInstanceOf[ThreadPoolExecutor]
assert(executor != null)
assert(executor ne null)
executor;
}

View file

@ -28,7 +28,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
it("should be able to parse typed actor configuration") {
val props = parser.parseActor(createTypedActorElement);
assert(props != null)
assert(props ne null)
assert(props.timeout == 1000)
assert(props.target == "foo.bar.MyPojo")
assert(props.transactional)
@ -37,7 +37,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
it("should parse the supervisor restart strategy") {
parser.parseSupervisor(createSupervisorElement, builder);
val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[RestartStrategy]
assert(strategy != null)
assert(strategy ne null)
assert(strategy.scheme match {
case x:AllForOne => true
case _ => false })
@ -48,7 +48,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
it("should parse the supervised typed actors") {
parser.parseSupervisor(createSupervisorElement, builder);
val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActorProperties]]
assert(supervised != null)
assert(supervised ne null)
expect(4) { supervised.length }
val iterator = supervised.iterator
val prop1 = iterator.next

View file

@ -34,11 +34,11 @@ class SupervisorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
val myConfigurator = context.getBean("supervision1").asInstanceOf[TypedActorConfigurator]
// get TypedActors
val foo = myConfigurator.getInstance(classOf[IFoo])
assert(foo != null)
assert(foo ne null)
val bar = myConfigurator.getInstance(classOf[IBar])
assert(bar != null)
assert(bar ne null)
val pojo = myConfigurator.getInstance(classOf[IMyPojo])
assert(pojo != null)
assert(pojo ne null)
}
scenario("get a supervisor for untyped actors from context") {
@ -51,7 +51,7 @@ class SupervisorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
val context = new ClassPathXmlApplicationContext("/supervisor-config.xml")
val myConfigurator = context.getBean("supervision-with-dispatcher").asInstanceOf[TypedActorConfigurator]
val foo = myConfigurator.getInstance(classOf[IFoo])
assert(foo != null)
assert(foo ne null)
}
}
}

View file

@ -31,7 +31,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:typed-actor>
val props = parser.parseActor(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.timeout === 1000)
assert(props.target === "foo.bar.MyPojo")
assert(props.transactional)
@ -53,7 +53,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
<akka:dispatcher type="thread-based" name="my-thread-based-dispatcher"/>
</akka:typed-actor>
val props = parser.parseActor(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.dispatcher.dispatcherType === "thread-based")
}
@ -63,7 +63,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
<akka:remote host="com.some.host" port="9999"/>
</akka:typed-actor>
val props = parser.parseActor(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.host === "com.some.host")
assert(props.port === "9999")
assert(!props.serverManaged)
@ -75,7 +75,7 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
<akka:remote host="com.some.host" port="9999" service-name="my-service"/>
</akka:typed-actor>
val props = parser.parseActor(dom(xml).getDocumentElement);
assert(props != null)
assert(props ne null)
assert(props.host === "com.some.host")
assert(props.port === "9999")
assert(props.serviceName === "my-service")

View file

@ -543,7 +543,7 @@ object TypedActor extends Logging {
}
def isTransactional(clazz: Class[_]): Boolean = {
if (clazz == null) false
if (clazz eq null) false
else if (clazz.isAssignableFrom(classOf[TypedTransactor])) true
else isTransactional(clazz.getSuperclass)
}