+per #16541 allow using javadsl implemented journals as-if scaladsl

This commit is contained in:
Patrik Nordwall & Konrad Malawski 2015-06-08 12:26:19 +02:00 committed by Konrad Malawski
parent 893578a8af
commit 3b94108e0c
8 changed files with 123 additions and 35 deletions

View file

@ -69,7 +69,6 @@ public class PersistenceQueryDocTest {
private <I, M> akka.japi.function.Function<I, M> noMaterializedValue () {
return param -> (M) null;
}
}
//#my-read-journal
}

View file

@ -2,45 +2,44 @@
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.javadsl
package akka.persistence.query.javadsl;
import akka.persistence.query.{ Query, Hint }
import akka.stream.javadsl.Source
import scala.annotation.varargs
import akka.persistence.query.Query;
import akka.persistence.query.Hint;
import akka.stream.javadsl.Source;
import scala.annotation.varargs;
/**
* Java API
*
* <p>
* API for reading persistent events and information derived
* from stored persistent events.
*
* <p>
* The purpose of the API is not to enforce compatibility between different
* journal implementations, because the technical capabilities may be very different.
* The interface is very open so that different journals may implement specific queries.
*
* <p>
* Usage:
* {{{
* <pre><code>
* final ReadJournal journal =
* PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
*
* final Source&lt;EventEnvelope, ?&gt; events =
* journal.query(new EventsByTag("mytag", 0L));
* }}}
* </pre></code>
*/
trait ReadJournal {
public interface ReadJournal {
/**
* Java API
*
* <p>
* A query that returns a `Source` with output type `T` and materialized value `M`.
*
* <p>
* The `hints` are optional parameters that defines how to execute the
* query, typically specific to the journal implementation.
*
*/
@varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M]
<T, M> Source<T, M> query(Query<T, M> q, Hint... hints);
}

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query.javadsl;
import akka.japi.Util;
import akka.persistence.query.Hint;
import akka.persistence.query.Query;
import akka.stream.javadsl.Source;
/**
* INTERNAL API
*
* Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal}
* to JavaDSL {@link ReadJournal}.
*/
public final class ReadJournalAdapter implements ReadJournal {
private final akka.persistence.query.scaladsl.ReadJournal backing;
public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) {
this.backing = backing;
}
@Override
public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
return backing.query(q, Util.immutableSeq(hints)).asJava();
}
}

View file

@ -7,9 +7,8 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.event.Logging
import akka.stream.javadsl.Source
import scala.annotation.{varargs, tailrec}
import scala.annotation.tailrec
import scala.util.Failure
/**
@ -63,12 +62,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
* Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry.
*/
final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal =
new javadsl.ReadJournal {
val backing = readJournalFor(readJournalPluginId)
@varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
backing.query(q, hints: _*).asJava
}
new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId))
private def createPlugin(configPath: String): scaladsl.ReadJournal = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
@ -79,14 +73,24 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
// TODO remove duplication
val scalaPlugin =
if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass))
system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil)
.orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil))
.map(jj new scaladsl.ReadJournalAdapter(jj))
.recoverWith {
case ex: Exception Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex))
}
else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend")
// TODO possibly apply event adapters here
plugin.get
scalaPlugin.get
}
/** Check for default or missing identity. */

View file

@ -36,3 +36,9 @@ abstract class ReadJournal {
def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M]
}
/** INTERNAL API */
private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal {
override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] =
backing.query(q, hints: _*).asScala
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.query;
import akka.persistence.query.javadsl.ReadJournal;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Iterator;
/**
* Use for tests only!
* Emits infinite stream of strings (representing queried for events).
*/
class MockJavaReadJournal implements ReadJournal {
public static final String Identifier = "akka.persistence.query.journal.mock-java";
public static final Config config = ConfigFactory.parseString(
Identifier + " { \n" +
" class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" +
" }\n\n");
@Override
@SuppressWarnings("unchecked")
public <T, M> Source<T, M> query(Query<T, M> q, Hint... hints) {
return (Source<T, M>) Source.fromIterator(() -> new Iterator<String>() {
private int i = 0;
@Override public boolean hasNext() { return true; }
@Override public String next() {
return "" + (i++);
}
});
}
}

View file

@ -10,12 +10,11 @@ import akka.actor.ActorSystem
import akka.persistence.journal.{ EventAdapter, EventSeq }
import com.typesafe.config.ConfigFactory
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import org.scalautils.ConversionCheckedTripleEquals
import scala.concurrent.Await
import scala.concurrent.duration._
class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals {
class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
val anything: Query[String, _] = null
@ -42,12 +41,24 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte
}.getMessage should include("missing persistence read journal")
}
}
"expose scaladsl implemented journal as javadsl.ReadJournal" in {
withActorSystem() { system
val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier)
}
}
"expose javadsl implemented journal as scaladsl.ReadJournal" in {
withActorSystem() { system
val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier)
}
}
}
private val systemCounter = new AtomicInteger()
private def withActorSystem(conf: String = "")(block: ActorSystem Unit): Unit = {
val config =
MockReadJournal.config
.withFallback(MockJavaReadJournal.config)
.withFallback(ConfigFactory.parseString(conf))
.withFallback(ConfigFactory.parseString(eventAdaptersConfig))
.withFallback(ConfigFactory.load())