From d2cc69e65aea0829a9c3c5b7a5953d602140067d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Sep 2015 08:32:26 +0200 Subject: [PATCH] =per #18497 Handle large deleteMessages ranges in leveldb journal * creating a large scala range is very inefficient and also impossible above Int.MaxValue, which can happen if deleteMessages(Long.MaxValue) is used * solved by capping the upper seq nr by the highest know seq nr * similar issue in inmem journal --- .../persistence/journal/inmem/InmemJournal.scala | 8 +++++++- .../persistence/journal/leveldb/LeveldbStore.scala | 9 +++++++-- .../scala/akka/persistence/PersistentActorSpec.scala | 12 ++++++++++++ 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 6379b38586..f6e7a24a74 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -83,7 +83,13 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ } sender() ! results case DeleteMessagesTo(pid, tsnr) ⇒ - sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) }) + val toSeqNr = math.min(tsnr, highestSequenceNr(pid)) + var snr = 1L + while (snr <= toSeqNr) { + delete(pid, snr) + snr += 1 + } + sender().tell((), self) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ val highest = highestSequenceNr(pid) if (highest != 0L && max != 0L) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index d11194ed0a..dc43cf33ea 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -97,8 +97,13 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue } - fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ - batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) + if (fromSequenceNr != Long.MaxValue) { + val toSeqNr = math.min(toSequenceNr, readHighestSequenceNr(nid)) + var sequenceNr = fromSequenceNr + while (sequenceNr <= toSeqNr) { + batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) + sequenceNr += 1 + } } } } catch { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 3071dca90d..981498bb3e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -1053,6 +1053,18 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(List("b-1", "b-2")) } + "be able to delete all events" in { + val persistentActor = namedPersistentActor[Behavior1PersistentActor] + persistentActor ! Cmd("b") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + persistentActor ! Delete(Long.MaxValue) + persistentActor ! "boom" // restart, recover + expectMsgType[DeleteMessagesSuccess] + persistentActor ! GetState + expectMsg(Nil) + } + } }