merged with upstream
This commit is contained in:
commit
c3fef4e1bc
8 changed files with 44 additions and 42 deletions
|
|
@ -653,9 +653,9 @@ trait Actor extends TransactionManagement {
|
||||||
* To be invoked from within the actor itself.
|
* To be invoked from within the actor itself.
|
||||||
*/
|
*/
|
||||||
protected[this] def link(actor: Actor) = {
|
protected[this] def link(actor: Actor) = {
|
||||||
getLinkedActors.add(actor)
|
|
||||||
if (actor._supervisor.isDefined) throw new IllegalStateException(
|
if (actor._supervisor.isDefined) throw new IllegalStateException(
|
||||||
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
||||||
|
getLinkedActors.add(actor)
|
||||||
actor._supervisor = Some(this)
|
actor._supervisor = Some(this)
|
||||||
Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this)
|
Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,22 +62,15 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
|
||||||
def dispatch(invocation: MessageInvocation) = if (active) {
|
def dispatch(invocation: MessageInvocation) = if (active) {
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
def run = {
|
def run = {
|
||||||
var messageInvocation = invocation.receiver._mailbox.poll
|
invocation.receiver.synchronized {
|
||||||
while (messageInvocation != null) {
|
var messageInvocation = invocation.receiver._mailbox.poll
|
||||||
messageInvocation.invoke
|
while (messageInvocation != null) {
|
||||||
messageInvocation = invocation.receiver._mailbox.poll
|
messageInvocation.invoke
|
||||||
}
|
messageInvocation = invocation.receiver._mailbox.poll
|
||||||
}
|
|
||||||
/* invocation.receiver.synchronized {
|
|
||||||
val messages = invocation.receiver._mailbox.iterator
|
|
||||||
while (messages.hasNext) {
|
|
||||||
messages.next.invoke
|
|
||||||
messages.remove
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
})
|
||||||
})
|
|
||||||
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
|
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
|
||||||
|
|
||||||
def start = if (!active) {
|
def start = if (!active) {
|
||||||
|
|
|
||||||
|
|
@ -26,4 +26,4 @@ class AgentTest extends junit.framework.TestCase with Suite with MustMatchers wi
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.redis</groupId>
|
<groupId>com.redis</groupId>
|
||||||
<artifactId>redisclient</artifactId>
|
<artifactId>redisclient</artifactId>
|
||||||
<version>1.0.1</version>
|
<version>1.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,10 +15,10 @@ trait Encoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
trait CommonsCodecBase64 {
|
trait CommonsCodecBase64 {
|
||||||
val base64 = new org.apache.commons.codec.binary.Base64
|
import org.apache.commons.codec.binary.Base64._
|
||||||
|
|
||||||
def encode(bytes: Array[Byte]): Array[Byte] = base64.encode(bytes)
|
def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
|
||||||
def decode(bytes: Array[Byte]): Array[Byte] = base64.decode(bytes)
|
def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
object Base64Encoder extends Encoder with CommonsCodecBase64
|
object Base64Encoder extends Encoder with CommonsCodecBase64
|
||||||
|
|
@ -45,7 +45,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
||||||
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
|
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
|
||||||
|
|
||||||
val db = new Redis(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
|
val db = new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map storage in Redis.
|
* Map storage in Redis.
|
||||||
|
|
@ -189,7 +189,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
|
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
|
||||||
db.pushHead(new String(encode(name.getBytes)), new String(element))
|
db.lpush(new String(encode(name.getBytes)), new String(element))
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
|
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
|
||||||
|
|
@ -197,11 +197,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
|
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
|
||||||
db.listSet(new String(encode(name.getBytes)), index, new String(elem))
|
db.lset(new String(encode(name.getBytes)), index, new String(elem))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||||
db.listIndex(new String(encode(name.getBytes)), index) match {
|
db.lindex(new String(encode(name.getBytes)), index) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
||||||
case Some(e) => e.getBytes
|
case Some(e) => e.getBytes
|
||||||
|
|
@ -221,16 +221,16 @@ private [akka] object RedisStorageBackend extends
|
||||||
if (f >= s) Math.min(count, (f - s)) else count
|
if (f >= s) Math.min(count, (f - s)) else count
|
||||||
}
|
}
|
||||||
else count
|
else count
|
||||||
db.listRange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
|
db.lrange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " does not have elements in the range specified")
|
throw new Predef.NoSuchElementException(name + " does not have elements in the range specified")
|
||||||
case Some(l) =>
|
case Some(l) =>
|
||||||
l map (_.getBytes)
|
l map (_.get.getBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageSizeFor(name: String): Int = {
|
def getVectorStorageSizeFor(name: String): Int = {
|
||||||
db.listLength(new String(encode(name.getBytes))) match {
|
db.llen(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
case Some(l) => l
|
case Some(l) => l
|
||||||
|
|
@ -251,12 +251,12 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
// add to the end of the queue
|
// add to the end of the queue
|
||||||
def enqueue(name: String, item: Array[Byte]): Boolean = {
|
def enqueue(name: String, item: Array[Byte]): Boolean = {
|
||||||
db.pushTail(new String(encode(name.getBytes)), new String(item))
|
db.rpush(new String(encode(name.getBytes)), new String(item))
|
||||||
}
|
}
|
||||||
|
|
||||||
// pop from the front of the queue
|
// pop from the front of the queue
|
||||||
def dequeue(name: String): Option[Array[Byte]] = {
|
def dequeue(name: String): Option[Array[Byte]] = {
|
||||||
db.popHead(new String(encode(name.getBytes))) match {
|
db.lpop(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
case Some(s) =>
|
case Some(s) =>
|
||||||
|
|
@ -266,7 +266,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
// get the size of the queue
|
// get the size of the queue
|
||||||
def size(name: String): Int = {
|
def size(name: String): Int = {
|
||||||
db.listLength(new String(encode(name.getBytes))) match {
|
db.llen(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
case Some(l) => l
|
case Some(l) => l
|
||||||
|
|
@ -277,40 +277,49 @@ private [akka] object RedisStorageBackend extends
|
||||||
// start is the item to begin, count is how many items to return
|
// start is the item to begin, count is how many items to return
|
||||||
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
|
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
|
||||||
case 1 =>
|
case 1 =>
|
||||||
db.listIndex(new String(encode(name.getBytes)), start) match {
|
db.lindex(new String(encode(name.getBytes)), start) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException("No element at " + start)
|
throw new Predef.NoSuchElementException("No element at " + start)
|
||||||
case Some(s) =>
|
case Some(s) =>
|
||||||
List(s.getBytes)
|
List(s.getBytes)
|
||||||
}
|
}
|
||||||
case n =>
|
case n =>
|
||||||
db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match {
|
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(
|
throw new Predef.NoSuchElementException(
|
||||||
"No element found between " + start + " and " + (start + count - 1))
|
"No element found between " + start + " and " + (start + count - 1))
|
||||||
case Some(es) =>
|
case Some(es) =>
|
||||||
es.map(_.getBytes)
|
es.map(_.get.getBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// completely delete the queue
|
// completely delete the queue
|
||||||
def remove(name: String): Boolean = {
|
def remove(name: String): Boolean = {
|
||||||
db.delete(new String(encode(name.getBytes)))
|
db.delete(new String(encode(name.getBytes))) match {
|
||||||
|
case Some(1) => true
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add item to sorted set identified by name
|
// add item to sorted set identified by name
|
||||||
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
|
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
|
||||||
db.zAdd(new String(encode(name.getBytes)), zscore, new String(item))
|
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
|
||||||
|
case Some(1) => true
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove item from sorted set identified by name
|
// remove item from sorted set identified by name
|
||||||
def zrem(name: String, item: Array[Byte]): Boolean = {
|
def zrem(name: String, item: Array[Byte]): Boolean = {
|
||||||
db.zRem(new String(encode(name.getBytes)), new String(item))
|
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
|
case Some(1) => true
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// cardinality of the set identified by name
|
// cardinality of the set identified by name
|
||||||
def zcard(name: String): Int = {
|
def zcard(name: String): Int = {
|
||||||
db.zCard(new String(encode(name.getBytes))) match {
|
db.zcard(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
case Some(l) => l
|
case Some(l) => l
|
||||||
|
|
@ -318,7 +327,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
def zscore(name: String, item: Array[Byte]): String = {
|
def zscore(name: String, item: Array[Byte]): String = {
|
||||||
db.zScore(new String(encode(name.getBytes)), new String(item)) match {
|
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
||||||
case Some(s) => s
|
case Some(s) => s
|
||||||
|
|
@ -326,11 +335,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
|
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
|
||||||
db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match {
|
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
case Some(s) =>
|
case Some(s) =>
|
||||||
s.map(_.getBytes)
|
s.map(_.get.getBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Binary file not shown.
BIN
embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar
Normal file
BIN
embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar
Normal file
Binary file not shown.
|
|
@ -3,6 +3,6 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>com.redis</groupId>
|
<groupId>com.redis</groupId>
|
||||||
<artifactId>redisclient</artifactId>
|
<artifactId>redisclient</artifactId>
|
||||||
<version>1.0.1</version>
|
<version>1.1</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
</project>
|
</project>
|
||||||
Loading…
Add table
Add a link
Reference in a new issue