This commit is contained in:
ticktock 2010-11-16 16:51:38 -05:00
parent f278780987
commit a7939eef8b
3 changed files with 245 additions and 45 deletions

View file

@ -910,7 +910,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
protected def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
transaction.get.get.register((this.getClass, uuid), this)
transaction.get.get.register("SortedSet:"+uuid, this)
}
def applyLog(log: Array[Byte]) = {

View file

@ -6,23 +6,23 @@ import org.multiverse.api.exceptions.TooManyRetriesException
import akka.persistence.common._
import akka.stm.{Recoverable, TransactionalVector}
import scala.collection.mutable.HashMap
import akka.util.Logging
import java.io.File
import org.fusesource.hawtdb.api._
import org.fusesource.hawtbuf.codec.{BytesCodec, StringCodec}
trait PersistentTransactor extends Transactor with Storage {
type ElementType
type ElementType = Array[Byte]
val refs = new HashMap[String, PersistentRef[ElementType]]
val maps = new HashMap[String, PersistentMap[ElementType]]
val maps = new HashMap[String, PersistentMap[ElementType,ElementType]]
val queues = new HashMap[String, PersistentQueue[ElementType]]
val vectors = new HashMap[String, PersistentVector[ElementType]]
val sortedSets = new HashMap[String, PersistentSortedSet[ElementType]]
def recoveryManager: ActorRef
def atomically = {
final def atomically = {
try
{
atomicallyWithStorage
@ -52,6 +52,7 @@ trait PersistentTransactor extends Transactor with Storage {
}
}
}
refs.clear
}
def logMaps() = {
@ -62,6 +63,7 @@ trait PersistentTransactor extends Transactor with Storage {
}
}
}
maps.clear
}
def logVectors() = {
@ -72,6 +74,7 @@ trait PersistentTransactor extends Transactor with Storage {
}
}
}
vectors.clear
}
def logQueues() = {
@ -82,30 +85,43 @@ trait PersistentTransactor extends Transactor with Storage {
}
}
}
queues.clear
}
def newRef(id: String) = {
val ref = underlyingStorage.newRef(id)
refs.put(ref.uuid, ref)
ref
refs.getOrElseUpdate(id, {
val ref = underlyingStorage.newRef(id)
recoveryManager !! RefRecoveryRequest(id) match {
case RefFailure(id, log) => ref.applyLog(log)
case _ => ()
}
})
}
def newVector(id: String) = {
val vec = underlyingStorage.newVector(id)
vectors.put(vec.uuid, vec)
vec
vectors.getOrElseUpdate(id, {
val vec = underlyingStorage.newVector(id)
recoveryManager !! VectorRecoveryRequest(id) match {
case VectorFailure(id, log) => vec.applyLog(log)
case _ => ()
}
})
}
def newMap(id: String) = {
val map = underlyingStorage.newMap(id)
maps.put(map.uuid, map)
map
maps.getOrElseUpdate(id, {
val map = underlyingStorage.newMap(id)
recoveryManager !! MapRecoveryRequest(id) match {
case MapFailure(id, log) => map.applyLog(log)
case _ => ()
}
})
}
def getRef(id: String) = {
refs.getOrElseUpdate(id, {
val ref = underlyingStorage.getRef(id)
recoveryManager !! RefRecovery(id) match {
recoveryManager !! RefRecoveryRequest(id) match {
case RefFailure(id, log) => ref.applyLog(log)
case _ => ()
}
@ -113,15 +129,23 @@ trait PersistentTransactor extends Transactor with Storage {
}
def getVector(id: String) = {
val vec = underlyingStorage.getVector(id)
vectors.put(vec.uuid, vec)
vec
vectors.getOrElseUpdate(id, {
val vec = underlyingStorage.getVector(id)
recoveryManager !! VectorRecoveryRequest(id) match {
case VectorFailure(id, log) => vec.applyLog(log)
case _ => ()
}
})
}
def getMap(id: String) = {
val map = underlyingStorage.newMap(id)
maps.put(map.uuid, map)
map
maps.getOrElseUpdate(id, {
val map = underlyingStorage.getMap(id)
recoveryManager !! MapRecoveryRequest(id) match {
case MapFailure(id, log) => map.applyLog(log)
case _ => ()
}
})
}
def newRef = {
@ -157,42 +181,58 @@ trait PersistentTransactor extends Transactor with Storage {
}
override def getQueue(id: String) = {
val queue = underlyingStorage.getQueue(id)
queues.put(queue.uuid, queue)
queue
queues.getOrElseUpdate(id, {
val queue = underlyingStorage.getQueue(id)
recoveryManager !! QueueRecoveryRequest(id) match {
case QueueFailure(id, log) => queue.applyLog(log)
case _ => ()
}
})
}
override def getSortedSet(id: String) = {
val set = underlyingStorage.getSortedSet(id)
sortedSets.put(set.uuid, set)
set
sortedSets.getOrElseUpdate(id, {
val set = underlyingStorage.getSortedSet(id)
recoveryManager !! SortedSetRecoveryRequest(id) match {
case SortedSetFailure(id, log) => set.applyLog(log)
case _ => ()
}
})
}
override def newQueue(id: String) = {
val queue = underlyingStorage.newQueue(id)
queues.put(queue.uuid, queue)
queue
queues.getOrElseUpdate(id, {
val queue = underlyingStorage.newQueue(id)
recoveryManager !! QueueRecoveryRequest(id) match {
case QueueFailure(id, log) => queue.applyLog(log)
case _ => ()
}
})
}
override def newSortedSet(id: String) = {
val set = underlyingStorage.newSortedSet(id)
sortedSets.put(set.uuid, set)
set
sortedSets.getOrElseUpdate(id, {
val set = underlyingStorage.newSortedSet(id)
recoveryManager !! SortedSetRecoveryRequest(id) match {
case SortedSetFailure(id, log) => set.applyLog(log)
case _ => ()
}
})
}
}
sealed trait CommitRecovery
sealed trait RecoveryRequest
case class RefRecovery(uuid: String) extends CommitRecovery
case class RefRecoveryRequest(uuid: String) extends RecoveryRequest
case class MapRecovery(uuid: String) extends CommitRecovery
case class MapRecoveryRequest(uuid: String) extends RecoveryRequest
case class QueueRecovery(uuid: String) extends CommitRecovery
case class QueueRecoveryRequest(uuid: String) extends RecoveryRequest
case class VectorRecovery(uuid: String) extends CommitRecovery
case class VectorRecoveryRequest(uuid: String) extends RecoveryRequest
case class SortedSetRecovery(uuid: String) extends CommitRecovery
case class SortedSetRecoveryRequest(uuid: String) extends RecoveryRequest
@serializable
@ -208,6 +248,8 @@ case class VectorFailure(uuid: String, tlog: Array[Byte]) extends CommitFailure
case class SortedSetFailure(uuid: String, tlog: Array[Byte]) extends CommitFailure
case class NoOp() extends CommitFailure
trait PersistentTransactorSupervisor extends Actor {
//send messages to this to start PersistentTransactors,
@ -223,10 +265,166 @@ trait PersistentTransactorRecoveryManager extends Actor {
//send the failed transactions back to the restarted actor
protected def receive = {
case RefFailure(uuid, tlog, error) => {
log.info(uuid + " " + error)
case RefFailure(uuid, log) => {
refFailed(uuid, log)
}
case RefRecoveryRequest(uuid) => {
recoverRef(uuid) match {
case None => self.reply(NoOp)
case Some(tlog) => self.reply(RefFailure(uuid, tlog))
}
}
case MapFailure(uuid, tlog) => {
mapFailed(uuid, tlog)
}
case MapRecoveryRequest(uuid) => {
recoverMap(uuid) match {
case None => self.reply(NoOp)
case Some(tlog) => self.reply(MapFailure(uuid, tlog))
}
}
case QueueFailure(uuid, log) => {
queueFailed(uuid, log)
}
case QueueRecoveryRequest(uuid) => {
recoverRef(uuid) match {
case None => self.reply(NoOp)
case Some(tlog) => self.reply(RefFailure(uuid, tlog))
}
}
case VectorFailure(uuid, tlog) => {
vectorFailed(uuid, tlog)
}
case VectorRecoveryRequest(uuid) => {
recoverVector(uuid) match {
case None => self.reply(NoOp)
case Some(tlog) => self.reply(VectorFailure(uuid, tlog))
}
}
case SortedSetFailure(uuid, log) => {
sortedSetFailed(uuid, log)
}
case SortedSetRecoveryRequest(uuid) => {
recoverSortedSet(uuid) match {
case None => self.reply(NoOp)
case Some(tlog) => self.reply(SortedSetFailure(uuid, tlog))
}
}
}
def refFailed(uuid: String, tlog: Array[Byte]): Unit
def recoverRef(uuid: String): Option[Array[Byte]]
def mapFailed(uuid: String, tlog: Array[Byte]): Unit
def recoverMap(uuid: String): Option[Array[Byte]]
def queueFailed(uuid: String, tlog: Array[Byte]): Unit
def recoverQueue(uuid: String): Option[Array[Byte]]
def vectorFailed(uuid: String, tlog: Array[Byte]): Unit
def recoverVector(uuid: String): Option[Array[Byte]]
def sortedSetFailed(uuid: String, tlog: Array[Byte]): Unit
def recoverSortedSet(uuid: String): Option[Array[Byte]]
}
class HawtDbRecoveryManager(dir: String) extends PersistentTransactorRecoveryManager {
var refFactory: TxPageFileFactory = null
var refPageFile: TxPageFile = null
var refIndexFactory: HashIndexFactory[String, Array[Byte]] = null
var refIndex: Index[String, Array[Byte]] = null
var mapFactory: TxPageFileFactory = null
var mapPageFile: TxPageFile = null
var mapIndexFactory: HashIndexFactory[String, Array[Byte]] = null
var mapIndex: Index[String, Array[Byte]] = null
var vectorFactory: TxPageFileFactory = null
var vectorPageFile: TxPageFile = null
var vectorIndexFactory: HashIndexFactory[String, Array[Byte]] = null
var vectorIndex: Index[String, Array[Byte]] = null
var queueFactory: TxPageFileFactory = null
var queuePageFile: TxPageFile = null
var queueIndexFactory: HashIndexFactory[String, Array[Byte]] = null
var queueIndex: Index[String, Array[Byte]] = null
var sortedSetFactory: TxPageFileFactory = null
var sortedSetPageFile: TxPageFile = null
var sortedSetIndexFactory: HashIndexFactory[String, Array[Byte]] = null
var sortedSetIndex: Index[String, Array[Byte]] = null
override def preStart = {
val datadir = new File(dir)
if (!datadir.exists || !datadir.isDirectory) {
throw new IllegalArgumentException(datadir.getAbsolutePath, "does not exist or is not a directory")
}
val (refFactory, refPageFile, refIndexFactory, refIndex) = initType(new File(dir, "ref.log"))
val (mapFactory, mapPageFile, mapIndexFactory, mapIndex) = initType(new File(dir, "map.log"))
val (vectorFactory, vectorPageFile, vectorIndexFactory, vectorIndex) = initType(new File(dir, "vector.log"))
val (queueFactory, queuePageFile, queueIndexFactory, queueIndex) = initType(new File(dir, "queue.log"))
val (sortedSetFactory, sortedSetPageFile, sortedSetIndexFactory, sortedSetIndex) = initType(new File(dir, "sortedset.log"))
}
def initType(file: File): (TxPageFileFactory, TxPageFile, IndexFactory, Index) = {
val factory = new TxPageFileFactory()
factory.setFile(file)
factory.open
val txFile = factory.getTxPageFile
val indexFact = new HashIndexFactory[String, Array[Byte]]
indexFact.setKeyCodec(StringCodec.INSTANCE)
indexFact.setValueCodec(BytesCodec.INSTANCE)
if (!txFile.allocator.isAllocated(0)) {
val tx = txFile.tx();
Index[String, Array[Byte]] root = indexFact.create(tx);
tx.commit();
}
}
def withIndex(txp: TxPageFile, idxf: IndexFactory[String, Array[Byte]])(block: Index => Any) {
val tx = txp.tx
val idx = idxf.open(tx)
block(index)
tx.commit
}
override def postStop = {
}
def recoverSortedSet(uuid: String) = null
def sortedSetFailed(uuid: String, tlog: Array[Byte]) = null
def recoverVector(uuid: String) = null
def vectorFailed(uuid: String, tlog: Array[Byte]) = null
def recoverQueue(uuid: String) = null
def queueFailed(uuid: String, tlog: Array[Byte]) = null
def recoverMap(uuid: String) = null
def mapFailed(uuid: String, tlog: Array[Byte]) = null
def recoverRef(uuid: String) = null
def refFailed(uuid: String, tlog: Array[Byte]) = withIndex(refPageFile, refIndexFactory)(_.put(uuid, tlog))
}

View file

@ -181,6 +181,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2
lazy val hawtdb = "org.fusesource.hawtdb" % "hawtdb" % "1.5" % "compile" //ApacheV2
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
@ -278,10 +279,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" //ApacheV2
//memcached
lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile"
lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" //MIT
//simpledb
lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile"
lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile" //ApacheV2
}
// -------------------------------------------------------------------------------------------------------------------
@ -569,6 +570,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val commons_pool = Dependencies.commons_pool
val thrift = Dependencies.thrift
val hawtdb = Dependencies.hawtdb
}
// -------------------------------------------------------------------------------------------------------------------