Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-09-06 14:03:58 +02:00
commit 4e4786972a
48 changed files with 589 additions and 490 deletions

View file

@ -73,7 +73,6 @@ trait ActorRef extends
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[akka] val guard = new ReentrantGuard

View file

@ -11,6 +11,7 @@ import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import se.scalablesolutions.akka.util.ListenerManagement
import annotation.tailrec
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -35,10 +36,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*/
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]]
private val Naught = Array[ActorRef]() //Nil for Arrays
private val actorsById = new Index[String,ActorRef]
/**
* Returns all actors in the system.
*/
@ -108,11 +107,7 @@ object ActorRegistry extends ListenerManagement {
/**
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): Array[ActorRef] = {
val set = actorsById get id
if (set ne null) set toArray Naught
else Naught
}
def actorsFor(id: String): Array[ActorRef] = actorsById values id
/**
* Finds the actor that has a specific UUID.
@ -124,18 +119,7 @@ object ActorRegistry extends ListenerManagement {
*/
def register(actor: ActorRef) = {
// ID
val id = actor.id
if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor)
val set = actorsById get id
if (set ne null) set add actor
else {
val newSet = new ConcurrentSkipListSet[ActorRef]
newSet add actor
val oldSet = actorsById.putIfAbsent(id,newSet)
// Parry for two simultaneous putIfAbsent(id,newSet)
if (oldSet ne null) oldSet add actor
}
actorsById.put(actor.id, actor)
// UUID
actorsByUUID.put(actor.uuid, actor)
@ -150,10 +134,7 @@ object ActorRegistry extends ListenerManagement {
def unregister(actor: ActorRef) = {
actorsByUUID remove actor.uuid
val set = actorsById get actor.id
if (set ne null) set remove actor
//FIXME: safely remove set if empty, leaks memory
actorsById.remove(actor.id,actor)
// notify listeners
foreachListener(_ ! ActorUnregistered(actor))
@ -170,3 +151,74 @@ object ActorRegistry extends ListenerManagement {
log.info("All actors have been shut down and unregistered from ActorRegistry")
}
}
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
import scala.collection.JavaConversions._
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
def put(key: K, value: V) {
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized {
if (!set.isEmpty) {
set add v
false
} else true
}
}
@tailrec def syncPut(k: K, v: V): Boolean = {
var retry = false
val set = container get k
if (set ne null) retry = tryPut(set,v)
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null)
retry = tryPut(oldSet,v)
}
if (retry) syncPut(k,v)
else true
}
syncPut(key,value)
}
def values(key: K) = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
}
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
def remove(key: K, value: V) {
val set = container get key
if (set ne null) {
set.synchronized {
set remove value
if (set.isEmpty)
container remove key
}
}
}
def clear = container.clear
}

View file

@ -177,23 +177,8 @@ object Dispatchers extends Logging {
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", UUID.newUuid.toString)
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher foreach {
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
def threadPoolConfig(b: ThreadPoolBuilder) {
b.configureIfPossible( builder => {
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
@ -209,7 +194,20 @@ object Dispatchers extends Logging {
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(builder.setRejectionPolicy(_))
})
case _ =>
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig)
case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher

View file

@ -65,10 +65,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
class ExecutorBasedEventDrivenDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder {
capacity: Int = Dispatchers.MAILBOX_CAPACITY,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
mailboxCapacity = capacity
@ -163,5 +165,9 @@ class ExecutorBasedEventDrivenDispatcher(
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}

View file

@ -31,7 +31,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateExcept
*/
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder {
capacity: Int = Dispatchers.MAILBOX_CAPACITY,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, capacity: Int) = this(_name,capacity, _ => ())
mailboxCapacity = capacity
@volatile private var active: Boolean = false
@ -180,7 +184,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]

View file

@ -96,7 +96,7 @@ trait MessageDispatcher extends Logging {
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actorRef: ActorRef):Int = 0
def mailboxSize(actorRef: ActorRef):Int
/**
* Creates and returns a mailbox for the given actor

View file

@ -11,6 +11,7 @@
package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, List}
import se.scalablesolutions.akka.actor.ActorRef
class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
@ -39,6 +40,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
selectorThread.start
}
def mailboxSize(a: ActorRef) = 0
def isShutdown = !active
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"

View file

@ -7,8 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
import java.util.{HashSet, HashMap, LinkedList, List}
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
@ -63,16 +62,18 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
with ThreadPoolBuilder {
def this(_name: String) = this(_name,_ => ())
private var fair = true
private val busyActors = new HashSet[AnyRef]
private val messageDemultiplexer = new Demultiplexer(queue)
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
init
def start = if (!active) {
log.debug("Starting up %s", toString)
@ -139,6 +140,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
else nrOfBusyMessages < 100
}
def mailboxSize(a: ActorRef) = 0
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
@ -164,4 +167,10 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
def wakeUp = messageQueue.interrupt
}
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}

View file

@ -44,6 +44,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox append invocation
def start = if (!active) {
@ -73,14 +75,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] =>
trait ThreadMessageQueue extends MessageQueue with TransferQueue[MessageInvocation] {
final def append(invocation: MessageInvocation): Unit = {
if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
}
}
final def next: MessageInvocation = self.take
final def next: MessageInvocation = take
}

View file

@ -56,7 +56,6 @@ trait ThreadPoolBuilder extends Logging {
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
inProcessOfBuilding = false
blockingQueue = queue
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.routing
import se.scalablesolutions.akka.actor.ActorRef
import scala.collection.JavaConversions._
/**
* An Iterator that is either always empty or yields an infinite number of Ts.
@ -15,6 +16,8 @@ trait InfiniteIterator[T] extends Iterator[T]
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
*/
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
def this(items: java.util.List[T]) = this(items.toList)
@volatile private[this] var current: List[T] = items
def hasNext = items != Nil
@ -34,6 +37,7 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
* useful for work-stealing.
*/
class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
def this(items: java.util.List[ActorRef]) = this(items.toList)
def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.routing
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.{UntypedActor, Actor, ActorRef}
/**
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
@ -26,6 +26,25 @@ trait Dispatcher { this: Actor =>
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedDispatcher is a trait whose purpose is to route incoming messages to actors.
*/
abstract class UntypedDispatcher extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any) : ActorRef
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
def onMessage(msg: Any) : Unit = {
val r = route(msg)
if(r eq null)
throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
@ -37,3 +56,17 @@ trait LoadBalancer extends Dispatcher { self: Actor =>
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedDispatcher {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
}

View file

@ -219,37 +219,35 @@ class ActorRegistrySpec extends JUnitSuite {
@Test def shouldBeAbleToRegisterActorsConcurrently {
ActorRegistry.shutdownAll
val latch = new CountDownLatch(3)
val barrier = new CyclicBarrier(3)
def mkTestActor(i:Int) = actorOf( new Actor {
def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor {
self.id = i.toString
def receive = { case _ => }
})
def mkTestActors = for(i <- 1 to 10;j <- 1 to 1000) yield mkTestActor(i)
val latch = new CountDownLatch(3)
val barrier = new CyclicBarrier(3)
def mkThread(actors: Iterable[ActorRef]) = new Thread {
start
this.start
override def run {
barrier.await
actors foreach { _.start }
latch.countDown
}
}
val a1,a2,a3 = mkTestActors
val t1 = mkThread(a1)
val t2 = mkThread(a2)
val t3 = mkThread(a3)
val testActors1 = mkTestActors
val testActors2 = mkTestActors
val testActors3 = mkTestActors
mkThread(testActors1)
mkThread(testActors2)
mkThread(testActors3)
assert(latch.await(30,TimeUnit.SECONDS) === true)
for(i <- 1 to 10) {
assert(ActorRegistry.actorsFor(i.toString).length === 3000)
val theId = i.toString
val actors = ActorRegistry.actorsFor(theId).toSet
for(a <- actors if a.id == theId) assert(actors contains a)
assert(actors.size === 9000)
}
}
}

View file

@ -38,7 +38,7 @@ trait RefStorageBackend[T] extends StorageBackend {
// for Queue
trait QueueStorageBackend[T] extends StorageBackend {
// add to the end of the queue
def enqueue(name: String, item: T): Boolean
def enqueue(name: String, item: T): Option[Int]
// pop from the front of the queue
def dequeue(name: String): Option[T]

View file

@ -11,34 +11,17 @@ import se.scalablesolutions.akka.config.Config.config
import com.redis._
trait Base64Encoder {
def encode(bytes: Array[Byte]): Array[Byte]
def decode(bytes: Array[Byte]): Array[Byte]
}
trait Base64StringEncoder {
def byteArrayToString(bytes: Array[Byte]): String
def stringToByteArray(str: String): Array[Byte]
}
trait NullBase64 {
def encode(bytes: Array[Byte]): Array[Byte] = bytes
def decode(bytes: Array[Byte]): Array[Byte] = bytes
}
object CommonsCodec {
import org.apache.commons.codec.binary.Base64
import org.apache.commons.codec.binary.Base64._
val b64 = new Base64(true)
trait CommonsCodecBase64 {
def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
}
object Base64Encoder extends Base64Encoder with CommonsCodecBase64
trait CommonsCodecBase64StringEncoder {
def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes)
def stringToByteArray(str: String) = b64.decode(str)
@ -48,7 +31,6 @@ object CommonsCodec {
}
import CommonsCodec._
import CommonsCodec.Base64Encoder._
import CommonsCodec.Base64StringEncoder._
/**
@ -94,27 +76,7 @@ private [akka] object RedisStorageBackend extends
/**
* Map storage in Redis.
* <p/>
* Maps are stored as key/value pairs in redis. <i>Redis keys cannot contain spaces</i>. But with
* our use case, the keys will be specified by the user. Hence we need to encode the key
* ourselves before sending to Redis. We use base64 encoding.
* <p/>
* Also since we are storing the key/value in the global namespace, we need to construct the
* key suitably so as to avoid namespace clash. The following strategy is used:
*
* Unique identifier for the map = T1 (say)
* <pre>
* Map(
* "debasish.address" -> "kolkata, India",
* "debasish.company" -> "anshinsoft",
* "debasish.programming_language" -> "scala",
* )</pre>
* will be stored as the following key-value pair in Redis:
*
* <i>
* base64(T1):base64("debasish.address") -> "kolkata, India"
* base64(T1):base64("debasish.company") -> "anshinsoft"
* base64(T1):base64("debasish.programming_language") -> "scala"
* </i>
* Maps are stored as key/value pairs in redis.
*/
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling {
insertMapStorageEntriesFor(name, List((key, value)))
@ -134,12 +96,12 @@ private [akka] object RedisStorageBackend extends
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
*/
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
"%s:%s".format(name, byteArrayToString(key))
}
private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
(nk(0), nk(1))
val nk = redisKey.split(':')
(nk(0), stringToByteArray(nk(1)))
}
private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
@ -149,11 +111,11 @@ private [akka] object RedisStorageBackend extends
}
def removeMapStorageFor(name: String): Unit = withErrorHandling {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
db.keys("%s:*".format(name)) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(keys) =>
keys.foreach(db.del(_))
keys.foreach(k => db.del(k.get))
}
}
@ -170,19 +132,18 @@ private [akka] object RedisStorageBackend extends
}
def getMapStorageSizeFor(name: String): Int = withErrorHandling {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
db.keys("%s:*".format(name)) match {
case None => 0
case Some(keys) =>
keys.length
case Some(keys) => keys.length
}
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
db.keys("%s:*".format(name)) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(keys) =>
keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList
keys.map(key => (makeKeyFromRedisKey(key.get)._2, stringToByteArray(db.get(key.get).get))).toList
}
}
@ -234,7 +195,7 @@ private [akka] object RedisStorageBackend extends
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
db.lpush(new String(encode(name.getBytes)), byteArrayToString(element))
db.lpush(name, byteArrayToString(element))
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
@ -242,11 +203,11 @@ private [akka] object RedisStorageBackend extends
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem))
db.lset(name, index, byteArrayToString(elem))
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
db.lindex(new String(encode(name.getBytes)), index) match {
db.lindex(name, index) match {
case None =>
throw new NoSuchElementException(name + " does not have element at " + index)
case Some(e) =>
@ -270,33 +231,28 @@ private [akka] object RedisStorageBackend extends
else count
if (s == 0 && cnt == 0) List()
else
db.lrange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
db.lrange(name, s, s + cnt - 1) match {
case None =>
throw new NoSuchElementException(name + " does not have elements in the range specified")
case Some(l) =>
l map ( e => stringToByteArray(e.get))
l map (e => stringToByteArray(e.get))
}
}
def getVectorStorageSizeFor(name: String): Int = withErrorHandling {
db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) =>
l
}
db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") }
}
def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
db.set(new String(encode(name.getBytes)), byteArrayToString(element))
db.set(name, byteArrayToString(element))
}
def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling {
db.set(new String(encode(name.getBytes)), element)
db.set(name, element)
}
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
db.get(new String(encode(name.getBytes))) match {
db.get(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) => Some(stringToByteArray(s))
@ -304,13 +260,13 @@ private [akka] object RedisStorageBackend extends
}
// add to the end of the queue
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.rpush(new String(encode(name.getBytes)), byteArrayToString(item))
def enqueue(name: String, item: Array[Byte]): Option[Int] = withErrorHandling {
db.rpush(name, byteArrayToString(item))
}
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
db.lpop(new String(encode(name.getBytes))) match {
db.lpop(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) => Some(stringToByteArray(s))
@ -319,11 +275,7 @@ private [akka] object RedisStorageBackend extends
// get the size of the queue
def size(name: String): Int = withErrorHandling {
db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) => l
}
db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") }
}
// return an array of items currently stored in the queue
@ -331,14 +283,14 @@ private [akka] object RedisStorageBackend extends
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling {
count match {
case 1 =>
db.lindex(new String(encode(name.getBytes)), start) match {
db.lindex(name, start) match {
case None =>
throw new NoSuchElementException("No element at " + start)
case Some(s) =>
List(stringToByteArray(s))
}
case n =>
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
db.lrange(name, start, start + count - 1) match {
case None =>
throw new NoSuchElementException(
"No element found between " + start + " and " + (start + count - 1))
@ -350,7 +302,7 @@ private [akka] object RedisStorageBackend extends
// completely delete the queue
def remove(name: String): Boolean = withErrorHandling {
db.del(new String(encode(name.getBytes))) match {
db.del(name) match {
case Some(1) => true
case _ => false
}
@ -358,7 +310,7 @@ private [akka] object RedisStorageBackend extends
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match {
db.zadd(name, zscore, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@ -366,7 +318,7 @@ private [akka] object RedisStorageBackend extends
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match {
db.zrem(name, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
@ -374,22 +326,18 @@ private [akka] object RedisStorageBackend extends
// cardinality of the set identified by name
def zcard(name: String): Int = withErrorHandling {
db.zcard(new String(encode(name.getBytes))) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) => l
}
db.zcard(name).getOrElse { throw new NoSuchElementException(name + " not present") }
}
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match {
db.zscore(name, byteArrayToString(item)) match {
case Some(s) => Some(s.toFloat)
case None => None
}
}
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
db.zrange(name, start.toString, end.toString, RedisClient.ASC, false) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(s) =>
@ -399,7 +347,7 @@ private [akka] object RedisStorageBackend extends
def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
db.zrangeWithScore(
new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match {
name, start.toString, end.toString, RedisClient.ASC) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) =>

View file

@ -234,9 +234,8 @@ class RemoteServer extends Logging with ListenerManagement {
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
RemoteServer.register(hostname, port, this)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors,this)
name, openChannels, loader, actors, typedActors, this)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
@ -324,6 +323,13 @@ class RemoteServer extends Logging with ListenerManagement {
protected override def manageLifeCycleOfListeners = false
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
private def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
}
private def typedActors() : ConcurrentHashMap[String, AnyRef] = {
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
}
}
object RemoteServerSslContext {
@ -348,8 +354,8 @@ class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val typedActors: JMap[String, AnyRef],
val actors: (() => ConcurrentHashMap[String, ActorRef]),
val typedActors: (() => ConcurrentHashMap[String, AnyRef]),
val server: RemoteServer) extends ChannelPipelineFactory {
import RemoteServer._
@ -373,7 +379,7 @@ class RemoteServerPipelineFactory(
case _ => (join(), join())
}
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors,server)
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors, server)
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
new StaticChannelPipeline(stages: _*)
}
@ -387,8 +393,8 @@ class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val typedActors: JMap[String, AnyRef],
val actors: (() => ConcurrentHashMap[String, ActorRef]),
val typedActors: (() => ConcurrentHashMap[String, AnyRef]),
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
@ -539,7 +545,8 @@ class RemoteServerHandler(
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val actorRefOrNull = actors get uuid
val registeredActors = actors()
val actorRefOrNull = registeredActors get uuid
if (actorRefOrNull eq null) {
try {
@ -550,7 +557,7 @@ class RemoteServerHandler(
actorRef.uuid = uuid
actorRef.timeout = timeout
actorRef.remoteAddress = None
actors.put(uuid, actorRef)
registeredActors.put(uuid, actorRef)
actorRef
} catch {
case e =>
@ -563,7 +570,8 @@ class RemoteServerHandler(
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = actorInfo.getUuid
val typedActorOrNull = typedActors get uuid
val registeredTypedActors = typedActors()
val typedActorOrNull = registeredTypedActors get uuid
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo
@ -580,7 +588,7 @@ class RemoteServerHandler(
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
typedActors.put(uuid, newInstance)
registeredTypedActors.put(uuid, newInstance)
newInstance
} catch {
case e =>

View file

@ -5,8 +5,8 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import Actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Actor}
object ServerInitiatedRemoteActorSpec {
val HOSTNAME = "localhost"
@ -132,5 +132,17 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
actor.stop
}
@Test
def shouldNotRecreateRegisteredActor {
server.register(actorOf[RemoteActorSpecActorUnidirectional])
val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
val numberOfActorsInRegistry = ActorRegistry.actors.length
val result = actor ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
actor.stop
}
}

View file

@ -1,94 +0,0 @@
package sample.lift
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.TransactionalMap
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
import scala.xml.Node
import java.lang.Integer
import javax.ws.rs.{GET, Path, Produces}
import java.nio.ByteBuffer
import net.liftweb.http._
import net.liftweb.http.rest._
class SimpleServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalMap[String, Integer]()
def receive = {
case "Tick" => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
self.reply(<h1>Tick: {counter + 1}</h1>)
} else {
storage.put(KEY, new Integer(0))
hasStartedTicking = true
self.reply(<h1>Tick: 0</h1>)
}
}
}
class PersistentServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = CassandraStorage.newMap
def receive = {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array)
hasStartedTicking = true
self.reply(<success>Tick: 0</success>)
}
}
}
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:8080/liftcount
* </pre>
* Or browse to the URL from a web browser.
*/
object SimpleRestService extends RestHelper {
serve {
case Get("liftcount" :: _, req) =>
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a Node back
val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor];
r <- (a !! "Tick").as[Node] ) yield r
//Return either the resulting NodeSeq or a default one
(result getOrElse <h1>Error in counter</h1>).asInstanceOf[Node]
}
}
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:8080/persistentliftcount
* </pre>
* Or browse to the URL from a web browser.
*/
object PersistentRestService extends RestHelper {
serve {
case Get("persistentliftcount" :: _, req) =>
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a Node back
val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor];
r <- (a !! "Tick").as[Node] ) yield r
//Return either the resulting NodeSeq or a default one
(result getOrElse <h1>Error in counter</h1>).asInstanceOf[Node]
}
}

