provide better voldemort configuration support, and defaults definition in akka-reference.conf, and made the backend more easily testable
This commit is contained in:
parent
063dc6964c
commit
a5e67d05f2
2 changed files with 55 additions and 24 deletions
|
|
@ -16,36 +16,34 @@ import voldemort.utils.ByteUtils
|
|||
import voldemort.versioning.Versioned
|
||||
import collection.JavaConversions
|
||||
import java.nio.ByteBuffer
|
||||
import collection.immutable.{IndexedSeq, SortedSet, TreeSet}
|
||||
import collection.mutable.{Map, Set, HashSet, ArrayBuffer}
|
||||
import java.util.{Map => JMap}
|
||||
|
||||
import collection.Map
|
||||
import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
|
||||
import collection.mutable.{Set, HashSet, ArrayBuffer}
|
||||
import java.util.{Properties, Map => JMap}
|
||||
|
||||
private[akka] object VoldemortStorageBackend extends
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
val bootstrapUrl: String = config.getString("akka.storage.voldemort.bootstrap.url", "tcp://localhost:6666")
|
||||
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
|
||||
val mapKeyStore = config.getString("akka.storage.voldemort.store.map.key", "MapKeys")
|
||||
val mapValueStore = config.getString("akka.storage.voldemort.store.map.value", "MapValues")
|
||||
val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector.size", "VectorSizes")
|
||||
val vectorValueStore = config.getString("akka.storage.voldemort.store.vectore.value", "VectorValues")
|
||||
val storeClientFactory = {
|
||||
if (bootstrapUrl.startsWith("tcp")) {
|
||||
new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl))
|
||||
} else if (bootstrapUrl.startsWith("http")) {
|
||||
new HttpStoreClientFactory(new ClientConfig().setBootstrapUrls(bootstrapUrl))
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown boostrapUrl syntax" + bootstrapUrl)
|
||||
}
|
||||
val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match {
|
||||
case Some(configMap) => getClientConfig(configMap.asMap)
|
||||
case None => getClientConfig(new HashMap[String, String] + ("boostrap_urls" -> "tcp://localhost:6666"))
|
||||
}
|
||||
var refClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(refStore)
|
||||
var mapKeyClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(mapKeyStore)
|
||||
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(mapValueStore)
|
||||
var vectorSizeClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(vectorSizeStore)
|
||||
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(vectorValueStore)
|
||||
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
|
||||
val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
|
||||
val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
|
||||
val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes")
|
||||
val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues")
|
||||
|
||||
var storeClientFactory: StoreClientFactory = null
|
||||
var refClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapKeyClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var vectorSizeClient: StoreClient[String, Array[Byte]] = null
|
||||
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
initStoreClients
|
||||
|
||||
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
|
||||
implicit val byteOrder = new Ordering[Array[Byte]] {
|
||||
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
|
||||
|
|
@ -245,6 +243,38 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
IntSerializer.fromBytes(indexBytes)
|
||||
}
|
||||
|
||||
|
||||
def getClientConfig(configMap: Map[String, String]): Properties = {
|
||||
val properites = new Properties
|
||||
configMap.foreach {
|
||||
keyval => keyval match {
|
||||
case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String])
|
||||
}
|
||||
}
|
||||
properites
|
||||
}
|
||||
|
||||
def initStoreClients() = {
|
||||
if (storeClientFactory != null) {
|
||||
storeClientFactory.close
|
||||
}
|
||||
|
||||
storeClientFactory = {
|
||||
if (clientConfig.getProperty("bootstrap_urls", "none").startsWith("tcp")) {
|
||||
new SocketStoreClientFactory(new ClientConfig(clientConfig))
|
||||
} else if (clientConfig.getProperty("bootstrap_urls", "none").startsWith("http")) {
|
||||
new HttpStoreClientFactory(new ClientConfig(clientConfig))
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown boostrapUrl syntax" + clientConfig.getProperty("boostrap_urls", "No Bootstrap URLs defined"))
|
||||
}
|
||||
}
|
||||
refClient = storeClientFactory.getStoreClient(refStore)
|
||||
mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
|
||||
mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
|
||||
vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore)
|
||||
vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore)
|
||||
}
|
||||
|
||||
object IntSerializer {
|
||||
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
|
|||
var server: VoldemortServer = null
|
||||
|
||||
override protected def beforeAll(): Unit = {
|
||||
|
||||
|
||||
try {
|
||||
val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources"
|
||||
val home = new File(dir)
|
||||
|
|
@ -24,6 +24,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
|
|||
log.info("Starting Voldemort")
|
||||
server = new VoldemortServer(config)
|
||||
server.start
|
||||
VoldemortStorageBackend.initStoreClients
|
||||
log.info("Started")
|
||||
} catch {
|
||||
case e => log.error(e, "Error Starting Voldemort")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue