Merge branch 'master' into workstealing

This commit is contained in:
Jan Van Besien 2010-03-18 10:14:01 +01:00
commit eff1cd3f58
18 changed files with 561 additions and 135 deletions

View file

@ -17,41 +17,42 @@ import scala.collection.immutable.{Map, HashMap}
* @author Viktor Klang
*/
trait Cluster {
/**
* Specifies the cluster name
*/
* Specifies the cluster name
*/
def name: String
/**
* Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
* Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit
/**
* Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
* Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit
/**
* Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
* Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
/**
* Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at
* The order of application is undefined and may vary
*/
* Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at
* The order of application is undefined and may vary
*/
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
/**
* Applies the specified function to all known remote addresses on al other nodes in the cluster
* The order of application is undefined and may vary
*/
* Applies the specified function to all known remote addresses on al other nodes in the cluster
* The order of application is undefined and may vary
*/
def foreach(f: (RemoteAddress) => Unit): Unit
}
@ -159,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor {
case RegisterLocalNode(s) => {
log debug ("RegisterLocalNode: %s", s)
local = Node(local.endpoints + s)
local = Node(s :: local.endpoints)
broadcast(Papers(local.endpoints))
}
@ -242,7 +243,7 @@ object Cluster extends Cluster with Logging {
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
serializer setClassLoader loader
serializer.classLoader = Some(loader)
try {
name map {
fqn =>

View file

@ -18,19 +18,17 @@ object RemoteProtocolBuilder {
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(classLoader: ClassLoader) = {
SERIALIZER_JAVA = new Serializer.Java
SERIALIZER_JAVA_JSON = new Serializer.JavaJSON
SERIALIZER_SCALA_JSON = new Serializer.ScalaJSON
SERIALIZER_JAVA.setClassLoader(classLoader)
SERIALIZER_JAVA_JSON.setClassLoader(classLoader)
SERIALIZER_SCALA_JSON.setClassLoader(classLoader)
def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
}
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
val renderer = Class.forName(
new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]

View file

@ -18,13 +18,13 @@ import sjson.json.{Serializer => SJSONSerializer}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializer {
def deepClone(obj: AnyRef): AnyRef
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
protected var classLoader: Option[ClassLoader] = None
var classLoader: Option[ClassLoader] = None
def setClassLoader(cl: ClassLoader) = classLoader = Some(cl)
def deepClone(obj: AnyRef): AnyRef
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
// For Java API
@ -55,7 +55,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Java extends Java
class Java extends Serializer {
trait Java extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = {
@ -67,8 +67,9 @@ object Serializer {
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
obj
@ -79,18 +80,21 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Protobuf extends Protobuf
class Protobuf extends Serializer {
trait Protobuf extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
// TODO: should we cache this method lookup?
val message = clazz.get.getDeclaredMethod("getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message]
val message = clazz.get.getDeclaredMethod(
"getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message]
message.toBuilder().mergeFrom(bytes).build
}
@ -104,7 +108,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JavaJSON extends JavaJSON
class JavaJSON extends Serializer {
trait JavaJSON extends Serializer {
private val mapper = new ObjectMapper
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
@ -118,9 +122,11 @@ object Serializer {
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
if (!clazz.isDefined) throw new IllegalArgumentException(
"Can't deserialize JSON to instance if no class is provided")
val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close
obj
@ -136,7 +142,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ScalaJSON extends ScalaJSON
class ScalaJSON extends Serializer {
trait ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
@ -158,7 +164,7 @@ object Serializer {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object SBinary extends SBinary
class SBinary {
trait SBinary {
import sbinary.DefaultProtocol._
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)

View file

@ -53,6 +53,17 @@ trait Committable {
}
/**
* Alias to TransactionalRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Ref {
def apply[T]() = new Ref[T]
}
/**
* Alias to Ref.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionalRef {
@ -65,8 +76,17 @@ object TransactionalRef {
def apply[T]() = new TransactionalRef[T]
}
/**
* Implements a transactional managed reference.
* Alias to TransactionalRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Ref[T] extends TransactionalRef[T]
/**
* Implements a transactional managed reference.
* Alias to Ref.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -18,19 +18,17 @@ object Patterns {
/**
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
*/
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter(
{case a if a.isInstanceOf[A] => interceptor(a)},
interceptee
)
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator
def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
val seq = actors
}
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher {
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor =
new Actor with Dispatcher {
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
}
@ -38,36 +36,29 @@ object Patterns {
def routes = routing
}
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor(
{case _ => actorToLog},
logger
)
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor =
dispatcherActor({case _ => actorToLog}, logger)
}
trait Dispatcher {
self: Actor =>
trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, Actor]
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) => {
if (self.sender.isDefined)
routes(a) forward transform(a)
else
routes(a) send transform(a)
}
case a if routes.isDefinedAt(a) =>
if (self.sender.isDefined) routes(a) forward transform(a)
else routes(a) send transform(a)
}
def receive = dispatch
}
trait LoadBalancer extends Dispatcher {
self: Actor =>
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[Actor]
protected def routes = {case x if seq.hasNext => seq.next}
protected def routes = { case x if seq.hasNext => seq.next }
}
trait InfiniteIterator[T] extends Iterator[T]

View file

@ -51,18 +51,24 @@ trait Storage {
def newRef: PersistentRef[ElementType]
def newQueue: PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getMap(id: String): PersistentMap[ElementType, ElementType]
def getVector(id: String): PersistentVector[ElementType]
def getRef(id: String): PersistentRef[ElementType]
def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newMap(id: String): PersistentMap[ElementType, ElementType]
def newVector(id: String): PersistentVector[ElementType]
def newRef(id: String): PersistentRef[ElementType]
def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
}
/**
@ -302,7 +308,7 @@ trait PersistentRef[T] extends Transactional with Committable {
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Logging {
abstract case class QueueOp
sealed trait QueueOp
case object ENQ extends QueueOp
case object DEQ extends QueueOp
@ -398,3 +404,119 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
transaction.get.get.register(uuid, this)
}
}
/**
* Implements a template for a concrete persistent transactional sorted set based storage.
* <p/>
* Sorting is done based on a <i>zscore</i>. But the computation of zscore has been kept
* outside the abstraction.
* <p/>
* zscore can be implemented in a variety of ways by the calling class:
* <pre>
* trait ZScorable {
* def toZScore: Float
* }
*
* class Foo extends ZScorable {
* //.. implemnetation
* }
* </pre>
* Or we can also use views:
* <pre>
* class Foo {
* //..
* }
*
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* //..
* }
* }
* </pre>
*
* and use <tt>foo.toZScore</tt> to compute the zscore and pass to the APIs.
*
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A]
extends Transactional
with Committable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
val storage: SortedSetStorageBackend[A]
def commit = {
for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element)
for (element <- removedElems) storage.zrem(uuid, element)
newElems.clear
removedElems.clear
}
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {
register
newElems.put(elem, score)
}
def -(elem: A) = remove(elem)
def remove(elem: A) = {
register
removedElems.add(elem)
}
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
case Some(s) => Some(s.toFloat)
case None => None
}
def contains(elem: A): Boolean = {
if (newElems contains elem) true
else {
inStorage(elem) match {
case Some(f) => true
case None => false
}
}
}
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
def zscore(elem: A): Float = {
if (newElems contains elem) newElems.get(elem).get
inStorage(elem) match {
case Some(f) => f
case None =>
throw new Predef.NoSuchElementException(elem + " not present")
}
}
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
def compare(that: (A, Float)) = x._2 compare that._2
}
def zrange(start: Int, end: Int): List[(A, Float)] = {
// need to operate on the whole range
// get all from the underlying storage
val fromStore = storage.zrangeWithScore(uuid, 0, -1)
val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList
val l = ts.size
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
if (end < 0) end + l
else if (end >= l) (l - 1)
else end
// slice is open at the end, we need a closed end range
ts.elements.slice(s, e + 1).toList
}
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
transaction.get.get.register(uuid, this)
}
}

View file

@ -69,11 +69,15 @@ trait SortedSetStorageBackend[T] extends StorageBackend {
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
// cardinality of the set idnetified by name
// cardinality of the set identified by name
def zcard(name: String): Int
def zscore(name: String, item: T): String
// zscore of the item from sorted set identified by name
def zscore(name: String, item: T): Option[Float]
// zrange from the sorted set identified by name
def zrange(name: String, start: Int, end: Int): List[T]
}
// zrange with score from the sorted set identified by name
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
}

View file

@ -15,16 +15,20 @@ object RedisStorage extends Storage {
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
override def newQueue: PersistentQueue[ElementType] = newQueue(UUID.newUuid.toString)
override def newSortedSet: PersistentSortedSet[ElementType] = newSortedSet(UUID.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
override def getSortedSet(id: String): PersistentSortedSet[ElementType] = newSortedSet(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
override def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id)
override def newSortedSet(id: String): PersistentSortedSet[ElementType] =
new RedisPersistentSortedSet(id)
}
/**
@ -63,3 +67,14 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}
/**
* Implements a persistent transactional sorted set based on the Redis
* storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}

View file

@ -364,11 +364,10 @@ private [akka] object RedisStorageBackend extends
}
}
def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
case None =>
throw new Predef.NoSuchElementException(new String(item) + " not present")
case Some(s) => s
case Some(s) => Some(s.toFloat)
case None => None
}
}
@ -380,6 +379,16 @@ private [akka] object RedisStorageBackend extends
s.map(_.get.getBytes)
}
}
def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
db.zrangeWithScore(
new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) =>
l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) }
}
}
def flushDB = withErrorHandling(db.flushDb)

View file

@ -87,7 +87,7 @@ class AccountActor extends Transactor {
}
@serializable class PersistentFailerActor extends Transactor {
//timeout = 5000
// timeout = 5000
def receive = {
case "Failure" =>
throw new RuntimeException("expected")

View file

@ -0,0 +1,237 @@
package se.scalablesolutions.akka.persistence.redis
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, Transactor}
/**
* A persistent actor based on Redis sortedset storage.
* <p/>
* Needs a running Redis server.
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait ZScorable {
def zscore: Float
}
case class Hacker(name: String, birth: String) extends ZScorable {
def zscore = birth.toFloat
}
class SetThresholdViolationException extends RuntimeException
// add hacker to the set
case class ADD(h: Hacker)
// remove hacker from set
case class REMOVE(h: Hacker)
// size of the set
case object SIZE
// zscore of the hacker
case class SCORE(h: Hacker)
// zrange
case class RANGE(start: Int, end: Int)
// add and remove subject to the condition that there will be at least 3 hackers
case class MULTI(add: List[Hacker], rem: List[Hacker], failer: Actor)
class SortedSetActor extends Transactor {
timeout = 100000
private lazy val hackers = RedisStorage.newSortedSet
def receive = {
case ADD(h) =>
hackers.+(h.name.getBytes, h.zscore)
reply(true)
case REMOVE(h) =>
hackers.-(h.name.getBytes)
reply(true)
case SIZE =>
reply(hackers.size)
case SCORE(h) =>
reply(hackers.zscore(h.name.getBytes))
case RANGE(s, e) =>
reply(hackers.zrange(s, e))
case MULTI(a, r, failer) =>
a.foreach{ h: Hacker =>
hackers.+(h.name.getBytes, h.zscore)
}
try {
r.foreach{ h =>
if (hackers.size <= 3)
throw new SetThresholdViolationException
hackers.-(h.name.getBytes)
}
} catch {
case e: Exception =>
failer !! "Failure"
}
reply((a.size, r.size))
}
}
import RedisStorageBackend._
@RunWith(classOf[JUnitRunner])
class RedisPersistentSortedSetSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
override def beforeAll {
flushDB
println("** destroyed database")
}
override def afterAll {
flushDB
println("** destroyed database")
}
val h1 = Hacker("Alan kay", "1940")
val h2 = Hacker("Richard Stallman", "1953")
val h3 = Hacker("Yukihiro Matsumoto", "1965")
val h4 = Hacker("Claude Shannon", "1916")
val h5 = Hacker("Linus Torvalds", "1969")
val h6 = Hacker("Alan Turing", "1912")
describe("Add and report cardinality of the set") {
val qa = new SortedSetActor
qa.start
it("should enter 6 hackers") {
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
}
it("should fetch correct scores for hackers") {
(qa !! SCORE(h1)).get.asInstanceOf[Float] should equal(1940.0f)
(qa !! SCORE(h5)).get.asInstanceOf[Float] should equal(1969.0f)
(qa !! SCORE(h6)).get.asInstanceOf[Float] should equal(1912.0f)
}
it("should fetch proper range") {
(qa !! RANGE(0, 4)).get.asInstanceOf[List[_]].size should equal(5)
(qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
}
it("should remove and throw exception for removing non-existent hackers") {
qa !! REMOVE(h2)
(qa !! SIZE).get.asInstanceOf[Int] should equal(5)
qa !! REMOVE(h3)
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
val h7 = Hacker("Paul Snively", "1952")
try {
qa !! REMOVE(h7)
}
catch {
case e: Predef.NoSuchElementException =>
e.getMessage should endWith("not present")
}
}
it("should change score for entering the same hacker name with diff score") {
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
// same name as h6
val h7 = Hacker("Alan Turing", "1992")
qa !! ADD(h7)
// size remains same
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
// score updated
(qa !! SCORE(h7)).get.asInstanceOf[Float] should equal(1992.0f)
}
}
describe("Transaction semantics") {
it("should rollback on exception") {
val qa = new SortedSetActor
qa.start
val failer = new PersistentFailerActor
failer.start
(qa !! SIZE).get.asInstanceOf[Int] should equal(0)
val add = List(h1, h2, h3, h4)
val rem = List(h2)
(qa !! MULTI(add, rem, failer)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1))
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
// size == 3
// add 2 more
val add1 = List(h5, h6)
// remove 3
val rem1 = List(h1, h3, h4)
try {
qa !! MULTI(add1, rem1, failer)
} catch { case e: Exception => {}
}
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
}
}
describe("zrange") {
it ("should report proper range") {
val qa = new SortedSetActor
qa.start
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
val l = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
l.map { case (e, s) => (new String(e), s) }.head should equal(("Alan Turing", 1912.0f))
val h7 = Hacker("Alan Turing", "1992")
qa !! ADD(h7)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
val m = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
m.map { case (e, s) => (new String(e), s) }.head should equal(("Claude Shannon", 1916.0f))
}
it ("should report proper rge") {
val qa = new SortedSetActor
qa.start
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
(qa !! RANGE(0, 5)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, 3)).get.asInstanceOf[List[_]].size should equal(4)
(qa !! RANGE(0, 1)).get.asInstanceOf[List[_]].size should equal(2)
(qa !! RANGE(0, 0)).get.asInstanceOf[List[_]].size should equal(1)
(qa !! RANGE(3, 1)).get.asInstanceOf[List[_]].size should equal(0)
(qa !! RANGE(0, -1)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, -2)).get.asInstanceOf[List[_]].size should equal(5)
(qa !! RANGE(0, -4)).get.asInstanceOf[List[_]].size should equal(3)
(qa !! RANGE(-4, -1)).get.asInstanceOf[List[_]].size should equal(4)
}
}
}

View file

@ -191,10 +191,10 @@ class RedisStorageBackendSpec extends
zcard("hackers") should equal(6)
zscore("hackers", "alan turing".getBytes) should equal("1912")
zscore("hackers", "richard stallman".getBytes) should equal("1953")
zscore("hackers", "claude shannon".getBytes) should equal("1916")
zscore("hackers", "linus torvalds".getBytes) should equal("1969")
zscore("hackers", "alan turing".getBytes).get should equal(1912.0f)
zscore("hackers", "richard stallman".getBytes).get should equal(1953.0f)
zscore("hackers", "claude shannon".getBytes).get should equal(1916.0f)
zscore("hackers", "linus torvalds".getBytes).get should equal(1969.0f)
val s: List[Array[Byte]] = zrange("hackers", 0, 2)
s.size should equal(3)
@ -206,6 +206,10 @@ class RedisStorageBackendSpec extends
val t: List[Array[Byte]] = zrange("hackers", 0, -1)
t.size should equal(6)
t.map(new String(_)) should equal(sorted)
val u: List[(Array[Byte], Float)] = zrangeWithScore("hackers", 0, -1)
u.size should equal(6)
u.map{ case (e, s) => new String(e) } should equal(sorted)
}
}
}

View file

@ -10,19 +10,22 @@ For details on how to set up Redis server have a look at http://code.google.com/
Then to run the sample:
1. Set AKKA_HOME environment variable to the root of the Akka distribution.
2. Open up a shell and step into the Akka distribution root folder.
3. Build Akka by invoking mvn install -Dmaven.test.skip=true. This will also build the sample application and deploy it to the $AKKA_HOME/deploy directory.
4. Run the microkernel
export AKKA_HOME=...
cd $AKKA_HOME
java -jar ./dist/akka-0.6.jar
5. Now start up a new shell and go down into the ./akka-samples/akka-sample-chat directory.
6. Invoke mvn scala:console -o. This will give you a Scala REPL (interpreter) with the chat application and all its dependency JARs on the classpath.
7. Simply paste in the whole code block with the Runner object above and invoke Runner.run. This runs a simulated client session that will connect to the running server in the microkernel.
8. Invoke Runner.run again and again…
1. Install the Redis network storage. Download it from [http://code.google.com/p/redis/].
2. Open up a shell and start up an instance of Redis.
3. Fire up two shells. For each of them:
- Step down into to the root of the Akka distribution.
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
4. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> ChatService.start
5. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> Runner.run
6. See the chat simulation run.
7. Run it again to see full speed after first initialization.
Now you could test client reconnect by killing the running microkernel and start it up again. See the client reconnect take place in the REPL shell.
Now you could test client reconnect by killing the console running the ChatService and start it up again. See the client reconnect take place in the REPL shell.
Thats it. Have fun.

View file

@ -1,30 +1,48 @@
/**ChatStorage
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>.
*/
package se.scalablesolutions.akka.sample.chat
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor}
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.OneForOneStrategy
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.util.Logging
/******************************************************************************
To run the sample:
1. Run 'mvn install' (builds and deploys jar to AKKA_HOME/deploy)
2. In another shell run 'java -jar ./dist/akka-0.6.jar' to start up Akka microkernel
3. In the first shell run 'mvn scala:console -o'
4. In the REPL you get execute:
Akka Chat Client/Server Sample Application
First we need to download, build and start up Redis:
1. Download Redis from http://code.google.com/p/redis/downloads/list.
2. Step into the distribution.
3. Build: make install.
4. Run: ./redis-server.
For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
Then to run the sample:
1. Fire up two shells. For each of them:
- Step down into to the root of the Akka distribution.
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> ChatService.start
3. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> Runner.run
5. See the chat simulation run
6. Run it again to see full speed after first initialization
4. See the chat simulation run.
5. Run it again to see full speed after first initialization.
Thats it. Have fun.
******************************************************************************/
/**
@ -42,10 +60,12 @@ case class ChatMessage(from: String, message: String) extends Event
*/
class ChatClient(val name: String) {
import Actor.Sender.Self
def login = ChatService ! Login(name)
def logout = ChatService ! Logout(name)
def post(message: String) = ChatService ! ChatMessage(name, name + ": " + message)
def chatLog: ChatLog = (ChatService !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
val chat = RemoteClient.actorFor("chat:service", "localhost", 9999)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def chatLog: ChatLog = (chat !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
}
/**
@ -77,8 +97,9 @@ trait ChatStorage extends Actor
*/
class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent))
private var chatLog = atomic { RedisStorage.getVector("akka.chat.log") }
val CHAT_LOG = "akka.chat.log"
private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
log.info("Redis-based chat storage is starting up...")
@ -96,7 +117,7 @@ class RedisChatStorage extends ChatStorage {
reply(ChatLog(messageList))
}
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector("akka.chat.log")
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
}
/**
@ -182,16 +203,19 @@ object ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
RedisChatStorageFactory
RedisChatStorageFactory {
override def start: Actor = {
super.start
RemoteNode.start("localhost", 9999)
RemoteNode.register("chat:service", this)
this
}
}
/**
* Test runner emulating a chat session.
*/
object Runner {
// create a handle to the remote ChatService
ChatService.makeRemote("localhost", 9999)
ChatService.start
def run = {
val client = new ChatClient("jonas")
@ -205,4 +229,4 @@ object Runner {
client.logout
}
}
}

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redis</groupId>
<artifactId>redisclient</artifactId>
<version>1.1</version>
<packaging>jar</packaging>
</project>

View file

@ -127,11 +127,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
// publishing
override def managedStyle = ManagedStyle.Maven
val publishTo = Resolver.file("maven-local", new java.io.File((Path.userHome / ".m2" / "repository").toString))
val publishTo = Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile)
// Credentials(Path.userHome / ".akka_publish_credentials", log)
val sourceArtifact = Artifact(artifactID, "src", "jar", Some("sources"), Nil, None)
// val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None)
//val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None)
override def packageDocsJar = defaultJarPath("-javadoc.jar")
override def packageSrcJar= defaultJarPath("-sources.jar")
@ -240,7 +240,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaRedisProject(info: ProjectInfo) extends DefaultProject(info) {
val redis = "com.redis" % "redisclient" % "1.1" % "compile"
val redis = "com.redis" % "redisclient" % "1.2-SNAPSHOT" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying")
}