From c00aef35918ea4e402e2833a3cd33589a5a3d677 Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 21 Oct 2010 17:53:44 -0400 Subject: [PATCH] Voldemort Tests now working as well as Riak --- .../main/scala/VoldemortStorageBackend.scala | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index f6fa2d54e4..348f8a9f7a 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -22,8 +22,10 @@ import voldemort.client.protocol.admin.{AdminClientConfig, AdminClient} */ private[akka] object VoldemortStorageBackend extends KVStorageBackend { + import KVAccess._ import VoldemortAccess._ + val bootstrapUrlsProp = "bootstrap_urls" val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match { case Some(configMap) => getClientConfig(configMap.asMap) @@ -35,12 +37,12 @@ private[akka] object VoldemortStorageBackend extends KVStorageBackend { val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues") - var storeClientFactory: StoreClientFactory = initStoreClientFactory + var storeClientFactory: StoreClientFactory = null var refs: KVAccess = null var maps: KVAccess = null var vectors: KVAccess = null var queues: KVAccess = null - initKVAccess + resetAccess def refAccess = refs @@ -52,11 +54,13 @@ private[akka] object VoldemortStorageBackend extends KVStorageBackend { object VoldemortAccess { - val admin = new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig) + var admin: AdminClient = null } class VoldemortAccess(val store: String) extends KVAccess { + import VoldemortAccess._ + val client: StoreClient[Array[Byte], Array[Byte]] = VoldemortStorageBackend.storeClientFactory.getStoreClient(store) def put(key: Array[Byte], value: Array[Byte]) = { @@ -72,7 +76,7 @@ private[akka] object VoldemortStorageBackend extends KVStorageBackend { } def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { - JavaConversions.asMap(client.getAll(JavaConversions.asIterable(keys))).map { + JavaConversions.asMap(client.getAll(JavaConversions.asIterable(keys))).map{ kv => kv match { case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => (key -> versioned.getValue) @@ -92,7 +96,7 @@ private[akka] object VoldemortStorageBackend extends KVStorageBackend { def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties - configMap.foreach { + configMap.foreach{ keyval => keyval match { case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String]) } @@ -114,15 +118,25 @@ private[akka] object VoldemortStorageBackend extends KVStorageBackend { } } - def initKVAccess = { - refs = new VoldemortAccess(refStore) - maps = new VoldemortAccess(mapStore) - vectors = new VoldemortAccess(vectorStore) - queues = new VoldemortAccess(queueStore) + def initAdminClient(): AdminClient = { + if (VoldemortAccess.admin ne null) { + VoldemortAccess.admin.stop + } + + new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig) + } - def resetAccess(){ - initStoreClientFactory + def initKVAccess = { + refs = new VoldemortAccess(refStore) + maps = new VoldemortAccess(mapStore) + vectors = new VoldemortAccess(vectorStore) + queues = new VoldemortAccess(queueStore) + } + + def resetAccess() { + storeClientFactory = initStoreClientFactory + VoldemortAccess.admin = initAdminClient initKVAccess }