View file

@ -1,60 +0,0 @@
package bootstrap.liftweb
import _root_.net.liftweb.util._
import _root_.net.liftweb.http._
import _root_.net.liftweb.sitemap._
import _root_.net.liftweb.sitemap.Loc._
import _root_.net.liftweb.http.auth._
import _root_.net.liftweb.common._
import Helpers._
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import sample.lift._
/**
* A class that's instantiated early and run. It allows the application
* to modify lift's environment
*/
class Boot extends Logging {
def boot {
// where to search snippet
LiftRules.addToPackages("sample.lift")
LiftRules.httpAuthProtectedResource.prepend {
case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin"))
}
LiftRules.authentication = HttpBasicAuthentication("lift") {
case ("someuser", "1234", req) => {
log.info("You are now authenticated !")
userRoles(AuthRole("admin"))
true
}
}
LiftRules.statelessDispatchTable.append(SimpleRestService)
LiftRules.statelessDispatchTable.append(PersistentRestService)
LiftRules.passNotFoundToChain = true
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
actorOf[SimpleServiceActor],
LifeCycle(Permanent)) ::
Supervise(
actorOf[PersistentServiceActor],
LifeCycle(Permanent)) ::
Nil))
factory.newInstance.start
// Build SiteMap
// val entries = Menu(Loc("Home", List("index"), "Home")) :: Nil
// LiftRules.setSiteMap(SiteMap(entries:_*))
}
}

View file

@ -1,6 +0,0 @@
package sample.lift.snippet
class HelloWorld {
def howdy = <span>Welcome to lift-akka at {new _root_.java.util.Date}</span>
}

View file

@ -1,22 +0,0 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd">
<web-app>
<filter>
<filter-name>LiftFilter</filter-name>
<display-name>Lift Filter</display-name>
<description>The Filter that intercepts lift calls</description>
<filter-class>net.liftweb.http.LiftFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>LiftFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet>
<servlet-name>AkkaServlet</servlet-name>
<servlet-class>se.scalablesolutions.akka.comet.AkkaServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>AkkaServlet</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
</web-app>

View file

@ -1,15 +0,0 @@
<lift:surround with="default" at="content">
<h2>Welcome to the Akka + Lift Sample</h2>
<p>This page is served by Lift, and Lift alone. In order to demonstrate how AkkaServlet and <br />
Lift can work in harmony we have supplied a sample JAX-RS service that is secured using <br />
Lift's HTTP Basic Authentication.</p>
<p>To access the Akka service, visit <a href="/liftcount">this url</a> and enter the
following access credentials:</p>
<p>user: <strong>someuser</strong><br />
password: <strong>1234</strong></p>
<p><lift:HelloWorld.howdy /></p>
</lift:surround>

