diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index bfb71500ad..75beae0a24 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -773,6 +773,23 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi --------------------------------------------------------------- +### collectType + +Transform this stream by testing the type of each of the elements on which the element is an instance of +the provided type as they pass through this processing step. Non-matching elements are filtered out. + +Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + +'''Emits when''' the element is an instance of the provided type + +'''Backpressures when''' the element is an instance of the provided type and downstream backpressures + +'''Completes when''' upstream completes + +'''Cancels when''' downstream cancels + +--------------------------------------------------------------- + ### grouped Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index ba9a2c28b3..8bfda94599 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -10,6 +10,7 @@ import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; import akka.stream.*; +import akka.stream.scaladsl.FlowSpec; import akka.util.ConstantFun; import akka.stream.javadsl.GraphDSL.Builder; import akka.stream.stage.*; @@ -192,13 +193,13 @@ public class FlowTest extends StreamTest { public final Inlet in = Inlet.create("in"); public final Outlet out = Outlet.create("out"); - + @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { return new GraphStageLogic(shape()) { int sum = 0; int count = 0; - + { setHandler(in, new AbstractInHandler() { @Override @@ -211,7 +212,7 @@ public class FlowTest extends StreamTest { } else { emitMultiple(out, Arrays.asList(element, element).iterator()); } - + } }); setHandler(out, new AbstractOutHandler() { @@ -223,14 +224,14 @@ public class FlowTest extends StreamTest { } }; } - + @Override public FlowShape shape() { return FlowShape.of(in, out); } } ); - Source.from(input).via(flow).runForeach((Procedure) elem -> + Source.from(input).via(flow).runForeach((Procedure) elem -> probe.getRef().tell(elem, ActorRef.noSender()), materializer); probe.expectMsgEquals(0); @@ -315,7 +316,7 @@ public class FlowTest extends StreamTest { return new GraphStage>() { public final Inlet in = Inlet.create("in"); public final Outlet out = Outlet.create("out"); - + @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { return new GraphStageLogic(shape()) { @@ -682,6 +683,16 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("C"); } + @Test + public void mustBeAbleToUseCollectType() throws Exception { + final TestKit probe = new TestKit(system); + final Iterable input = Arrays.asList(new FlowSpec.Apple(), new FlowSpec.Orange()); + + Source.from(input).via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class)) + .runForeach((apple) -> probe.getRef().tell(apple, ActorRef.noSender()), materializer); + probe.expectMsgAnyClassOf(FlowSpec.Apple.class); + } + @Test public void mustBeAbleToRecover() throws Exception { final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 2ea2f5640c..43e726fec6 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -11,8 +11,7 @@ import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; import akka.stream.*; -import akka.stream.testkit.TestSubscriber; -import akka.stream.testkit.javadsl.TestSink; +import akka.stream.scaladsl.FlowSpec; import akka.util.ConstantFun; import akka.stream.stage.*; import akka.testkit.AkkaSpec; @@ -446,6 +445,19 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("C"); } + @Test + public void mustBeAbleToUseCollectType() throws Exception{ + final TestKit probe = new TestKit(system); + final Iterable input = Collections.singletonList(new FlowSpec.Apple()); + final Source appleSource = Source.from(input); + final Source fruitSource = appleSource.collectType(FlowSpec.Fruit.class); + fruitSource.collectType(FlowSpec.Apple.class).collectType(FlowSpec.Apple.class) + .runForeach((elem) -> { + probe.getRef().tell(elem,ActorRef.noSender()); + },materializer); + probe.expectMsgAnyClassOf(FlowSpec.Apple.class); + } + @Test public void mustWorkFromFuture() throws Exception { final Iterable input = Arrays.asList("A", "B", "C"); diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index bd75c111fb..0f8bc4518d 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -33,7 +33,7 @@ class DslConsistencySpec extends WordSpec with Matchers { val jRunnableGraphClass: Class[_] = classOf[akka.stream.javadsl.RunnableGraph[_]] val sRunnableGraphClass: Class[_] = classOf[akka.stream.scaladsl.RunnableGraph[_]] - val ignore = + val ignore: Set[String] = Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ @@ -46,9 +46,13 @@ class DslConsistencySpec extends WordSpec with Matchers { // Java subflows can only be nested using .via and .to (due to type system restrictions) jSubFlowClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")), jSubSourceClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow")), + sFlowClass → Set("of"), sSourceClass → Set("adapt", "from"), sSinkClass → Set("adapt"), + sSubFlowClass → Set(), + sSubSourceClass → Set(), + sRunnableGraphClass → Set("builder")) def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer]) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectTypeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectTypeSpec.scala new file mode 100644 index 0000000000..9c1ae180e8 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCollectTypeSpec.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random } + +import akka.stream.testkit.{ ScriptedTest, StreamSpec } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } + +class FlowCollectTypeSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + sealed class Fruit + class Orange extends Fruit + object Orange extends Orange + class Apple extends Fruit + object Apple extends Apple + + "A CollectType" must { + + "collectType" in { + val fruit = Source(List(Orange, Apple, Apple, Orange)) + + val apples = fruit.collectType[Apple].runWith(Sink.seq).futureValue + apples should equal(List(Apple, Apple)) + val oranges = fruit.collectType[Orange].runWith(Sink.seq).futureValue + oranges should equal(List(Orange, Orange)) + val all = fruit.collectType[Fruit].runWith(Sink.seq).futureValue + all should equal(List(Orange, Apple, Apple, Orange)) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 622801c2f5..76d1e5aa2a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import java.util.concurrent.ThreadLocalRandom + import akka.NotUsed import akka.actor._ import akka.stream.Supervision._ @@ -16,6 +18,7 @@ import akka.testkit.{ EventFilter, TestDuration } import com.typesafe.config.ConfigFactory import org.reactivestreams.{ Publisher, Subscriber } import org.scalatest.concurrent.ScalaFutures + import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -25,7 +28,11 @@ import akka.stream.impl.fusing.GraphInterpreterShell object FlowSpec { class Fruit class Apple extends Fruit - val apples = () ⇒ Iterator.continually(new Apple) + class Orange extends Fruit + val fruits = () ⇒ new Iterator[Fruit] { + override def hasNext: Boolean = true + override def next(): Fruit = if (ThreadLocalRandom.current().nextBoolean()) new Apple else new Orange + } } @@ -252,11 +259,11 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re } "be covariant" in { - val f1: Source[Fruit, _] = Source.fromIterator[Fruit](apples) - val p1: Publisher[Fruit] = Source.fromIterator[Fruit](apples).runWith(Sink.asPublisher(false)) - val f2: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](apples).splitWhen(_ ⇒ true) - val f3: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](apples).groupBy(2, _ ⇒ true) - val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source.fromIterator[Fruit](apples).prefixAndTail(1) + val f1: Source[Fruit, _] = Source.fromIterator[Fruit](fruits) + val p1: Publisher[Fruit] = Source.fromIterator[Fruit](fruits).runWith(Sink.asPublisher(false)) + val f2: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](fruits).splitWhen(_ ⇒ true) + val f3: SubFlow[Fruit, _, Source[Fruit, NotUsed]#Repr, _] = Source.fromIterator[Fruit](fruits).groupBy(2, _ ⇒ true) + val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source.fromIterator[Fruit](fruits).prefixAndTail(1) val d1: SubFlow[Fruit, _, Flow[String, Fruit, NotUsed]#Repr, _] = Flow[String].map(_ ⇒ new Apple).splitWhen(_ ⇒ true) val d2: SubFlow[Fruit, _, Flow[String, Fruit, NotUsed]#Repr, _] = Flow[String].map(_ ⇒ new Apple).groupBy(2, _ ⇒ true) val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ ⇒ new Apple).prefixAndTail(1) diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes index af2a648a6b..56cf37ee77 100644 --- a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes @@ -12,3 +12,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSou ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSource.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this") + +# #24254 add collectType +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 03bc1226ff..b6d91c5ad7 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -17,6 +17,7 @@ import java.util.Comparator import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ +import scala.reflect.ClassTag object Flow { @@ -591,6 +592,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.collect(pf)) + /** + * Transform this stream by testing the type of each of the elements + * on which the element is an instance of the provided type as they pass through this processing step. + * Non-matching elements are filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is an instance of the provided type + * + * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def collectType[T](clazz: Class[T]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.collectType[T](ClassTag[T](clazz))) + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 5926cd48d3..8241be0921 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture import akka.annotation.InternalApi import scala.compat.java8.FutureConverters._ +import scala.reflect.ClassTag /** Java API */ object Source { @@ -1284,6 +1285,24 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.collect(pf)) + /** + * Transform this stream by testing the type of each of the elements + * on which the element is an instance of the provided type as they pass through this processing step. + * Non-matching elements are filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is an instance of the provided type + * + * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def collectType[T](clazz: Class[T]): javadsl.Source[T, Mat] = + new Source(delegate.collectType[T](ClassTag[T](clazz))) + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 1629f80ba4..fb8c4dd89c 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -18,6 +18,8 @@ import java.util.Comparator import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import scala.reflect.ClassTag + /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. * SubFlows cannot contribute to the super-flow’s materialized value since they @@ -294,6 +296,24 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def collect[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.collect(pf)) + /** + * Transform this stream by testing the type of each of the elements + * on which the element is an instance of the provided type as they pass through this processing step. + * Non-matching elements are filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is an instance of the provided type + * + * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def collectType[T](clazz: Class[T]): javadsl.SubFlow[In, T, Mat] = + new SubFlow(delegate.collectType[T](ClassTag[T](clazz))) + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 0b9f4c8bb5..685db56118 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -19,6 +19,7 @@ import java.util.concurrent.CompletionStage import akka.stream.impl.fusing.MapError import scala.compat.java8.FutureConverters._ +import scala.reflect.ClassTag /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. @@ -294,6 +295,24 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def collect[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] = new SubSource(delegate.collect(pf)) + /** + * Transform this stream by testing the type of each of the elements + * on which the element is an instance of the provided type as they pass through this processing step. + * Non-matching elements are filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is an instance of the provided type + * + * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def collectType[T](clazz: Class[T]): javadsl.SubSource[T, Mat] = + new SubSource(delegate.collectType[T](ClassTag[T](clazz))) + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c26be1cf59..be72a098e0 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -21,6 +21,8 @@ import akka.stream.impl.fusing.FlattenMerge import akka.NotUsed import akka.annotation.DoNotInherit +import scala.reflect.ClassTag + /** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ @@ -923,6 +925,25 @@ trait FlowOps[+Out, +Mat] { */ def collect[T](pf: PartialFunction[Out, T]): Repr[T] = via(Collect(pf)) + /** + * Transform this stream by testing the type of each of the elements + * on which the element is an instance of the provided type as they pass through this processing step. + * + * Non-matching elements are filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is an instance of the provided type + * + * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def collectType[T](implicit tag: ClassTag[T]): Repr[T] = + collect { case c if tag.runtimeClass.isInstance(c) ⇒ c.asInstanceOf[T] } + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream.