!per #18050 Make event adapter lookup more efficient
This commit is contained in:
parent
a22b3be9da
commit
b3fedb4cf8
3 changed files with 15 additions and 14 deletions
|
|
@ -5,12 +5,11 @@
|
||||||
package akka.persistence.query
|
package akka.persistence.query
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
import akka.persistence.journal.EventSeq
|
||||||
|
import akka.persistence.journal.ReadEventAdapter
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
|
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
|
@ -73,9 +72,6 @@ object ExampleQueryModels {
|
||||||
case class NewModel(value: String)
|
case class NewModel(value: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
class PrefixStringWithPAdapter extends EventAdapter {
|
class PrefixStringWithPAdapter extends ReadEventAdapter {
|
||||||
override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event)
|
override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event)
|
||||||
|
}
|
||||||
override def manifest(event: Any) = ""
|
|
||||||
override def toJournal(event: Any) = throw new Exception("toJournal should not be called by query side")
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,10 @@ import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
/** INTERNAL API */
|
/**
|
||||||
private[akka] class EventAdapters(
|
* `EventAdapters` serves as a per-journal collection of bound event adapters.
|
||||||
|
*/
|
||||||
|
class EventAdapters(
|
||||||
map: ConcurrentHashMap[Class[_], EventAdapter],
|
map: ConcurrentHashMap[Class[_], EventAdapter],
|
||||||
bindings: immutable.Seq[(Class[_], EventAdapter)],
|
bindings: immutable.Seq[(Class[_], EventAdapter)],
|
||||||
log: LoggingAdapter) {
|
log: LoggingAdapter) {
|
||||||
|
|
@ -62,7 +64,10 @@ private[akka] object EventAdapters {
|
||||||
def apply(system: ExtendedActorSystem, config: Config): EventAdapters = {
|
def apply(system: ExtendedActorSystem, config: Config): EventAdapters = {
|
||||||
val adapters = configToMap(config, "event-adapters")
|
val adapters = configToMap(config, "event-adapters")
|
||||||
val adapterBindings = configToListMap(config, "event-adapter-bindings")
|
val adapterBindings = configToListMap(config, "event-adapter-bindings")
|
||||||
apply(system, adapters, adapterBindings)
|
if (adapters.isEmpty && adapterBindings.isEmpty)
|
||||||
|
IdentityEventAdapters
|
||||||
|
else
|
||||||
|
apply(system, adapters, adapterBindings)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def apply(
|
private def apply(
|
||||||
|
|
@ -165,4 +170,4 @@ private[akka] object EventAdapters {
|
||||||
private[akka] case object IdentityEventAdapters extends EventAdapters(null, null, null) {
|
private[akka] case object IdentityEventAdapters extends EventAdapters(null, null, null) {
|
||||||
override def get(clazz: Class[_]): EventAdapter = IdentityEventAdapter
|
override def get(clazz: Class[_]): EventAdapter = IdentityEventAdapter
|
||||||
override def toString = Logging.simpleName(IdentityEventAdapters)
|
override def toString = Logging.simpleName(IdentityEventAdapters)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,8 @@ import akka.persistence.AtomicWrite
|
||||||
private[akka] trait WriteJournalBase {
|
private[akka] trait WriteJournalBase {
|
||||||
this: Actor ⇒
|
this: Actor ⇒
|
||||||
|
|
||||||
lazy val persistence = Persistence(context.system)
|
val persistence = Persistence(context.system)
|
||||||
private def eventAdapters = persistence.adaptersFor(self)
|
private val eventAdapters = persistence.adaptersFor(self)
|
||||||
|
|
||||||
protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite] =
|
protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite] =
|
||||||
rb.collect { // collect instead of flatMap to avoid Some allocations
|
rb.collect { // collect instead of flatMap to avoid Some allocations
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue