migrated storage over to cassandra 0.4
This commit is contained in:
parent
905438c883
commit
5b8b46d21c
8 changed files with 613 additions and 629 deletions
|
|
@ -81,10 +81,10 @@ object CassandraStorage extends MapStorage
|
|||
|
||||
def getRefStorageFor(name: String): Option[AnyRef] = {
|
||||
try {
|
||||
val column: Option[Column] = sessions.withSession {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
|
||||
}
|
||||
if (column.isDefined) Some(serializer.in(column.get.value, None))
|
||||
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
|
||||
else None
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -123,17 +123,17 @@ object CassandraStorage extends MapStorage
|
|||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
val column: Option[Column] = sessions.withSession {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
|
||||
}
|
||||
if (column.isDefined) serializer.in(column.get.value, None)
|
||||
if (column.isDefined) serializer.in(column.get.getColumn.value, None)
|
||||
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
val startBytes = if (start.isDefined) intToBytes(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
|
||||
val columns: List[Column] = sessions.withSession {
|
||||
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ / (name,
|
||||
VECTOR_COLUMN_PARENT,
|
||||
startBytes, finishBytes,
|
||||
|
|
@ -141,7 +141,7 @@ object CassandraStorage extends MapStorage
|
|||
count,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
columns.map(column => serializer.in(column.value, None))
|
||||
columns.map(column => serializer.in(column.getColumn.value, None))
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
|
|
@ -165,23 +165,23 @@ object CassandraStorage extends MapStorage
|
|||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
|
||||
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
|
||||
val batch = new scala.collection.mutable.HashMap[String, List[ColumnOrSuperColumn]]
|
||||
for (entry <- entries) {
|
||||
val columns: java.util.List[Column] = new java.util.ArrayList
|
||||
columns.add(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
|
||||
cf2columns.put(MAP_COLUMN_PARENT.getColumn_family, columns)
|
||||
val columnOrSuperColumn = new ColumnOrSuperColumn
|
||||
columnOrSuperColumn.setColumn(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
|
||||
batch + (MAP_COLUMN_PARENT.getColumn_family -> List(columnOrSuperColumn))
|
||||
}
|
||||
sessions.withSession {
|
||||
_ ++| (new BatchMutation(name, cf2columns), CONSISTENCY_LEVEL)
|
||||
_ ++| (name, batch, CONSISTENCY_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
|
||||
try {
|
||||
val column: Option[Column] = sessions.withSession {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
|
||||
}
|
||||
if (column.isDefined) Some(serializer.in(column.get.value, None))
|
||||
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
|
||||
else None
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -214,7 +214,7 @@ object CassandraStorage extends MapStorage
|
|||
val keyBytes = if (key == null) null else serializer.out(key)
|
||||
sessions.withSession {
|
||||
_ -- (name,
|
||||
new ColumnPathOrParent(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
|
||||
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
|
||||
System.currentTimeMillis,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
|
|
@ -224,10 +224,10 @@ object CassandraStorage extends MapStorage
|
|||
List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val startBytes = if (start.isDefined) serializer.out(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
|
||||
val columns: List[Column] = sessions.withSession {
|
||||
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
|
||||
}
|
||||
columns.map(column => (column.name, serializer.in(column.value, None)))
|
||||
columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue