diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index b90031fa53..2af6572628 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -69,7 +69,6 @@ public class PersistenceQueryDocTest { private akka.japi.function.Function noMaterializedValue () { return param -> (M) null; } - } //#my-read-journal } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java similarity index 72% rename from akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala rename to akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java index c0edc644cd..c776b92c37 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournal.java @@ -2,45 +2,44 @@ * Copyright (C) 2009-2015 Typesafe Inc. */ -package akka.persistence.query.javadsl +package akka.persistence.query.javadsl; -import akka.persistence.query.{ Query, Hint } -import akka.stream.javadsl.Source - -import scala.annotation.varargs +import akka.persistence.query.Query; +import akka.persistence.query.Hint; +import akka.stream.javadsl.Source; +import scala.annotation.varargs; /** * Java API - * + *

* API for reading persistent events and information derived * from stored persistent events. - * + *

* The purpose of the API is not to enforce compatibility between different * journal implementations, because the technical capabilities may be very different. * The interface is very open so that different journals may implement specific queries. - * + *

* Usage: - * {{{ + *


  * final ReadJournal journal =
  *   PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
  *
  * final Source<EventEnvelope, ?> events =
  *   journal.query(new EventsByTag("mytag", 0L));
- * }}}
+ * 
*/ -trait ReadJournal { +public interface ReadJournal { /** * Java API - * + *

* A query that returns a `Source` with output type `T` and materialized value `M`. - * + *

* The `hints` are optional parameters that defines how to execute the * query, typically specific to the journal implementation. * */ - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] - + Source query(Query q, Hint... hints); } diff --git a/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java new file mode 100644 index 0000000000..96866071fb --- /dev/null +++ b/akka-persistence-query/src/main/java/akka/persistence/query/javadsl/ReadJournalAdapter.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl; + +import akka.japi.Util; +import akka.persistence.query.Hint; +import akka.persistence.query.Query; +import akka.stream.javadsl.Source; + +/** + * INTERNAL API + * + * Adapter from ScalaDSL {@link akka.persistence.query.scaladsl.ReadJournal} + * to JavaDSL {@link ReadJournal}. + */ +public final class ReadJournalAdapter implements ReadJournal { + + private final akka.persistence.query.scaladsl.ReadJournal backing; + + public ReadJournalAdapter(akka.persistence.query.scaladsl.ReadJournal backing) { + this.backing = backing; + } + + @Override + public Source query(Query q, Hint... hints) { + return backing.query(q, Util.immutableSeq(hints)).asJava(); + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala index 0010e50d0d..57ec6fc9d8 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -7,9 +7,8 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.event.Logging -import akka.stream.javadsl.Source -import scala.annotation.{varargs, tailrec} +import scala.annotation.tailrec import scala.util.Failure /** @@ -63,12 +62,7 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. */ final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = - new javadsl.ReadJournal { - val backing = readJournalFor(readJournalPluginId) - @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = - backing.query(q, hints: _*).asJava - } - + new javadsl.ReadJournalAdapter(readJournalFor(readJournalPluginId)) private def createPlugin(configPath: String): scaladsl.ReadJournal = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), @@ -79,14 +73,24 @@ class PersistenceQuery(system: ExtendedActorSystem) extends Extension { log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get - val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) - .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) - .recoverWith { - case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) - } + // TODO remove duplication + val scalaPlugin = + if (classOf[scaladsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else if (classOf[javadsl.ReadJournal].isAssignableFrom(pluginClass)) + system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[javadsl.ReadJournal](pluginClass, Nil)) + .map(jj ⇒ new scaladsl.ReadJournalAdapter(jj)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + else throw new IllegalArgumentException(s"Configured class ${pluginClass} does not extend") - // TODO possibly apply event adapters here - plugin.get + scalaPlugin.get } /** Check for default or missing identity. */ diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala index 02824269d3..936aadb7d9 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -36,3 +36,9 @@ abstract class ReadJournal { def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] } + +/** INTERNAL API */ +private[akka] final class ReadJournalAdapter(backing: akka.persistence.query.javadsl.ReadJournal) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asScala +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java new file mode 100644 index 0000000000..1b0050cd81 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/MockJavaReadJournal.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.persistence.query.javadsl.ReadJournal; +import akka.stream.javadsl.Source; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.util.Iterator; + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockJavaReadJournal implements ReadJournal { + public static final String Identifier = "akka.persistence.query.journal.mock-java"; + + public static final Config config = ConfigFactory.parseString( + Identifier + " { \n" + + " class = \"" + MockJavaReadJournal.class.getCanonicalName() + "\" \n" + + " }\n\n"); + + @Override + @SuppressWarnings("unchecked") + public Source query(Query q, Hint... hints) { + return (Source) Source.fromIterator(() -> new Iterator() { + private int i = 0; + @Override public boolean hasNext() { return true; } + + @Override public String next() { + return "" + (i++); + } + }); + } +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java index 9714ac8632..2ab31f8301 100644 --- a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -1,6 +1,6 @@ /* - * Copyright (C) 2009-2015 Typesafe Inc. - */ + * Copyright (C) 2009-2015 Typesafe Inc. + */ package akka.persistence.query; diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala index 44d442fe03..a7778c00bc 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -10,12 +10,11 @@ import akka.actor.ActorSystem import akka.persistence.journal.{ EventAdapter, EventSeq } import com.typesafe.config.ConfigFactory import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } -import org.scalautils.ConversionCheckedTripleEquals import scala.concurrent.Await import scala.concurrent.duration._ -class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals { +class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll { val anything: Query[String, _] = null @@ -42,12 +41,24 @@ class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfte }.getMessage should include("missing persistence read journal") } } + + "expose scaladsl implemented journal as javadsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: javadsl.ReadJournal = PersistenceQuery.get(system).getReadJournalFor(MockReadJournal.Identifier) + } + } + "expose javadsl implemented journal as scaladsl.ReadJournal" in { + withActorSystem() { system ⇒ + val j: scaladsl.ReadJournal = PersistenceQuery.get(system).readJournalFor(MockJavaReadJournal.Identifier) + } + } } private val systemCounter = new AtomicInteger() private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { val config = MockReadJournal.config + .withFallback(MockJavaReadJournal.config) .withFallback(ConfigFactory.parseString(conf)) .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) .withFallback(ConfigFactory.load())