View file

@ -1,17 +0,0 @@
<html xmlns="http://www.w3.org/1999/xhtml" xmlns:lift="http://liftweb.net/">
<head>
<meta http-equiv="content-type" content="text/html; charset=UTF-8" />
<meta name="description" content="" />
<meta name="keywords" content="" />
<lift:CSS.blueprint />
<title>Akka with Lift Example</title>
<script id="jquery" src="/classpath/jquery.js" type="text/javascript"></script>
</head>
<body>
<div class="container">
<lift:bind name="content" />
<lift:Menu.builder />
<lift:msgs/>
</div>
</body>
</html>

View file

@ -1,16 +0,0 @@
/*import _root_.bootstrap.liftweb.Boot
import _root_.scala.tools.nsc.MainGenericRunner
object LiftConsole {
def main(args : Array[String]) {
// Instantiate your project's Boot file
val b = new Boot()
// Boot your project
b.boot
// Now run the MainGenericRunner to get your repl
MainGenericRunner.main(args)
// After the repl exits, then exit the scala script
exit(0)
}
}
*/

View file

@ -1,27 +0,0 @@
import org.eclipse.jetty.webapp.WebAppContext
import org.eclipse.jetty.server.Server
object RunWebApp extends Application {
val server = new Server(8080)
val context = new WebAppContext()
context.setServer(server)
context.setContextPath("/")
context.setWar("src/main/webapp")
server.setHandler(context)
try {
println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP")
server.start()
while (System.in.available() == 0) {
Thread.sleep(5000)
}
server.stop()
server.join()
} catch {
case exc : Exception => {
exc.printStackTrace()
System.exit(100)
}
}
}

View file

