Lazy init of LmdbDurableStore, #22759 (#22779)

* Lazy init of LmdbDurableStore, #22759

* to avoid creating files (and initializing db) when not needed,
  e.g. cluster sharding that is not using remember entities
* enable MiMa against 2.5.0

* use OptionVal instead
This commit is contained in:
Patrik Nordwall 2017-04-28 15:12:14 +02:00 committed by GitHub
parent edee4ba409
commit 3ab101039f
3 changed files with 118 additions and 77 deletions

View file

@ -98,10 +98,6 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String)
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode" akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingCustomShardAllocationSpec/sharding-ddata
map-size = 10 MiB
}
""")) """))
} }
@ -123,18 +119,6 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC
override def initialParticipants = roles.size override def initialParticipants = roles.size
val storageLocations = List(new File(system.settings.config.getString(
"akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup() {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination() {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { runOn(from) {
Cluster(system) join node(to).address Cluster(system) join node(to).address

View file

@ -31,6 +31,7 @@ import org.lmdbjava.DbiFlags
import org.lmdbjava.Env import org.lmdbjava.Env
import org.lmdbjava.EnvFlags import org.lmdbjava.EnvFlags
import org.lmdbjava.Txn import org.lmdbjava.Txn
import org.lmdbjava.Dbi
/** /**
* An actor implementing the durable store for the Distributed Data `Replicator` * An actor implementing the durable store for the Distributed Data `Replicator`
@ -100,11 +101,18 @@ object LmdbDurableStore {
Props(new LmdbDurableStore(config)) Props(new LmdbDurableStore(config))
private case object WriteBehind extends DeadLetterSuppression private case object WriteBehind extends DeadLetterSuppression
private final case class Lmdb(
env: Env[ByteBuffer],
db: Dbi[ByteBuffer],
keyBuffer: ByteBuffer,
valueBuffer: ByteBuffer)
} }
final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
import DurableStore._ import DurableStore._
import LmdbDurableStore.WriteBehind import LmdbDurableStore.WriteBehind
import LmdbDurableStore.Lmdb
val serialization = SerializationExtension(context.system) val serialization = SerializationExtension(context.system)
val serializer = serialization.serializerFor(classOf[DurableDataEnvelope]).asInstanceOf[SerializerWithStringManifest] val serializer = serialization.serializerFor(classOf[DurableDataEnvelope]).asInstanceOf[SerializerWithStringManifest]
@ -115,31 +123,50 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
case _ config.getDuration("lmdb.write-behind-interval", MILLISECONDS).millis case _ config.getDuration("lmdb.write-behind-interval", MILLISECONDS).millis
} }
val env: Env[ByteBuffer] = { val dir = config.getString("lmdb.dir") match {
val mapSize = config.getBytes("lmdb.map-size") case path if path.endsWith("ddata")
val dir = config.getString("lmdb.dir") match { new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}")
case path if path.endsWith("ddata") case path
new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}") new File(path)
case path
new File(path)
}
log.info("Using durable data in LMDB directory [{}]", dir.getCanonicalPath)
dir.mkdirs()
Env.create()
.setMapSize(mapSize)
.setMaxDbs(1)
.open(dir, EnvFlags.MDB_NOLOCK)
} }
val db = env.openDbi("ddata", DbiFlags.MDB_CREATE) // lazy init
private var _lmdb: OptionVal[Lmdb] = OptionVal.None
val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize) private def lmdb(): Lmdb = _lmdb match {
var valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed case OptionVal.Some(l) l
case OptionVal.None
val t0 = System.nanoTime()
log.info("Using durable data in LMDB directory [{}]", dir.getCanonicalPath)
val env = {
val mapSize = config.getBytes("lmdb.map-size")
dir.mkdirs()
Env.create()
.setMapSize(mapSize)
.setMaxDbs(1)
.open(dir, EnvFlags.MDB_NOLOCK)
}
val db = env.openDbi("ddata", DbiFlags.MDB_CREATE)
val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize)
val valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed
if (log.isDebugEnabled)
log.debug("Init of LMDB in directory [{}] took [{} ms]", dir.getCanonicalPath,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
val l = Lmdb(env, db, keyBuffer, valueBuffer)
_lmdb = OptionVal.Some(l)
l
}
def isDbInitialized: Boolean = _lmdb.isDefined
def ensureValueBufferSize(size: Int): Unit = { def ensureValueBufferSize(size: Int): Unit = {
val valueBuffer = lmdb().valueBuffer
if (valueBuffer.remaining < size) { if (valueBuffer.remaining < size) {
DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer) DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer)
valueBuffer = ByteBuffer.allocateDirect(size * 2) _lmdb = OptionVal.Some(lmdb.copy(valueBuffer = ByteBuffer.allocateDirect(size * 2)))
} }
} }
@ -155,53 +182,64 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
override def postStop(): Unit = { override def postStop(): Unit = {
super.postStop() super.postStop()
writeBehind() writeBehind()
Try(db.close()) if (isDbInitialized) {
Try(env.close()) val l = lmdb()
DirectByteBufferPool.tryCleanDirectByteBuffer(keyBuffer) Try(l.db.close())
DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer) Try(l.env.close())
DirectByteBufferPool.tryCleanDirectByteBuffer(l.keyBuffer)
DirectByteBufferPool.tryCleanDirectByteBuffer(l.valueBuffer)
}
} }
def receive = init def receive = init
def init: Receive = { def init: Receive = {
case LoadAll case LoadAll
val t0 = System.nanoTime() if (dir.exists && dir.list().length > 0) {
val tx = env.txnRead() val l = lmdb()
try { val t0 = System.nanoTime()
val iter = db.iterate(tx) val tx = l.env.txnRead()
try { try {
var n = 0 val iter = l.db.iterate(tx)
val loadData = LoadData(iter.asScala.map { entry try {
n += 1 var n = 0
val keyArray = new Array[Byte](entry.key.remaining) val loadData = LoadData(iter.asScala.map { entry
entry.key.get(keyArray) n += 1
val key = new String(keyArray, ByteString.UTF_8) val keyArray = new Array[Byte](entry.key.remaining)
val valArray = new Array[Byte](entry.`val`.remaining) entry.key.get(keyArray)
entry.`val`.get(valArray) val key = new String(keyArray, ByteString.UTF_8)
val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope] val valArray = new Array[Byte](entry.`val`.remaining)
key envelope entry.`val`.get(valArray)
}.toMap) val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope]
if (loadData.data.nonEmpty) key envelope
sender() ! loadData }.toMap)
sender() ! LoadAllCompleted if (loadData.data.nonEmpty)
if (log.isDebugEnabled) sender() ! loadData
log.debug("load all of [{}] entries took [{} ms]", n, sender() ! LoadAllCompleted
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0)) if (log.isDebugEnabled)
context.become(active) log.debug("load all of [{}] entries took [{} ms]", n,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
context.become(active)
} finally {
Try(iter.close())
}
} catch {
case NonFatal(e)
throw new LoadFailed("failed to load durable distributed-data", e)
} finally { } finally {
Try(iter.close()) Try(tx.close())
} }
} catch { } else {
case NonFatal(e) // no files to load
throw new LoadFailed("failed to load durable distributed-data", e) sender() ! LoadAllCompleted
} finally { context.become(active)
Try(tx.close())
} }
} }
def active: Receive = { def active: Receive = {
case Store(key, data, reply) case Store(key, data, reply)
try { try {
lmdb() // init
if (writeBehindInterval.length == 0) { if (writeBehindInterval.length == 0) {
dbPut(OptionVal.None, key, data) dbPut(OptionVal.None, key, data)
} else { } else {
@ -230,24 +268,26 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: KeyId, data: DurableDataEnvelope): Unit = { def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: KeyId, data: DurableDataEnvelope): Unit = {
try { try {
keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
val value = serializer.toBinary(data) val value = serializer.toBinary(data)
ensureValueBufferSize(value.length) ensureValueBufferSize(value.length)
valueBuffer.put(value).flip() val l = lmdb()
l.keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
l.valueBuffer.put(value).flip()
tx match { tx match {
case OptionVal.None db.put(keyBuffer, valueBuffer) case OptionVal.None l.db.put(l.keyBuffer, l.valueBuffer)
case OptionVal.Some(t) db.put(t, keyBuffer, valueBuffer) case OptionVal.Some(t) l.db.put(t, l.keyBuffer, l.valueBuffer)
} }
} finally { } finally {
keyBuffer.clear() val l = lmdb()
valueBuffer.clear() l.keyBuffer.clear()
l.valueBuffer.clear()
} }
} }
def writeBehind(): Unit = { def writeBehind(): Unit = {
if (!pending.isEmpty()) { if (!pending.isEmpty()) {
val t0 = System.nanoTime() val t0 = System.nanoTime()
val tx = env.txnWrite() val tx = lmdb().env.txnWrite()
try { try {
val iter = pending.entrySet.iterator val iter = pending.entrySet.iterator
while (iter.hasNext) { while (iter.hasNext) {

View file

@ -32,7 +32,7 @@ object MiMa extends AutoPlugin {
.max .max
val akka24NoStreamVersions = Seq("2.4.0", "2.4.1") val akka24NoStreamVersions = Seq("2.4.0", "2.4.1")
val akka25Versions = Seq.empty[String] // FIXME enable once 2.5.0 is out (0 to latestMinorVersionOf("2.5.")).map(patch => s"2.5.$patch") val akka25Versions = (0 to latestMinorVersionOf("2.5.")).map(patch => s"2.5.$patch")
val akka24StreamVersions = (2 to 12) map ("2.4." + _) val akka24StreamVersions = (2 to 12) map ("2.4." + _)
val akka24WithScala212 = val akka24WithScala212 =
(13 to latestMinorVersionOf("2.4.")) (13 to latestMinorVersionOf("2.4."))
@ -55,10 +55,10 @@ object MiMa extends AutoPlugin {
else { else {
if (!akka242NewArtifacts.contains(projectName)) akka24NoStreamVersions if (!akka242NewArtifacts.contains(projectName)) akka24NoStreamVersions
else Seq.empty else Seq.empty
} ++ akka24StreamVersions ++ akka24WithScala212 } ++ akka24StreamVersions ++ akka24WithScala212 ++ akka25Versions
case "2.12" => case "2.12" =>
akka24WithScala212 akka24WithScala212 ++ akka25Versions
} }
} }
@ -1164,9 +1164,26 @@ object MiMa extends AutoPlugin {
// * is kept in sync between release-2.4 and master branch // * is kept in sync between release-2.4 and master branch
) )
val Release25Filters = Seq(
"2.5.0" -> Seq(
// #22759 LMDB files
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.env"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.db"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.keyBuffer"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.valueBuffer_="),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.valueBuffer"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.traversalBuilder"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.named"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.addAttributes"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async")
)
)
val Latest24Filters = Release24Filters.last val Latest24Filters = Release24Filters.last
val AllFilters = val AllFilters =
Release24Filters.dropRight(1) :+ (Latest24Filters._1 -> (Latest24Filters._2 ++ bcIssuesBetween24and25)) Release25Filters ++ Release24Filters.dropRight(1) :+ (Latest24Filters._1 -> (Latest24Filters._2 ++ bcIssuesBetween24and25))
Map(AllFilters: _*) Map(AllFilters: _*)
} }