Added MergeSequence graph stage (#29247)
Fixes #28769
Use case for this is if you have a sequence of elements that has been
partitioned across multiple streams, and you want to merge them back
together in order. It will typically be used in combination with
`zipWithIndex` to define the index for the sequence, followed by a
`Partition`, followed by the processing of different substreams with
different flows (each flow emitting exactly one output for each input),
and then merging with this stage, using the index from `zipWithIndex`.
A more concrete use case is if you're consuming messages from a message
broker, and you have a flow that you wish to apply to some messages, but
not others, you can partition the message stream according to which
should be processed by the flow and which should bypass it, and then
bring the elements back together acknowledgement. If an ordinary merge
was used rather than this, the messages that bypass the processing flow
would likely overtake the messages going through the processing flow,
and the result would be out of order offset acknowledgement which would
lead to dropping messages on failure.
I've included a minimal version of the above example in the documentation.
2020-07-10 01:52:46 +10:00
|
|
|
/*
|
2022-02-04 12:36:44 +01:00
|
|
|
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
|
Added MergeSequence graph stage (#29247)
Fixes #28769
Use case for this is if you have a sequence of elements that has been
partitioned across multiple streams, and you want to merge them back
together in order. It will typically be used in combination with
`zipWithIndex` to define the index for the sequence, followed by a
`Partition`, followed by the processing of different substreams with
different flows (each flow emitting exactly one output for each input),
and then merging with this stage, using the index from `zipWithIndex`.
A more concrete use case is if you're consuming messages from a message
broker, and you have a flow that you wish to apply to some messages, but
not others, you can partition the message stream according to which
should be processed by the flow and which should bypass it, and then
bring the elements back together acknowledgement. If an ordinary merge
was used rather than this, the messages that bypass the processing flow
would likely overtake the messages going through the processing flow,
and the result would be out of order offset acknowledgement which would
lead to dropping messages on failure.
I've included a minimal version of the above example in the documentation.
2020-07-10 01:52:46 +10:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package jdocs.stream.operators;
|
|
|
|
|
|
|
|
|
|
import akka.actor.ActorSystem;
|
|
|
|
|
|
|
|
|
|
// #import
|
|
|
|
|
import akka.NotUsed;
|
|
|
|
|
import akka.japi.Pair;
|
|
|
|
|
import akka.stream.ClosedShape;
|
|
|
|
|
import akka.stream.UniformFanInShape;
|
|
|
|
|
import akka.stream.UniformFanOutShape;
|
|
|
|
|
import akka.stream.javadsl.Flow;
|
|
|
|
|
import akka.stream.javadsl.GraphDSL;
|
|
|
|
|
import akka.stream.javadsl.MergeSequence;
|
|
|
|
|
import akka.stream.javadsl.Partition;
|
|
|
|
|
import akka.stream.javadsl.RunnableGraph;
|
|
|
|
|
import akka.stream.javadsl.Sink;
|
|
|
|
|
import akka.stream.javadsl.Source;
|
|
|
|
|
// #import
|
|
|
|
|
|
|
|
|
|
public class MergeSequenceDocExample {
|
|
|
|
|
|
|
|
|
|
private final ActorSystem system = ActorSystem.create("MergeSequenceDocExample");
|
|
|
|
|
|
|
|
|
|
interface Message {}
|
|
|
|
|
|
|
|
|
|
boolean shouldProcess(Message message) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Source<Message, NotUsed> createSubscription() {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Flow<Pair<Message, Long>, Pair<Message, Long>, NotUsed> createMessageProcessor() {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Sink<Message, NotUsed> createMessageAcknowledger() {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mergeSequenceExample() {
|
|
|
|
|
|
|
|
|
|
// #merge-sequence
|
|
|
|
|
|
|
|
|
|
Source<Message, NotUsed> subscription = createSubscription();
|
|
|
|
|
Flow<Pair<Message, Long>, Pair<Message, Long>, NotUsed> messageProcessor =
|
|
|
|
|
createMessageProcessor();
|
|
|
|
|
Sink<Message, NotUsed> messageAcknowledger = createMessageAcknowledger();
|
|
|
|
|
|
|
|
|
|
RunnableGraph.fromGraph(
|
|
|
|
|
GraphDSL.create(
|
|
|
|
|
builder -> {
|
|
|
|
|
// Partitions stream into messages that should or should not be processed
|
|
|
|
|
UniformFanOutShape<Pair<Message, Long>, Pair<Message, Long>> partition =
|
|
|
|
|
builder.add(
|
|
|
|
|
Partition.create(2, element -> shouldProcess(element.first()) ? 0 : 1));
|
|
|
|
|
// Merges stream by the index produced by zipWithIndex
|
|
|
|
|
UniformFanInShape<Pair<Message, Long>, Pair<Message, Long>> merge =
|
|
|
|
|
builder.add(MergeSequence.create(2, Pair::second));
|
|
|
|
|
|
|
|
|
|
builder.from(builder.add(subscription.zipWithIndex())).viaFanOut(partition);
|
|
|
|
|
// First goes through message processor
|
|
|
|
|
builder.from(partition.out(0)).via(builder.add(messageProcessor)).viaFanIn(merge);
|
|
|
|
|
// Second partition bypasses message processor
|
|
|
|
|
builder.from(partition.out(1)).viaFanIn(merge);
|
|
|
|
|
|
|
|
|
|
// Unwrap message index pairs and send to acknowledger
|
|
|
|
|
builder
|
|
|
|
|
.from(merge.out())
|
|
|
|
|
.to(
|
|
|
|
|
builder.add(
|
|
|
|
|
Flow.<Pair<Message, Long>>create()
|
|
|
|
|
.map(Pair::first)
|
|
|
|
|
.to(messageAcknowledger)));
|
|
|
|
|
|
|
|
|
|
return ClosedShape.getInstance();
|
|
|
|
|
}))
|
|
|
|
|
.run(system);
|
|
|
|
|
|
|
|
|
|
// #merge-sequence
|
|
|
|
|
}
|
|
|
|
|
}
|