diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 27045f6045..39d3942382 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -101,6 +101,17 @@ public class PersistenceQueryDocTest { TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } + /** + * You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all + * events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for + * the specific tag. Note that the corresponding offset of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given offset. + * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + */ @Override public Source eventsByTag(String tag, Offset offset) { if(offset instanceof Sequence){ @@ -108,7 +119,8 @@ public class PersistenceQueryDocTest { final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); return Source.actorPublisher(props). mapMaterializedValue(m -> NotUsed.getInstance()); - } + } else if (offset == NoOffset.getInstance()) + return eventsByTag(tag, Offset.sequence(0L)); //recursive else throw new IllegalArgumentException("MyJavadslReadJournal does not support " + offset.getClass().getName() + " offsets"); } diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java index 571db7868b..320fd8eadd 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -84,7 +84,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { private void query() { if (buf.isEmpty()) { final String query = "SELECT id, persistent_repr " + - "FROM journal WHERE tag = ? AND id >= ? " + + "FROM journal WHERE tag = ? AND id > ? " + "ORDER BY id LIMIT ?"; try (PreparedStatement s = connection.prepareStatement(query)) { diff --git a/akka-docs/rst/java/persistence-query-leveldb.rst b/akka-docs/rst/java/persistence-query-leveldb.rst index ac5ad765e8..08d030932f 100644 --- a/akka-docs/rst/java/persistence-query-leveldb.rst +++ b/akka-docs/rst/java/persistence-query-leveldb.rst @@ -95,10 +95,14 @@ with the given ``tags``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger -You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all -events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag. -Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible -to resume the stream at a later point from a given offset. +You can use ``NoOffset`` to retrieve all events with a given tag or retrieve a subset of all +events by specifying a ``Sequence`` ``offset``. The ``offset`` corresponds to an ordered sequence number for +the specific tag. Note that the corresponding offset of each event is provided in the +``EventEnvelope``, which makes it possible to resume the stream at a later point from a given offset. + +The ``offset`` is exclusive, i.e. the event with the exact same sequence number will not be included +in the returned stream. This means that you can use the offset that is returned in ``EventEnvelope`` +as the ``offset`` parameter in a subsequent query. In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr`` for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index df3aa2a172..886438f299 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -51,7 +51,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD private def statement() = connection.prepareStatement( """ SELECT id, persistent_repr FROM journal - WHERE tag = ? AND id >= ? + WHERE tag = ? AND id > ? ORDER BY id LIMIT ? """) diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index 5779489d08..0b0d7580e8 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -54,12 +54,24 @@ object PersistenceQueryDocSpec { private val refreshInterval: FiniteDuration = config.getDuration("refresh-interval", MILLISECONDS).millis + /** + * You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all + * events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for + * the specific tag. Note that the corresponding offset of each event is provided in the + * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the + * stream at a later point from a given offset. + * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + */ override def eventsByTag( tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) ⇒ val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) Source.actorPublisher[EventEnvelope](props) .mapMaterializedValue(_ => NotUsed) + case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } diff --git a/akka-docs/rst/scala/persistence-query-leveldb.rst b/akka-docs/rst/scala/persistence-query-leveldb.rst index 7b20824f1f..e7ff2f76c1 100644 --- a/akka-docs/rst/scala/persistence-query-leveldb.rst +++ b/akka-docs/rst/scala/persistence-query-leveldb.rst @@ -90,10 +90,14 @@ with the given ``tags``. .. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger -You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all -events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag. -Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible -to resume the stream at a later point from a given offset. +You can use ``NoOffset`` to retrieve all events with a given tag or retrieve a subset of all +events by specifying a ``Sequence`` ``offset``. The ``offset`` corresponds to an ordered sequence number for +the specific tag. Note that the corresponding offset of each event is provided in the +``EventEnvelope``, which makes it possible to resume the stream at a later point from a given offset. + +The ``offset`` is exclusive, i.e. the event with the exact same sequence number will not be included +in the returned stream. This means that you can use the offset that is returned in ``EventEnvelope`` +as the ``offset`` parameter in a subsequent query. In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr`` for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala index 6cef724a5d..b54e6314e0 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala @@ -17,10 +17,28 @@ object Offset { abstract class Offset +/** + * Corresponds to an ordered sequence number for the events. Note that the corresponding + * offset of each event is provided in the [[akka.persistence.query.EventEnvelope]], + * which makes it possible to resume the stream at a later point from a given offset. + * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + */ final case class Sequence(value: Long) extends Offset with Ordered[Sequence] { override def compare(that: Sequence): Int = value.compare(that.value) } +/** + * Corresponds to an ordered unique identifier of the events. Note that the corresponding + * offset of each event is provided in the [[akka.persistence.query.EventEnvelope]], + * which makes it possible to resume the stream at a later point from a given offset. + * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + */ final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] { if (value == null || value.version != 1) { throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID") @@ -29,6 +47,9 @@ final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBase override def compare(other: TimeBasedUUID): Int = value.compareTo(other.value) } +/** + * Used when retrieving all events. + */ final case object NoOffset extends Offset { /** * Java API: diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala index 9c09c9ebb5..b4d4e09d38 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala @@ -78,7 +78,7 @@ private[akka] abstract class AbstractEventsByTagPublisher( def receiveIdleRequest(): Unit def timeForReplay: Boolean = - (buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset <= toOffset) + (buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset < toOffset) def replay(): Unit = { val limit = maxBufSize - buf.size @@ -94,7 +94,7 @@ private[akka] abstract class AbstractEventsByTagPublisher( persistenceId = p.persistenceId, sequenceNr = p.sequenceNr, event = p.payload) - currOffset = offset + 1 + currOffset = offset deliverBuf() case RecoverySuccess(highestSeqNr) ⇒ @@ -143,13 +143,13 @@ private[akka] class LiveEventsByTagPublisher( override def receiveIdleRequest(): Unit = { deliverBuf() - if (buf.isEmpty && currOffset > toOffset) + if (buf.isEmpty && currOffset >= toOffset) onCompleteThenStop() } override def receiveRecoverySuccess(highestSeqNr: Long): Unit = { deliverBuf() - if (buf.isEmpty && currOffset > toOffset) + if (buf.isEmpty && currOffset >= toOffset) onCompleteThenStop() context.become(idle) } @@ -174,7 +174,7 @@ private[akka] class CurrentEventsByTagPublisher( override def receiveIdleRequest(): Unit = { deliverBuf() - if (buf.isEmpty && currOffset > toOffset) + if (buf.isEmpty && currOffset >= toOffset) onCompleteThenStop() else self ! Continue @@ -184,7 +184,7 @@ private[akka] class CurrentEventsByTagPublisher( deliverBuf() if (highestSeqNr < toOffset) _toOffset = highestSeqNr - if (buf.isEmpty && (currOffset > toOffset || currOffset == fromOffset)) + if (buf.isEmpty && (currOffset >= toOffset || currOffset == fromOffset)) onCompleteThenStop() else self ! Continue // more to fetch diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index 1488d6bc07..fea4a1fb06 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -116,6 +116,10 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique * identifier for the event. * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + * * The returned event stream is ordered by the offset (tag sequence number), which corresponds * to the same order as the write journal stored the events. The same stream elements (in same order) * are returned for multiple executions of the query. Deleted events are not deleted from the diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index aa5eb698a0..c7d2bf0bf7 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -10,7 +10,7 @@ import akka.actor.ExtendedActorSystem import akka.event.Logging import akka.persistence.query.journal.leveldb.{ AllPersistenceIdsPublisher, EventsByPersistenceIdPublisher, EventsByTagPublisher } import akka.persistence.query.scaladsl.{ ReadJournal, _ } -import akka.persistence.query.{ EventEnvelope, Offset, Sequence } +import akka.persistence.query.{ EventEnvelope, NoOffset, Offset, Sequence } import akka.stream.scaladsl.Source import akka.util.ByteString import com.typesafe.config.Config @@ -128,12 +128,16 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events * in a [[akka.persistence.journal.Tagged]] with the given `tags`. * - * You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all - * events with a given tag. The `offset` corresponds to an ordered sequence number for + * You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all + * events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for * the specific tag. Note that the corresponding offset of each event is provided in the * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the * stream at a later point from a given offset. * + * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + * as the `offset` parameter in a subsequent query. + * * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` * for each event. The `sequenceNr` is the sequence number for the persistent actor with the * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique @@ -163,7 +167,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re refreshInterval, maxBufSize, writeJournalPluginId)) .mapMaterializedValue(_ ⇒ NotUsed) .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - + case NoOffset ⇒ eventsByTag(tag, Sequence(0L)) //recursive case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") } @@ -179,7 +183,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue, None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - + case NoOffset ⇒ currentEventsByTag(tag, Sequence(0L)) //recursive case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index 62e6471278..6b08d8d16f 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -13,6 +13,7 @@ import akka.stream.ActorMaterializer import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.persistence.query.NoOffset object EventsByTagSpec { val config = """ @@ -72,7 +73,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) b ! "a green leaf" expectMsg(s"a green leaf-done") - val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset) greenSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) @@ -109,11 +110,11 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .expectComplete() // green cucumber not seen } - "find events from offset" in { + "find events from offset (exclusive)" in { val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + // note that banana is not included, since exclusive offset .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) .expectComplete() @@ -124,7 +125,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) "find new events" in { val d = system.actorOf(TestActor.props("d")) - val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L)) + val blackSrc = queries.eventsByTag(tag = "black", offset = NoOffset) val probe = blackSrc.runWith(TestSink.probe[Any]) .request(2) .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")) @@ -142,11 +143,11 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) .expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night")) } - "find events from offset" in { + "find events from offset (exclusive)" in { val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L)) val probe = greenSrc.runWith(TestSink.probe[Any]) .request(10) - .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + // note that banana is not included, since exclusive offset .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) .expectNoMsg(100.millis) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 16c812e2c5..cca088d245 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -100,6 +100,10 @@ private[persistence] object LeveldbJournal { final case class SubscribeTag(tag: String) extends SubscriptionCommand final case class TaggedEventAppended(tag: String) extends DeadLetterSuppression + /** + * `fromSequenceNr` is exclusive + * `toSequenceNr` is inclusive + */ final case class ReplayTaggedMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, tag: String, replyTo: ActorRef) extends SubscriptionCommand final case class ReplayedTaggedMessage(persistent: PersistentRepr, tag: String, offset: Long) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala index 9902780019..2e40670fe2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala @@ -100,7 +100,8 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } withIterator { iter ⇒ - val startKey = Key(tagNid, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) + // fromSequenceNr is exclusive, i.e. start with +1 + val startKey = Key(tagNid, if (fromSequenceNr < 1L) 1L else fromSequenceNr + 1, 0) iter.seek(keyToBytes(startKey)) go(iter, startKey, 0L, replayCallback) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index e8b1b13c9d..b5c2ed8732 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -96,10 +96,10 @@ object SnapshotFailureRobustnessSpec { } class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( - """ - akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore" - akka.persistence.snapshot-store.local-delete-fail = ${akka.persistence.snapshot-store.local} - akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore" + s""" + akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$$FailingLocalSnapshotStore" + akka.persistence.snapshot-store.local-delete-fail = $${akka.persistence.snapshot-store.local} + akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$$DeleteFailingLocalSnapshotStore" """))) with ImplicitSender { import SnapshotFailureRobustnessSpec._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 40d7fd8079..d20bbffc73 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,7 +10,7 @@ import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ import akka.stream.stage._ -import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription} +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable