19452: Contramap and fromFunction

This commit is contained in:
Alois Cochard 2016-02-10 12:18:24 +01:00 committed by Endre Sándor Varga
parent d9b47c3a8d
commit 6c69fba61e
8 changed files with 71 additions and 3 deletions

View file

@ -803,6 +803,15 @@ public class FlowTest extends StreamTest {
assertEquals((Object) 0, result); assertEquals((Object) 0, result);
} }
@Test
public void shouldBePossibleToCreateFromFunction() throws Exception {
List<Integer> out = Source.range(0, 2).via(Flow.fromFunction((Integer x) -> x + 1))
.runWith(Sink.seq(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3), out);
}
public void mustSuitablyOverrideAttributeHandlingMethods() { public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused") @SuppressWarnings("unused")
final Flow<Integer, Integer, NotUsed> f = final Flow<Integer, Integer, NotUsed> f =

View file

@ -24,6 +24,8 @@ import akka.japi.function.Function2;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import static org.junit.Assert.*;
public class SinkTest extends StreamTest { public class SinkTest extends StreamTest {
public SinkTest() { public SinkTest() {
super(actorSystemResource); super(actorSystemResource);
@ -93,6 +95,14 @@ public class SinkTest extends StreamTest {
probe2.expectMsgEquals("done2"); probe2.expectMsgEquals("done2");
} }
@Test
public void mustBeAbleToUseContramap() throws Exception {
List<Integer> out = Source.range(0, 2).toMat(Sink.<Integer>seq().contramap(x -> x + 1), Keep.right())
.run(materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3), out);
}
public void mustSuitablyOverrideAttributeHandlingMethods() { public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused") @SuppressWarnings("unused")
final Sink<Integer, CompletionStage<Integer>> s = final Sink<Integer, CompletionStage<Integer>> s =

View file

@ -19,6 +19,7 @@ import akka.testkit.TestEvent.{ Mute, UnMute }
import akka.testkit.{ EventFilter, TestDuration } import akka.testkit.{ EventFilter, TestDuration }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -32,7 +33,8 @@ object FlowSpec {
} }
class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO"))
with ScalaFutures {
import FlowSpec._ import FlowSpec._
val settings = ActorMaterializerSettings(system) val settings = ActorMaterializerSettings(system)
@ -534,6 +536,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true) downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true)
} }
} }
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}
} }
"A broken Flow" must { "A broken Flow" must {

View file

@ -6,13 +6,18 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.testkit.TestPublisher.ManualProbe import akka.stream.testkit.TestPublisher.ManualProbe
import akka.stream.testkit._ import akka.stream.testkit._
import scala.concurrent.Future import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
class SinkSpec extends AkkaSpec { import scala.concurrent.Future
import scala.concurrent.duration._
class SinkSpec extends AkkaSpec with ConversionCheckedTripleEquals with ScalaFutures {
import GraphDSL.Implicits._ import GraphDSL.Implicits._
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(2.seconds)
"A Sink" must { "A Sink" must {
@ -124,6 +129,10 @@ class SinkSpec extends AkkaSpec {
import Attributes._ import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("")
} }
"support contramap" in {
Source(0 to 9).toMat(Sink.seq.contramap(_ + 1))(Keep.right).run().futureValue should ===(1 to 10)
}
} }
} }

View file

@ -35,6 +35,13 @@ object Flow {
(javaPair.first, javaPair.second) (javaPair.first, javaPair.second)
}) })
/**
* Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent
* to `Flow.create[T].map(f)`
*/
def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] =
Flow.create[I]().map(f)
/** Create a `Flow` which can process elements of type `T`. */ /** Create a `Flow` which can process elements of type `T`. */
def of[T](clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() def of[T](clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]()

View file

@ -265,6 +265,17 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def runWith[M](source: Graph[SourceShape[In], M], materializer: Materializer): M = def runWith[M](source: Graph[SourceShape[In], M], materializer: Materializer): M =
asScala.runWith(source)(materializer) asScala.runWith(source)(materializer)
/**
* Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]]
*
* '''Backpressures when''' original [[Sink]] backpressures
*
* '''Cancels when''' original [[Sink]] backpressures
*/
def contramap[In2](f: function.Function[In2, In]): Sink[In2, Mat] =
javadsl.Flow.fromFunction(f).toMat(this, Keep.right[NotUsed, Mat])
/** /**
* Transform only the materialized value of this Sink, leaving all other properties as they were. * Transform only the materialized value of this Sink, leaving all other properties as they were.
*/ */

View file

@ -284,6 +284,12 @@ object Flow {
*/ */
def apply[T]: Flow[T, T, NotUsed] = identity.asInstanceOf[Flow[T, T, NotUsed]] def apply[T]: Flow[T, T, NotUsed] = identity.asInstanceOf[Flow[T, T, NotUsed]]
/**
* Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent
* to `Flow[T].map(f)`
*/
def fromFunction[A, B](f: A B): Flow[A, B, NotUsed] = apply[A].map(f)
/** /**
* A graph with the shape of a flow logically is a flow, this method makes * A graph with the shape of a flow logically is a flow, this method makes
* it so also in type. * it so also in type.

View file

@ -30,6 +30,16 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]] override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]]
/**
* Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]]
*
* '''Backpressures when''' original [[Sink]] backpressures
*
* '''Cancels when''' original [[Sink]] backpressures
*/
def contramap[In2](f: In2 In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right)
/** /**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]]. * of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]].