diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 1a5f56ce79..c92f43aadf 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -803,6 +803,15 @@ public class FlowTest extends StreamTest { assertEquals((Object) 0, result); } + @Test + public void shouldBePossibleToCreateFromFunction() throws Exception { + List out = Source.range(0, 2).via(Flow.fromFunction((Integer x) -> x + 1)) + .runWith(Sink.seq(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(Arrays.asList(1, 2, 3), out); + + } + public void mustSuitablyOverrideAttributeHandlingMethods() { @SuppressWarnings("unused") final Flow f = diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 54d9fab3ec..2c65f8ed5c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -24,6 +24,8 @@ import akka.japi.function.Function2; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; +import static org.junit.Assert.*; + public class SinkTest extends StreamTest { public SinkTest() { super(actorSystemResource); @@ -93,6 +95,14 @@ public class SinkTest extends StreamTest { probe2.expectMsgEquals("done2"); } + @Test + public void mustBeAbleToUseContramap() throws Exception { + List out = Source.range(0, 2).toMat(Sink.seq().contramap(x -> x + 1), Keep.right()) + .run(materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(Arrays.asList(1, 2, 3), out); + } + public void mustSuitablyOverrideAttributeHandlingMethods() { @SuppressWarnings("unused") final Sink> s = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 89fc631287..4a5a59649e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -19,6 +19,7 @@ import akka.testkit.TestEvent.{ Mute, UnMute } import akka.testkit.{ EventFilter, TestDuration } import com.typesafe.config.ConfigFactory import org.reactivestreams.{ Publisher, Subscriber } +import org.scalatest.concurrent.ScalaFutures import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -32,7 +33,8 @@ object FlowSpec { } -class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { +class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) + with ScalaFutures { import FlowSpec._ val settings = ActorMaterializerSettings(system) @@ -534,6 +536,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true) } } + + "should be created from a function easily" in { + Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10) + } } "A broken Flow" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 57eb34e6f9..be40b8907a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -6,13 +6,18 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.testkit.TestPublisher.ManualProbe import akka.stream.testkit._ -import scala.concurrent.Future +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures -class SinkSpec extends AkkaSpec { +import scala.concurrent.Future +import scala.concurrent.duration._ + +class SinkSpec extends AkkaSpec with ConversionCheckedTripleEquals with ScalaFutures { import GraphDSL.Implicits._ implicit val materializer = ActorMaterializer() + implicit val patience = PatienceConfig(2.seconds) "A Sink" must { @@ -124,6 +129,10 @@ class SinkSpec extends AkkaSpec { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") } + + "support contramap" in { + Source(0 to 9).toMat(Sink.seq.contramap(_ + 1))(Keep.right).run().futureValue should ===(1 to 10) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c41d6832cf..f924aac20f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -35,6 +35,13 @@ object Flow { (javaPair.first, javaPair.second) }) + /** + * Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent + * to `Flow.create[T].map(f)` + */ + def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] = + Flow.create[I]().map(f) + /** Create a `Flow` which can process elements of type `T`. */ def of[T](clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 1f615b1e57..995a6bb77a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -265,6 +265,17 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink def runWith[M](source: Graph[SourceShape[In], M], materializer: Materializer): M = asScala.runWith(source)(materializer) + /** + * Transform this Sink by applying a function to each *incoming* upstream element before + * it is passed to the [[Sink]] + * + * '''Backpressures when''' original [[Sink]] backpressures + * + * '''Cancels when''' original [[Sink]] backpressures + */ + def contramap[In2](f: function.Function[In2, In]): Sink[In2, Mat] = + javadsl.Flow.fromFunction(f).toMat(this, Keep.right[NotUsed, Mat]) + /** * Transform only the materialized value of this Sink, leaving all other properties as they were. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index dee90c8b19..1ed7bf9590 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -284,6 +284,12 @@ object Flow { */ def apply[T]: Flow[T, T, NotUsed] = identity.asInstanceOf[Flow[T, T, NotUsed]] + /** + * Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent + * to `Flow[T].map(f)` + */ + def fromFunction[A, B](f: A ⇒ B): Flow[A, B, NotUsed] = apply[A].map(f) + /** * A graph with the shape of a flow logically is a flow, this method makes * it so also in type. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index ef4d2d9658..4c6b6db500 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -30,6 +30,16 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]] + /** + * Transform this Sink by applying a function to each *incoming* upstream element before + * it is passed to the [[Sink]] + * + * '''Backpressures when''' original [[Sink]] backpressures + * + * '''Cancels when''' original [[Sink]] backpressures + */ + def contramap[In2](f: In2 ⇒ In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right) + /** * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]].