@ -0,0 +1,297 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.akkasource.org/schema/akka"
targetNamespace="http://www.akkasource.org/schema/akka"
elementFormDefault="qualified" attributeFormDefault="unqualified"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans">
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" />
<!-- base types -->
<!-- restart strategies enumeration -->
<xsd:simpleType name="failover-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="AllForOne"/>
<xsd:enumeration value="OneForOne"/>
</xsd:restriction>
</xsd:simpleType>
<!-- restart strategies enumeration -->
<xsd:simpleType name="lifecycle-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="permanent"/>
<xsd:enumeration value="temporary"/>
</xsd:restriction>
</xsd:simpleType>
<!-- Scopes enumeration -->
<xsd:simpleType name="scope-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="prototype"/>
<xsd:enumeration value="singleton"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatchers enumeration -->
<xsd:simpleType name="dispatcher-enum-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="executor-based-event-driven"/>
<xsd:enumeration value="executor-based-event-driven-work-stealing"/>
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
<xsd:enumeration value="thread-based"/>
<xsd:enumeration value="hawt"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher queue types enumeration -->
<xsd:simpleType name="dispatcher-queue-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="bounded-linked-blocking-queue"/>
<xsd:enumeration value="unbounded-linked-blocking-queue"/>
<xsd:enumeration value="synchronous-queue"/>
<xsd:enumeration value="bounded-array-blocking-queue"/>
</xsd:restriction>
</xsd:simpleType>
<!-- thread pool rejection policies enumeration -->
<xsd:simpleType name="rejection-policy-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="abort-policy"/>
<xsd:enumeration value="caller-runs-policy"/>
<xsd:enumeration value="discard-oldest-policy"/>
<xsd:enumeration value="discard-policy"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher type -->
<xsd:complexType name="dispatcher-type">
<xsd:choice minOccurs="0" maxOccurs="1">
<xsd:element name="thread-pool" type="threadpool-type"/>
</xsd:choice>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="ref" type="xsd:string"/>
<xsd:attribute name="type" type="dispatcher-enum-type"/>
<xsd:attribute name="name" type="xsd:string"/>
<xsd:attribute name="aggregate" type="xsd:boolean"/>
</xsd:complexType>
<xsd:complexType name="threadpool-type">
<xsd:attribute name="queue" type="dispatcher-queue-type"/>
<xsd:attribute name="bound" type="xsd:integer"/>
<xsd:attribute name="capacity" type="xsd:integer"/>
<xsd:attribute name="fairness" type="xsd:boolean"/>
<xsd:attribute name="core-pool-size" type="xsd:integer"/>
<xsd:attribute name="max-pool-size" type="xsd:integer"/>
<xsd:attribute name="keep-alive" type="xsd:long"/>
<xsd:attribute name="rejection-policy" type="rejection-policy-type"/>
<xsd:attribute name="mailbox-capacity" type="xsd:integer"/>
</xsd:complexType>
<!-- Remote -->
<xsd:complexType name="remote-type">
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- typed actor -->
<xsd:complexType name="typed-actor-type">
<xsd:sequence>
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="interface" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the interface implemented by implementation class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="implementation" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the implementation class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
Theh default timeout for '!!' invocations.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set this to true if messages should have REQUIRES_NEW semantics.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Defines the lifecycle, can be either 'permanent' or 'temporary'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="scope-enum-type">
<xsd:annotation>
<xsd:documentation>
Supported scopes are 'singleton' and 'prototype'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- untyped actor -->
<xsd:complexType name="untyped-actor-type">
<xsd:sequence>
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="implementation" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the implementation class.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:long">
<xsd:annotation>
<xsd:documentation>
The default timeout for '!!' invocations.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transactional" type="xsd:boolean">
<xsd:annotation>
<xsd:documentation>
Set this to true if messages should have REQUIRES_NEW semantics.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lifecycle" type="lifecycle-type">
<xsd:annotation>
<xsd:documentation>
Defines the lifecycle, can be either 'permanent' or 'temporary'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="scope-enum-type">
<xsd:annotation>
<xsd:documentation>
Supported scopes are 'singleton' and 'prototype'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- trap exits -->
<xsd:complexType name="trap-exits-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="trap-exit" type="xsd:string"/>
</xsd:choice>
</xsd:complexType>
<!-- typed actors -->
<xsd:complexType name="typed-actors-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="typed-actor" type="typed-actor-type"/>
</xsd:choice>
</xsd:complexType>
<!-- untyped actors -->
<xsd:complexType name="untyped-actors-type">
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="untyped-actor" type="untyped-actor-type"/>
</xsd:choice>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="strategy-type">
<xsd:sequence>
<xsd:element name="trap-exits" type="trap-exits-type" minOccurs="1" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="failover" type="failover-type">
<xsd:annotation>
<xsd:documentation>
Failover scheme, can be one of 'AllForOne' or 'OneForOne'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retries" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Maximal number of restarts.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="timerange" type="xsd:int">
<xsd:annotation>
<xsd:documentation>
Time range for maximal number of restart.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="supervision-type">
<xsd:all>
<xsd:element name="restart-strategy" type="strategy-type" minOccurs="1" maxOccurs="1"/>
<xsd:element name="typed-actors" type="typed-actors-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="untyped-actors" type="untyped-actors-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="supervision" type="supervision-type" minOccurs="0"/>
</xsd:all>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<xsd:complexType name="camel-service-type">
<xsd:sequence>
<xsd:element name="camel-context" type="camel-context-type" minOccurs="0" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="id" type="xsd:ID"/>
</xsd:complexType>
<xsd:complexType name="camel-context-type">
<xsd:attribute name="ref" type="xsd:string"/>
</xsd:complexType>
<!-- TypedActor -->
<xsd:element name="typed-actor" type="typed-actor-type"/>
<!-- UntypedActor -->
<xsd:element name="untyped-actor" type="untyped-actor-type"/>
<!-- Dispatcher -->
<xsd:element name="dispatcher" type="dispatcher-type"/>
<!-- Supervision -->
<xsd:element name="supervision" type="supervision-type"/>
<!-- CamelService -->
<xsd:element name="camel-service" type="camel-service-type"/>
</xsd:schema>

View file

@ -68,6 +68,7 @@ object AkkaSpringConfigurationTags {
val KEEP_ALIVE = "keep-alive"
val BOUND ="bound"
val REJECTION_POLICY ="rejection-policy"
val MAILBOX_CAPACITY ="mailbox-capacity"
// --- VALUES
//

View file

@ -58,6 +58,9 @@ object DispatcherFactoryBean {
if (properties.threadPool.keepAlive > -1) {
threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive)
}
if (properties.threadPool.mailboxCapacity > -1) {
threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity)
}
if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
case "abort-policy" => new AbortPolicy()

View file

@ -87,6 +87,9 @@ trait DispatcherParser extends BeanParser {
if (element.hasAttribute(REJECTION_POLICY)) {
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
}
if (element.hasAttribute(MAILBOX_CAPACITY)) {
properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
}
properties
}

View file

@ -45,6 +45,7 @@ class ThreadPoolProperties {
var maxPoolSize = -1
var keepAlive = -1L
var rejectionPolicy = ""
var mailboxCapacity = -1
override def toString : String = {
"ThreadPoolProperties[queue=" + queue +
@ -54,6 +55,7 @@ class ThreadPoolProperties {
", corePoolSize=" + corePoolSize +
", maxPoolSize=" + maxPoolSize +
", keepAlive=" + keepAlive +
", policy=" + rejectionPolicy + "]"
", policy=" + rejectionPolicy +
", mailboxCapacity=" + mailboxCapacity + "]"
}
}

View file

@ -42,6 +42,13 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
bound="10" />
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with bounded-linked-blocking-queue with bounded actor mailbox capacity-->
<akka:dispatcher id="executor-event-driven-dispatcher-mc" type="executor-based-event-driven" name="dispatcher-mc">
<akka:thread-pool queue="bounded-linked-blocking-queue"
bound="100"
mailbox-capacity="1000"/>
</akka:dispatcher>
<!-- executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity-->
<akka:dispatcher id="executor-event-driven-dispatcher-4" type="executor-based-event-driven" name="dispatcher-4">
<akka:thread-pool queue="unbounded-linked-blocking-queue"

View file

@ -6,8 +6,6 @@ package se.scalablesolutions.akka.spring
import foo.{IMyPojo, MyPojo, PingActor}
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.actor.ActorRef
import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
@ -18,6 +16,10 @@ import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import org.springframework.core.io.{ClassPathResource, Resource}
import java.util.concurrent._
import se.scalablesolutions.akka.actor.{UntypedActor, Actor, ActorRef}
/**
* Tests for spring configuration of typed actors.
@ -41,6 +43,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
assert(executor.getQueue().remainingCapacity() === 100)
}
scenario("get a dispatcher via ref from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val pojo = context.getBean("typed-actor-with-dispatcher-ref").asInstanceOf[IMyPojo]
@ -56,6 +59,17 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
assert(dispatcher.name === EVENT_DRIVEN_PREFIX + "dispatcher-2")
}
scenario("get a executor-event-driven-dispatcher with bounded-blocking-queue and with bounded mailbox capacity") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("executor-event-driven-dispatcher-mc").asInstanceOf[ExecutorBasedEventDrivenDispatcher]
assert(dispatcher.name === EVENT_DRIVEN_PREFIX + "dispatcher-mc")
val actorRef = UntypedActor.actorOf(classOf[PingActor])
actorRef.dispatcher = dispatcher
actorRef.start
assert(actorRef.mailbox.isInstanceOf[LinkedBlockingQueue[MessageInvocation]])
assert((actorRef.mailbox.asInstanceOf[LinkedBlockingQueue[MessageInvocation]]).remainingCapacity === 1000)
}
scenario("get a executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity from context") {
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
val dispatcher = context.getBean("executor-event-driven-dispatcher-4").asInstanceOf[ExecutorBasedEventDrivenDispatcher]

View file

@ -21,10 +21,7 @@
<!-- Server Thread Pool -->
<!-- =========================================================== -->
<Set name="ThreadPool">
<!-- Default queued blocking threadpool -->
<New class="org.eclipse.jetty.util.thread.QueuedThreadPool">
<Set name="minThreads">10</Set>
<Set name="maxThreads">200</Set>
<New class="org.eclipse.jetty.util.thread.ExecutorThreadPool">
</New>
</Set>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redis</groupId>
<artifactId>redisclient</artifactId>
<version>1.1</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redis</groupId>
<artifactId>redisclient</artifactId>
<version>2.8.0.Beta1-1.2</version>
<packaging>jar</packaging>
</project>

View file

@ -70,7 +70,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsReleases)
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
@ -89,14 +88,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val HAWT_DISPATCH_VERSION = "1.0"
lazy val JACKSON_VERSION = "1.2.1"
lazy val JERSEY_VERSION = "1.2"
lazy val LIFT_VERSION = "2.1-M1"
lazy val MULTIVERSE_VERSION = "0.6.1"
lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT"
lazy val LOGBACK_VERSION = "0.9.24"
lazy val SLF4J_VERSION = "1.6.0"
lazy val SPRING_VERSION = "3.0.3.RELEASE"
lazy val ASPECTWERKZ_VERSION = "2.2.1"
lazy val JETTY_VERSION = "7.1.6.v20100715"
lazy val JETTY_VERSION = "7.1.4.v20100610"
// -------------------------------------------------------------------------------------------------------------------
// Dependencies
@ -136,9 +134,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile"
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile"
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile"
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile"
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile"
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile"
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile"
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile"
lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile"
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
@ -165,9 +164,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive
lazy val lift_util = "net.liftweb" % "lift-util_2.8.0" % LIFT_VERSION % "compile"
lazy val lift_webkit = "net.liftweb" % "lift-webkit_2.8.0" % LIFT_VERSION % "compile"
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile"
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
@ -180,12 +176,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile"
lazy val redis = "com.redis" % "redisclient" % "2.8.0-1.4" % "compile"
lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0" % "compile"
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
lazy val servlet = "javax.servlet" % "servlet-api" % "2.5" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile"
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
@ -425,13 +419,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val jetty = Dependencies.jetty
val jetty_util = Dependencies.jetty_util
val jetty_xml = Dependencies.jetty_xml
val jetty_servlet = Dependencies.jetty_servlet
val jackson_core_asl = Dependencies.jackson_core_asl
val jersey = Dependencies.jersey
val jersey_contrib = Dependencies.jersey_contrib
val jersey_json = Dependencies.jersey_json
val jersey_server = Dependencies.jersey_server
val jsr311 = Dependencies.jsr311
val servlet = Dependencies.servlet
val stax_api = Dependencies.stax_api
// testing
@ -564,7 +558,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Provided by other bundles
"!se.scalablesolutions.akka.*",
"!net.liftweb.*",
"!com.google.inject.*",
"!javax.transaction.*",
"!javax.ws.rs.*",
@ -586,12 +579,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Scala bundle
val scala_bundle = "com.weiglewilczek.scala-lang-osgi" % "scala-library" % buildScalaVersion % "compile" intransitive
// Lift bundles
// val lift_util = Dependencies.lift_util.intransitive
// val lift_actor = "net.liftweb" % "lift-actor" % LIFT_VERSION % "compile" intransitive
// val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile" intransitive
// val lift_json = "net.liftweb" % "lift-json" % LIFT_VERSION % "compile" intransitive
// Camel bundles
val camel_core = Dependencies.camel_core.intransitive
val fusesource_commonman = "org.fusesource.commonman" % "commons-management" % "1.0" intransitive
@ -656,21 +643,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleLiftProject(info: ProjectInfo) extends DefaultWebProject(info) with DeployProject {
//val commons_logging = Dependencies.commons_logging
val lift_util = Dependencies.lift_util
val lift_webkit = Dependencies.lift_webkit
val servlet = Dependencies.servlet
// testing
val testJetty = Dependencies.testJetty
val testJettyWebApp = Dependencies.testJettyWebApp
val junit = Dependencies.junit
def deployPath = AkkaParentProject.this.deployPath
override def jarPath = warPath
}
class AkkaSampleRestJavaProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
@ -718,8 +690,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaSamplePubSubProject(_), akka_kernel)
lazy val akka_sample_fsm = project("akka-sample-fsm", "akka-sample-fsm",
new AkkaSampleFSMProject(_), akka_kernel)
lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
new AkkaSampleLiftProject(_), akka_kernel)
lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",
new AkkaSampleRestJavaProject(_), akka_kernel)
lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala",