diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 7d710cb40e..31e8f5d2a7 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -910,7 +910,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab protected def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException - transaction.get.get.register((this.getClass, uuid), this) + transaction.get.get.register("SortedSet:"+uuid, this) } def applyLog(log: Array[Byte]) = { diff --git a/akka-persistence/akka-persistence-common/src/main/scala/actor/PersistentTransactor.scala b/akka-persistence/akka-persistence-common/src/main/scala/actor/PersistentTransactor.scala index 6a7c3cd127..8a905dcb86 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/actor/PersistentTransactor.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/actor/PersistentTransactor.scala @@ -6,23 +6,23 @@ import org.multiverse.api.exceptions.TooManyRetriesException import akka.persistence.common._ import akka.stm.{Recoverable, TransactionalVector} import scala.collection.mutable.HashMap - -import akka.util.Logging - +import java.io.File +import org.fusesource.hawtdb.api._ +import org.fusesource.hawtbuf.codec.{BytesCodec, StringCodec} trait PersistentTransactor extends Transactor with Storage { - type ElementType + type ElementType = Array[Byte] val refs = new HashMap[String, PersistentRef[ElementType]] - val maps = new HashMap[String, PersistentMap[ElementType]] + val maps = new HashMap[String, PersistentMap[ElementType,ElementType]] val queues = new HashMap[String, PersistentQueue[ElementType]] val vectors = new HashMap[String, PersistentVector[ElementType]] val sortedSets = new HashMap[String, PersistentSortedSet[ElementType]] def recoveryManager: ActorRef - def atomically = { + final def atomically = { try { atomicallyWithStorage @@ -52,6 +52,7 @@ trait PersistentTransactor extends Transactor with Storage { } } } + refs.clear } def logMaps() = { @@ -62,6 +63,7 @@ trait PersistentTransactor extends Transactor with Storage { } } } + maps.clear } def logVectors() = { @@ -72,6 +74,7 @@ trait PersistentTransactor extends Transactor with Storage { } } } + vectors.clear } def logQueues() = { @@ -82,30 +85,43 @@ trait PersistentTransactor extends Transactor with Storage { } } } + queues.clear } def newRef(id: String) = { - val ref = underlyingStorage.newRef(id) - refs.put(ref.uuid, ref) - ref + refs.getOrElseUpdate(id, { + val ref = underlyingStorage.newRef(id) + recoveryManager !! RefRecoveryRequest(id) match { + case RefFailure(id, log) => ref.applyLog(log) + case _ => () + } + }) } def newVector(id: String) = { - val vec = underlyingStorage.newVector(id) - vectors.put(vec.uuid, vec) - vec + vectors.getOrElseUpdate(id, { + val vec = underlyingStorage.newVector(id) + recoveryManager !! VectorRecoveryRequest(id) match { + case VectorFailure(id, log) => vec.applyLog(log) + case _ => () + } + }) } def newMap(id: String) = { - val map = underlyingStorage.newMap(id) - maps.put(map.uuid, map) - map + maps.getOrElseUpdate(id, { + val map = underlyingStorage.newMap(id) + recoveryManager !! MapRecoveryRequest(id) match { + case MapFailure(id, log) => map.applyLog(log) + case _ => () + } + }) } def getRef(id: String) = { refs.getOrElseUpdate(id, { val ref = underlyingStorage.getRef(id) - recoveryManager !! RefRecovery(id) match { + recoveryManager !! RefRecoveryRequest(id) match { case RefFailure(id, log) => ref.applyLog(log) case _ => () } @@ -113,15 +129,23 @@ trait PersistentTransactor extends Transactor with Storage { } def getVector(id: String) = { - val vec = underlyingStorage.getVector(id) - vectors.put(vec.uuid, vec) - vec + vectors.getOrElseUpdate(id, { + val vec = underlyingStorage.getVector(id) + recoveryManager !! VectorRecoveryRequest(id) match { + case VectorFailure(id, log) => vec.applyLog(log) + case _ => () + } + }) } def getMap(id: String) = { - val map = underlyingStorage.newMap(id) - maps.put(map.uuid, map) - map + maps.getOrElseUpdate(id, { + val map = underlyingStorage.getMap(id) + recoveryManager !! MapRecoveryRequest(id) match { + case MapFailure(id, log) => map.applyLog(log) + case _ => () + } + }) } def newRef = { @@ -157,42 +181,58 @@ trait PersistentTransactor extends Transactor with Storage { } override def getQueue(id: String) = { - val queue = underlyingStorage.getQueue(id) - queues.put(queue.uuid, queue) - queue + queues.getOrElseUpdate(id, { + val queue = underlyingStorage.getQueue(id) + recoveryManager !! QueueRecoveryRequest(id) match { + case QueueFailure(id, log) => queue.applyLog(log) + case _ => () + } + }) } override def getSortedSet(id: String) = { - val set = underlyingStorage.getSortedSet(id) - sortedSets.put(set.uuid, set) - set + sortedSets.getOrElseUpdate(id, { + val set = underlyingStorage.getSortedSet(id) + recoveryManager !! SortedSetRecoveryRequest(id) match { + case SortedSetFailure(id, log) => set.applyLog(log) + case _ => () + } + }) } override def newQueue(id: String) = { - val queue = underlyingStorage.newQueue(id) - queues.put(queue.uuid, queue) - queue + queues.getOrElseUpdate(id, { + val queue = underlyingStorage.newQueue(id) + recoveryManager !! QueueRecoveryRequest(id) match { + case QueueFailure(id, log) => queue.applyLog(log) + case _ => () + } + }) } override def newSortedSet(id: String) = { - val set = underlyingStorage.newSortedSet(id) - sortedSets.put(set.uuid, set) - set + sortedSets.getOrElseUpdate(id, { + val set = underlyingStorage.newSortedSet(id) + recoveryManager !! SortedSetRecoveryRequest(id) match { + case SortedSetFailure(id, log) => set.applyLog(log) + case _ => () + } + }) } } -sealed trait CommitRecovery +sealed trait RecoveryRequest -case class RefRecovery(uuid: String) extends CommitRecovery +case class RefRecoveryRequest(uuid: String) extends RecoveryRequest -case class MapRecovery(uuid: String) extends CommitRecovery +case class MapRecoveryRequest(uuid: String) extends RecoveryRequest -case class QueueRecovery(uuid: String) extends CommitRecovery +case class QueueRecoveryRequest(uuid: String) extends RecoveryRequest -case class VectorRecovery(uuid: String) extends CommitRecovery +case class VectorRecoveryRequest(uuid: String) extends RecoveryRequest -case class SortedSetRecovery(uuid: String) extends CommitRecovery +case class SortedSetRecoveryRequest(uuid: String) extends RecoveryRequest @serializable @@ -208,6 +248,8 @@ case class VectorFailure(uuid: String, tlog: Array[Byte]) extends CommitFailure case class SortedSetFailure(uuid: String, tlog: Array[Byte]) extends CommitFailure +case class NoOp() extends CommitFailure + trait PersistentTransactorSupervisor extends Actor { //send messages to this to start PersistentTransactors, @@ -223,10 +265,166 @@ trait PersistentTransactorRecoveryManager extends Actor { //send the failed transactions back to the restarted actor protected def receive = { - case RefFailure(uuid, tlog, error) => { - log.info(uuid + " " + error) + case RefFailure(uuid, log) => { + refFailed(uuid, log) + } + + case RefRecoveryRequest(uuid) => { + recoverRef(uuid) match { + case None => self.reply(NoOp) + case Some(tlog) => self.reply(RefFailure(uuid, tlog)) + } + } + + case MapFailure(uuid, tlog) => { + mapFailed(uuid, tlog) + } + + case MapRecoveryRequest(uuid) => { + recoverMap(uuid) match { + case None => self.reply(NoOp) + case Some(tlog) => self.reply(MapFailure(uuid, tlog)) + } + } + + case QueueFailure(uuid, log) => { + queueFailed(uuid, log) + } + + case QueueRecoveryRequest(uuid) => { + recoverRef(uuid) match { + case None => self.reply(NoOp) + case Some(tlog) => self.reply(RefFailure(uuid, tlog)) + } + } + + case VectorFailure(uuid, tlog) => { + vectorFailed(uuid, tlog) + } + + case VectorRecoveryRequest(uuid) => { + recoverVector(uuid) match { + case None => self.reply(NoOp) + case Some(tlog) => self.reply(VectorFailure(uuid, tlog)) + } + } + + case SortedSetFailure(uuid, log) => { + sortedSetFailed(uuid, log) + } + + case SortedSetRecoveryRequest(uuid) => { + recoverSortedSet(uuid) match { + case None => self.reply(NoOp) + case Some(tlog) => self.reply(SortedSetFailure(uuid, tlog)) + } + } + + } + + def refFailed(uuid: String, tlog: Array[Byte]): Unit + + def recoverRef(uuid: String): Option[Array[Byte]] + + def mapFailed(uuid: String, tlog: Array[Byte]): Unit + + def recoverMap(uuid: String): Option[Array[Byte]] + + def queueFailed(uuid: String, tlog: Array[Byte]): Unit + + def recoverQueue(uuid: String): Option[Array[Byte]] + + def vectorFailed(uuid: String, tlog: Array[Byte]): Unit + + def recoverVector(uuid: String): Option[Array[Byte]] + + def sortedSetFailed(uuid: String, tlog: Array[Byte]): Unit + + def recoverSortedSet(uuid: String): Option[Array[Byte]] + +} + +class HawtDbRecoveryManager(dir: String) extends PersistentTransactorRecoveryManager { + var refFactory: TxPageFileFactory = null + var refPageFile: TxPageFile = null + var refIndexFactory: HashIndexFactory[String, Array[Byte]] = null + var refIndex: Index[String, Array[Byte]] = null + var mapFactory: TxPageFileFactory = null + var mapPageFile: TxPageFile = null + var mapIndexFactory: HashIndexFactory[String, Array[Byte]] = null + var mapIndex: Index[String, Array[Byte]] = null + var vectorFactory: TxPageFileFactory = null + var vectorPageFile: TxPageFile = null + var vectorIndexFactory: HashIndexFactory[String, Array[Byte]] = null + var vectorIndex: Index[String, Array[Byte]] = null + var queueFactory: TxPageFileFactory = null + var queuePageFile: TxPageFile = null + var queueIndexFactory: HashIndexFactory[String, Array[Byte]] = null + var queueIndex: Index[String, Array[Byte]] = null + var sortedSetFactory: TxPageFileFactory = null + var sortedSetPageFile: TxPageFile = null + var sortedSetIndexFactory: HashIndexFactory[String, Array[Byte]] = null + var sortedSetIndex: Index[String, Array[Byte]] = null + + + override def preStart = { + val datadir = new File(dir) + if (!datadir.exists || !datadir.isDirectory) { + throw new IllegalArgumentException(datadir.getAbsolutePath, "does not exist or is not a directory") + } + val (refFactory, refPageFile, refIndexFactory, refIndex) = initType(new File(dir, "ref.log")) + val (mapFactory, mapPageFile, mapIndexFactory, mapIndex) = initType(new File(dir, "map.log")) + val (vectorFactory, vectorPageFile, vectorIndexFactory, vectorIndex) = initType(new File(dir, "vector.log")) + val (queueFactory, queuePageFile, queueIndexFactory, queueIndex) = initType(new File(dir, "queue.log")) + val (sortedSetFactory, sortedSetPageFile, sortedSetIndexFactory, sortedSetIndex) = initType(new File(dir, "sortedset.log")) + } + + def initType(file: File): (TxPageFileFactory, TxPageFile, IndexFactory, Index) = { + val factory = new TxPageFileFactory() + factory.setFile(file) + factory.open + val txFile = factory.getTxPageFile + val indexFact = new HashIndexFactory[String, Array[Byte]] + indexFact.setKeyCodec(StringCodec.INSTANCE) + indexFact.setValueCodec(BytesCodec.INSTANCE) + if (!txFile.allocator.isAllocated(0)) { + val tx = txFile.tx(); + Index[String, Array[Byte]] root = indexFact.create(tx); + tx.commit(); } } + def withIndex(txp: TxPageFile, idxf: IndexFactory[String, Array[Byte]])(block: Index => Any) { + val tx = txp.tx + val idx = idxf.open(tx) + block(index) + tx.commit + } + + override def postStop = { + + } + + def recoverSortedSet(uuid: String) = null + + def sortedSetFailed(uuid: String, tlog: Array[Byte]) = null + + def recoverVector(uuid: String) = null + + def vectorFailed(uuid: String, tlog: Array[Byte]) = null + + def recoverQueue(uuid: String) = null + + def queueFailed(uuid: String, tlog: Array[Byte]) = null + + def recoverMap(uuid: String) = null + + def mapFailed(uuid: String, tlog: Array[Byte]) = null + + def recoverRef(uuid: String) = null + + def refFailed(uuid: String, tlog: Array[Byte]) = withIndex(refPageFile, refIndexFactory)(_.put(uuid, tlog)) + + } \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 3f1fa28e4a..935b9651b8 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -181,6 +181,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2 + lazy val hawtdb = "org.fusesource.hawtdb" % "hawtdb" % "1.5" % "compile" //ApacheV2 lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 @@ -278,10 +279,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" //ApacheV2 //memcached - lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" + lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" //MIT //simpledb - lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile" + lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile" //ApacheV2 } // ------------------------------------------------------------------------------------------------------------------- @@ -569,6 +570,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val commons_pool = Dependencies.commons_pool val thrift = Dependencies.thrift + val hawtdb = Dependencies.hawtdb } // -------------------------------------------------------------------------------------------------------------------