diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index c6f9a37d7f..5db234aca9 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -269,6 +269,7 @@ the inputs in different ways. |Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.| |Source/Flow|@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.| |Source/Flow|@ref[mergePrioritized](Source-or-Flow/mergePrioritized.md)|Merge multiple sources.| +|Source|@ref[mergePrioritizedN](Source/mergePrioritizedN.md)|Merge multiple sources with priorities.| |Source/Flow|@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.| |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| @@ -499,6 +500,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mergeLatest](Source-or-Flow/mergeLatest.md) * [mergePreferred](Source-or-Flow/mergePreferred.md) * [mergePrioritized](Source-or-Flow/mergePrioritized.md) +* [mergePrioritizedN](Source/mergePrioritizedN.md) * [MergeSequence](MergeSequence.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [monitor](Source-or-Flow/monitor.md) diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java index 20d7766b33..fcf51f8a35 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSplitter.java @@ -19,62 +19,62 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; public class RecipeSplitter { - private static ActorSystem system; + private static ActorSystem system; - @Test - public void simpleSplit() throws ExecutionException, InterruptedException { - // #Simple-Split - // Sample Source - Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); + @Test + public void simpleSplit() throws ExecutionException, InterruptedException { + // #Simple-Split + // Sample Source + Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); - CompletionStage> ret = - source - .map(s -> Arrays.asList(s.split("-"))) - .mapConcat(f -> f) - // Sub-streams logic - .map(s -> Integer.valueOf(s)) - .runWith(Sink.seq(), system); + CompletionStage> ret = + source + .map(s -> Arrays.asList(s.split("-"))) + .mapConcat(f -> f) + // Sub-streams logic + .map(s -> Integer.valueOf(s)) + .runWith(Sink.seq(), system); - // Verify results - List list = ret.toCompletableFuture().get(); - assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4)); - // #Simple-Split - } + // Verify results + List list = ret.toCompletableFuture().get(); + assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4)); + // #Simple-Split + } - @Test - public void splitAggregate() throws ExecutionException, InterruptedException { - // #Aggregate-Split - // Sample Source - Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); + @Test + public void splitAggregate() throws ExecutionException, InterruptedException { + // #Aggregate-Split + // Sample Source + Source source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4")); - CompletionStage> ret = - source - .map(s -> Arrays.asList(s.split("-"))) - // split all messages into sub-streams - .splitWhen(a -> true) - // now split each collection - .mapConcat(f -> f) - // Sub-streams logic - .map(s -> Integer.valueOf(s)) - // aggregate each sub-stream - .reduce((a, b) -> a + b) - // and merge back the result into the original stream - .mergeSubstreams() - .runWith(Sink.seq(), system); + CompletionStage> ret = + source + .map(s -> Arrays.asList(s.split("-"))) + // split all messages into sub-streams + .splitWhen(a -> true) + // now split each collection + .mapConcat(f -> f) + // Sub-streams logic + .map(s -> Integer.valueOf(s)) + // aggregate each sub-stream + .reduce((a, b) -> a + b) + // and merge back the result into the original stream + .mergeSubstreams() + .runWith(Sink.seq(), system); - // Verify results - List list = ret.toCompletableFuture().get(); - assert list.equals(Arrays.asList(6, 5, 7)); - // #Aggregate-Split - } + // Verify results + List list = ret.toCompletableFuture().get(); + assert list.equals(Arrays.asList(6, 5, 7)); + // #Aggregate-Split + } - @BeforeClass - public static void setup() throws Exception { - system = ActorSystem.create(); - } + @BeforeClass + public static void setup() throws Exception { + system = ActorSystem.create(); + } - @AfterClass - public static void teardown() throws Exception { - TestKit.shutdownActorSystem(system); - } + @AfterClass + public static void teardown() throws Exception { + TestKit.shutdownActorSystem(system); + } } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index d198d003fc..c7624530e8 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -208,10 +208,8 @@ class SourceOrFlow { Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); Source sourceC = Source.from(Arrays.asList(100, 200, 300, 400)); - List,Integer>> sourcesAndPriorities = Arrays.asList( - new Pair<>(sourceA, 9900), - new Pair<>(sourceB, 99), - new Pair<>(sourceC, 1)); + List, Integer>> sourcesAndPriorities = + Arrays.asList(new Pair<>(sourceA, 9900), new Pair<>(sourceB, 99), new Pair<>(sourceC, 1)); Source.mergePrioritizedN(sourcesAndPriorities, false).runForeach(System.out::println, system); // prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400 since both sources have their // first element ready and diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala index 1cd0520132..d448ecb862 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala @@ -255,7 +255,7 @@ import akka.stream.scaladsl.Sink case _ => throw new IllegalArgumentException("Cannot initialize from state when snapshots are not used.") } } - persistenceTestKit.persistForRecovery(persistenceId.id, events) + persistenceTestKit.persistForRecovery(persistenceId.id, collection.immutable.Seq.empty ++ events) restart() }