diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala
index d232ca2a77..3f8e8e9d34 100644
--- a/akka-actor/src/main/scala/actor/Actor.scala
+++ b/akka-actor/src/main/scala/actor/Actor.scala
@@ -159,7 +159,7 @@ object Actor extends Logging {
*/
def actor(body: Receive): ActorRef =
actorOf(new Actor() {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
def receive: Receive = body
}).start
@@ -181,7 +181,7 @@ object Actor extends Logging {
*/
def transactor(body: Receive): ActorRef =
actorOf(new Transactor() {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
def receive: Receive = body
}).start
@@ -201,7 +201,7 @@ object Actor extends Logging {
*/
def temporaryActor(body: Receive): ActorRef =
actorOf(new Actor() {
- self.lifeCycle = Some(LifeCycle(Temporary))
+ self.lifeCycle = Temporary
def receive = body
}).start
@@ -226,7 +226,7 @@ object Actor extends Logging {
def handler[A](body: => Unit) = new {
def receive(handler: Receive) =
actorOf(new Actor() {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
body
def receive = handler
}).start
@@ -444,7 +444,6 @@ trait Actor extends Logging {
*/
def become(behavior: Option[Receive]) {
self.hotswap = behavior
- self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
}
/** Akka Java API
diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala
index 6bb64363f3..552fe2cfdf 100644
--- a/akka-actor/src/main/scala/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/actor/ActorRef.scala
@@ -160,12 +160,6 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
def setFaultHandler(handler: FaultHandlingStrategy) = this.faultHandler = Some(handler)
def getFaultHandler(): Option[FaultHandlingStrategy] = faultHandler
- /**
- * Defines the life-cycle for a supervised actor.
- */
- def setLifeCycle(lifeCycle: LifeCycle) = this.lifeCycle = Some(lifeCycle)
- def getLifeCycle(): Option[LifeCycle] = lifeCycle
-
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
@@ -632,16 +626,16 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
override def equals(that: Any): Boolean = {
that.isInstanceOf[ActorRef] &&
- that.asInstanceOf[ActorRef].uuid == uuid
+ that.asInstanceOf[ActorRef].uuid == uuid
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def checkReceiveTimeout = {
cancelReceiveTimeout
- receiveTimeout.foreach { time =>
+ if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed
log.debug("Scheduling timeout for %s", this)
- _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS))
+ _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
@@ -698,7 +692,7 @@ class LocalActorRef private[akka] (
__isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
- __lifeCycle: Option[LifeCycle],
+ __lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__factory: () => Actor) = {
@@ -715,7 +709,6 @@ class LocalActorRef private[akka] (
actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this))
start
- checkReceiveTimeout
ActorRegistry.register(this)
}
@@ -819,7 +812,10 @@ class LocalActorRef private[akka] (
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_status = ActorRefStatus.RUNNING
+
if (!isInInitialization) initializeActorInstance
+
+ checkReceiveTimeout //Schedule the initial Receive timeout
}
this
}
@@ -1088,7 +1084,7 @@ class LocalActorRef private[akka] (
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
- case Some(LifeCycle(Temporary)) => shutDownTemporaryActor(this)
+ case Temporary => shutDownTemporaryActor(this)
case _ =>
// either permanent or none where default is permanent
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
@@ -1109,7 +1105,7 @@ class LocalActorRef private[akka] (
linkedActorsAsList.foreach { actorRef =>
actorRef.lifeCycle match {
// either permanent or none where default is permanent
- case Some(LifeCycle(Temporary)) => shutDownTemporaryActor(actorRef)
+ case Temporary => shutDownTemporaryActor(actorRef)
case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
@@ -1236,6 +1232,7 @@ class LocalActorRef private[akka] (
finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
+ checkReceiveTimeout // Reschedule receive timeout
}
}
@@ -1274,7 +1271,7 @@ class LocalActorRef private[akka] (
if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
else {
lifeCycle match {
- case Some(LifeCycle(Temporary)) => shutDownTemporaryActor(this)
+ case Temporary => shutDownTemporaryActor(this)
case _ =>
}
}
@@ -1315,9 +1312,7 @@ class LocalActorRef private[akka] (
actor.preStart // run actor preStart
Actor.log.trace("[%s] has started", toString)
ActorRegistry.register(this)
- if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
- checkReceiveTimeout
}
/*
@@ -1495,7 +1490,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* Defines the life-cycle for a supervised actor.
*/
@volatile
- var lifeCycle: Option[LifeCycle] = None
+ @BeanProperty
+ var lifeCycle: LifeCycle = UndefinedLifeCycle
/**
* User overridable callback/setting.
diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala
index f34d9fc2c0..f4ef69f353 100644
--- a/akka-actor/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala
@@ -311,60 +311,83 @@ object ActorRegistry extends ListenerManagement {
}
}
+/**
+ * An implementation of a ConcurrentMultiMap
+ * Adds/remove is serialized over the specified key
+ * Reads are fully concurrent <-- el-cheapo
+ *
+ * @author Viktor Klang
+ */
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
- import scala.collection.JavaConversions._
-
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
- def put(key: K, value: V) {
-
- //Returns whether it needs to be retried or not
- def tryPut(set: JSet[V], v: V): Boolean = {
- set.synchronized {
- if (set.isEmpty) true //IF the set is empty then it has been removed, so signal retry
- else { //Else add the value to the set and signal that retry is not needed
- set add v
- false
- }
- }
- }
-
- @tailrec def syncPut(k: K, v: V): Boolean = {
+ /**
+ * Associates the value of type V with the key of type K
+ * @returns true if the value didn't exist for the key previously, and false otherwise
+ */
+ def put(key: K, value: V): Boolean = {
+ //Tailrecursive spin-locking put
+ @tailrec def spinPut(k: K, v: V): Boolean = {
var retry = false
+ var added = false
val set = container get k
- if (set ne null) retry = tryPut(set,v)
+
+ if (set ne null) {
+ set.synchronized {
+ if (set.isEmpty) {
+ retry = true //IF the set is empty then it has been removed, so signal retry
+ }
+ else { //Else add the value to the set and signal that retry is not needed
+ added = set add v
+ retry = false
+ }
+ }
+ }
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
- if (oldSet ne null)
- retry = tryPut(oldSet,v)
+ if (oldSet ne null) {
+ oldSet.synchronized {
+ if (oldSet.isEmpty) {
+ retry = true //IF the set is empty then it has been removed, so signal retry
+ }
+ else { //Else try to add the value to the set and signal that retry is not needed
+ added = oldSet add v
+ retry = false
+ }
+ }
+ } else {
+ added = true
+ }
}
- if (retry) syncPut(k,v)
- else true
+ if (retry) spinPut(k,v)
+ else added
}
- syncPut(key,value)
+ spinPut(key,value)
}
- def values(key: K) = {
+ /**
+ * @returns a _new_ array of all existing values for the given key at the time of the call
+ */
+ def values(key: K): Array[V] = {
val set: JSet[V] = container get key
- if (set ne null) set toArray Naught
- else Naught
- }
-
- def foreach(key: K)(fun: (V) => Unit) {
- val set = container get key
- if (set ne null)
- set foreach fun
+ val result = if (set ne null) set toArray Naught else Naught
+ result.asInstanceOf[Array[V]]
}
+ /**
+ * @returns Some(value) for the first matching value where the supplied function returns true for the given key,
+ * if no matches it returns None
+ */
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
+ import scala.collection.JavaConversions._
val set = container get key
if (set ne null)
set.iterator.find(f)
@@ -372,23 +395,43 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
None
}
+ /**
+ * Applies the supplied function to all keys and their values
+ */
def foreach(fun: (K,V) => Unit) {
+ import scala.collection.JavaConversions._
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
- def remove(key: K, value: V) {
+ /**
+ * Disassociates the value of type V from the key of type K
+ * @returns true if the value was disassociated from the key and false if it wasn't previously associated with the key
+ */
+ def remove(key: K, value: V): Boolean = {
val set = container get key
+
if (set ne null) {
set.synchronized {
if (set.remove(value)) { //If we can remove the value
if (set.isEmpty) //and the set becomes empty
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
+
+ true //Remove succeeded
}
+ else false //Remove failed
}
- }
+ } else false //Remove failed
}
- def clear = { foreach(remove _) }
+ /**
+ * @returns true if the underlying containers is empty, may report false negatives when the last remove is underway
+ */
+ def isEmpty: Boolean = container.isEmpty
+
+ /**
+ * Removes all keys and all values
+ */
+ def clear = foreach { case (k,v) => remove(k,v) }
}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/actor/Supervisor.scala b/akka-actor/src/main/scala/actor/Supervisor.scala
index f575cda299..28d883e5ea 100644
--- a/akka-actor/src/main/scala/actor/Supervisor.scala
+++ b/akka-actor/src/main/scala/actor/Supervisor.scala
@@ -29,10 +29,10 @@ class SupervisorException private[akka](message: String) extends AkkaException(m
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
- * LifeCycle(Permanent)) ::
+ * Permanent) ::
* Supervise(
* mySecondActor,
- * LifeCycle(Permanent)) ::
+ * Permanent) ::
* Nil))
*
*
@@ -60,10 +60,10 @@ object Supervisor {
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
- * LifeCycle(Permanent)) ::
+ * Permanent) ::
* Supervise(
* mySecondActor,
- * LifeCycle(Permanent)) ::
+ * Permanent) ::
* Nil))
*
*
@@ -160,7 +160,7 @@ sealed class Supervisor private[akka] (
else list
}
_childActors.put(className, actorRef :: currentActors)
- actorRef.lifeCycle = Some(lifeCycle)
+ actorRef.lifeCycle = lifeCycle
supervisor.link(actorRef)
remoteAddress.foreach { address =>
RemoteServerModule.registerActor(
diff --git a/akka-actor/src/main/scala/config/SupervisionConfig.scala b/akka-actor/src/main/scala/config/SupervisionConfig.scala
index d85001b5ca..43a5e21395 100644
--- a/akka-actor/src/main/scala/config/SupervisionConfig.scala
+++ b/akka-actor/src/main/scala/config/SupervisionConfig.scala
@@ -32,12 +32,13 @@ object ScalaConfig {
abstract class Server extends ConfigElement
abstract class FailOverScheme extends ConfigElement
- abstract class Scope extends ConfigElement
+ abstract class LifeCycle extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
}
+
object Supervise {
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null)
@@ -53,9 +54,9 @@ object ScalaConfig {
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
- case class LifeCycle(scope: Scope) extends ConfigElement
- case object Permanent extends Scope
- case object Temporary extends Scope
+ case object Permanent extends LifeCycle
+ case object Temporary extends LifeCycle
+ case object UndefinedLifeCycle extends LifeCycle
case class RemoteAddress(val hostname: String, val port: Int) extends ConfigElement
@@ -139,22 +140,22 @@ object JavaConfig {
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
- class LifeCycle(@BeanProperty val scope: Scope) extends ConfigElement {
- def transform = {
- se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform)
- }
+ abstract class LifeCycle extends ConfigElement {
+ def transform: se.scalablesolutions.akka.config.ScalaConfig.LifeCycle
}
- abstract class Scope extends ConfigElement {
- def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope
- }
- class Permanent extends Scope {
+ class Permanent extends LifeCycle {
override def transform = se.scalablesolutions.akka.config.ScalaConfig.Permanent
}
- class Temporary extends Scope {
+
+ class Temporary extends LifeCycle {
override def transform = se.scalablesolutions.akka.config.ScalaConfig.Temporary
}
+ class UndefinedLifeCycle extends LifeCycle {
+ override def transform = se.scalablesolutions.akka.config.ScalaConfig.UndefinedLifeCycle
+ }
+
abstract class FailOverScheme extends ConfigElement {
def transform: se.scalablesolutions.akka.config.ScalaConfig.FailOverScheme
}
diff --git a/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala
index 92d4356ca0..7741b79cea 100644
--- a/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala
+++ b/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala
@@ -20,7 +20,7 @@ object ActorFireForgetRequestReplySpec {
}
class CrashingTemporaryActor extends Actor {
- self.lifeCycle = Some(LifeCycle(Temporary))
+ self.lifeCycle = Temporary
def receive = {
case "Die" =>
diff --git a/akka-actor/src/test/scala/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/actor/actor/ReceiveTimeoutSpec.scala
index ff43467efc..1fabfe71bf 100644
--- a/akka-actor/src/test/scala/actor/actor/ReceiveTimeoutSpec.scala
+++ b/akka-actor/src/test/scala/actor/actor/ReceiveTimeoutSpec.scala
@@ -6,6 +6,7 @@ import org.junit.Test
import java.util.concurrent.TimeUnit
import org.multiverse.api.latches.StandardLatch
import Actor._
+import java.util.concurrent.atomic.AtomicInteger
class ReceiveTimeoutSpec extends JUnitSuite {
@@ -22,6 +23,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
+ timeoutActor.stop
}
@Test def swappedReceiveShouldAlsoGetTimout = {
@@ -44,9 +46,10 @@ class ReceiveTimeoutSpec extends JUnitSuite {
})
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
+ timeoutActor.stop
}
- @Test def timeoutShouldBeCancelledAfterRegularReceive = {
+ @Test def timeoutShouldBeRescheduledAfterRegularReceive = {
val timeoutLatch = new StandardLatch
case object Tick
@@ -60,7 +63,30 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
- assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
+ assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
+ timeoutActor.stop
+ }
+
+ @Test def timeoutShouldBeTurnedOffIfDesired = {
+ val count = new AtomicInteger(0)
+ val timeoutLatch = new StandardLatch
+ case object Tick
+ val timeoutActor = actorOf(new Actor {
+ self.receiveTimeout = Some(500L)
+
+ protected def receive = {
+ case Tick => ()
+ case ReceiveTimeout =>
+ timeoutLatch.open
+ count.incrementAndGet
+ self.receiveTimeout = None
+ }
+ }).start
+ timeoutActor ! Tick
+
+ assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
+ assert(count.get === 1)
+ timeoutActor.stop
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
@@ -73,5 +99,6 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
+ timeoutActor.stop
}
}
diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
index 26fdb6e1ef..2805a8675d 100644
--- a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
+++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala
@@ -58,10 +58,10 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
val sup = Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
- Supervise(actor1, LifeCycle(Permanent)) ::
- Supervise(actor2, LifeCycle(Permanent)) ::
- Supervise(actor3, LifeCycle(Permanent)) ::
- Supervise(actor4, LifeCycle(Permanent)) ::
+ Supervise(actor1, Permanent) ::
+ Supervise(actor2, Permanent) ::
+ Supervise(actor3, Permanent) ::
+ Supervise(actor4, Permanent) ::
Nil))
actor1 ! "kill"
diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala
index 01eb9cb006..f7d1752ded 100644
--- a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala
+++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala
@@ -78,7 +78,7 @@ object SupervisorSpec {
class TemporaryActor extends Actor {
import self._
- lifeCycle = Some(LifeCycle(Temporary))
+ lifeCycle = Temporary
def receive = {
case Ping =>
messageLog.put("ping")
@@ -506,7 +506,7 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
temporaryActor,
- LifeCycle(Temporary))
+ Temporary)
:: Nil))
}
@@ -518,7 +518,7 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
}
@@ -530,7 +530,7 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
}
@@ -544,15 +544,15 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
}
@@ -566,15 +566,15 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
}
@@ -588,17 +588,17 @@ class SupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 5000, Nil),
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil)
:: Nil))
}
diff --git a/akka-actor/src/test/scala/misc/SchedulerSpec.scala b/akka-actor/src/test/scala/misc/SchedulerSpec.scala
index 16dd21f327..83daff2e01 100644
--- a/akka-actor/src/test/scala/misc/SchedulerSpec.scala
+++ b/akka-actor/src/test/scala/misc/SchedulerSpec.scala
@@ -98,7 +98,7 @@ class SchedulerSpec extends JUnitSuite {
val pingLatch = new CountDownLatch(6)
val actor = actorOf(new Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
def receive = {
case Ping => pingLatch.countDown
@@ -113,7 +113,7 @@ class SchedulerSpec extends JUnitSuite {
List(classOf[Exception])),
Supervise(
actor,
- LifeCycle(Permanent))
+ Permanent)
:: Nil)).start
Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)
diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala
index 0fd3f715b5..bf5a192299 100644
--- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala
+++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala
@@ -16,7 +16,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
import connectionParameters._
self.id = "amqp-connection-%s".format(host)
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala
index 4a3d61cc10..6afb216c9b 100644
--- a/akka-http/src/main/scala/AkkaCometServlet.scala
+++ b/akka-http/src/main/scala/AkkaCometServlet.scala
@@ -42,32 +42,30 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
*
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
-class AkkaServlet extends AtmosphereServlet with Logging {
+class AkkaServlet extends AtmosphereServlet {
import se.scalablesolutions.akka.config.Config.{config => c}
+ /*
+ * Configure Atmosphere and Jersey (default, fall-back values)
+ */
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
- c.getInt("akka.rest.maxInactiveActivity") foreach { value =>
- log.info("MAX_INACTIVE:%s",value.toString)
- addInitParameter(CometSupport.MAX_INACTIVE,value.toString)
- }
+ c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) }
+ c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) }
- c.getString("akka.rest.cometSupport") foreach { value =>
- addInitParameter("cometSupport",value)
- }
-
-
- val servlet = new AtmosphereRestServlet {
- override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
- override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
- }
-
- override def getInitParameter(key : String) = Option(super.getInitParameter(key)).getOrElse(initParams.get(key))
+ /*
+ * Provide a fallback for default values
+ */
+ override def getInitParameter(key : String) =
+ Option(super.getInitParameter(key)).getOrElse(initParams get key)
+ /*
+ * Provide a fallback for default values
+ */
override def getInitParameterNames() = {
import scala.collection.JavaConversions._
initParams.keySet.iterator ++ super.getInitParameterNames
@@ -80,24 +78,24 @@ class AkkaServlet extends AtmosphereServlet with Logging {
override def loadConfiguration(sc: ServletConfig) {
config.setSupportSession(false)
isBroadcasterSpecified = true
+
+ //The bridge between Atmosphere and Jersey
+ val servlet = new AtmosphereRestServlet {
+ //These are needed to make sure that Jersey is reading the config from the outer servlet
+ override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
+ override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
+ }
+
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
}
- /**
- * This method is overridden because Akka Kernel is bundles with Grizzly, so if we deploy the Kernel in another container,
- * we need to handle that.
- */
- override def createCometSupportResolver() : CometSupportResolver = {
- import scala.collection.JavaConversions._
+ override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) {
+ import scala.collection.JavaConversions._
- new DefaultCometSupportResolver(config) {
- type CS = CometSupport[_ <: AtmosphereResource[_,_]]
+ lazy val desiredCometSupport =
+ Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport
- override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
- val predef = config.getInitParameter("cometSupport")
- if (testClassExists(predef)) newCometSupport(predef)
- else super.resolve(useNativeIfPossible, useBlockingAsDefault)
- }
- }
+ override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] =
+ desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault))
}
}
diff --git a/akka-http/src/main/scala/DefaultAkkaLoader.scala b/akka-http/src/main/scala/DefaultAkkaLoader.scala
new file mode 100644
index 0000000000..8fb7ed4e5b
--- /dev/null
+++ b/akka-http/src/main/scala/DefaultAkkaLoader.scala
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.http
+
+import se.scalablesolutions.akka.config.Config
+import se.scalablesolutions.akka.util.{Logging, Bootable}
+import se.scalablesolutions.akka.camel.CamelService
+import se.scalablesolutions.akka.remote.BootableRemoteActorService
+import se.scalablesolutions.akka.actor.BootableActorLoaderService
+import se.scalablesolutions.akka.servlet.AkkaLoader
+
+class DefaultAkkaLoader extends AkkaLoader {
+ def boot(): Unit = boot(true,
+ new EmbeddedAppServer with BootableActorLoaderService
+ with BootableRemoteActorService
+ with CamelService)
+}
+
+
+/**
+ * Can be used to boot Akka
+ *
+ * java -cp ... se.scalablesolutions.akka.http.Main
+ */
+object Main extends DefaultAkkaLoader {
+ def main(args: Array[String]) = boot
+}
\ No newline at end of file
diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-http/src/main/scala/EmbeddedAppServer.scala
similarity index 98%
rename from akka-kernel/src/main/scala/EmbeddedAppServer.scala
rename to akka-http/src/main/scala/EmbeddedAppServer.scala
index 9afcfbe572..580f3430db 100644
--- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala
+++ b/akka-http/src/main/scala/EmbeddedAppServer.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB
*/
-package se.scalablesolutions.akka.kernel
+package se.scalablesolutions.akka.http
import javax.ws.rs.core.UriBuilder
import javax.servlet.ServletConfig
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index 646ca34bcc..d31163eb65 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -4,11 +4,8 @@
package se.scalablesolutions.akka.kernel
-import se.scalablesolutions.akka.servlet.AkkaLoader
+import se.scalablesolutions.akka.http.{ EmbeddedAppServer, DefaultAkkaLoader }
import se.scalablesolutions.akka.remote.BootableRemoteActorService
-import se.scalablesolutions.akka.actor.BootableActorLoaderService
-import se.scalablesolutions.akka.camel.CamelService
-import se.scalablesolutions.akka.config.Config
object Main {
def main(args: Array[String]) = Kernel.boot
@@ -19,18 +16,10 @@ object Main {
*
* @author Jonas Bonér
*/
-object Kernel extends AkkaLoader {
- /**
- * Boots up the Kernel with default bootables
- */
- def boot(): Unit = boot(true,
- new EmbeddedAppServer with BootableActorLoaderService
- with BootableRemoteActorService
- with CamelService)
-
- //For testing purposes only
+object Kernel extends DefaultAkkaLoader {
+ //For testing purposes only
def startRemoteService(): Unit = bundles.foreach( _ match {
case x: BootableRemoteActorService => x.startRemoteService
case _ =>
})
-}
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index e75fd9581c..9d98095045 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.util.Logging
+import collection.mutable.ArraySeq
// FIXME move to 'stm' package + add message with more info
class NoTransactionInScopeException extends RuntimeException
@@ -47,26 +48,38 @@ trait Storage {
type ElementType
def newMap: PersistentMap[ElementType, ElementType]
+
def newVector: PersistentVector[ElementType]
+
def newRef: PersistentRef[ElementType]
+
def newQueue: PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
+
def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getMap(id: String): PersistentMap[ElementType, ElementType]
+
def getVector(id: String): PersistentVector[ElementType]
+
def getRef(id: String): PersistentRef[ElementType]
+
def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
+
def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newMap(id: String): PersistentMap[ElementType, ElementType]
+
def newVector(id: String): PersistentVector[ElementType]
+
def newRef(id: String): PersistentRef[ElementType]
+
def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
+
def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
}
@@ -90,7 +103,7 @@ private[akka] object PersistentMap {
* @author Jonas Bonér
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
- with Transactional with Committable with Abortable with Logging {
+ with Transactional with Committable with Abortable with Logging {
//Import Ops
import PersistentMap._
@@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
- appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
+ appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
@@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
// get the snapshot from the underlying store for this key
val underlying = try {
storage.getMapStorageEntryFor(uuid, key)
- } catch { case e: Exception => None }
+ } catch {case e: Exception => None}
if (txEntries.isEmpty) underlying
else txEntries.last match {
@@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case None => Map.empty[K, V]
case Some(v) => Map((key, v))
}
- txEntries.foreach {case LogEntry(k, v, o) => o match {
- case PUT => m.put(k.get, v.get)
- case REM => m -= k.get
- case UPD => m.update(k.get, v.get)
- case CLR => Map.empty[K, V]
- }}
+ txEntries.foreach {
+ case LogEntry(k, v, o) => o match {
+ case PUT => m.put(k.get, v.get)
+ case REM => m -= k.get
+ case UPD => m.update(k.get, v.get)
+ case CLR => Map.empty[K, V]
+ }
+ }
m get key
}
@@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
val storage: MapStorageBackend[K, V]
def commit = {
- appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
- case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
- case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
- case REM => storage.removeMapStorageFor(uuid, k.get)
- case CLR => storage.removeMapStorageFor(uuid)
- }}
+ appendOnlyTxLog.foreach {
+ case LogEntry(k, v, o) => o match {
+ case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
+ case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
+ case REM => storage.removeMapStorageFor(uuid, k.get)
+ case CLR => storage.removeMapStorageFor(uuid)
+ }
+ }
appendOnlyTxLog.clear
clearDistinctKeys
@@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
this
}
- override def +=(kv : (K,V)) = {
- put(kv._1,kv._2)
+ override def +=(kv: (K, V)) = {
+ put(kv._1, kv._2)
this
}
@@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
- val lastOp = txs.last.op
+ val lastOp = txs.last.op
lastOp != REM && lastOp != CLR // last entry cannot be a REM
- }
- } catch { case e: Exception => false }
+ }
+ } catch {case e: Exception => false}
protected def existsInStorage(key: K): Option[V] = try {
storage.getMapStorageEntryFor(uuid, key)
@@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def size: Int = try {
// partition key set affected in current tx into those which r added & which r deleted
- val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (kseq, k) => ((kseq, k), getCurrentValue(k))
}.partition(_._2.isDefined)
// keys which existed in storage but removed in current tx
- val inStorageRemovedInTx =
- keysRemoved.keySet
- .map(_._2)
- .filter(k => existsInStorage(k).isDefined)
- .size
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .map(_._2)
+ .filter(k => existsInStorage(k).isDefined)
+ .size
// all keys in storage
- val keysInStorage =
- storage.getMapStorageFor(uuid)
- .map { case (k, v) => toEquals(k) }
- .toSet
+ val keysInStorage =
+ storage.getMapStorageFor(uuid)
+ .map {case (k, v) => toEquals(k)}
+ .toSet
// (keys that existed UNION keys added ) - (keys removed)
(keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
- } catch {
- case e: Exception => 0
+ } catch {
+ case e: Exception => 0
}
// get must consider underlying storage & current uncommitted tx log
override def get(key: K): Option[V] = getCurrentValue(key)
- def iterator: Iterator[Tuple2[K, V]]
+ def iterator: Iterator[Tuple2[K, V]]
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
@@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
}
+object PersistentMapBinary {
+ object COrdering {
+ //frontend
+ implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
+ def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
+ ArrayOrdering.compare(o1.toArray, o2.toArray)
+ }
+ //backend
+ implicit object ArrayOrdering extends Ordering[Array[Byte]] {
+ def compare(o1: Array[Byte], o2: Array[Byte]) =
+ new String(o1) compare new String(o2)
+ }
+ }
+}
+
trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
import scala.collection.mutable.ArraySeq
type T = ArraySeq[Byte]
+
def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
+
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
- object COrdering {
- implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
- def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
- new String(o1.toArray) compare new String(o2.toArray)
- }
- }
+
import scala.collection.immutable.{TreeMap, SortedMap}
private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
- import COrdering._
+ import PersistentMapBinary.COrdering._
// need ArraySeq for ordering
- val fromStorage =
- TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
+ val fromStorage =
+ TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*)
- val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (_, k) => (k, getCurrentValue(k))
}.partition(_._2.isDefined)
- val inStorageRemovedInTx =
- keysRemoved.keySet
- .filter(k => existsInStorage(k).isDefined)
- .map(k => ArraySeq(k: _*))
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .filter(k => existsInStorage(k).isDefined)
+ .map(k => ArraySeq(k: _*))
- (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) }
+ (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)}
}
override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
@@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
val startKey =
- start match {
- case Some(bytes) => Some(ArraySeq(bytes: _*))
- case None => None
- }
+ start match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
val endKey =
- finish match {
- case Some(bytes) => Some(ArraySeq(bytes: _*))
- case None => None
- }
+ finish match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
((startKey, endKey, count): @unchecked) match {
case ((Some(s), Some(e), _)) =>
newMap.range(s, e)
- .toList
- .map(e => (e._1.toArray, e._2))
- .toList
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
case ((Some(s), None, c)) if c > 0 =>
newMap.from(s)
- .iterator
- .take(count)
- .map(e => (e._1.toArray, e._2))
- .toList
+ .iterator
+ .take(count)
+ .map(e => (e._1.toArray, e._2))
+ .toList
case ((Some(s), None, _)) =>
newMap.from(s)
- .toList
- .map(e => (e._1.toArray, e._2))
- .toList
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
case ((None, Some(e), _)) =>
newMap.until(e)
- .toList
- .map(e => (e._1.toArray, e._2))
- .toList
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
}
- } catch { case e: Exception => Nil }
+ } catch {case e: Exception => Nil}
- override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
+ override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
new Iterator[(Array[Byte], Array[Byte])] {
private var elements = replayAllKeys
+
override def next: (Array[Byte], Array[Byte]) = synchronized {
val (k, v) = elements.head
elements = elements.tail
(k.toArray, v)
}
- override def hasNext: Boolean = synchronized { !elements.isEmpty }
+
+ override def hasNext: Boolean = synchronized {!elements.isEmpty}
}
}
}
@@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
val storage: VectorStorageBackend[T]
def commit = {
- for(entry <- appendOnlyTxLog) {
+ for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
@@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
import scala.collection.mutable.ArrayBuffer
var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
- for(entry <- appendOnlyTxLog) {
+ for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(_, Some(v), ADD) => elemsStorage += v
case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
@@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
val curr = replay
val s = if (start.isDefined) start.get else 0
val cnt =
- if (finish.isDefined) {
- val f = finish.get
- if (f >= s) (f - s) else count
- }
- else count
+ if (finish.isDefined) {
+ val f = finish.get
+ if (f >= s) (f - s) else count
+ }
+ else count
if (s == 0 && cnt == 0) List().toIndexedSeq
else curr.slice(s, s + cnt).toIndexedSeq
}
@@ -519,12 +550,12 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
}
}
- private[akka] object PersistentQueue {
- //Operations for PersistentQueue
- sealed trait QueueOp
- case object ENQ extends QueueOp
- case object DEQ extends QueueOp
- }
+private[akka] object PersistentQueue {
+ //Operations for PersistentQueue
+ sealed trait QueueOp
+ case object ENQ extends QueueOp
+ case object DEQ extends QueueOp
+}
/**
* Implementation of PersistentQueue for every concrete
@@ -552,7 +583,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
* @author Debasish Ghosh
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
- with Transactional with Committable with Abortable with Logging {
+ with Transactional with Committable with Abortable with Logging {
//Import Ops
import PersistentQueue._
@@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
val storage: QueueStorageBackend[A]
def commit = {
- enqueuedNDequeuedEntries.toList.foreach { e =>
- e._2 match {
- case ENQ => storage.enqueue(uuid, e._1.get)
- case DEQ => storage.dequeue(uuid)
- }
+ enqueuedNDequeuedEntries.toList.foreach {
+ e =>
+ e._2 match {
+ case ENQ => storage.enqueue(uuid, e._1.get)
+ case DEQ => storage.dequeue(uuid)
+ }
}
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) {
storage.remove(uuid)
@@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
override def size: Int = try {
storage.size(uuid) + localQ.get.length
- } catch { case e: Exception => 0 }
+ } catch {case e: Exception => 0}
override def isEmpty: Boolean =
size == 0
@@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
enqueue(elem)
this
}
+
def ++=(elems: Iterator[A]) = {
enqueue(elems.toList: _*)
this
}
+
def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator
override def dequeueFirst(p: A => Boolean): Option[A] =
@@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
*
* zscore can be implemented in a variety of ways by the calling class:
*
- * trait ZScorable {
+ * trait ZScorable {
* def toZScore: Float
* }
*
- * class Foo extends ZScorable {
+ * class Foo extends ZScorable {
* //.. implemnetation
* }
*
* Or we can also use views:
*
- * class Foo {
+ * class Foo {
* //..
* }
*
- * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
- * def toZScore = {
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
+ * def toZScore = {
* //..
- * }
+ * }
* }
*
*
@@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* @author
*/
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
-
protected val newElems = TransactionalMap[A, Float]()
protected val removedElems = TransactionalVector[A]()
@@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
}
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
- case Some(s) => Some(s.toFloat)
- case None => None
+ case Some(s) => Some(s.toFloat)
+ case None => None
}
def contains(elem: A): Boolean = {
@@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
def compare(that: (A, Float)) = x._2 compare that._2
}
- implicit def ordering = new scala.math.Ordering[(A,Float)] {
- def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
+ implicit def ordering = new scala.math.Ordering[(A, Float)] {
+ def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2
}
@@ -773,9 +806,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
- if (end < 0) end + l
- else if (end >= l) (l - 1)
- else end
+ if (end < 0) end + l
+ else if (end >= l) (l - 1)
+ else end
// slice is open at the end, we need a closed end range
ts.iterator.slice(s, e + 1).toList
}
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
new file mode 100644
index 0000000000..395d0ef269
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.matchers.ShouldMatchers
+import se.scalablesolutions.akka.util.Logging
+import org.scalatest.{BeforeAndAfterEach, Spec}
+import scala.util.Random
+import collection.immutable.{TreeMap, HashMap, HashSet}
+import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
+
+
+/**
+ * Implementation Compatibility test for PersistentMap backend implementations.
+ */
+
+trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
+ def storage: MapStorageBackend[Array[Byte], Array[Byte]]
+
+ def dropMaps: Unit
+
+ override def beforeEach = {
+ log.info("beforeEach: dropping maps")
+ dropMaps
+ }
+
+ override def afterEach = {
+ log.info("afterEach: dropping maps")
+ dropMaps
+ }
+
+
+ describe("A Properly functioning MapStorageBackend") {
+ it("should remove map storage properly") {
+ val mapName = "removeTest"
+ val mkey = "removeTestKey".getBytes
+ val value = "removeTestValue".getBytes
+
+ storage.insertMapStorageEntryFor(mapName, mkey, value)
+ storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
+ storage.removeMapStorageFor(mapName, mkey)
+ storage.getMapStorageEntryFor(mapName, mkey) should be(None)
+
+ storage.insertMapStorageEntryFor(mapName, mkey, value)
+ storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
+ storage.removeMapStorageFor(mapName)
+ storage.getMapStorageEntryFor(mapName, mkey) should be(None)
+ }
+
+ it("should insert a single map storage element properly") {
+ val mapName = "insertSingleTest"
+ val mkey = "insertSingleTestKey".getBytes
+ val value = "insertSingleTestValue".getBytes
+
+ storage.insertMapStorageEntryFor(mapName, mkey, value)
+ storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
+ storage.removeMapStorageFor(mapName, mkey)
+ storage.getMapStorageEntryFor(mapName, mkey) should be(None)
+
+ storage.insertMapStorageEntryFor(mapName, mkey, value)
+ storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
+ storage.removeMapStorageFor(mapName)
+ storage.getMapStorageEntryFor(mapName, mkey) should be(None)
+ }
+
+
+ it("should insert multiple map storage elements properly") {
+ val mapName = "insertMultipleTest"
+ val rand = new Random(3).nextInt(100)
+ val entries = (1 to rand).toList.map {
+ index =>
+ (("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes)
+ }
+
+ storage.insertMapStorageEntriesFor(mapName, entries)
+ entries foreach {
+ _ match {
+ case (mkey, value) => {
+ storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
+ storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
+ }
+ }
+ }
+ storage.removeMapStorageFor(mapName)
+ entries foreach {
+ _ match {
+ case (mkey, value) => {
+ storage.getMapStorageEntryFor(mapName, mkey) should be(None)
+ }
+ }
+ }
+ }
+
+
+ it("should accurately track the number of key value pairs in a map") {
+ val mapName = "sizeTest"
+ val rand = new Random(3).nextInt(100)
+ val entries = (1 to rand).toList.map {
+ index =>
+ (("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes)
+ }
+
+ storage.insertMapStorageEntriesFor(mapName, entries)
+ storage.getMapStorageSizeFor(mapName) should be(rand)
+ }
+
+
+
+ it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") {
+ val mapName = "allTest"
+ val rand = new Random(3).nextInt(100)
+ var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering)
+ (1 to rand).foreach {
+ index =>
+ entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
+ }
+
+ storage.insertMapStorageEntriesFor(mapName, entries.toList)
+ val retrieved = storage.getMapStorageFor(mapName)
+ retrieved.size should be(rand)
+ entries.size should be(rand)
+
+
+
+ val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
+ val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
+
+ entryMap should equal(retrievedMap)
+
+ (0 until rand).foreach {
+ i: Int => {
+ new String(entries.toList(i)._1) should be(new String(retrieved(i)._1))
+ }
+ }
+
+ }
+
+ it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") {
+ //implement if this method will be used
+ }
+
+
+ it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") {
+ val mapName = "nullTest"
+ val key = "key".getBytes
+ storage.insertMapStorageEntryFor(mapName, key, null)
+ storage.getMapStorageEntryFor(mapName, key).get should be(null)
+ storage.removeMapStorageFor(mapName, key)
+ storage.getMapStorageEntryFor(mapName, key) should be(None)
+ }
+
+ it("should not throw an exception when size is called on a non existent map?") {
+ storage.getMapStorageSizeFor("nonExistent") should be(0)
+ }
+
+
+ }
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala
new file mode 100644
index 0000000000..3eb89e3db5
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.matchers.ShouldMatchers
+import se.scalablesolutions.akka.util.Logging
+import org.scalatest.{BeforeAndAfterEach, Spec}
+import scala.util.Random
+
+/**
+ * Implementation Compatibility test for PersistentQueue backend implementations.
+ */
+
+trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
+ def storage: QueueStorageBackend[Array[Byte]]
+
+ def dropQueues: Unit
+
+ override def beforeEach = {
+ log.info("beforeEach: dropping queues")
+ dropQueues
+ }
+
+ override def afterEach = {
+ log.info("afterEach: dropping queues")
+ dropQueues
+ }
+
+
+
+ describe("A Properly functioning QueueStorage Backend") {
+ it("should enqueue properly when there is capacity in the queue") {
+ val queue = "enqueueTest"
+ val value = "enqueueTestValue".getBytes
+ storage.size(queue) should be(0)
+ storage.enqueue(queue, value).get should be(1)
+ storage.size(queue) should be(1)
+ }
+
+ it("should return None when enqueue is called on a full queue?") {
+
+ }
+
+ it("should dequeue properly when the queue is not empty") {
+ val queue = "dequeueTest"
+ val value = "dequeueTestValue".getBytes
+ storage.size(queue) should be(0)
+ storage.enqueue(queue, value)
+ storage.size(queue) should be(1)
+ storage.dequeue(queue).get should be(value)
+ }
+
+ it("should return None when dequeue is called on an empty queue") {
+ val queue = "dequeueTest2"
+ val value = "dequeueTestValue2".getBytes
+ storage.size(queue) should be(0)
+ storage.dequeue(queue) should be(None)
+ }
+
+ it("should accurately reflect the size of the queue") {
+ val queue = "sizeTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (1 to rand).toList.map {i: Int => ("sizeTestValue" + i).getBytes}
+ values.foreach {storage.enqueue(queue, _)}
+ storage.size(queue) should be(rand)
+ val drand = new Random(3).nextInt(rand)
+ (1 to drand).foreach {
+ i: Int => {
+ storage.dequeue(queue).isDefined should be(true)
+ storage.size(queue) should be(rand - i)
+ }
+ }
+ }
+
+ it("should support peek properly") {
+ val queue = "sizeTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (1 to rand).toList.map {i: Int => ("peekTestValue" + i)}
+ storage.remove(queue)
+ values.foreach {s: String => storage.enqueue(queue, s.getBytes)}
+ (1 to rand).foreach {
+ index => {
+ val peek = storage.peek(queue, 0, index).map {new String(_)}
+ peek.size should be(index)
+ values.dropRight(values.size - index).equals(peek) should be(true)
+ }
+ }
+ (0 until rand).foreach {
+ index => {
+ val peek = storage.peek(queue, index, rand - index).map {new String(_)}
+ peek.size should be(rand - index)
+ values.drop(index).equals(peek) should be(true)
+ }
+ }
+
+ //Should we test counts greater than queue size? or greater than queue size - count???
+ }
+
+ it("should not throw an exception when remove is called on a non-existent queue") {
+ storage.remove("exceptionTest")
+ }
+
+ it("should remove queue storage properly") {
+ val queue = "removeTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (1 to rand).toList.map {i: Int => ("removeValue" + i).getBytes}
+ values.foreach {storage.enqueue(queue, _)}
+ storage.size(queue) should be(rand)
+ storage.remove(queue)
+ storage.size(queue) should be(0)
+ }
+
+ it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") {
+ val queue = "nullTest"
+ storage.enqueue(queue, null).get should be(1)
+ storage.dequeue(queue).get should be(null)
+ storage.dequeue(queue) should be(None)
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala
new file mode 100644
index 0000000000..37902cf7c9
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.matchers.ShouldMatchers
+import se.scalablesolutions.akka.util.Logging
+import org.scalatest.{BeforeAndAfterEach, Spec}
+
+/**
+ * Implementation Compatibility test for PersistentRef backend implementations.
+ */
+
+trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
+ def storage: RefStorageBackend[Array[Byte]]
+
+ def dropRefs: Unit
+
+ override def beforeEach = {
+ log.info("beforeEach: dropping refs")
+ dropRefs
+ }
+
+ override def afterEach = {
+ log.info("afterEach: dropping refs")
+ dropRefs
+ }
+
+
+ describe("A Properly functioning RefStorageBackend") {
+ it("should successfully insert ref storage") {
+ val name = "RefStorageTest #1"
+ val value = name.getBytes
+ storage.insertRefStorageFor(name, value)
+ storage.getRefStorageFor(name).get should be(value)
+ }
+
+ it("should return None when getRefStorage is called when no value has been inserted") {
+ val name = "RefStorageTest #2"
+ val value = name.getBytes
+ storage.getRefStorageFor(name) should be(None)
+ }
+
+ it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") {
+ val name = "RefStorageTest #3"
+ storage.insertRefStorageFor(name, null)
+ storage.getRefStorageFor(name) should be(None)
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala
new file mode 100644
index 0000000000..2a9d3ab324
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.matchers.ShouldMatchers
+import se.scalablesolutions.akka.util.Logging
+import org.scalatest.{BeforeAndAfterEach, Spec}
+
+/**
+ * Implementation Compatibility test for PersistentSortedSet backend implementations.
+ */
+
+trait SortedSetStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
+ def storage: SortedSetStorageBackend[Array[Byte]]
+
+ def dropSortedSets: Unit
+
+ override def beforeEach = {
+ log.info("beforeEach: dropping sorted sets")
+ dropSortedSets
+ }
+
+ override def afterEach = {
+ log.info("afterEach: dropping sorted sets")
+ dropSortedSets
+ }
+
+
+ describe("A Properly functioning SortedSetStorageBackend Backend") {
+
+ }
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
new file mode 100644
index 0000000000..14eba7d4e3
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
@@ -0,0 +1,362 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+import StorageObj._
+
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+
+object StorageObj {
+ var getMap: String => PersistentMap[Array[Byte], Array[Byte]] = _
+ var getVector: String => PersistentVector[Array[Byte]] = _
+
+ class SampleMapStorage extends Actor {
+ self.lifeCycle = Permanent
+ val FOO_MAP = "akka.sample.map"
+
+ private var fooMap = atomic {StorageObj.getMap(FOO_MAP)}
+
+ def receive = {
+ case SET(k, v) =>
+ atomic {
+ fooMap += (k.getBytes, v.getBytes)
+ }
+ self.reply((k, v))
+
+ case GET(k) =>
+ val v = atomic {
+ fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
+ }
+ self.reply(v)
+
+ case REM(k) =>
+ val v = atomic {
+ fooMap -= k.getBytes
+ }
+ self.reply(k)
+
+ case CONTAINS(k) =>
+ val v = atomic {
+ fooMap contains k.getBytes
+ }
+ self.reply(v)
+
+ case MAP_SIZE =>
+ val v = atomic {
+ fooMap.size
+ }
+ self.reply(v)
+
+ case MSET(kvs) => atomic {
+ kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes)}
+ }
+ self.reply(kvs.size)
+
+ case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
+ kvs2add.foreach {
+ kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+
+ ks2rem.foreach {
+ k =>
+ fooMap -= k.getBytes
+ }
+ }
+ self.reply(fooMap.size)
+
+ case CLEAR_AFTER_PUT(kvs2add) => atomic {
+ kvs2add.foreach {
+ kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.clear
+ }
+ self.reply(true)
+
+ case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {
+ kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+
+ case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {
+ kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ ks2rem.foreach {
+ k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+ }
+ }
+
+ class SampleVectorStorage extends Actor {
+ self.lifeCycle = Permanent
+ val FOO_VECTOR = "akka.sample.vector"
+
+ private var fooVector = atomic {StorageObj.getVector(FOO_VECTOR)}
+
+ def receive = {
+ case VADD(v) =>
+ val size =
+ atomic {
+ fooVector + v.getBytes
+ fooVector length
+ }
+ self.reply(size)
+
+ case VGET(index) =>
+ val ind =
+ atomic {
+ fooVector get index
+ }
+ self.reply(ind)
+
+ case VGET_AFTER_VADD(vs, is) =>
+ val els =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+ }
+ self.reply(els)
+
+ case VUPD_AND_ABORT(index, value) =>
+ val l =
+ atomic {
+ fooVector.update(index, value.getBytes)
+ // force fail
+ fooVector get 100
+ }
+ self.reply(index)
+
+ case VADD_WITH_SLICE(vs, s, c) =>
+ val l =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ fooVector.slice(Some(s), None, c)
+ }
+ self.reply(l.map(new String(_)))
+ }
+ }
+}
+
+
+
+trait Ticket343Test extends
+Spec with
+ ShouldMatchers with
+ BeforeAndAfterEach {
+ def getMap: String => PersistentMap[Array[Byte], Array[Byte]]
+
+ def getVector: String => PersistentVector[Array[Byte]]
+
+
+ def dropMapsAndVectors: Unit
+
+ override def beforeEach {
+ StorageObj.getMap = getMap
+ StorageObj.getVector = getVector
+ dropMapsAndVectors
+ println("** dropMapsAndVectors")
+ }
+
+ override def afterEach {
+ dropMapsAndVectors
+ println("** dropMapsAndVectors")
+ }
+
+ describe("Ticket 343 Issue #1") {
+ it("remove after put should work within the same transaction") {
+ val proc = actorOf[SampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ val rem = List("a", "debasish")
+ (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+ (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+ (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+ (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+ (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+ (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #2") {
+ it("clear after put should work within the same transaction") {
+ val proc = actorOf[SampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #3") {
+ it("map size should change after the transaction") {
+ val proc = actorOf[SampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+ proc.stop
+ }
+ }
+
+ describe("slice test") {
+ it("should pass") {
+ val proc = actorOf[SampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+ (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #4") {
+ it("vector get should not ignore elements that were in vector before transaction") {
+
+ val proc = actorOf[SampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
+ new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]]) should equal("ramanendu")
+ new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]]) should equal("maulindu")
+ new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]]) should equal("debasish")
+
+ // now add 3 more and do gets in the same transaction
+ (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #6") {
+ it("vector update should not ignore transaction") {
+ val proc = actorOf[SampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ evaluating {
+ (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+ } should produce[Exception]
+
+ // update aborts and hence values will remain unchanged
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #5") {
+ it("vector slice() should not ignore elements added in current transaction") {
+ val proc = actorOf[SampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ // slice with no new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+ // slice with new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+ proc.stop
+ }
+ }
+}
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
new file mode 100644
index 0000000000..e677f8fe66
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.matchers.ShouldMatchers
+import se.scalablesolutions.akka.util.Logging
+import org.scalatest.{BeforeAndAfterEach, Spec}
+import scala.util.Random
+
+/**
+ * Implementation Compatibility test for PersistentVector backend implementations.
+ */
+
+trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
+ def storage: VectorStorageBackend[Array[Byte]]
+
+ def dropVectors: Unit
+
+ override def beforeEach = {
+ log.info("beforeEach: dropping vectors")
+ dropVectors
+ }
+
+ override def afterEach = {
+ log.info("afterEach: dropping vectors")
+ dropVectors
+ }
+
+
+
+ describe("A Properly functioning VectorStorageBackend") {
+ it("should insertVectorStorageEntry as a logical prepend operation to the existing list") {
+ val vector = "insertSingleTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+ storage.getVectorStorageSizeFor(vector) should be(0)
+ values.foreach {s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes)}
+ val shouldRetrieve = values.reverse
+ (0 to rand).foreach {
+ i: Int => {
+ shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
+ }
+ }
+ }
+
+ it("should insertVectorStorageEntries as a logical prepend operation to the existing list") {
+ val vector = "insertMultiTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+ storage.getVectorStorageSizeFor(vector) should be(0)
+ storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+ val shouldRetrieve = values.reverse
+ (0 to rand).foreach {
+ i: Int => {
+ shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
+ }
+ }
+ }
+
+ it("should successfully update entries") {
+ val vector = "updateTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+ val urand = new Random(3).nextInt(rand)
+ storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+ val toUpdate = "updated" + values.reverse(urand)
+ storage.updateVectorStorageEntryFor(vector, urand, toUpdate.getBytes)
+ toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
+ }
+
+ it("should return the correct value from getVectorStorageFor") {
+ val vector = "getTest"
+ val rand = new Random(3).nextInt(100)
+ val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+ val urand = new Random(3).nextInt(rand)
+ storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+ values.reverse(urand) should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
+ }
+
+ it("should return the correct values from getVectorStorageRangeFor") {
+ val vector = "getTest"
+ val rand = new Random(3).nextInt(100)
+ val drand = new Random(3).nextInt(rand)
+ val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+ storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+ values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)})
+ (0 to drand).foreach {
+ i: Int => {
+ val value: String = vector + "value" + (rand - i)
+ log.debug(value)
+ List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)})
+ }
+ }
+ }
+
+ it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") {
+ //what is proper?
+ }
+
+ it("shoud return null when getStorageEntry is called on a null entry") {
+ //What is proper?
+ val vector = "nullTest"
+ storage.insertVectorStorageEntryFor(vector, null)
+ storage.getVectorStorageEntryFor(vector, 0) should be(null)
+ }
+
+ it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") {
+ val vector = "tooLargeRetrieve"
+ storage.insertVectorStorageEntryFor(vector, null)
+ evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException]
+ }
+
+ it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") {
+ val vector = "tooLargeUpdate"
+ storage.insertVectorStorageEntryFor(vector, null)
+ evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException]
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala
index 26210ba52f..930a3b25a7 100644
--- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala
+++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala
@@ -36,7 +36,7 @@ case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class HbaseSampleMapStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { HbaseStorage.getMap(FOO_MAP) }
@@ -119,7 +119,7 @@ object Storage {
}
class HbaseSampleVectorStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { HbaseStorage.getVector(FOO_VECTOR) }
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
index 413be5d860..a614fbc78d 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
@@ -36,7 +36,7 @@ case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class MongoSampleMapStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { MongoStorage.getMap(FOO_MAP) }
@@ -119,7 +119,7 @@ object Storage {
}
class MongoSampleVectorStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { MongoStorage.getVector(FOO_VECTOR) }
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala
index 1e760784c9..1bd2c34d86 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala
@@ -28,7 +28,7 @@ case class SETFOO(s: String)
object SampleStorage {
class RedisSampleStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val EVENT_MAP = "akka.sample.map"
private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) }
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
index 2b06b17270..f46aa9f224 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
@@ -41,7 +41,7 @@ case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class RedisSampleMapStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { RedisStorage.getMap(FOO_MAP) }
@@ -134,7 +134,7 @@ object Storage {
}
class RedisSampleVectorStorage extends Actor {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { RedisStorage.getVector(FOO_VECTOR) }
diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
index 4e237267a5..2a9c3c5717 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
@@ -15,14 +15,17 @@ object VoldemortStorage extends Storage {
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
+ override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
+ override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id)
+ override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id)
}
@@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = VoldemortStorageBackend
}
+
+class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
+ val uuid = id
+ val storage = VoldemortStorageBackend
+}
diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index 20b9804ed4..abc7855d9c 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -17,9 +17,10 @@ import voldemort.versioning.Versioned
import collection.JavaConversions
import java.nio.ByteBuffer
import collection.Map
-import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
import collection.mutable.{Set, HashSet, ArrayBuffer}
import java.util.{Properties, Map => JMap}
+import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
+import collection.immutable._
/*
RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores
@@ -49,28 +50,28 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
initStoreClients
+ val nullMapValueHeader = 0x00.byteValue
+ val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
+ val notNullMapValueHeader: Byte = 0xff.byteValue
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
val mapKeysIndex = getIndexedBytes(-1)
val vectorSizeIndex = getIndexedBytes(-1)
val queueHeadIndex = getIndexedBytes(-1)
val queueTailIndex = getIndexedBytes(-2)
-
-
- implicit val byteOrder = new Ordering[Array[Byte]] {
- override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
- }
+ //explicit implicit :)
+ implicit val ordering = ArrayOrdering
def getRefStorageFor(name: String): Option[Array[Byte]] = {
val result: Array[Byte] = refClient.getValue(name)
- result match {
- case null => None
- case _ => Some(result)
- }
+ Option(result)
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
- refClient.put(name, element)
+ element match {
+ case null => refClient.delete(name)
+ case _ => refClient.put(name, element)
+ }
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
@@ -90,17 +91,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
mapKey => getKey(name, mapKey)
}))
- val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size)
+ var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
JavaConversions.asMap(all).foreach {
(entry) => {
entry match {
- case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => {
- buf += key -> versioned.getValue
+ case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => {
+ returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue)
}
}
}
}
- buf.toList
+ returned.toList
}
def getMapStorageSizeFor(name: String): Int = {
@@ -112,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val result: Array[Byte] = mapClient.getValue(getKey(name, key))
result match {
case null => None
- case _ => Some(result)
+ case _ => Some(getMapValueFromStored(result))
}
}
@@ -134,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
- mapClient.put(getKey(name, key), value)
+ mapClient.put(getKey(name, key), getStoredMapValue(value))
var keys = getMapKeys(name)
keys += key
putMapKeys(name, keys)
@@ -143,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map {
case (key, value) => {
- mapClient.put(getKey(name, key), value)
+ mapClient.put(getKey(name, key), getStoredMapValue(value))
key
}
}
@@ -169,17 +170,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val size = getVectorStorageSizeFor(name)
val st = start.getOrElse(0)
- val cnt =
+ var cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= st) (f - st) else count
} else {
count
}
- val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
- index => getIndexedKey(name, index)
+ if (cnt > (size - st)) {
+ cnt = size - st
}
+
+ val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
+ index => getIndexedKey(name, (size - 1) - index)
+ } //read backwards
+
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
var storage = new ArrayBuffer[Array[Byte]](seq.size)
@@ -199,14 +205,23 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
- vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte])
+ val size = getVectorStorageSizeFor(name)
+ if (size > 0 && index < size) {
+ vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
+ } else {
+ throw new StorageException("In Vector:" + name + " No such Index:" + index)
+ }
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val size = getVectorStorageSizeFor(name)
- vectorClient.put(getIndexedKey(name, index), elem)
- if (size < index + 1) {
- vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
+ if (size > 0 && index < size) {
+ elem match {
+ case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
+ case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
+ }
+ } else {
+ throw new StorageException("In Vector:" + name + " No such Index:" + index)
}
}
@@ -214,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var size = getVectorStorageSizeFor(name)
elements.foreach {
element =>
- vectorClient.put(getIndexedKey(name, size), element)
+ if (element != null) {
+ vectorClient.put(getIndexedKey(name, size), element)
+ }
size += 1
}
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
@@ -263,7 +280,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
try {
queueClient.delete(key)
} catch {
- //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
+ //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue")
}
}
@@ -276,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val mdata = getQueueMetadata(name)
if (mdata.canEnqueue) {
val key = getIndexedKey(name, mdata.tail)
- queueClient.put(key, item)
+ item match {
+ case null => queueClient.delete(key)
+ case _ => queueClient.put(key, item)
+ }
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue))
Some(mdata.size + 1)
} else {
@@ -332,6 +352,39 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
IntSerializer.fromBytes(indexBytes)
}
+ def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = {
+ val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length
+ val mapkey = new Array[Byte](mapKeyLength)
+ System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength)
+ mapkey
+ }
+
+ //wrapper for null
+ def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
+ value match {
+ case null => nullMapValue
+ case value => {
+ val stored = new Array[Byte](value.length + 1)
+ stored(0) = notNullMapValueHeader
+ System.arraycopy(value, 0, stored, 1, value.length)
+ stored
+ }
+ }
+ }
+
+ def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
+
+ if (value(0) == nullMapValueHeader) {
+ null
+ } else if (value(0) == notNullMapValueHeader) {
+ val returned = new Array[Byte](value.length - 1)
+ System.arraycopy(value, 1, returned, 0, value.length - 1)
+ returned
+ } else {
+ throw new StorageException("unknown header byte on map value:" + value(0))
+ }
+ }
+
def getClientConfig(configMap: Map[String, String]): Properties = {
val properites = new Properties
@@ -450,6 +503,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
+ import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
+
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala
index ce87309fb9..d0f40f1a03 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala
@@ -1,20 +1,20 @@
package se.scalablesolutions.akka.persistence.voldemort
-import org.scalatest.matchers.ShouldMatchers
import voldemort.server.{VoldemortServer, VoldemortConfig}
-import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite}
+import org.scalatest.{Suite, BeforeAndAfterAll}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import voldemort.utils.Utils
import java.io.File
import se.scalablesolutions.akka.util.{Logging}
import collection.JavaConversions
import voldemort.store.memory.InMemoryStorageConfiguration
+import voldemort.client.protocol.admin.{AdminClientConfig, AdminClient}
+
-@RunWith(classOf[JUnitRunner])
trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
this: Suite =>
var server: VoldemortServer = null
+ var admin: AdminClient = null
override protected def beforeAll(): Unit = {
@@ -28,6 +28,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
server = new VoldemortServer(config)
server.start
VoldemortStorageBackend.initStoreClients
+ admin = new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig)
log.info("Started")
} catch {
case e => log.error(e, "Error Starting Voldemort")
@@ -36,6 +37,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
}
override protected def afterAll(): Unit = {
+ admin.stop
server.stop
}
}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala
deleted file mode 100644
index 76bb989ac9..0000000000
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package se.scalablesolutions.akka.persistence.voldemort
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
-import se.scalablesolutions.akka.actor.{newUuid,Uuid}
-import collection.immutable.TreeSet
-import VoldemortStorageBackendSuite._
-
-import se.scalablesolutions.akka.stm._
-import se.scalablesolutions.akka.stm.global._
-import se.scalablesolutions.akka.config.ScalaConfig._
-import se.scalablesolutions.akka.persistence.common._
-import se.scalablesolutions.akka.util.Logging
-import se.scalablesolutions.akka.config.Config.config
-
-@RunWith(classOf[JUnitRunner])
-class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
- test("persistentRefs work as expected") {
- val name = newUuid.toString
- val one = "one".getBytes
- atomic {
- val ref = VoldemortStorage.getRef(name)
- ref.isDefined should be(false)
- ref.swap(one)
- ref.get match {
- case Some(bytes) => bytes should be(one)
- case None => true should be(false)
- }
- }
- val two = "two".getBytes
- atomic {
- val ref = VoldemortStorage.getRef(name)
- ref.isDefined should be(true)
- ref.swap(two)
- ref.get match {
- case Some(bytes) => bytes should be(two)
- case None => true should be(false)
- }
- }
- }
-
-
- test("Persistent Vectors function as expected") {
- val name = newUuid.toString
- val one = "one".getBytes
- val two = "two".getBytes
- atomic {
- val vec = VoldemortStorage.getVector(name)
- vec.add(one)
- }
- atomic {
- val vec = VoldemortStorage.getVector(name)
- vec.size should be(1)
- vec.add(two)
- }
- atomic {
- val vec = VoldemortStorage.getVector(name)
-
- vec.get(0) should be(one)
- vec.get(1) should be(two)
- vec.size should be(2)
- vec.update(0, two)
- }
-
- atomic {
- val vec = VoldemortStorage.getVector(name)
- vec.get(0) should be(two)
- vec.get(1) should be(two)
- vec.size should be(2)
- vec.update(0, Array.empty[Byte])
- vec.update(1, Array.empty[Byte])
- }
-
- atomic {
- val vec = VoldemortStorage.getVector(name)
- vec.get(0) should be(Array.empty[Byte])
- vec.get(1) should be(Array.empty[Byte])
- vec.size should be(2)
- }
-
-
- }
-
-}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala
new file mode 100644
index 0000000000..b9b3ea4ed1
--- /dev/null
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala
@@ -0,0 +1,49 @@
+package se.scalablesolutions.akka.persistence.voldemort
+
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortRefStorageBackendTest extends RefStorageBackendTest with EmbeddedVoldemort {
+ def dropRefs = {
+ admin.truncate(0, VoldemortStorageBackend.refStore)
+ }
+
+
+ def storage = VoldemortStorageBackend
+}
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortMapStorageBackendTest extends MapStorageBackendTest with EmbeddedVoldemort {
+ def dropMaps = {
+ admin.truncate(0, VoldemortStorageBackend.mapStore)
+ }
+
+
+ def storage = VoldemortStorageBackend
+}
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with EmbeddedVoldemort {
+ def dropVectors = {
+ admin.truncate(0, VoldemortStorageBackend.vectorStore)
+ }
+
+
+ def storage = VoldemortStorageBackend
+}
+
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortQueueStorageBackendTest extends QueueStorageBackendTest with EmbeddedVoldemort {
+ def dropQueues = {
+ admin.truncate(0, VoldemortStorageBackend.queueStore)
+ }
+
+
+ def storage = VoldemortStorageBackend
+}
+
+
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
index 8ac3d306c4..b28ea90171 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
@@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
vectorClient.delete(getKey(key, vectorSizeIndex))
vectorClient.delete(getIndexedKey(key, 0))
vectorClient.delete(getIndexedKey(key, 1))
- getVectorStorageEntryFor(key, 0) should be(empty)
- getVectorStorageEntryFor(key, 1) should be(empty)
- getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
-
+
insertVectorStorageEntryFor(key, value)
//again
insertVectorStorageEntryFor(key, value)
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala
new file mode 100644
index 0000000000..b170f949cf
--- /dev/null
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala
@@ -0,0 +1,22 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.persistence.voldemort
+
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import se.scalablesolutions.akka.persistence.common._
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortTicket343Test extends Ticket343Test with EmbeddedVoldemort {
+ def dropMapsAndVectors: Unit = {
+ admin.truncate(0, VoldemortStorageBackend.mapStore)
+ admin.truncate(0, VoldemortStorageBackend.vectorStore)
+ }
+
+ def getVector: (String) => PersistentVector[Array[Byte]] = VoldemortStorage.getVector
+
+ def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = VoldemortStorage.getMap
+}
\ No newline at end of file
diff --git a/akka-remote/src/main/scala/remote/Cluster.scala b/akka-remote/src/main/scala/remote/Cluster.scala
index 6e1e99f0b2..c668228291 100644
--- a/akka-remote/src/main/scala/remote/Cluster.scala
+++ b/akka-remote/src/main/scala/remote/Cluster.scala
@@ -241,7 +241,7 @@ object Cluster extends Cluster with Logging {
Some(Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
- Supervise(actor, LifeCycle(Permanent)) :: Nil)))
+ Supervise(actor, Permanent) :: Nil)))
private[this] def clusterActor = if (clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor])
diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala
index c9dd582978..427c3ad721 100644
--- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala
+++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala
@@ -91,16 +91,10 @@ object ActorSerialization {
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
- def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
- case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
- case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
- }
- val builder = LifeCycleProtocol.newBuilder
actorRef.lifeCycle match {
- case Some(LifeCycle(scope)) =>
- setScope(builder, scope)
- Some(builder.build)
- case None => None
+ case Permanent => Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build)
+ case Temporary => Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.TEMPORARY).build)
+ case UndefinedLifeCycle => None//No need to send the undefined lifecycle over the wire //builder.setLifeCycle(LifeCycleType.UNDEFINED)
}
}
@@ -164,11 +158,12 @@ object ActorSerialization {
val lifeCycle =
if (protocol.hasLifeCycle) {
- val lifeCycleProtocol = protocol.getLifeCycle
- Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
- else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
- else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
- } else None
+ protocol.getLifeCycle.getLifeCycle match {
+ case LifeCycleType.PERMANENT => Permanent
+ case LifeCycleType.TEMPORARY => Temporary
+ case unknown => throw new IllegalActorStateException("LifeCycle type is not valid: " + unknown)
+ }
+ } else UndefinedLifeCycle
val supervisor =
if (protocol.hasSupervisor)
diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
index 936d1cf5c4..40f0d27640 100644
--- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
+++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
@@ -483,7 +483,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
factory.newInstance
@@ -499,7 +499,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
factory.newInstance
}
@@ -520,15 +520,15 @@ class RemoteSupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
factory.newInstance
}
@@ -551,15 +551,15 @@ class RemoteSupervisorSpec extends JUnitSuite {
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
factory.newInstance
}
@@ -580,17 +580,17 @@ class RemoteSupervisorSpec extends JUnitSuite {
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
- LifeCycle(Permanent))
+ Permanent)
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong2,
- LifeCycle(Permanent))
+ Permanent)
::
Supervise(
pingpong3,
- LifeCycle(Permanent))
+ Permanent)
:: Nil)
:: Nil))
factory.newInstance
diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
index 8b28b35f57..5a3a5bc2c4 100644
--- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala
@@ -55,13 +55,13 @@ class RemoteTypedActorSpec extends
new Component(
classOf[RemoteTypedActorOne],
classOf[RemoteTypedActorOneImpl],
- new LifeCycle(new Permanent),
+ new Permanent,
10000,
new RemoteAddress("localhost", 9995)),
new Component(
classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl],
- new LifeCycle(new Permanent),
+ new Permanent,
10000,
new RemoteAddress("localhost", 9995))
).toArray).supervise
diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala
index 90c445dd3c..dda4f0a6d6 100644
--- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala
+++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala
@@ -116,6 +116,8 @@ class SerializableTypeClassActorSpec extends
(actor2 !! "hello").getOrElse("_") should equal("world 3")
actor2.receiveTimeout should equal (Some(1000))
+ actor1.stop
+ actor2.stop
}
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
index 98c7c34b7e..461ba31d83 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
@@ -27,8 +27,8 @@ class Boot {
//val supervisor = Supervisor(
// SupervisorConfig(
// RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
- // Supervise(actorOf[Consumer1], LifeCycle(Permanent)) ::
- // Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
+ // Supervise(actorOf[Consumer1], Permanent) ::
+ // Supervise(actorOf[Consumer2], Permanent) :: Nil))
// -----------------------------------------------------------------------
// Custom Camel route example
diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
index 6f70d8071a..e5b60b364f 100644
--- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
+++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
@@ -97,7 +97,7 @@ trait ChatStorage extends Actor
* Redis-backed chat storage implementation.
*/
class RedisChatStorage extends ChatStorage {
- self.lifeCycle = Some(LifeCycle(Permanent))
+ self.lifeCycle = Permanent
val CHAT_LOG = "akka.chat.log"
private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
index d9b41cd136..4702eead02 100644
--- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
+++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
@@ -16,12 +16,12 @@ public class Boot {
new Component(
SimpleService.class,
SimpleServiceImpl.class,
- new LifeCycle(new Permanent()),
+ new Permanent(),
1000),
new Component(
PersistentSimpleService.class,
PersistentSimpleServiceImpl.class,
- new LifeCycle(new Permanent()),
+ new Permanent(),
1000)
}).supervise();
}
diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
index c3b71a3fdf..fb8bd7c381 100644
--- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala
@@ -28,13 +28,13 @@ class Boot {
RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])),
Supervise(
actorOf[SimpleServiceActor],
- LifeCycle(Permanent)) ::
+ Permanent) ::
Supervise(
actorOf[ChatActor],
- LifeCycle(Permanent)) ::
+ Permanent) ::
Supervise(
actorOf[PersistentSimpleServiceActor],
- LifeCycle(Permanent))
+ Permanent)
:: Nil))
factory.newInstance.start
}
diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala
index 02af6174c6..3f2b76a359 100644
--- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala
+++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala
@@ -20,18 +20,18 @@ class Boot {
// see akka.conf to enable one of these for the AkkaSecurityFilterFactory
Supervise(
actorOf[BasicAuthenticationService],
- LifeCycle(Permanent)) ::
+ Permanent) ::
/**
Supervise(
actorOf[DigestAuthenticationService],
- LifeCycle(Permanent)) ::
+ Permanent) ::
Supervise(
actorOf[SpnegoAuthenticationService],
- LifeCycle(Permanent)) ::
+ Permanent) ::
**/
Supervise(
actorOf[SecureTickActor],
- LifeCycle(Permanent)):: Nil))
+ Permanent):: Nil))
val supervisor = factory.newInstance
supervisor.start
diff --git a/akka-spring/src/main/scala/SupervisionFactoryBean.scala b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
index 657a40c90a..c6d1e7ddc0 100644
--- a/akka-spring/src/main/scala/SupervisionFactoryBean.scala
+++ b/akka-spring/src/main/scala/SupervisionFactoryBean.scala
@@ -56,7 +56,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
*/
private[akka] def createComponent(props: ActorProperties): Component = {
import StringReflect._
- val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
+ val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent()
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val withInterface = (props.interface ne null) && (!props.interface.isEmpty)
if (isRemote) {
@@ -81,7 +81,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
*/
private[akka] def createSupervise(props: ActorProperties): Server = {
import StringReflect._
- val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new LifeCycle(new Temporary()) else new LifeCycle(new Permanent())
+ val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent()
val isRemote = (props.host ne null) && (!props.host.isEmpty)
val actorRef = Actor.actorOf(props.target.toClass)
if (props.timeout > 0) {
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala
index 1769a5c47b..ea5db11531 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala
@@ -33,13 +33,13 @@ class RestartNestedTransactionalTypedActorSpec extends
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
List(
new Component(classOf[TransactionalTypedActor],
- new LifeCycle(new Permanent),
+ new Permanent,
10000),
new Component(classOf[NestedTransactionalTypedActor],
- new LifeCycle(new Permanent),
+ new Permanent,
10000),
new Component(classOf[TypedActorFailer],
- new LifeCycle(new Permanent),
+ new Permanent,
10000)
).toArray).supervise
*/
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala
index 56b1e6ec5b..8f80fbcd1b 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala
@@ -33,11 +33,11 @@ class RestartTransactionalTypedActorSpec extends
List(
new Component(
classOf[TransactionalTypedActor],
- new LifeCycle(new Temporary),
+ new Temporary,
10000),
new Component(
classOf[TypedActorFailer],
- new LifeCycle(new Temporary),
+ new Temporary,
10000)
).toArray).supervise
}
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala
index d076ec52cf..814cd299d9 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala
@@ -41,13 +41,13 @@ class TypedActorGuiceConfiguratorSpec extends
new Component(
classOf[Foo],
classOf[FooImpl],
- new LifeCycle(new Permanent),
+ new Permanent,
1000,
dispatcher),
new Component(
classOf[Bar],
classOf[BarImpl],
- new LifeCycle(new Permanent),
+ new Permanent,
1000,
dispatcher)
).toArray).inject.supervise
diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
index 052f4cc7de..781537d5a9 100644
--- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
+++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala
@@ -22,8 +22,8 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
override protected def beforeAll() = {
val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
- val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Permanent()), 1000)
- val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Temporary()), 1000)
+ val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Permanent(), 1000)
+ val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Temporary(), 1000)
conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
conf2 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise
}
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 4c6859af46..7f42ea0531 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -605,7 +605,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val dbcp = Dependencies.dbcp
val sjson = Dependencies.sjson_test
- override def testOptions = createTestFilter( _.endsWith("Suite"))
+ override def testOptions = createTestFilter({ s:String=> s.endsWith("Suite") || s.endsWith("Test")})
}
class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {