From 3ab101039fd14d7bea92c4e14d2c809a60b00229 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Apr 2017 15:12:14 +0200 Subject: [PATCH] Lazy init of LmdbDurableStore, #22759 (#22779) * Lazy init of LmdbDurableStore, #22759 * to avoid creating files (and initializing db) when not needed, e.g. cluster sharding that is not using remember entities * enable MiMa against 2.5.0 * use OptionVal instead --- ...terShardingCustomShardAllocationSpec.scala | 16 -- .../akka/cluster/ddata/DurableStore.scala | 154 +++++++++++------- project/MiMa.scala | 25 ++- 3 files changed, 118 insertions(+), 77 deletions(-) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index f31f78469b..c0cf6f18ef 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -98,10 +98,6 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots" akka.cluster.sharding.state-store-mode = "$mode" - akka.cluster.sharding.distributed-data.durable.lmdb { - dir = target/ClusterShardingCustomShardAllocationSpec/sharding-ddata - map-size = 10 MiB - } """)) } @@ -123,18 +119,6 @@ abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingC override def initialParticipants = roles.size - val storageLocations = List(new File(system.settings.config.getString( - "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) - - override protected def atStartup() { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) - enterBarrier("startup") - } - - override protected def afterTermination() { - storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) - } - def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala index ddb9acac78..a651769046 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala @@ -31,6 +31,7 @@ import org.lmdbjava.DbiFlags import org.lmdbjava.Env import org.lmdbjava.EnvFlags import org.lmdbjava.Txn +import org.lmdbjava.Dbi /** * An actor implementing the durable store for the Distributed Data `Replicator` @@ -100,11 +101,18 @@ object LmdbDurableStore { Props(new LmdbDurableStore(config)) private case object WriteBehind extends DeadLetterSuppression + + private final case class Lmdb( + env: Env[ByteBuffer], + db: Dbi[ByteBuffer], + keyBuffer: ByteBuffer, + valueBuffer: ByteBuffer) } final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { import DurableStore._ import LmdbDurableStore.WriteBehind + import LmdbDurableStore.Lmdb val serialization = SerializationExtension(context.system) val serializer = serialization.serializerFor(classOf[DurableDataEnvelope]).asInstanceOf[SerializerWithStringManifest] @@ -115,31 +123,50 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { case _ ⇒ config.getDuration("lmdb.write-behind-interval", MILLISECONDS).millis } - val env: Env[ByteBuffer] = { - val mapSize = config.getBytes("lmdb.map-size") - val dir = config.getString("lmdb.dir") match { - case path if path.endsWith("ddata") ⇒ - new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}") - case path ⇒ - new File(path) - } - log.info("Using durable data in LMDB directory [{}]", dir.getCanonicalPath) - dir.mkdirs() - Env.create() - .setMapSize(mapSize) - .setMaxDbs(1) - .open(dir, EnvFlags.MDB_NOLOCK) + val dir = config.getString("lmdb.dir") match { + case path if path.endsWith("ddata") ⇒ + new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}") + case path ⇒ + new File(path) } - val db = env.openDbi("ddata", DbiFlags.MDB_CREATE) + // lazy init + private var _lmdb: OptionVal[Lmdb] = OptionVal.None - val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize) - var valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed + private def lmdb(): Lmdb = _lmdb match { + case OptionVal.Some(l) ⇒ l + case OptionVal.None ⇒ + val t0 = System.nanoTime() + log.info("Using durable data in LMDB directory [{}]", dir.getCanonicalPath) + val env = { + val mapSize = config.getBytes("lmdb.map-size") + dir.mkdirs() + Env.create() + .setMapSize(mapSize) + .setMaxDbs(1) + .open(dir, EnvFlags.MDB_NOLOCK) + } + + val db = env.openDbi("ddata", DbiFlags.MDB_CREATE) + + val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize) + val valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed + + if (log.isDebugEnabled) + log.debug("Init of LMDB in directory [{}] took [{} ms]", dir.getCanonicalPath, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0)) + val l = Lmdb(env, db, keyBuffer, valueBuffer) + _lmdb = OptionVal.Some(l) + l + } + + def isDbInitialized: Boolean = _lmdb.isDefined def ensureValueBufferSize(size: Int): Unit = { + val valueBuffer = lmdb().valueBuffer if (valueBuffer.remaining < size) { DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer) - valueBuffer = ByteBuffer.allocateDirect(size * 2) + _lmdb = OptionVal.Some(lmdb.copy(valueBuffer = ByteBuffer.allocateDirect(size * 2))) } } @@ -155,53 +182,64 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { override def postStop(): Unit = { super.postStop() writeBehind() - Try(db.close()) - Try(env.close()) - DirectByteBufferPool.tryCleanDirectByteBuffer(keyBuffer) - DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer) + if (isDbInitialized) { + val l = lmdb() + Try(l.db.close()) + Try(l.env.close()) + DirectByteBufferPool.tryCleanDirectByteBuffer(l.keyBuffer) + DirectByteBufferPool.tryCleanDirectByteBuffer(l.valueBuffer) + } } def receive = init def init: Receive = { case LoadAll ⇒ - val t0 = System.nanoTime() - val tx = env.txnRead() - try { - val iter = db.iterate(tx) + if (dir.exists && dir.list().length > 0) { + val l = lmdb() + val t0 = System.nanoTime() + val tx = l.env.txnRead() try { - var n = 0 - val loadData = LoadData(iter.asScala.map { entry ⇒ - n += 1 - val keyArray = new Array[Byte](entry.key.remaining) - entry.key.get(keyArray) - val key = new String(keyArray, ByteString.UTF_8) - val valArray = new Array[Byte](entry.`val`.remaining) - entry.`val`.get(valArray) - val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope] - key → envelope - }.toMap) - if (loadData.data.nonEmpty) - sender() ! loadData - sender() ! LoadAllCompleted - if (log.isDebugEnabled) - log.debug("load all of [{}] entries took [{} ms]", n, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0)) - context.become(active) + val iter = l.db.iterate(tx) + try { + var n = 0 + val loadData = LoadData(iter.asScala.map { entry ⇒ + n += 1 + val keyArray = new Array[Byte](entry.key.remaining) + entry.key.get(keyArray) + val key = new String(keyArray, ByteString.UTF_8) + val valArray = new Array[Byte](entry.`val`.remaining) + entry.`val`.get(valArray) + val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope] + key → envelope + }.toMap) + if (loadData.data.nonEmpty) + sender() ! loadData + sender() ! LoadAllCompleted + if (log.isDebugEnabled) + log.debug("load all of [{}] entries took [{} ms]", n, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0)) + context.become(active) + } finally { + Try(iter.close()) + } + } catch { + case NonFatal(e) ⇒ + throw new LoadFailed("failed to load durable distributed-data", e) } finally { - Try(iter.close()) + Try(tx.close()) } - } catch { - case NonFatal(e) ⇒ - throw new LoadFailed("failed to load durable distributed-data", e) - } finally { - Try(tx.close()) + } else { + // no files to load + sender() ! LoadAllCompleted + context.become(active) } } def active: Receive = { case Store(key, data, reply) ⇒ try { + lmdb() // init if (writeBehindInterval.length == 0) { dbPut(OptionVal.None, key, data) } else { @@ -230,24 +268,26 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging { def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: KeyId, data: DurableDataEnvelope): Unit = { try { - keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip() val value = serializer.toBinary(data) ensureValueBufferSize(value.length) - valueBuffer.put(value).flip() + val l = lmdb() + l.keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip() + l.valueBuffer.put(value).flip() tx match { - case OptionVal.None ⇒ db.put(keyBuffer, valueBuffer) - case OptionVal.Some(t) ⇒ db.put(t, keyBuffer, valueBuffer) + case OptionVal.None ⇒ l.db.put(l.keyBuffer, l.valueBuffer) + case OptionVal.Some(t) ⇒ l.db.put(t, l.keyBuffer, l.valueBuffer) } } finally { - keyBuffer.clear() - valueBuffer.clear() + val l = lmdb() + l.keyBuffer.clear() + l.valueBuffer.clear() } } def writeBehind(): Unit = { if (!pending.isEmpty()) { val t0 = System.nanoTime() - val tx = env.txnWrite() + val tx = lmdb().env.txnWrite() try { val iter = pending.entrySet.iterator while (iter.hasNext) { diff --git a/project/MiMa.scala b/project/MiMa.scala index eae610a357..6dc9d6d5ba 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -32,7 +32,7 @@ object MiMa extends AutoPlugin { .max val akka24NoStreamVersions = Seq("2.4.0", "2.4.1") - val akka25Versions = Seq.empty[String] // FIXME enable once 2.5.0 is out (0 to latestMinorVersionOf("2.5.")).map(patch => s"2.5.$patch") + val akka25Versions = (0 to latestMinorVersionOf("2.5.")).map(patch => s"2.5.$patch") val akka24StreamVersions = (2 to 12) map ("2.4." + _) val akka24WithScala212 = (13 to latestMinorVersionOf("2.4.")) @@ -55,10 +55,10 @@ object MiMa extends AutoPlugin { else { if (!akka242NewArtifacts.contains(projectName)) akka24NoStreamVersions else Seq.empty - } ++ akka24StreamVersions ++ akka24WithScala212 + } ++ akka24StreamVersions ++ akka24WithScala212 ++ akka25Versions case "2.12" => - akka24WithScala212 + akka24WithScala212 ++ akka25Versions } } @@ -1163,10 +1163,27 @@ object MiMa extends AutoPlugin { // * this list ends with the latest released version number // * is kept in sync between release-2.4 and master branch ) + + val Release25Filters = Seq( + "2.5.0" -> Seq( + + // #22759 LMDB files + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.env"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.db"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.keyBuffer"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.valueBuffer_="), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.LmdbDurableStore.valueBuffer"), + + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.traversalBuilder"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.named"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.addAttributes"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async") + ) + ) val Latest24Filters = Release24Filters.last val AllFilters = - Release24Filters.dropRight(1) :+ (Latest24Filters._1 -> (Latest24Filters._2 ++ bcIssuesBetween24and25)) + Release25Filters ++ Release24Filters.dropRight(1) :+ (Latest24Filters._1 -> (Latest24Filters._2 ++ bcIssuesBetween24and25)) Map(AllFilters: _*) }