diff --git a/.github/workflows/scala3-build.yml b/.github/workflows/scala3-build.yml index 7fe87c037c..e1ea061623 100644 --- a/.github/workflows/scala3-build.yml +++ b/.github/workflows/scala3-build.yml @@ -24,7 +24,7 @@ jobs: - akka-cluster/Test/compile - akka-coordination/test - akka-discovery/test - - akka-persistence/compile + - akka-persistence/test akka-persistence-shared/test akka-persistence-query/test - akka-pki/test - akka-serialization-jackson/test - akka-slf4j/test diff --git a/akka-persistence-query/src/main/mima-filters/2.6.16.backwards.excludes/persistence-query-leveldb.excludes b/akka-persistence-query/src/main/mima-filters/2.6.16.backwards.excludes/persistence-query-leveldb.excludes new file mode 100644 index 0000000000..22800ed1e6 --- /dev/null +++ b/akka-persistence-query/src/main/mima-filters/2.6.16.backwards.excludes/persistence-query-leveldb.excludes @@ -0,0 +1,6 @@ +# Provider users shouldn't rely on the more specific type anyway +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider.scaladslReadJournal") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider.javadslReadJournal") + +# Buffer is private[leveldb] +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.query.journal.leveldb.Buffer.doPush") diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala index 283575248a..b31f3f905e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala @@ -30,6 +30,8 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] { + override def doPush(out: Outlet[String], elem: String): Unit = super.push(out, elem) + setHandler(out, this) val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) var initialResponseReceived = false diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/Buffer.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/Buffer.scala index 8fc5bce634..2e93a0e520 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/Buffer.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/Buffer.scala @@ -15,7 +15,10 @@ import akka.util.ccompat.JavaConverters._ * INTERNAL API */ @InternalApi -private[leveldb] trait Buffer[T] { self: GraphStageLogic => +private[leveldb] abstract trait Buffer[T] { self: GraphStageLogic => + + def doPush(out: Outlet[T], elem: T): Unit + private val buf: java.util.LinkedList[T] = new util.LinkedList[T]() def buffer(element: T): Unit = { buf.add(element) @@ -25,7 +28,7 @@ private[leveldb] trait Buffer[T] { self: GraphStageLogic => } def deliverBuf(out: Outlet[T]): Unit = { if (!buf.isEmpty && isAvailable(out)) { - push(out, buf.remove()) + doPush(out, buf.remove()) } } def bufferSize: Int = buf.size diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index 26ad262778..8f9ed181f1 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -52,6 +52,8 @@ final private[akka] class EventsByPersistenceIdStage( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { + override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem) + val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) var stageActorRef: ActorRef = null var replayInProgress = false diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala index 8598019217..278431b098 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala @@ -53,6 +53,8 @@ final private[leveldb] class EventsByTagStage( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { + override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem) + val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) var currOffset: Long = fromOffset var toOffset: Long = initialTooOffset diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala index 0fe3844671..6c5f691095 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/LeveldbReadJournalProvider.scala @@ -5,17 +5,20 @@ package akka.persistence.query.journal.leveldb import com.typesafe.config.Config - import akka.actor.ExtendedActorSystem import akka.persistence.query.ReadJournalProvider @deprecated("Use another journal/query implementation", "2.6.15") class LeveldbReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider { - override val scaladslReadJournal: scaladsl.LeveldbReadJournal = - new scaladsl.LeveldbReadJournal(system, config) + val readJournal: scaladsl.LeveldbReadJournal = new scaladsl.LeveldbReadJournal(system, config) - override val javadslReadJournal: javadsl.LeveldbReadJournal = - new javadsl.LeveldbReadJournal(scaladslReadJournal) + override def scaladslReadJournal(): akka.persistence.query.scaladsl.ReadJournal = + readJournal + + val javaReadJournal = new javadsl.LeveldbReadJournal(readJournal) + + override def javadslReadJournal(): akka.persistence.query.javadsl.ReadJournal = + javaReadJournal } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala index 2592fe3e8e..0c7006d022 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/DummyReadJournal.scala @@ -56,11 +56,15 @@ class DummyReadJournalProvider(dummyValue: String) extends ReadJournalProvider { // mandatory zero-arg constructor def this() = this("dummy") - override val scaladslReadJournal: DummyReadJournal = - new DummyReadJournal(dummyValue) + val readJournal = new DummyReadJournal(dummyValue) - override val javadslReadJournal: DummyReadJournalForJava = - new DummyReadJournalForJava(scaladslReadJournal) + override def scaladslReadJournal(): DummyReadJournal = + readJournal + + val javaReadJournal = new DummyReadJournalForJava(readJournal) + + override def javadslReadJournal(): DummyReadJournalForJava = + javaReadJournal } class DummyReadJournalProvider2(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/OffsetSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/OffsetSpec.scala index 506aa60448..eb2c8b7f54 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/OffsetSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/OffsetSpec.scala @@ -30,7 +30,7 @@ class OffsetSpec extends AnyWordSpecLike with Matchers { "Sequence offset" must { "be ordered correctly" in { - val sequenceBasedList = List(1L, 2L, 3L).map(Sequence) + val sequenceBasedList = List(1L, 2L, 3L).map(Sequence(_)) Random.shuffle(sequenceBasedList).sorted shouldEqual sequenceBasedList } } diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index e1245dfb02..c385d8e590 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -521,6 +521,13 @@ abstract class AbstractPersistentFSM[S <: FSMState, D, E] // workaround, possibly for https://github.com/scala/bug/issues/11512 override def receive: Receive = super.receive + + @throws(classOf[Exception]) + override def postStop(): Unit = { + // Make sure any ambiguity is resolved on the 'scala side' so this doesn't have to + // happen on the 'java side' + super.postStop() + } } /** diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala index d3faf13006..96dcd5ae79 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -182,7 +182,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before "use the same adapter when reading as was used when writing to the journal" in withActorSystem("SimpleSystem", adaptersConfig) { implicit system => val p = TestProbe() - implicit val ref = p.ref + implicit val ref: ActorRef = p.ref val p1 = persister("p1") val a = A("a1") @@ -207,7 +207,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before withActorSystem("NoAdapterSystem", adaptersConfig) { implicit system => val p = TestProbe() - implicit val ref = p.ref + implicit val ref: ActorRef = p.ref val p2 = persister(persistentName) val a = A("a1") @@ -229,7 +229,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before withActorSystem("NowAdaptersAddedSystem", newAdaptersConfig) { implicit system => val p = TestProbe() - implicit val ref = p.ref + implicit val ref: ActorRef = p.ref val p22 = persister(persistentName) p22 ! GetState diff --git a/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala index f76780f2cc..d025d92674 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala @@ -59,7 +59,7 @@ object EventAdapterSpec { } class LoggingAdapter(system: ExtendedActorSystem) extends EventAdapter { - final val log = Logging(system, getClass) + final val log = Logging(system, classOf[EventAdapterSpec]) override def toJournal(event: Any): Any = { log.info("On its way to the journal: []: " + event) event diff --git a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala index e38bd51a88..40577ac9f3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala @@ -48,13 +48,11 @@ object EventSourcedActorFailureSpec { } def isWrong(messages: immutable.Seq[AtomicWrite]): Boolean = - messages.exists { - case a: AtomicWrite => - a.payload.exists { - case PersistentRepr(Evt(s: String), _) => s.contains("wrong") - case _ => false - } - case _ => false + messages.exists { a => + a.payload.exists { + case PersistentRepr(Evt(s: String), _) => s.contains("wrong") + case _ => false + } } def checkSerializable(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 54cb70c9d4..a9f1c70115 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -19,7 +19,7 @@ import akka.persistence._ import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ -@nowarn("msg=deprecated") +@nowarn("msg=deprecated|Unused import") abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { import PersistentFSMSpec._ @@ -290,7 +290,6 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) "can extract state name" in { StateChangeEvent("xxx", None) match { case StateChangeEvent(name, _) => name should equal("xxx") - case _ => fail("unable to extract state name") } } @@ -341,6 +340,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsgPF() { case SnapshotOffer(SnapshotMetadata(_, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) => stateIdentifier should ===(Paid.identifier) + import org.scalatest.matchers.should.Matchers.unconstrainedEquality cart should ===(NonEmptyShoppingCart(List(shirt, shoes, coat))) timestamp should be > 0L } diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala index faf50ac37c..129ed451ca 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala @@ -5,10 +5,12 @@ package akka.persistence.journal import com.typesafe.config.ConfigFactory - import akka.actor.ExtendedActorSystem import akka.testkit.AkkaSpec +import scala.annotation.nowarn + +@nowarn("msg=Unused import") class InmemEventAdaptersSpec extends AkkaSpec { val config = ConfigFactory.parseString(s""" @@ -65,6 +67,7 @@ class InmemEventAdaptersSpec extends AkkaSpec { adapters.get(classOf[PreciseAdapterEvent]).getClass should ===(classOf[PreciseAdapter]) // no adapter defined for Long, should return identity adapter + import org.scalatest.matchers.should.Matchers.unconstrainedEquality adapters.get(classOf[java.lang.Long]).getClass should ===(IdentityEventAdapter.getClass) }