diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index a6b42db579..826b751d54 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -73,7 +73,6 @@ trait ActorRef extends @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None - @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[akka] val guard = new ReentrantGuard diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index f340533186..90ecdc30d5 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -11,6 +11,7 @@ import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set => JSet} import se.scalablesolutions.akka.util.ListenerManagement +import annotation.tailrec /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -35,10 +36,8 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent */ object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] - private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] - - private val Naught = Array[ActorRef]() //Nil for Arrays - + private val actorsById = new Index[String,ActorRef] + /** * Returns all actors in the system. */ @@ -108,11 +107,7 @@ object ActorRegistry extends ListenerManagement { /** * Finds all actors that has a specific id. */ - def actorsFor(id: String): Array[ActorRef] = { - val set = actorsById get id - if (set ne null) set toArray Naught - else Naught - } + def actorsFor(id: String): Array[ActorRef] = actorsById values id /** * Finds the actor that has a specific UUID. @@ -124,18 +119,7 @@ object ActorRegistry extends ListenerManagement { */ def register(actor: ActorRef) = { // ID - val id = actor.id - if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) - - val set = actorsById get id - if (set ne null) set add actor - else { - val newSet = new ConcurrentSkipListSet[ActorRef] - newSet add actor - val oldSet = actorsById.putIfAbsent(id,newSet) - // Parry for two simultaneous putIfAbsent(id,newSet) - if (oldSet ne null) oldSet add actor - } + actorsById.put(actor.id, actor) // UUID actorsByUUID.put(actor.uuid, actor) @@ -150,10 +134,7 @@ object ActorRegistry extends ListenerManagement { def unregister(actor: ActorRef) = { actorsByUUID remove actor.uuid - val set = actorsById get actor.id - if (set ne null) set remove actor - - //FIXME: safely remove set if empty, leaks memory + actorsById.remove(actor.id,actor) // notify listeners foreachListener(_ ! ActorUnregistered(actor)) @@ -170,3 +151,74 @@ object ActorRegistry extends ListenerManagement { log.info("All actors have been shut down and unregistered from ActorRegistry") } } + +class Index[K <: AnyRef,V <: AnyRef : Manifest] { + import scala.collection.JavaConversions._ + + private val Naught = Array[V]() //Nil for Arrays + private val container = new ConcurrentHashMap[K, JSet[V]] + + def put(key: K, value: V) { + + //Returns whether it needs to be retried or not + def tryPut(set: JSet[V], v: V): Boolean = { + set.synchronized { + if (!set.isEmpty) { + set add v + false + } else true + } + } + + @tailrec def syncPut(k: K, v: V): Boolean = { + var retry = false + val set = container get k + if (set ne null) retry = tryPut(set,v) + else { + val newSet = new ConcurrentSkipListSet[V] + newSet add v + + // Parry for two simultaneous putIfAbsent(id,newSet) + val oldSet = container.putIfAbsent(k,newSet) + if (oldSet ne null) + retry = tryPut(oldSet,v) + } + + if (retry) syncPut(k,v) + else true + } + + syncPut(key,value) + } + + def values(key: K) = { + val set: JSet[V] = container get key + if (set ne null) set toArray Naught + else Naught + } + + def foreach(key: K)(fun: (V) => Unit) { + val set = container get key + if (set ne null) + set foreach fun + } + + def foreach(fun: (K,V) => Unit) { + container.entrySet foreach { + (e) => e.getValue.foreach(fun(e.getKey,_)) + } + } + + def remove(key: K, value: V) { + val set = container get key + if (set ne null) { + set.synchronized { + set remove value + if (set.isEmpty) + container remove key + } + } + } + + def clear = container.clear +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 803fd700cc..f33d3b1b24 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -177,23 +177,8 @@ object Dispatchers extends Logging { def from(cfg: ConfigMap): Option[MessageDispatcher] = { lazy val name = cfg.getString("name", UUID.newUuid.toString) - val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { - case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) - case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name) - case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT)) - case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name) - case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) - case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher - case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher - case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case "GlobalHawt" => globalHawtDispatcher - - case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) - } - - dispatcher foreach { - case d: ThreadPoolBuilder => d.configureIfPossible( builder => { - + def threadPoolConfig(b: ThreadPoolBuilder) { + b.configureIfPossible( builder => { cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_)) cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_)) cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) @@ -209,7 +194,20 @@ object Dispatchers extends Logging { case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) }).foreach(builder.setRejectionPolicy(_)) }) - case _ => + } + + val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { + case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name) + case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) + case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig) + case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig) + case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) + case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher + case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher + case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher + case "GlobalHawt" => globalHawtDispatcher + + case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) } dispatcher diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 5f8469eb84..c3ecf5ded7 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,10 +65,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} class ExecutorBasedEventDrivenDispatcher( _name: String, throughput: Int = Dispatchers.THROUGHPUT, - capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder { + capacity: Int = Dispatchers.MAILBOX_CAPACITY, + config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage + def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage + mailboxCapacity = capacity @@ -163,5 +165,9 @@ class ExecutorBasedEventDrivenDispatcher( override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index f9409e91fb..9b1097213e 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -31,7 +31,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateExcept */ class ExecutorBasedEventDrivenWorkStealingDispatcher( _name: String, - capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder { + capacity: Int = Dispatchers.MAILBOX_CAPACITY, + config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { + + def this(_name: String, capacity: Int) = this(_name,capacity, _ => ()) + mailboxCapacity = capacity @volatile private var active: Boolean = false @@ -180,7 +184,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" - private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } protected override def createMailbox(actorRef: ActorRef): AnyRef = { if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 49f4cc3839..640ded8039 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -96,7 +96,7 @@ trait MessageDispatcher extends Logging { /** * Returns the size of the mailbox for the specified actor */ - def mailboxSize(actorRef: ActorRef):Int = 0 + def mailboxSize(actorRef: ActorRef):Int /** * Creates and returns a mailbox for the given actor diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index f76465f7c7..c698b22c15 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -11,6 +11,7 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, List} +import se.scalablesolutions.akka.actor.ActorRef class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) { @@ -39,6 +40,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) selectorThread.start } + def mailboxSize(a: ActorRef) = 0 + def isShutdown = !active override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]" diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 0bb8f3de45..684f737c07 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -7,8 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.locks.ReentrantLock import java.util.{HashSet, HashMap, LinkedList, List} - -import se.scalablesolutions.akka.actor.IllegalActorStateException +import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} /** * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
@@ -63,16 +62,18 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException * * @author Jonas Bonér */ -class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) +class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit) extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name) with ThreadPoolBuilder { + def this(_name: String) = this(_name,_ => ()) + private var fair = true private val busyActors = new HashSet[AnyRef] private val messageDemultiplexer = new Demultiplexer(queue) // build default thread pool - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + init def start = if (!active) { log.debug("Starting up %s", toString) @@ -139,6 +140,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) else nrOfBusyMessages < 100 } + def mailboxSize(a: ActorRef) = 0 + def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") @@ -164,4 +167,10 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) def wakeUp = messageQueue.interrupt } + + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } } diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 8fe07e17ac..65ee9ed845 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -44,6 +44,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef, def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue] + def mailboxSize(a: ActorRef) = mailbox.size + def dispatch(invocation: MessageInvocation) = mailbox append invocation def start = if (!active) { @@ -73,14 +75,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef, override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] => - +trait ThreadMessageQueue extends MessageQueue with TransferQueue[MessageInvocation] { final def append(invocation: MessageInvocation): Unit = { - if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer - if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer + if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") } } - final def next: MessageInvocation = self.take + final def next: MessageInvocation = take } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 9fe47d5415..9657ad3fe4 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -56,7 +56,6 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - inProcessOfBuilding = false blockingQueue = queue threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue) this diff --git a/akka-actor/src/main/scala/routing/Iterators.scala b/akka-actor/src/main/scala/routing/Iterators.scala index 6e73af08e4..7f21589cfa 100644 --- a/akka-actor/src/main/scala/routing/Iterators.scala +++ b/akka-actor/src/main/scala/routing/Iterators.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.routing import se.scalablesolutions.akka.actor.ActorRef +import scala.collection.JavaConversions._ /** * An Iterator that is either always empty or yields an infinite number of Ts. @@ -15,6 +16,8 @@ trait InfiniteIterator[T] extends Iterator[T] * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. */ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + def this(items: java.util.List[T]) = this(items.toList) + @volatile private[this] var current: List[T] = items def hasNext = items != Nil @@ -34,6 +37,7 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { * useful for work-stealing. */ class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] { + def this(items: java.util.List[ActorRef]) = this(items.toList) def hasNext = items != Nil def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) diff --git a/akka-actor/src/main/scala/routing/Routers.scala b/akka-actor/src/main/scala/routing/Routers.scala index 7f2effee29..00f9f17544 100644 --- a/akka-actor/src/main/scala/routing/Routers.scala +++ b/akka-actor/src/main/scala/routing/Routers.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.routing -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{UntypedActor, Actor, ActorRef} /** * A Dispatcher is a trait whose purpose is to route incoming messages to actors. @@ -26,6 +26,25 @@ trait Dispatcher { this: Actor => private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined } +/** + * An UntypedDispatcher is a trait whose purpose is to route incoming messages to actors. + */ +abstract class UntypedDispatcher extends UntypedActor { + protected def transform(msg: Any): Any = msg + + protected def route(msg: Any) : ActorRef + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined + + def onMessage(msg: Any) : Unit = { + val r = route(msg) + if(r eq null) + throw new IllegalStateException("No route for " + msg + " defined!") + if (isSenderDefined) r.forward(transform(msg))(someSelf) + else r.!(transform(msg))(None) + } +} + /** * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets * to dispatch incoming messages to. @@ -37,3 +56,17 @@ trait LoadBalancer extends Dispatcher { self: Actor => override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) } + +/** + * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. + */ +abstract class UntypedLoadBalancer extends UntypedDispatcher { + protected def seq: InfiniteIterator[ActorRef] + + protected def route(msg: Any) = + if (seq.hasNext) seq.next + else null + + override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) +} \ No newline at end of file diff --git a/akka-actor/src/test/scala/misc/ActorRegistrySpec.scala b/akka-actor/src/test/scala/misc/ActorRegistrySpec.scala index 8c9e0778ca..1fe72d6c68 100644 --- a/akka-actor/src/test/scala/misc/ActorRegistrySpec.scala +++ b/akka-actor/src/test/scala/misc/ActorRegistrySpec.scala @@ -219,37 +219,35 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldBeAbleToRegisterActorsConcurrently { ActorRegistry.shutdownAll - val latch = new CountDownLatch(3) - val barrier = new CyclicBarrier(3) - - def mkTestActor(i:Int) = actorOf( new Actor { + def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor { self.id = i.toString def receive = { case _ => } }) - def mkTestActors = for(i <- 1 to 10;j <- 1 to 1000) yield mkTestActor(i) + val latch = new CountDownLatch(3) + val barrier = new CyclicBarrier(3) def mkThread(actors: Iterable[ActorRef]) = new Thread { - start + this.start override def run { barrier.await actors foreach { _.start } latch.countDown } } + val a1,a2,a3 = mkTestActors + val t1 = mkThread(a1) + val t2 = mkThread(a2) + val t3 = mkThread(a3) - val testActors1 = mkTestActors - val testActors2 = mkTestActors - val testActors3 = mkTestActors - - mkThread(testActors1) - mkThread(testActors2) - mkThread(testActors3) assert(latch.await(30,TimeUnit.SECONDS) === true) for(i <- 1 to 10) { - assert(ActorRegistry.actorsFor(i.toString).length === 3000) + val theId = i.toString + val actors = ActorRegistry.actorsFor(theId).toSet + for(a <- actors if a.id == theId) assert(actors contains a) + assert(actors.size === 9000) } } } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index df74040b68..7e6a95f9a1 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -38,7 +38,7 @@ trait RefStorageBackend[T] extends StorageBackend { // for Queue trait QueueStorageBackend[T] extends StorageBackend { // add to the end of the queue - def enqueue(name: String, item: T): Boolean + def enqueue(name: String, item: T): Option[Int] // pop from the front of the queue def dequeue(name: String): Option[T] diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index eef60784a0..9200393ef9 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -11,34 +11,17 @@ import se.scalablesolutions.akka.config.Config.config import com.redis._ -trait Base64Encoder { - def encode(bytes: Array[Byte]): Array[Byte] - def decode(bytes: Array[Byte]): Array[Byte] -} - trait Base64StringEncoder { def byteArrayToString(bytes: Array[Byte]): String def stringToByteArray(str: String): Array[Byte] } -trait NullBase64 { - def encode(bytes: Array[Byte]): Array[Byte] = bytes - def decode(bytes: Array[Byte]): Array[Byte] = bytes -} - object CommonsCodec { import org.apache.commons.codec.binary.Base64 import org.apache.commons.codec.binary.Base64._ val b64 = new Base64(true) - trait CommonsCodecBase64 { - def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) - def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) - } - - object Base64Encoder extends Base64Encoder with CommonsCodecBase64 - trait CommonsCodecBase64StringEncoder { def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) def stringToByteArray(str: String) = b64.decode(str) @@ -48,7 +31,6 @@ object CommonsCodec { } import CommonsCodec._ -import CommonsCodec.Base64Encoder._ import CommonsCodec.Base64StringEncoder._ /** @@ -94,27 +76,7 @@ private [akka] object RedisStorageBackend extends /** * Map storage in Redis. *

- * Maps are stored as key/value pairs in redis. Redis keys cannot contain spaces. But with - * our use case, the keys will be specified by the user. Hence we need to encode the key - * ourselves before sending to Redis. We use base64 encoding. - *

- * Also since we are storing the key/value in the global namespace, we need to construct the - * key suitably so as to avoid namespace clash. The following strategy is used: - * - * Unique identifier for the map = T1 (say) - *

-   * Map(
-   *   "debasish.address" -> "kolkata, India",
-   *   "debasish.company" -> "anshinsoft",
-   *   "debasish.programming_language" -> "scala",
-   * )
- * will be stored as the following key-value pair in Redis: - * - * - * base64(T1):base64("debasish.address") -> "kolkata, India" - * base64(T1):base64("debasish.company") -> "anshinsoft" - * base64(T1):base64("debasish.programming_language") -> "scala" - * + * Maps are stored as key/value pairs in redis. */ def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling { insertMapStorageEntriesFor(name, List((key, value))) @@ -134,12 +96,12 @@ private [akka] object RedisStorageBackend extends *
  • both parts of the key need to be based64 encoded since there can be spaces within each of them
  • */ private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling { - "%s:%s".format(new String(encode(name.getBytes)), new String(encode(key))) + "%s:%s".format(name, byteArrayToString(key)) } private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling { - val nk = redisKey.split(':').map{e: String => decode(e.getBytes)} - (nk(0), nk(1)) + val nk = redisKey.split(':') + (nk(0), stringToByteArray(nk(1))) } private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling { @@ -149,11 +111,11 @@ private [akka] object RedisStorageBackend extends } def removeMapStorageFor(name: String): Unit = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.foreach(db.del(_)) + keys.foreach(k => db.del(k.get)) } } @@ -170,19 +132,18 @@ private [akka] object RedisStorageBackend extends } def getMapStorageSizeFor(name: String): Int = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => 0 - case Some(keys) => - keys.length + case Some(keys) => keys.length } } def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList + keys.map(key => (makeKeyFromRedisKey(key.get)._2, stringToByteArray(db.get(key.get).get))).toList } } @@ -234,7 +195,7 @@ private [akka] object RedisStorageBackend extends } def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.lpush(new String(encode(name.getBytes)), byteArrayToString(element)) + db.lpush(name, byteArrayToString(element)) } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { @@ -242,11 +203,11 @@ private [akka] object RedisStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { - db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem)) + db.lset(name, index, byteArrayToString(elem)) } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { - db.lindex(new String(encode(name.getBytes)), index) match { + db.lindex(name, index) match { case None => throw new NoSuchElementException(name + " does not have element at " + index) case Some(e) => @@ -270,33 +231,28 @@ private [akka] object RedisStorageBackend extends else count if (s == 0 && cnt == 0) List() else - db.lrange(new String(encode(name.getBytes)), s, s + cnt - 1) match { + db.lrange(name, s, s + cnt - 1) match { case None => throw new NoSuchElementException(name + " does not have elements in the range specified") case Some(l) => - l map ( e => stringToByteArray(e.get)) + l map (e => stringToByteArray(e.get)) } } def getVectorStorageSizeFor(name: String): Int = withErrorHandling { - db.llen(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => - l - } + db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") } } def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), byteArrayToString(element)) + db.set(name, byteArrayToString(element)) } def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), element) + db.set(name, element) } def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { - db.get(new String(encode(name.getBytes))) match { + db.get(name) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => Some(stringToByteArray(s)) @@ -304,13 +260,13 @@ private [akka] object RedisStorageBackend extends } // add to the end of the queue - def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.rpush(new String(encode(name.getBytes)), byteArrayToString(item)) + def enqueue(name: String, item: Array[Byte]): Option[Int] = withErrorHandling { + db.rpush(name, byteArrayToString(item)) } // pop from the front of the queue def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { - db.lpop(new String(encode(name.getBytes))) match { + db.lpop(name) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => Some(stringToByteArray(s)) @@ -319,11 +275,7 @@ private [akka] object RedisStorageBackend extends // get the size of the queue def size(name: String): Int = withErrorHandling { - db.llen(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => l - } + db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") } } // return an array of items currently stored in the queue @@ -331,14 +283,14 @@ private [akka] object RedisStorageBackend extends def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling { count match { case 1 => - db.lindex(new String(encode(name.getBytes)), start) match { + db.lindex(name, start) match { case None => throw new NoSuchElementException("No element at " + start) case Some(s) => List(stringToByteArray(s)) } case n => - db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { + db.lrange(name, start, start + count - 1) match { case None => throw new NoSuchElementException( "No element found between " + start + " and " + (start + count - 1)) @@ -350,7 +302,7 @@ private [akka] object RedisStorageBackend extends // completely delete the queue def remove(name: String): Boolean = withErrorHandling { - db.del(new String(encode(name.getBytes))) match { + db.del(name) match { case Some(1) => true case _ => false } @@ -358,7 +310,7 @@ private [akka] object RedisStorageBackend extends // add item to sorted set identified by name def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match { + db.zadd(name, zscore, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -366,7 +318,7 @@ private [akka] object RedisStorageBackend extends // remove item from sorted set identified by name def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match { + db.zrem(name, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -374,22 +326,18 @@ private [akka] object RedisStorageBackend extends // cardinality of the set identified by name def zcard(name: String): Int = withErrorHandling { - db.zcard(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => l - } + db.zcard(name).getOrElse { throw new NoSuchElementException(name + " not present") } } def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { - db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match { + db.zscore(name, byteArrayToString(item)) match { case Some(s) => Some(s.toFloat) case None => None } } def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling { - db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match { + db.zrange(name, start.toString, end.toString, RedisClient.ASC, false) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => @@ -399,7 +347,7 @@ private [akka] object RedisStorageBackend extends def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling { db.zrangeWithScore( - new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match { + name, start.toString, end.toString, RedisClient.ASC) match { case None => throw new NoSuchElementException(name + " not present") case Some(l) => diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bae6b6c88c..5f24def4f5 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -234,9 +234,8 @@ class RemoteServer extends Logging with ListenerManagement { port = _port log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) - val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val pipelineFactory = new RemoteServerPipelineFactory( - name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors,this) + name, openChannels, loader, actors, typedActors, this) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -324,6 +323,13 @@ class RemoteServer extends Logging with ListenerManagement { protected override def manageLifeCycleOfListeners = false protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f) + + private def actors() : ConcurrentHashMap[String, ActorRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors + } + private def typedActors() : ConcurrentHashMap[String, AnyRef] = { + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors + } } object RemoteServerSslContext { @@ -348,8 +354,8 @@ class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, val loader: Option[ClassLoader], - val actors: JMap[String, ActorRef], - val typedActors: JMap[String, AnyRef], + val actors: (() => ConcurrentHashMap[String, ActorRef]), + val typedActors: (() => ConcurrentHashMap[String, AnyRef]), val server: RemoteServer) extends ChannelPipelineFactory { import RemoteServer._ @@ -373,7 +379,7 @@ class RemoteServerPipelineFactory( case _ => (join(), join()) } - val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors,server) + val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors, server) val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) new StaticChannelPipeline(stages: _*) } @@ -387,8 +393,8 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val actors: JMap[String, ActorRef], - val typedActors: JMap[String, AnyRef], + val actors: (() => ConcurrentHashMap[String, ActorRef]), + val typedActors: (() => ConcurrentHashMap[String, AnyRef]), val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern @@ -539,7 +545,8 @@ class RemoteServerHandler( val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val actorRefOrNull = actors get uuid + val registeredActors = actors() + val actorRefOrNull = registeredActors get uuid if (actorRefOrNull eq null) { try { @@ -550,7 +557,7 @@ class RemoteServerHandler( actorRef.uuid = uuid actorRef.timeout = timeout actorRef.remoteAddress = None - actors.put(uuid, actorRef) + registeredActors.put(uuid, actorRef) actorRef } catch { case e => @@ -563,7 +570,8 @@ class RemoteServerHandler( private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { val uuid = actorInfo.getUuid - val typedActorOrNull = typedActors get uuid + val registeredTypedActors = typedActors() + val typedActorOrNull = registeredTypedActors get uuid if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo @@ -580,7 +588,7 @@ class RemoteServerHandler( val newInstance = TypedActor.newInstance( interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef] - typedActors.put(uuid, newInstance) + registeredTypedActors.put(uuid, newInstance) newInstance } catch { case e => diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 59cfe3778d..8b1e0ef765 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -5,8 +5,8 @@ import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} -import se.scalablesolutions.akka.actor.{ActorRef, Actor} -import Actor._ +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Actor} object ServerInitiatedRemoteActorSpec { val HOSTNAME = "localhost" @@ -132,5 +132,17 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } actor.stop } + + + @Test + def shouldNotRecreateRegisteredActor { + server.register(actorOf[RemoteActorSpecActorUnidirectional]) + val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) + val numberOfActorsInRegistry = ActorRegistry.actors.length + val result = actor ! "OneWay" + assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + assert(numberOfActorsInRegistry === ActorRegistry.actors.length) + actor.stop + } } diff --git a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala deleted file mode 100644 index d5358a7d89..0000000000 --- a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala +++ /dev/null @@ -1,94 +0,0 @@ -package sample.lift - -import se.scalablesolutions.akka.actor._ -import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.stm.TransactionalMap -import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage -import scala.xml.Node -import java.lang.Integer -import javax.ws.rs.{GET, Path, Produces} -import java.nio.ByteBuffer -import net.liftweb.http._ -import net.liftweb.http.rest._ - -class SimpleServiceActor extends Transactor { - private val KEY = "COUNTER" - private var hasStartedTicking = false - private lazy val storage = TransactionalMap[String, Integer]() - - def receive = { - case "Tick" => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) - self.reply(

    Tick: {counter + 1}

    ) - } else { - storage.put(KEY, new Integer(0)) - hasStartedTicking = true - self.reply(

    Tick: 0

    ) - } - } -} - -class PersistentServiceActor extends Transactor { - - private val KEY = "COUNTER" - private var hasStartedTicking = false - private lazy val storage = CassandraStorage.newMap - - def receive = { - case "Tick" => if (hasStartedTicking) { - val bytes = storage.get(KEY.getBytes).get - val counter = ByteBuffer.wrap(bytes).getInt - storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) - self.reply(Tick:{counter + 1}) - } else { - storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array) - hasStartedTicking = true - self.reply(Tick: 0) - } - } -} - - -/** - * Try service out by invoking (multiple times): - *
    - * curl http://localhost:8080/liftcount
    - * 
    - * Or browse to the URL from a web browser. - */ - -object SimpleRestService extends RestHelper { - serve { - case Get("liftcount" :: _, req) => - //Fetch the first actor of type SimpleServiceActor - //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor]; - r <- (a !! "Tick").as[Node] ) yield r - - //Return either the resulting NodeSeq or a default one - (result getOrElse

    Error in counter

    ).asInstanceOf[Node] - } -} - - -/** - * Try service out by invoking (multiple times): - *
    - * curl http://localhost:8080/persistentliftcount
    - * 
    - * Or browse to the URL from a web browser. - */ - object PersistentRestService extends RestHelper { - serve { - case Get("persistentliftcount" :: _, req) => - //Fetch the first actor of type SimpleServiceActor - //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor]; - r <- (a !! "Tick").as[Node] ) yield r - - //Return either the resulting NodeSeq or a default one - (result getOrElse

    Error in counter

    ).asInstanceOf[Node] - } - } diff --git a/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala b/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala deleted file mode 100644 index 2e56a5857a..0000000000 --- a/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala +++ /dev/null @@ -1,60 +0,0 @@ -package bootstrap.liftweb - -import _root_.net.liftweb.util._ -import _root_.net.liftweb.http._ -import _root_.net.liftweb.sitemap._ -import _root_.net.liftweb.sitemap.Loc._ -import _root_.net.liftweb.http.auth._ -import _root_.net.liftweb.common._ -import Helpers._ - -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} -import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.util.Logging - -import sample.lift._ - -/** - * A class that's instantiated early and run. It allows the application - * to modify lift's environment - */ -class Boot extends Logging { - def boot { - // where to search snippet - LiftRules.addToPackages("sample.lift") - - LiftRules.httpAuthProtectedResource.prepend { - case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin")) - } - - LiftRules.authentication = HttpBasicAuthentication("lift") { - case ("someuser", "1234", req) => { - log.info("You are now authenticated !") - userRoles(AuthRole("admin")) - true - } - } - LiftRules.statelessDispatchTable.append(SimpleRestService) - LiftRules.statelessDispatchTable.append(PersistentRestService) - - LiftRules.passNotFoundToChain = true - - val factory = SupervisorFactory( - SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), - Supervise( - actorOf[SimpleServiceActor], - LifeCycle(Permanent)) :: - Supervise( - actorOf[PersistentServiceActor], - LifeCycle(Permanent)) :: - Nil)) - factory.newInstance.start - - // Build SiteMap - // val entries = Menu(Loc("Home", List("index"), "Home")) :: Nil - // LiftRules.setSiteMap(SiteMap(entries:_*)) - } -} - diff --git a/akka-samples/akka-sample-lift/src/main/scala/comet/.keep b/akka-samples/akka-sample-lift/src/main/scala/comet/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-samples/akka-sample-lift/src/main/scala/model/.keep b/akka-samples/akka-sample-lift/src/main/scala/model/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-samples/akka-sample-lift/src/main/scala/snippet/.keep b/akka-samples/akka-sample-lift/src/main/scala/snippet/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-samples/akka-sample-lift/src/main/scala/snippet/HelloWorld.scala b/akka-samples/akka-sample-lift/src/main/scala/snippet/HelloWorld.scala deleted file mode 100644 index aed272c0b2..0000000000 --- a/akka-samples/akka-sample-lift/src/main/scala/snippet/HelloWorld.scala +++ /dev/null @@ -1,6 +0,0 @@ -package sample.lift.snippet - -class HelloWorld { - def howdy = Welcome to lift-akka at {new _root_.java.util.Date} -} - diff --git a/akka-samples/akka-sample-lift/src/main/scala/view/.keep b/akka-samples/akka-sample-lift/src/main/scala/view/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-samples/akka-sample-lift/src/main/webapp/WEB-INF/web.xml b/akka-samples/akka-sample-lift/src/main/webapp/WEB-INF/web.xml deleted file mode 100644 index 3a1b672cec..0000000000 --- a/akka-samples/akka-sample-lift/src/main/webapp/WEB-INF/web.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - LiftFilter - Lift Filter - The Filter that intercepts lift calls - net.liftweb.http.LiftFilter - - - LiftFilter - /* - - - AkkaServlet - se.scalablesolutions.akka.comet.AkkaServlet - - - AkkaServlet - /* - - diff --git a/akka-samples/akka-sample-lift/src/main/webapp/index.html b/akka-samples/akka-sample-lift/src/main/webapp/index.html deleted file mode 100644 index aa25a1d91d..0000000000 --- a/akka-samples/akka-sample-lift/src/main/webapp/index.html +++ /dev/null @@ -1,15 +0,0 @@ - -

    Welcome to the Akka + Lift Sample

    -

    This page is served by Lift, and Lift alone. In order to demonstrate how AkkaServlet and
    - Lift can work in harmony we have supplied a sample JAX-RS service that is secured using
    - Lift's HTTP Basic Authentication.

    - -

    To access the Akka service, visit this url and enter the - following access credentials:

    - -

    user: someuser
    - password: 1234

    - -

    -
    - diff --git a/akka-samples/akka-sample-lift/src/main/webapp/templates-hidden/default.html b/akka-samples/akka-sample-lift/src/main/webapp/templates-hidden/default.html deleted file mode 100644 index 4a18c18f8a..0000000000 --- a/akka-samples/akka-sample-lift/src/main/webapp/templates-hidden/default.html +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - - Akka with Lift Example - - - -
    - - - -
    - - diff --git a/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala b/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala deleted file mode 100644 index 43296bc1f4..0000000000 --- a/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala +++ /dev/null @@ -1,16 +0,0 @@ -/*import _root_.bootstrap.liftweb.Boot -import _root_.scala.tools.nsc.MainGenericRunner - -object LiftConsole { - def main(args : Array[String]) { - // Instantiate your project's Boot file - val b = new Boot() - // Boot your project - b.boot - // Now run the MainGenericRunner to get your repl - MainGenericRunner.main(args) - // After the repl exits, then exit the scala script - exit(0) - } -} -*/ diff --git a/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala b/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala deleted file mode 100644 index fd8ea053c3..0000000000 --- a/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala +++ /dev/null @@ -1,27 +0,0 @@ -import org.eclipse.jetty.webapp.WebAppContext -import org.eclipse.jetty.server.Server - -object RunWebApp extends Application { - val server = new Server(8080) - val context = new WebAppContext() - context.setServer(server) - context.setContextPath("/") - context.setWar("src/main/webapp") - - server.setHandler(context) - - try { - println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP") - server.start() - while (System.in.available() == 0) { - Thread.sleep(5000) - } - server.stop() - server.join() - } catch { - case exc : Exception => { - exc.printStackTrace() - System.exit(100) - } - } -} diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd new file mode 100644 index 0000000000..c3d7608bee --- /dev/null +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -0,0 +1,297 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the remote host. + + + + + + + Port of the remote host. + + + + + + + + + + + + + + + + + + Name of the interface implemented by implementation class. + + + + + + + Name of the implementation class. + + + + + + + Theh default timeout for '!!' invocations. + + + + + + + Set this to true if messages should have REQUIRES_NEW semantics. + + + + + + + Defines the lifecycle, can be either 'permanent' or 'temporary'. + + + + + + + Supported scopes are 'singleton' and 'prototype'. + + + + + + + + + + + + + + + + + + Name of the implementation class. + + + + + + + The default timeout for '!!' invocations. + + + + + + + Set this to true if messages should have REQUIRES_NEW semantics. + + + + + + + Defines the lifecycle, can be either 'permanent' or 'temporary'. + + + + + + + Supported scopes are 'singleton' and 'prototype'. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Failover scheme, can be one of 'AllForOne' or 'OneForOne'. + + + + + + + Maximal number of restarts. + + + + + + + Time range for maximal number of restart. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 518727bd4c..2743d772da 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -68,6 +68,7 @@ object AkkaSpringConfigurationTags { val KEEP_ALIVE = "keep-alive" val BOUND ="bound" val REJECTION_POLICY ="rejection-policy" + val MAILBOX_CAPACITY ="mailbox-capacity" // --- VALUES // diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index 06c9994c7f..5986b5a697 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -58,6 +58,9 @@ object DispatcherFactoryBean { if (properties.threadPool.keepAlive > -1) { threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive) } + if (properties.threadPool.mailboxCapacity > -1) { + threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity) + } if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { case "abort-policy" => new AbortPolicy() diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala index c4257230f7..e9f10e1328 100644 --- a/akka-spring/src/main/scala/DispatcherParser.scala +++ b/akka-spring/src/main/scala/DispatcherParser.scala @@ -87,6 +87,9 @@ trait DispatcherParser extends BeanParser { if (element.hasAttribute(REJECTION_POLICY)) { properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) } + if (element.hasAttribute(MAILBOX_CAPACITY)) { + properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt + } properties } diff --git a/akka-spring/src/main/scala/DispatcherProperties.scala b/akka-spring/src/main/scala/DispatcherProperties.scala index 183b3825bb..89d97670ca 100644 --- a/akka-spring/src/main/scala/DispatcherProperties.scala +++ b/akka-spring/src/main/scala/DispatcherProperties.scala @@ -45,6 +45,7 @@ class ThreadPoolProperties { var maxPoolSize = -1 var keepAlive = -1L var rejectionPolicy = "" + var mailboxCapacity = -1 override def toString : String = { "ThreadPoolProperties[queue=" + queue + @@ -54,6 +55,7 @@ class ThreadPoolProperties { ", corePoolSize=" + corePoolSize + ", maxPoolSize=" + maxPoolSize + ", keepAlive=" + keepAlive + - ", policy=" + rejectionPolicy + "]" + ", policy=" + rejectionPolicy + + ", mailboxCapacity=" + mailboxCapacity + "]" } } diff --git a/akka-spring/src/test/resources/dispatcher-config.xml b/akka-spring/src/test/resources/dispatcher-config.xml index 9f0dfa3802..37c33516e0 100644 --- a/akka-spring/src/test/resources/dispatcher-config.xml +++ b/akka-spring/src/test/resources/dispatcher-config.xml @@ -42,6 +42,13 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> bound="10" /> + + + + + - - - 10 - 200 + diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar deleted file mode 100644 index 5d2a6a3632..0000000000 Binary files a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom deleted file mode 100755 index 16dd81402a..0000000000 --- a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom +++ /dev/null @@ -1,8 +0,0 @@ - - - 4.0.0 - com.redis - redisclient - 1.1 - jar - diff --git a/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar b/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar deleted file mode 100644 index 91ff84b97c..0000000000 Binary files a/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar b/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar new file mode 100644 index 0000000000..66c18b6fbf Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar deleted file mode 100644 index 3f1593380b..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom deleted file mode 100755 index 68f3763187..0000000000 --- a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom +++ /dev/null @@ -1,8 +0,0 @@ - - - 4.0.0 - com.redis - redisclient - 2.8.0.Beta1-1.2 - jar - diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar deleted file mode 100644 index 0daede37f0..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar deleted file mode 100644 index 261b5cc1be..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar deleted file mode 100644 index d939a49d7c..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar deleted file mode 100644 index 351ff49c9d..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar and /dev/null differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e4806fbf7e..393b531dd6 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -70,7 +70,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) - lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsReleases) lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) @@ -89,14 +88,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val HAWT_DISPATCH_VERSION = "1.0" lazy val JACKSON_VERSION = "1.2.1" lazy val JERSEY_VERSION = "1.2" - lazy val LIFT_VERSION = "2.1-M1" lazy val MULTIVERSE_VERSION = "0.6.1" lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT" lazy val LOGBACK_VERSION = "0.9.24" lazy val SLF4J_VERSION = "1.6.0" lazy val SPRING_VERSION = "3.0.3.RELEASE" lazy val ASPECTWERKZ_VERSION = "2.2.1" - lazy val JETTY_VERSION = "7.1.6.v20100715" + lazy val JETTY_VERSION = "7.1.4.v20100610" // ------------------------------------------------------------------------------------------------------------------- // Dependencies @@ -136,9 +134,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" - lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" - lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" - lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" + lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" + lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" + lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" + lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" @@ -165,9 +164,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive - lazy val lift_util = "net.liftweb" % "lift-util_2.8.0" % LIFT_VERSION % "compile" - lazy val lift_webkit = "net.liftweb" % "lift-webkit_2.8.0" % LIFT_VERSION % "compile" - lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile" lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive @@ -180,12 +176,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" - lazy val redis = "com.redis" % "redisclient" % "2.8.0-1.4" % "compile" + lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0" % "compile" lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" - lazy val servlet = "javax.servlet" % "servlet-api" % "2.5" % "compile" - lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile" lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" @@ -425,13 +419,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val jetty = Dependencies.jetty val jetty_util = Dependencies.jetty_util val jetty_xml = Dependencies.jetty_xml + val jetty_servlet = Dependencies.jetty_servlet val jackson_core_asl = Dependencies.jackson_core_asl val jersey = Dependencies.jersey val jersey_contrib = Dependencies.jersey_contrib val jersey_json = Dependencies.jersey_json val jersey_server = Dependencies.jersey_server val jsr311 = Dependencies.jsr311 - val servlet = Dependencies.servlet val stax_api = Dependencies.stax_api // testing @@ -564,7 +558,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // Provided by other bundles "!se.scalablesolutions.akka.*", - "!net.liftweb.*", "!com.google.inject.*", "!javax.transaction.*", "!javax.ws.rs.*", @@ -586,12 +579,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // Scala bundle val scala_bundle = "com.weiglewilczek.scala-lang-osgi" % "scala-library" % buildScalaVersion % "compile" intransitive - // Lift bundles -// val lift_util = Dependencies.lift_util.intransitive -// val lift_actor = "net.liftweb" % "lift-actor" % LIFT_VERSION % "compile" intransitive -// val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile" intransitive -// val lift_json = "net.liftweb" % "lift-json" % LIFT_VERSION % "compile" intransitive - // Camel bundles val camel_core = Dependencies.camel_core.intransitive val fusesource_commonman = "org.fusesource.commonman" % "commons-management" % "1.0" intransitive @@ -656,21 +643,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) - class AkkaSampleLiftProject(info: ProjectInfo) extends DefaultWebProject(info) with DeployProject { - //val commons_logging = Dependencies.commons_logging - val lift_util = Dependencies.lift_util - val lift_webkit = Dependencies.lift_webkit - val servlet = Dependencies.servlet - - // testing - val testJetty = Dependencies.testJetty - val testJettyWebApp = Dependencies.testJettyWebApp - val junit = Dependencies.junit - - def deployPath = AkkaParentProject.this.deployPath - override def jarPath = warPath - } - class AkkaSampleRestJavaProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) @@ -718,8 +690,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaSamplePubSubProject(_), akka_kernel) lazy val akka_sample_fsm = project("akka-sample-fsm", "akka-sample-fsm", new AkkaSampleFSMProject(_), akka_kernel) - lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift", - new AkkaSampleLiftProject(_), akka_kernel) lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java", new AkkaSampleRestJavaProject(_), akka_kernel) lazy val akka_sample_rest_scala = project("akka-sample-rest-scala", "akka-sample-rest-scala",