diff --git a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala index bfebb3a65e..18020ff180 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala @@ -101,9 +101,9 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess wi def put(key: Array[Byte], value: Array[Byte]): Unit - def getValue(key: Array[Byte]): Array[Byte] + def get(key: Array[Byte]): Array[Byte] - def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] + def get(key: Array[Byte], default: Array[Byte]): Array[Byte] def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] @@ -131,16 +131,16 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess wi override def get(owner: String, key: Array[Byte]): Array[Byte] = { - getValue(getKey(owner, key)) + get(getKey(owner, key)) } override def getIndexed(owner: String, index: Int): Array[Byte] = { - getValue(getIndexedKey(owner, index)) + get(getIndexedKey(owner, index)) } override def get(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] = { - getValue(getKey(owner, key), default) + get(getKey(owner, key), default) } diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala index 4bf907ec69..4b36fa4b87 100644 --- a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala +++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala @@ -74,14 +74,14 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend { } } - def getValue(key: Array[Byte], default: Array[Byte]) = { + def get(key: Array[Byte], default: Array[Byte]) = { Option(client.get(keyStr(encodeKey(key)))) match { case Some(value) => value.asInstanceOf[Array[Byte]] case None => default } } - def getValue(key: Array[Byte]) = getValue(key, null) + def get(key: Array[Byte]) = get(key, null) def put(key: Array[Byte], value: Array[Byte]) = { diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala index d716cc0eec..57c8776a1b 100644 --- a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -75,7 +75,7 @@ private[akka] object RiakStorageBackend extends CommonStorageBackend { } } - def getValue(key: Array[Byte]): Array[Byte] = { + def get(key: Array[Byte]): Array[Byte] = { val objs = riakClient.fetch(bucket, key, quorum) objs.size match { case 0 => null; @@ -86,8 +86,8 @@ private[akka] object RiakStorageBackend extends CommonStorageBackend { } } - def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { - Option(getValue(key)) match { + def get(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + Option(get(key)) match { case Some(value) => value case None => default } @@ -97,7 +97,7 @@ private[akka] object RiakStorageBackend extends CommonStorageBackend { var result = new HashMap[Array[Byte], Array[Byte]] keys.foreach{ key => - val value = getValue(key) + val value = get(key) Option(value) match { case Some(value) => result += key -> value case None => () diff --git a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala index 22f81fc277..753a85c507 100644 --- a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala +++ b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala @@ -10,30 +10,64 @@ import java.lang.String import java.util.{List => JList, ArrayList => JAList} import collection.immutable.{HashMap, Iterable} -import collection.mutable.{HashMap => MMap} - import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.simpledb.AmazonSimpleDBClient import com.amazonaws.services.simpledb.model._ import collection.{JavaConversions, Map} +import collection.mutable.{ArrayBuffer, HashMap => MMap} +import com.amazonaws.{Protocol, ClientConfiguration} private[akka] object SimpledbStorageBackend extends CommonStorageBackend { import org.apache.commons.codec.binary.Base64 + import KVStorageBackend._ val seperator = "\r\n" val seperatorBytes = seperator.getBytes("UTF-8") val sizeAtt = "size" + val ownerAtt = "owner" val base64 = new Base64(1024, seperatorBytes, true) val base64key = new Base64(1024, Array.empty[Byte], true) - val id = config.getString("akka.storage.simpledb.account.id", "YOU NEED TO PROVIDE AN AWS ID") - val secretKey = config.getString("akka.storage.simpledb.account.secretKey", "YOU NEED TO PROVIDE AN AWS SECRET KEY") + val id = config.getString("akka.storage.simpledb.account.id").getOrElse{ + val e = new IllegalStateException("You must provide an AWS id") + log.error(e, "You Must Provide an AWS id to use the SimpledbStorageBackend") + throw e + } + val secretKey = config.getString("akka.storage.simpledb.account.secretKey").getOrElse{ + val e = new IllegalStateException("You must provide an AWS secretKey") + log.error(e, "You Must Provide an AWS secretKey to use the SimpledbStorageBackend") + throw e + } val refDomain = config.getString("akka.storage.simpledb.domain.ref", "ref") val mapDomain = config.getString("akka.storage.simpledb.domain.map", "map") val queueDomain = config.getString("akka.storage.simpledb.domain.queue", "queue") val vectorDomain = config.getString("akka.storage.simpledb.domain.vector", "vector") val credentials = new BasicAWSCredentials(id, secretKey); - val client = new AmazonSimpleDBClient(credentials) + val clientConfig = new ClientConfiguration() + for (i <- config.getInt("akka.storage.simpledb.client.timeout")) { + clientConfig.setConnectionTimeout(i) + } + for (i <- config.getInt("akka.storage.simpledb.client.maxconnections")) { + clientConfig.setMaxConnections(i) + } + clientConfig.setMaxErrorRetry(config.getInt("akka.storage.simpledb.client.maxretries", 10)) + + for (s <- config.getString("akka.storage.simpledb.client.protocol")) { + clientConfig.setProtocol(Protocol.valueOf(s)) + } + for (i <- config.getInt("akka.storage.simpledb.client.sockettimeout")) { + clientConfig.setSocketTimeout(i) + } + for {s <- config.getInt("akka.storage.simpledb.client.sendbuffer") + r <- config.getInt("akka.storage.simpledb.client.receivebuffer")} { + clientConfig.setSocketBufferSizeHints(s, r) + } + + for (s <- config.getString("akka.storage.simpledb.client.useragent")) { + clientConfig.setUserAgent(s) + } + + val client = new AmazonSimpleDBClient(credentials, clientConfig) def queueAccess = queue @@ -70,21 +104,74 @@ private[akka] object SimpledbStorageBackend extends CommonStorageBackend { def delete(key: Array[Byte]): Unit = getClient.deleteAttributes(new DeleteAttributesRequest(domainName, encodeAndValidateKey(key))) - def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { - //Todo rewrite as select request - keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) { - (map, key) => { - val value = getValue(key) - if (value != null) { - map + (key -> getValue(key)) - } else { - map - } + override def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + + val batcher = GetBatcher(domainName, 20) + keys.foreach(batcher.addItem(_)) + var map = new HashMap[Array[Byte], Array[Byte]] + batcher.getRequests foreach { + req => { + var res = getClient.select(req) + var continue = true + do { + JavaConversions.asIterable(res.getItems) foreach { + item => map += (base64key.decode(item.getName) -> recomposeValue(item.getAttributes).get) + } + if (res.getNextToken ne null) { + res = getClient.select(req.withNextToken(res.getNextToken)) + } else { + continue = false + } + } while (continue == true) } } + map } - def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + case class GetBatcher(domain: String, maxItems: Int) { + + val reqs = new ArrayBuffer[SelectRequest] + var currentItems = new ArrayBuffer[String] + var items = 0 + + def addItem(item: Array[Byte]) = { + if ((items + 1 > maxItems)) { + addReq + } + currentItems += (encodeAndValidateKey(item)) + items += 1 + } + + private def addReq() { + items = 0 + reqs += new SelectRequest(select, true) + currentItems = new ArrayBuffer[String] + } + + def getRequests() = { + if (items > 0) { + addReq + } + reqs + } + + + def select(): String = { + val in = currentItems.reduceLeft[String] { + (acc, key) => { + acc + "', '" + key + } + } + + "select * from " + domainName + " where itemName() in ('" + in + "')" + } + + } + + + def get(key: Array[Byte]) = get(key, null) + + def get(key: Array[Byte], default: Array[Byte]): Array[Byte] = { val req = new GetAttributesRequest(domainName, encodeAndValidateKey(key)).withConsistentRead(true) val resp = getClient.getAttributes(req) recomposeValue(resp.getAttributes) match { @@ -93,27 +180,59 @@ private[akka] object SimpledbStorageBackend extends CommonStorageBackend { } } - def getValue(key: Array[Byte]): Array[Byte] = getValue(key, null) - def put(key: Array[Byte], value: Array[Byte]): Unit = { + override def put(key: Array[Byte], value: Array[Byte]) = { val req = new PutAttributesRequest(domainName, encodeAndValidateKey(key), decomposeValue(value)) getClient.putAttributes(req) } override def putAll(owner: String, keyValues: Iterable[(Array[Byte], Array[Byte])]) = { - val items = keyValues.foldLeft(new JAList[ReplaceableItem]()) { + val items = keyValues.foldLeft(new ArrayBuffer[ReplaceableItem]()) { (jal, kv) => kv match { case (key, value) => { - jal.add(new ReplaceableItem(encodeAndValidateKey(key), decomposeValue(value))) - jal + jal += (new ReplaceableItem(encodeAndValidateKey(getKey(owner, key)), decomposeValue(value))) } } } - //Max items per post = 25, max size per post 1MB - val req = new BatchPutAttributesRequest(domainName,items) - //TODO assure the above - getClient.batchPutAttributes(req) + + val batcher = new PutBatcher(domainName, 25, 1000) + items foreach (batcher.addItem(_)) + val reqs = batcher.getRequests + reqs foreach (getClient.batchPutAttributes(_)) + } + + + class PutBatcher(domain: String, maxItems: Int, maxAttributes: Int) { + + val reqs = new ArrayBuffer[BatchPutAttributesRequest] + var currentItems = new JAList[ReplaceableItem]() + var items = 0 + var atts = 0 + + def addItem(item: ReplaceableItem) = { + if ((items + 1 > maxItems) || (atts + item.getAttributes.size > maxAttributes)) { + addReq + } + currentItems.add(item) + items += 1 + atts += item.getAttributes.size + } + + private def addReq() { + items = 0 + atts = 0 + reqs += new BatchPutAttributesRequest(domain, currentItems) + currentItems = new JAList[ReplaceableItem]() + } + + def getRequests() = { + if (items > 0) { + addReq + } + reqs + } + } def encodeAndValidateKey(key: Array[Byte]): String = { diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index fd97f32124..520b2030fe 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -68,11 +68,11 @@ private[akka] object VoldemortStorageBackend extends CommonStorageBackend { client.put(key, value) } - def getValue(key: Array[Byte]): Array[Byte] = { + def get(key: Array[Byte]): Array[Byte] = { client.getValue(key) } - def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + def get(key: Array[Byte], default: Array[Byte]): Array[Byte] = { client.getValue(key, default) } diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 1d1fb60320..7153387d6a 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -19,9 +19,9 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val value = "testRefValue" val valueBytes = bytes(value) refAccess.delete(key.getBytes) - refAccess.getValue(key.getBytes, empty) should be(empty) + refAccess.get(key.getBytes, empty) should be(empty) refAccess.put(key.getBytes, valueBytes) - refAccess.getValue(key.getBytes) should be(valueBytes) + refAccess.get(key.getBytes) should be(valueBytes) } test("PersistentRef apis function as expected") { @@ -38,7 +38,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val key = "testmapKey" val mapKeys = new TreeSet[Array[Byte]] + bytes("key1") mapAccess.delete(getKey(key, mapKeysIndex)) - mapAccess.getValue(getKey(key, mapKeysIndex), SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) + mapAccess.get(getKey(key, mapKeysIndex), SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) putMapKeys(key, mapKeys) getMapKeys(key) should equal(mapKeys) } @@ -47,7 +47,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val key = bytes("keyForTestingMapValueClient") val value = bytes("value for testing map value client") mapAccess.put(key, value) - mapAccess.getValue(key, empty) should equal(value) + mapAccess.get(key, empty) should equal(value) } @@ -93,9 +93,9 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val vecKey = getIndexedKey(key, index) getIndexFromVectorValueKey(key, vecKey) should be(index) vectorAccess.delete(vecKey) - vectorAccess.getValue(vecKey, empty) should equal(empty) + vectorAccess.get(vecKey, empty) should equal(empty) vectorAccess.put(vecKey, value) - vectorAccess.getValue(vecKey) should equal(value) + vectorAccess.get(vecKey) should equal(value) } test("PersistentVector apis function as expected") { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 16aecf6bc9..221c4213cc 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -212,6 +212,30 @@ akka { addresses = "localhost:11211" #Formatted according to spymemcached "localhost:11211 otherhost:11211" etc.. } } + + simpledb { + account{ + id = "YOU NEED TO PROVIDE AN AWS ID" + secretKey = "YOU NEED TO PROVIDE AN AWS SECRETKEY" + } + client{ + #Defaults to default AWS ClientConfiguration + timeout =50000 + #maxconnections = + maxretries = 10 + #protocol = "HTTPS" | "HTTP" + #sockettimeout 50000 + #sendbuffer = 0 + #receivebuffer = 0 + #useragent = "AWS Java SDK-1.0.14" + } + domain{ + ref = "ref" + map = "map" + vector = "vector" + queue = "queue" + } + } } camel {