+ stream add collectType operator to Source,SubSource,Flow and SubFlow for javadsl.

add docs for collectType, do mirror it in scaladsl

mima
This commit is contained in:
虎鸣 2018-01-06 04:33:35 +08:00 committed by Konrad `ktoso` Malawski
parent 39c97c3306
commit e44fafd4b7
12 changed files with 204 additions and 15 deletions

View file

@ -773,6 +773,23 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi
---------------------------------------------------------------
### collectType
Transform this stream by testing the type of each of the elements on which the element is an instance of
the provided type as they pass through this processing step. Non-matching elements are filtered out.
Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
'''Emits when''' the element is an instance of the provided type
'''Backpressures when''' the element is an instance of the provided type and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
---------------------------------------------------------------
### grouped
Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of

View file

@ -10,6 +10,7 @@ import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.japi.function.*;
import akka.stream.*;
import akka.stream.scaladsl.FlowSpec;
import akka.util.ConstantFun;
import akka.stream.javadsl.GraphDSL.Builder;
import akka.stream.stage.*;
@ -682,6 +683,16 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("C");
}
@Test
public void mustBeAbleToUseCollectType() throws Exception {
final TestKit probe = new TestKit(system);
final Iterable<FlowSpec.Fruit> input = Arrays.asList(new FlowSpec.Apple(), new FlowSpec.Orange());
Source.from(input).via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class))
.runForeach((apple) -> probe.getRef().tell(apple, ActorRef.noSender()), materializer);
probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
}
@Test
public void mustBeAbleToRecover() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);

View file

