diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 77fd7acedb..6331378356 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -16,36 +16,34 @@ import voldemort.utils.ByteUtils import voldemort.versioning.Versioned import collection.JavaConversions import java.nio.ByteBuffer -import collection.immutable.{IndexedSeq, SortedSet, TreeSet} -import collection.mutable.{Map, Set, HashSet, ArrayBuffer} -import java.util.{Map => JMap} - +import collection.Map +import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} +import collection.mutable.{Set, HashSet, ArrayBuffer} +import java.util.{Properties, Map => JMap} private[akka] object VoldemortStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging { - val bootstrapUrl: String = config.getString("akka.storage.voldemort.bootstrap.url", "tcp://localhost:6666") - val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") - val mapKeyStore = config.getString("akka.storage.voldemort.store.map.key", "MapKeys") - val mapValueStore = config.getString("akka.storage.voldemort.store.map.value", "MapValues") - val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector.size", "VectorSizes") - val vectorValueStore = config.getString("akka.storage.voldemort.store.vectore.value", "VectorValues") - val storeClientFactory = { - if (bootstrapUrl.startsWith("tcp")) { - new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl)) - } else if (bootstrapUrl.startsWith("http")) { - new HttpStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl)) - } else { - throw new IllegalArgumentException("Unknown boostrapUrl syntax" + bootstrapUrl) - } + val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match { + case Some(configMap) => getClientConfig(configMap.asMap) + case None => getClientConfig(new HashMap[String, String] + ("boostrap_urls" -> "tcp://localhost:6666")) } - var refClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(refStore) - var mapKeyClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(mapKeyStore) - var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(mapValueStore) - var vectorSizeClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(vectorSizeStore) - var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(vectorValueStore) + val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") + val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys") + val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues") + val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes") + val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues") + + var storeClientFactory: StoreClientFactory = null + var refClient: StoreClient[String, Array[Byte]] = null + var mapKeyClient: StoreClient[String, Array[Byte]] = null + var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null + var vectorSizeClient: StoreClient[String, Array[Byte]] = null + var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null + initStoreClients + val underscoreBytesUTF8 = "_".getBytes("UTF-8") implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) @@ -245,6 +243,38 @@ MapStorageBackend[Array[Byte], Array[Byte]] with IntSerializer.fromBytes(indexBytes) } + + def getClientConfig(configMap: Map[String, String]): Properties = { + val properites = new Properties + configMap.foreach { + keyval => keyval match { + case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String]) + } + } + properites + } + + def initStoreClients() = { + if (storeClientFactory != null) { + storeClientFactory.close + } + + storeClientFactory = { + if (clientConfig.getProperty("bootstrap_urls", "none").startsWith("tcp")) { + new SocketStoreClientFactory(new ClientConfig(clientConfig)) + } else if (clientConfig.getProperty("bootstrap_urls", "none").startsWith("http")) { + new HttpStoreClientFactory(new ClientConfig(clientConfig)) + } else { + throw new IllegalArgumentException("Unknown boostrapUrl syntax" + clientConfig.getProperty("boostrap_urls", "No Bootstrap URLs defined")) + } + } + refClient = storeClientFactory.getStoreClient(refStore) + mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore) + mapValueClient = storeClientFactory.getStoreClient(mapValueStore) + vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore) + vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore) + } + object IntSerializer { val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala index 395825152e..034b493006 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala @@ -15,7 +15,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { var server: VoldemortServer = null override protected def beforeAll(): Unit = { - + try { val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources" val home = new File(dir) @@ -24,6 +24,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { log.info("Starting Voldemort") server = new VoldemortServer(config) server.start + VoldemortStorageBackend.initStoreClients log.info("Started") } catch { case e => log.error(e, "Error Starting Voldemort")