+str Add combine seq method to Source and Sink.

This commit is contained in:
He-Pin 2023-12-17 17:56:44 +08:00 committed by kerr
parent 0eed6a128b
commit a159aee6d8
8 changed files with 236 additions and 32 deletions

View file

@ -19,21 +19,19 @@ import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.nowarn
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.util.Try
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.apache.pekko
import pekko._
import pekko.actor.ActorRef
import pekko.actor.ClassicActorSystemProvider
import pekko.actor.Status
import pekko.dispatch.ExecutionContexts
import pekko.japi.function
import pekko.japi.{ function, Util }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.LinearTraversalBuilder
@ -43,6 +41,9 @@ import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
/** Java API */
object Sink {
@ -372,10 +373,44 @@ object Sink {
output1: Sink[U, _],
output2: Sink[U, _],
rest: java.util.List[Sink[U, _]],
strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = {
@nowarn
@deprecatedName(Symbol("strategy"))
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, NotUsed] = {
import pekko.util.ccompat.JavaConverters._
val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else immutable.Seq()
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => strategy.apply(num)))
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => fanOutStrategy.apply(num)))
}
/**
* Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink` with 2 outlets.
* @since 1.1.0
*/
def combineMat[T, U, M1, M2, M](
first: Sink[U, M1],
second: Sink[U, M2],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]],
matF: function.Function2[M1, M2, M]): Sink[T, M] = {
new Sink(
scaladsl.Sink.combineMat(first.asScala, second.asScala)(size => fanOutStrategy(size))(combinerToScala(matF)))
}
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
* The fanoutGraph's outlets size must match the provides sinks'.
* @since 1.1.0
*/
def combine[T, U, M](
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
val seq = if (sinks != null) Util.immutableSeq(sinks).collect {
case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
case other => other
}
else immutable.Seq()
import org.apache.pekko.util.ccompat.JavaConverters._
new Sink(scaladsl.Sink.combine(seq)(size => fanOutStrategy(size)).mapMaterializedValue(_.asJava))
}
/**

View file

@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import org.reactivestreams.{ Publisher, Subscriber }
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._
import org.reactivestreams.{ Publisher, Subscriber }
/** Java API */
object Source {
private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
@ -656,10 +656,12 @@ object Source {
first: Source[T, _ <: Any],
second: Source[T, _ <: Any],
rest: java.util.List[Source[T, _ <: Any]],
strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
@nowarn
@deprecatedName(Symbol("strategy"))
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => strategy.apply(num)))
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num)))
}
/**
@ -668,10 +670,30 @@ object Source {
def combineMat[T, U, M1, M2, M](
first: Source[T, M1],
second: Source[T, M2],
strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]],
@nowarn
@deprecatedName(Symbol("strategy"))
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]],
combine: function.Function2[M1, M2, M]): Source[U, M] = {
new Source(
scaladsl.Source.combineMat(first.asScala, second.asScala)(num => strategy.apply(num))(combinerToScala(combine)))
scaladsl.Source.combineMat(first.asScala, second.asScala)(num => fanInStrategy.apply(num))(
combinerToScala(combine)))
}
/**
* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]].
* @since 1.1.0
*/
def combine[T, U, M](
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
val seq = if (sources != null) Util.immutableSeq(sources).collect {
case source: Source[T @unchecked, M @unchecked] => source.asScala
case other => other
}
else immutable.Seq()
import org.apache.pekko.util.ccompat.JavaConverters._
new Source(scaladsl.Source.combine(seq)(size => fanInStrategy(size)).mapMaterializedValue(_.asJava))
}
/**

View file

@ -13,7 +13,7 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.tailrec
import scala.annotation.{ nowarn, tailrec }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.ExecutionContext
@ -22,9 +22,6 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@ -40,6 +37,9 @@ import pekko.stream.javadsl
import pekko.stream.stage._
import pekko.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
/**
* A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber`
@ -339,10 +339,12 @@ object Sink {
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(
strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
@nowarn
@deprecatedName(Symbol("strategy"))
fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val d = b.add(strategy(rest.size + 2))
val d = b.add(fanOutStrategy(rest.size + 2))
d.out(0) ~> first
d.out(1) ~> second
@ -355,6 +357,41 @@ object Sink {
combineRest(2, rest.iterator)
})
/**
* Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink` with 2 outlets.
* @since 1.1.0
*/
def combineMat[T, U, M1, M2, M](first: Sink[U, M1], second: Sink[U, M2])(
fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed])(matF: (M1, M2) => M): Sink[T, M] = {
Sink.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => (shape1, shape2) =>
import GraphDSL.Implicits._
val d = b.add(fanOutStrategy(2))
d.out(0) ~> shape1
d.out(1) ~> shape2
new SinkShape[T](d.in)
})
}
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
* The fanoutGraph's outlets size must match the provides sinks'.
* @since 1.1.0
*/
def combine[T, U, M](sinks: immutable.Seq[Graph[SinkShape[U], M]])(
fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, immutable.Seq[M]] =
sinks match {
case immutable.Seq() => Sink.cancelled.mapMaterializedValue(_ => Nil)
case immutable.Seq(sink) => sink.asInstanceOf[Sink[T, M]].mapMaterializedValue(_ :: Nil)
case _ =>
Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
import GraphDSL.Implicits._
val c = b.add(fanOutStrategy(sinks.size))
for ((shape, idx) <- shapes.zipWithIndex)
c.out(idx) ~> shape
SinkShape(c.in)
})
}
/**
* A `Sink` that will invoke the given function to each of the elements
* as they pass in. The sink is materialized into a [[scala.concurrent.Future]]

View file

@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.CompletionStage
import scala.annotation.tailrec
import scala.annotation.{ nowarn, tailrec }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
@ -756,10 +756,12 @@ object Source {
* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]].
*/
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
@nowarn
@deprecatedName(Symbol("strategy"))
fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val c = b.add(strategy(rest.size + 2))
val c = b.add(fanInStrategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@ -772,19 +774,40 @@ object Source {
combineRest(2, rest.iterator)
})
/**
* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]].
* @since 1.1.0
*/
def combine[T, U, M](sources: immutable.Seq[Graph[SourceShape[T], M]])(
fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, immutable.Seq[M]] =
sources match {
case immutable.Seq() => Source.empty.mapMaterializedValue(_ => Nil)
case immutable.Seq(source) => source.asInstanceOf[Source[U, M]].mapMaterializedValue(_ :: Nil)
case _ =>
Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
import GraphDSL.Implicits._
val c = b.add(fanInStrategy(sources.size))
for ((shape, i) <- shapes.zipWithIndex) {
shape ~> c.in(i)
}
SourceShape(c.out)
})
}
/**
* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]] with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = {
val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b => secondShape =>
@nowarn
@deprecatedName(Symbol("strategy"))
fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] =
Source.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => (shape1, shape2) =>
import GraphDSL.Implicits._
val c = b.add(strategy(2))
secondShape ~> c.in(1)
FlowShape(c.in(0), c.out)
}
first.viaMat(secondPartiallyCombined)(matF)
}
val c = b.add(fanInStrategy(2))
shape1 ~> c.in(0)
shape2 ~> c.in(1)
SourceShape(c.out)
})
/**
* Combine the elements of multiple streams into a stream of sequences.