@ -11,8 +11,7 @@ import akka.japi.Pair;
import akka.japi.function.*;
import akka.japi.pf.PFBuilder;
import akka.stream.*;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.scaladsl.FlowSpec;
import akka.util.ConstantFun;
import akka.stream.stage.*;
import akka.testkit.AkkaSpec;
@ -446,6 +445,19 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals("C");
}
@Test
public void mustBeAbleToUseCollectType() throws Exception{
final TestKit probe = new TestKit(system);
final Iterable<FlowSpec.Apple> input = Collections.singletonList(new FlowSpec.Apple());
final Source<FlowSpec.Apple,?> appleSource = Source.from(input);
final Source<FlowSpec.Fruit,?> fruitSource = appleSource.collectType(FlowSpec.Fruit.class);
fruitSource.collectType(FlowSpec.Apple.class).collectType(FlowSpec.Apple.class)
.runForeach((elem) -> {
probe.getRef().tell(elem,ActorRef.noSender());
},materializer);
probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
}
@Test
public void mustWorkFromFuture() throws Exception {
final Iterable<String> input = Arrays.asList("A", "B", "C");

View file

@ -33,7 +33,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
val jRunnableGraphClass: Class[_] = classOf[akka.stream.javadsl.RunnableGraph[_]]
val sRunnableGraphClass: Class[_] = classOf[akka.stream.scaladsl.RunnableGraph[_]]
val ignore =
val ignore: Set[String] =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
@ -46,9 +46,13 @@ class DslConsistencySpec extends WordSpec with Matchers {
// Java subflows can only be nested using .via and .to (due to type system restrictions)
jSubFlowClass (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")),
jSubSourceClass (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")),
sFlowClass Set("of"),
sSourceClass Set("adapt", "from"),
sSinkClass Set("adapt"),
sSubFlowClass Set(),
sSubSourceClass Set(),
sRunnableGraphClass Set("builder"))
def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer])

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.ThreadLocalRandom.{ current random }
import akka.stream.testkit.{ ScriptedTest, StreamSpec }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
class FlowCollectTypeSpec extends StreamSpec {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
sealed class Fruit
class Orange extends Fruit
object Orange extends Orange
class Apple extends Fruit
object Apple extends Apple
"A CollectType" must {
"collectType" in {
val fruit = Source(List(Orange, Apple, Apple, Orange))
val apples = fruit.collectType[Apple].runWith(Sink.seq).futureValue
apples should equal(List(Apple, Apple))
val oranges = fruit.collectType[Orange].runWith(Sink.seq).futureValue
oranges should equal(List(Orange, Orange))
val all = fruit.collectType[Fruit].runWith(Sink.seq).futureValue
all should equal(List(Orange, Apple, Apple, Orange))
}
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.scaladsl
import java.util.concurrent.ThreadLocalRandom
import akka.NotUsed
import akka.actor._
import akka.stream.Supervision._
@ -16,6 +18,7 @@ import akka.testkit.{ EventFilter, TestDuration }
import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Publisher, Subscriber }
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
@ -25,7 +28,11 @@ import akka.stream.impl.fusing.GraphInterpreterShell
object FlowSpec {
class Fruit
class Apple extends Fruit
val apples = () Iterator.continually(new Apple)
class Orange extends Fruit
val fruits = () new Iterator[Fruit] {
override def hasNext: Boolean = true
override def next(): Fruit = if (ThreadLocalRandom.current().nextBoolean()) new Apple else new Orange
}
}
@ -252,11 +259,11 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
}
"be covariant" in {
val f1: Source[Fruit, _] = Source.fromIterator[Fruit](apples)
val p1: Publisher[Fruit] = Source.fromIterator[Fruit](apples).runWith(Sink.asPublisher(false))
val f2: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](apples).splitWhen(_ true)
val f3: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](apples).groupBy(2, _ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source.fromIterator[Fruit](apples).prefixAndTail(1)
val f1: Source[Fruit, _] = Source.fromIterator[Fruit](fruits)
val p1: Publisher[Fruit] = Source.fromIterator[Fruit](fruits).runWith(Sink.asPublisher(false))
val f2: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](fruits).splitWhen(_ true)
val f3: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](fruits).groupBy(2, _ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source.fromIterator[Fruit](fruits).prefixAndTail(1)
val d1: SubFlow[Fruit, _, Flow[String, Fruit, NotUsed]#Repr, _] = Flow[String].map(_ new Apple).splitWhen(_ true)
val d2: SubFlow[Fruit, _, Flow[String, Fruit, NotUsed]#Repr, _] = Flow[String].map(_ new Apple).groupBy(2, _ true)
val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ new Apple).prefixAndTail(1)

View file

@ -12,3 +12,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSou
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this")
# #24254 add collectType
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType")

View file

@ -17,6 +17,7 @@ import java.util.Comparator
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
object Flow {
@ -591,6 +592,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](clazz: Class[T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collectType[T](ClassTag[T](clazz)))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.

View file

@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
/** Java API */
object Source {
@ -1284,6 +1285,24 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](clazz: Class[T]): javadsl.Source[T, Mat] =
new Source(delegate.collectType[T](ClassTag[T](clazz)))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.

View file

@ -18,6 +18,8 @@ import java.util.Comparator
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import scala.reflect.ClassTag
/**
* A stream of streams sub-flow of data elements, e.g. produced by `groupBy`.
* SubFlows cannot contribute to the super-flows materialized value since they
@ -294,6 +296,24 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def collect[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](clazz: Class[T]): javadsl.SubFlow[In, T, Mat] =
new SubFlow(delegate.collectType[T](ClassTag[T](clazz)))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.

View file

@ -19,6 +19,7 @@ import java.util.concurrent.CompletionStage
import akka.stream.impl.fusing.MapError
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
/**
* A stream of streams sub-flow of data elements, e.g. produced by `groupBy`.
@ -294,6 +295,24 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def collect[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](clazz: Class[T]): javadsl.SubSource[T, Mat] =
new SubSource(delegate.collectType[T](ClassTag[T](clazz)))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.

View file

@ -21,6 +21,8 @@ import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
import akka.annotation.DoNotInherit
import scala.reflect.ClassTag
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
*/
@ -923,6 +925,25 @@ trait FlowOps[+Out, +Mat] {
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = via(Collect(pf))
/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the element is an instance of the provided type
*
* '''Backpressures when''' the element is an instance of the provided type and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collectType[T](implicit tag: ClassTag[T]): Repr[T] =
collect { case c if tag.runtimeClass.isInstance(c) c.asInstanceOf[T] }
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.