+per #18751 Persistence: Return completed current events stream for cleaned journal
This commit is contained in:
parent
fb80ac8acb
commit
3cc1d0fe8e
4 changed files with 74 additions and 3 deletions
|
|
@ -181,7 +181,7 @@ private[akka] class CurrentEventsByPersistenceIdPublisher(
|
|||
deliverBuf()
|
||||
if (highestSeqNr < toSequenceNr)
|
||||
toSeqNr = highestSeqNr
|
||||
if (highestSeqNr == 0L || (buf.isEmpty && currSeqNo > toSequenceNr))
|
||||
if (buf.isEmpty && (currSeqNo > toSequenceNr || currSeqNo == fromSequenceNr))
|
||||
onCompleteThenStop()
|
||||
else
|
||||
self ! Continue // more to fetch
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ private[akka] class CurrentEventsByTagPublisher(
|
|||
deliverBuf()
|
||||
if (highestSeqNr < toOffset)
|
||||
_toOffset = highestSeqNr
|
||||
if (highestSeqNr == 0L || (buf.isEmpty && currOffset > toOffset))
|
||||
if (buf.isEmpty && (currOffset > toOffset || currOffset == fromOffset))
|
||||
onCompleteThenStop()
|
||||
else
|
||||
self ! Continue // more to fetch
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
|
|||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
|
||||
|
||||
def setup(persistenceId: String): ActorRef = {
|
||||
val ref = system.actorOf(TestActor.props(persistenceId))
|
||||
val ref = setupEmpty(persistenceId)
|
||||
ref ! s"$persistenceId-1"
|
||||
ref ! s"$persistenceId-2"
|
||||
ref ! s"$persistenceId-3"
|
||||
|
|
@ -43,6 +43,10 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
|
|||
ref
|
||||
}
|
||||
|
||||
def setupEmpty(persistenceId: String): ActorRef = {
|
||||
system.actorOf(TestActor.props(persistenceId))
|
||||
}
|
||||
|
||||
"Leveldb query EventsByPersistenceId" must {
|
||||
|
||||
"implement standard EventsByTagQuery" in {
|
||||
|
|
@ -88,6 +92,65 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
|
|||
.expectNext("f-3")
|
||||
.expectComplete() // f-4 not seen
|
||||
}
|
||||
|
||||
"return empty stream for cleaned journal from 0 to MaxLong" in {
|
||||
val ref = setup("g1")
|
||||
|
||||
ref ! TestActor.DeleteCmd(3L)
|
||||
expectMsg(s"${3L}-deleted")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("g1", 0L, Long.MaxValue)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for cleaned journal from 0 to 0" in {
|
||||
val ref = setup("g2")
|
||||
|
||||
ref ! TestActor.DeleteCmd(3L)
|
||||
expectMsg(s"${3L}-deleted")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("g2", 0L, 0L)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
"return remaining values after partial journal cleanup" in {
|
||||
val ref = setup("h")
|
||||
|
||||
ref ! TestActor.DeleteCmd(2L)
|
||||
expectMsg(s"${2L}-deleted")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3") expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for empty journal" in {
|
||||
val ref = setupEmpty("i")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("i", 0L, Long.MaxValue)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for journal from 0 to 0" in {
|
||||
val ref = setup("k1")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("k1", 0L, 0L)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for empty journal from 0 to 0" in {
|
||||
val ref = setupEmpty("k2")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("k2", 0L, 0L)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
"return empty stream for journal from seqNo greater than highestSeqNo" in {
|
||||
val ref = setup("l")
|
||||
|
||||
val src = queries.currentEventsByPersistenceId("l", 4L, 3L)
|
||||
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Leveldb live query EventsByPersistenceId" must {
|
||||
|
|
|
|||
|
|
@ -9,15 +9,23 @@ import akka.actor.Props
|
|||
object TestActor {
|
||||
def props(persistenceId: String): Props =
|
||||
Props(new TestActor(persistenceId))
|
||||
|
||||
case class DeleteCmd(toSeqNr: Long = Long.MaxValue)
|
||||
}
|
||||
|
||||
class TestActor(override val persistenceId: String) extends PersistentActor {
|
||||
|
||||
import TestActor.DeleteCmd
|
||||
|
||||
val receiveRecover: Receive = {
|
||||
case evt: String ⇒
|
||||
}
|
||||
|
||||
val receiveCommand: Receive = {
|
||||
case DeleteCmd(toSeqNr) ⇒
|
||||
deleteMessages(toSeqNr)
|
||||
sender() ! s"$toSeqNr-deleted"
|
||||
|
||||
case cmd: String ⇒
|
||||
persist(cmd) { evt ⇒
|
||||
sender() ! evt + "-done"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue