Build akka-persistence-query on scala3 (#30663)
* Build akka-persistence-query on scala3 * Build and test on scala3 on GHA * scalafmt * Also akka-persistence-shared * Add mima exclusions * Add type to implicit, not to probe
This commit is contained in:
parent
1e389da9b8
commit
6c2ed1b17d
15 changed files with 57 additions and 27 deletions
2
.github/workflows/scala3-build.yml
vendored
2
.github/workflows/scala3-build.yml
vendored
|
|
@ -24,7 +24,7 @@ jobs:
|
||||||
- akka-cluster/Test/compile
|
- akka-cluster/Test/compile
|
||||||
- akka-coordination/test
|
- akka-coordination/test
|
||||||
- akka-discovery/test
|
- akka-discovery/test
|
||||||
- akka-persistence/compile
|
- akka-persistence/test akka-persistence-shared/test akka-persistence-query/test
|
||||||
- akka-pki/test
|
- akka-pki/test
|
||||||
- akka-serialization-jackson/test
|
- akka-serialization-jackson/test
|
||||||
- akka-slf4j/test
|
- akka-slf4j/test
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
# Provider users shouldn't rely on the more specific type anyway
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider.scaladslReadJournal")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider.javadslReadJournal")
|
||||||
|
|
||||||
|
# Buffer is private[leveldb]
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.query.journal.leveldb.Buffer.doPush")
|
||||||
|
|
@ -30,6 +30,8 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
|
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
|
||||||
|
override def doPush(out: Outlet[String], elem: String): Unit = super.push(out, elem)
|
||||||
|
|
||||||
setHandler(out, this)
|
setHandler(out, this)
|
||||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||||
var initialResponseReceived = false
|
var initialResponseReceived = false
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,10 @@ import akka.util.ccompat.JavaConverters._
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[leveldb] trait Buffer[T] { self: GraphStageLogic =>
|
private[leveldb] abstract trait Buffer[T] { self: GraphStageLogic =>
|
||||||
|
|
||||||
|
def doPush(out: Outlet[T], elem: T): Unit
|
||||||
|
|
||||||
private val buf: java.util.LinkedList[T] = new util.LinkedList[T]()
|
private val buf: java.util.LinkedList[T] = new util.LinkedList[T]()
|
||||||
def buffer(element: T): Unit = {
|
def buffer(element: T): Unit = {
|
||||||
buf.add(element)
|
buf.add(element)
|
||||||
|
|
@ -25,7 +28,7 @@ private[leveldb] trait Buffer[T] { self: GraphStageLogic =>
|
||||||
}
|
}
|
||||||
def deliverBuf(out: Outlet[T]): Unit = {
|
def deliverBuf(out: Outlet[T]): Unit = {
|
||||||
if (!buf.isEmpty && isAvailable(out)) {
|
if (!buf.isEmpty && isAvailable(out)) {
|
||||||
push(out, buf.remove())
|
doPush(out, buf.remove())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def bufferSize: Int = buf.size
|
def bufferSize: Int = buf.size
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,8 @@ final private[akka] class EventsByPersistenceIdStage(
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
||||||
|
override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem)
|
||||||
|
|
||||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||||
var stageActorRef: ActorRef = null
|
var stageActorRef: ActorRef = null
|
||||||
var replayInProgress = false
|
var replayInProgress = false
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,8 @@ final private[leveldb] class EventsByTagStage(
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
||||||
|
override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem)
|
||||||
|
|
||||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||||
var currOffset: Long = fromOffset
|
var currOffset: Long = fromOffset
|
||||||
var toOffset: Long = initialTooOffset
|
var toOffset: Long = initialTooOffset
|
||||||
|
|
|
||||||
|
|
@ -5,17 +5,20 @@
|
||||||
package akka.persistence.query.journal.leveldb
|
package akka.persistence.query.journal.leveldb
|
||||||
|
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.persistence.query.ReadJournalProvider
|
import akka.persistence.query.ReadJournalProvider
|
||||||
|
|
||||||
@deprecated("Use another journal/query implementation", "2.6.15")
|
@deprecated("Use another journal/query implementation", "2.6.15")
|
||||||
class LeveldbReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
|
class LeveldbReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
|
||||||
|
|
||||||
override val scaladslReadJournal: scaladsl.LeveldbReadJournal =
|
val readJournal: scaladsl.LeveldbReadJournal = new scaladsl.LeveldbReadJournal(system, config)
|
||||||
new scaladsl.LeveldbReadJournal(system, config)
|
|
||||||
|
|
||||||
override val javadslReadJournal: javadsl.LeveldbReadJournal =
|
override def scaladslReadJournal(): akka.persistence.query.scaladsl.ReadJournal =
|
||||||
new javadsl.LeveldbReadJournal(scaladslReadJournal)
|
readJournal
|
||||||
|
|
||||||
|
val javaReadJournal = new javadsl.LeveldbReadJournal(readJournal)
|
||||||
|
|
||||||
|
override def javadslReadJournal(): akka.persistence.query.javadsl.ReadJournal =
|
||||||
|
javaReadJournal
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -56,11 +56,15 @@ class DummyReadJournalProvider(dummyValue: String) extends ReadJournalProvider {
|
||||||
// mandatory zero-arg constructor
|
// mandatory zero-arg constructor
|
||||||
def this() = this("dummy")
|
def this() = this("dummy")
|
||||||
|
|
||||||
override val scaladslReadJournal: DummyReadJournal =
|
val readJournal = new DummyReadJournal(dummyValue)
|
||||||
new DummyReadJournal(dummyValue)
|
|
||||||
|
|
||||||
override val javadslReadJournal: DummyReadJournalForJava =
|
override def scaladslReadJournal(): DummyReadJournal =
|
||||||
new DummyReadJournalForJava(scaladslReadJournal)
|
readJournal
|
||||||
|
|
||||||
|
val javaReadJournal = new DummyReadJournalForJava(readJournal)
|
||||||
|
|
||||||
|
override def javadslReadJournal(): DummyReadJournalForJava =
|
||||||
|
javaReadJournal
|
||||||
}
|
}
|
||||||
|
|
||||||
class DummyReadJournalProvider2(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider
|
class DummyReadJournalProvider2(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class OffsetSpec extends AnyWordSpecLike with Matchers {
|
||||||
"Sequence offset" must {
|
"Sequence offset" must {
|
||||||
|
|
||||||
"be ordered correctly" in {
|
"be ordered correctly" in {
|
||||||
val sequenceBasedList = List(1L, 2L, 3L).map(Sequence)
|
val sequenceBasedList = List(1L, 2L, 3L).map(Sequence(_))
|
||||||
Random.shuffle(sequenceBasedList).sorted shouldEqual sequenceBasedList
|
Random.shuffle(sequenceBasedList).sorted shouldEqual sequenceBasedList
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -521,6 +521,13 @@ abstract class AbstractPersistentFSM[S <: FSMState, D, E]
|
||||||
|
|
||||||
// workaround, possibly for https://github.com/scala/bug/issues/11512
|
// workaround, possibly for https://github.com/scala/bug/issues/11512
|
||||||
override def receive: Receive = super.receive
|
override def receive: Receive = super.receive
|
||||||
|
|
||||||
|
@throws(classOf[Exception])
|
||||||
|
override def postStop(): Unit = {
|
||||||
|
// Make sure any ambiguity is resolved on the 'scala side' so this doesn't have to
|
||||||
|
// happen on the 'java side'
|
||||||
|
super.postStop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before
|
||||||
"use the same adapter when reading as was used when writing to the journal" in
|
"use the same adapter when reading as was used when writing to the journal" in
|
||||||
withActorSystem("SimpleSystem", adaptersConfig) { implicit system =>
|
withActorSystem("SimpleSystem", adaptersConfig) { implicit system =>
|
||||||
val p = TestProbe()
|
val p = TestProbe()
|
||||||
implicit val ref = p.ref
|
implicit val ref: ActorRef = p.ref
|
||||||
|
|
||||||
val p1 = persister("p1")
|
val p1 = persister("p1")
|
||||||
val a = A("a1")
|
val a = A("a1")
|
||||||
|
|
@ -207,7 +207,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before
|
||||||
|
|
||||||
withActorSystem("NoAdapterSystem", adaptersConfig) { implicit system =>
|
withActorSystem("NoAdapterSystem", adaptersConfig) { implicit system =>
|
||||||
val p = TestProbe()
|
val p = TestProbe()
|
||||||
implicit val ref = p.ref
|
implicit val ref: ActorRef = p.ref
|
||||||
|
|
||||||
val p2 = persister(persistentName)
|
val p2 = persister(persistentName)
|
||||||
val a = A("a1")
|
val a = A("a1")
|
||||||
|
|
@ -229,7 +229,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before
|
||||||
|
|
||||||
withActorSystem("NowAdaptersAddedSystem", newAdaptersConfig) { implicit system =>
|
withActorSystem("NowAdaptersAddedSystem", newAdaptersConfig) { implicit system =>
|
||||||
val p = TestProbe()
|
val p = TestProbe()
|
||||||
implicit val ref = p.ref
|
implicit val ref: ActorRef = p.ref
|
||||||
|
|
||||||
val p22 = persister(persistentName)
|
val p22 = persister(persistentName)
|
||||||
p22 ! GetState
|
p22 ! GetState
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ object EventAdapterSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
class LoggingAdapter(system: ExtendedActorSystem) extends EventAdapter {
|
class LoggingAdapter(system: ExtendedActorSystem) extends EventAdapter {
|
||||||
final val log = Logging(system, getClass)
|
final val log = Logging(system, classOf[EventAdapterSpec])
|
||||||
override def toJournal(event: Any): Any = {
|
override def toJournal(event: Any): Any = {
|
||||||
log.info("On its way to the journal: []: " + event)
|
log.info("On its way to the journal: []: " + event)
|
||||||
event
|
event
|
||||||
|
|
|
||||||
|
|
@ -48,13 +48,11 @@ object EventSourcedActorFailureSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
def isWrong(messages: immutable.Seq[AtomicWrite]): Boolean =
|
def isWrong(messages: immutable.Seq[AtomicWrite]): Boolean =
|
||||||
messages.exists {
|
messages.exists { a =>
|
||||||
case a: AtomicWrite =>
|
a.payload.exists {
|
||||||
a.payload.exists {
|
case PersistentRepr(Evt(s: String), _) => s.contains("wrong")
|
||||||
case PersistentRepr(Evt(s: String), _) => s.contains("wrong")
|
case _ => false
|
||||||
case _ => false
|
}
|
||||||
}
|
|
||||||
case _ => false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def checkSerializable(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] =
|
def checkSerializable(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] =
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import akka.persistence._
|
||||||
import akka.persistence.fsm.PersistentFSM._
|
import akka.persistence.fsm.PersistentFSM._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
||||||
@nowarn("msg=deprecated")
|
@nowarn("msg=deprecated|Unused import")
|
||||||
abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
|
abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
|
||||||
import PersistentFSMSpec._
|
import PersistentFSMSpec._
|
||||||
|
|
||||||
|
|
@ -290,7 +290,6 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
|
||||||
"can extract state name" in {
|
"can extract state name" in {
|
||||||
StateChangeEvent("xxx", None) match {
|
StateChangeEvent("xxx", None) match {
|
||||||
case StateChangeEvent(name, _) => name should equal("xxx")
|
case StateChangeEvent(name, _) => name should equal("xxx")
|
||||||
case _ => fail("unable to extract state name")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -341,6 +340,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
|
||||||
expectMsgPF() {
|
expectMsgPF() {
|
||||||
case SnapshotOffer(SnapshotMetadata(_, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) =>
|
case SnapshotOffer(SnapshotMetadata(_, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) =>
|
||||||
stateIdentifier should ===(Paid.identifier)
|
stateIdentifier should ===(Paid.identifier)
|
||||||
|
import org.scalatest.matchers.should.Matchers.unconstrainedEquality
|
||||||
cart should ===(NonEmptyShoppingCart(List(shirt, shoes, coat)))
|
cart should ===(NonEmptyShoppingCart(List(shirt, shoes, coat)))
|
||||||
timestamp should be > 0L
|
timestamp should be > 0L
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,12 @@
|
||||||
package akka.persistence.journal
|
package akka.persistence.journal
|
||||||
|
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
import scala.annotation.nowarn
|
||||||
|
|
||||||
|
@nowarn("msg=Unused import")
|
||||||
class InmemEventAdaptersSpec extends AkkaSpec {
|
class InmemEventAdaptersSpec extends AkkaSpec {
|
||||||
|
|
||||||
val config = ConfigFactory.parseString(s"""
|
val config = ConfigFactory.parseString(s"""
|
||||||
|
|
@ -65,6 +67,7 @@ class InmemEventAdaptersSpec extends AkkaSpec {
|
||||||
adapters.get(classOf[PreciseAdapterEvent]).getClass should ===(classOf[PreciseAdapter])
|
adapters.get(classOf[PreciseAdapterEvent]).getClass should ===(classOf[PreciseAdapter])
|
||||||
|
|
||||||
// no adapter defined for Long, should return identity adapter
|
// no adapter defined for Long, should return identity adapter
|
||||||
|
import org.scalatest.matchers.should.Matchers.unconstrainedEquality
|
||||||
adapters.get(classOf[java.lang.Long]).getClass should ===(IdentityEventAdapter.getClass)
|
adapters.get(classOf[java.lang.Long]).getClass should ===(IdentityEventAdapter.getClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue