stream: provide mapMaterializedValue for Graph (#28610)

This commit is contained in:
eyal farago 2020-02-20 13:30:32 +01:00 committed by GitHub
parent 8a354ec3f0
commit 5bb9a7145a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 0 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl;
import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.*;
@ -216,4 +217,14 @@ public class GraphDslTest extends StreamTest {
assertEquals("bx", result.get(1).toCompletableFuture().get(1, TimeUnit.SECONDS));
assertEquals("cx", result.get(2).toCompletableFuture().get(1, TimeUnit.SECONDS));
}
@Test
public void canUseMapMaterializedValueOnGraphs() {
Graph<SourceShape<Object>, NotUsed> srcGraph = Source.empty();
Graph<SourceShape<Object>, Pair> mappedMatValueSrcGraph =
Graph.mapMaterializedValue(srcGraph, notUsed -> new Pair(notUsed, notUsed));
Sink<Object, CompletionStage<Done>> snk = Sink.ignore();
Pair<NotUsed, NotUsed> pair = Source.fromGraph(mappedMatValueSrcGraph).to(snk).run(system);
assertEquals(pair, new Pair(NotUsed.getInstance(), NotUsed.getInstance()));
}
}

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.impl.fusing.GraphStages
import akka.stream._
import akka.stream.testkit._
@ -411,5 +412,15 @@ class GraphDSLCompileSpec extends StreamSpec {
ga.traversalBuilder.attributes.getFirst[Name] shouldEqual Some(Name("useless"))
ga.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary))
}
"support mapMaterializedValue" in {
val anOp = op[String, String]
val anOpWithMappedMatVal = anOp.mapMaterializedValue {
case NotUsed => (NotUsed, NotUsed)
}
val g = Source.empty[String].viaMat(anOpWithMappedMatVal)(Keep.right).to(Sink.cancelled)
val matVal = g.run()
matVal shouldEqual ((NotUsed, NotUsed))
}
}
}

View file

@ -6,6 +6,7 @@ package akka.stream
import akka.annotation.InternalApi
import akka.stream.impl.TraversalBuilder
import akka.stream.scaladsl.GenericGraph
import scala.annotation.unchecked.uncheckedVariance
@ -69,6 +70,34 @@ trait Graph[+S <: Shape, +M] {
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
}
object Graph {
/**
* Java API
* Transform the materialized value of this Flow, leaving all other properties as they were.
*
* @param g the graph being transformed
* @param f function to map the graph's materialized value
* @return a graph with same semantics as the given graph, except from the materialized value which is mapped using f.
*/
def mapMaterializedValue[S <: Shape, M1, M2](g: Graph[S, M1])(f: M1 => M2): Graph[S, M2] =
new GenericGraph(g.shape, g.traversalBuilder).mapMaterializedValue(f)
/**
* Scala API, see https://github.com/akka/akka/issues/28501 for discussion why this can't be an instance method on class Graph.
* @param self the graph whose materialized value will be mapped
*/
final implicit class GraphMapMatVal[S <: Shape, M](self: Graph[S, M]) {
/**
* Transform the materialized value of this Graph, leaving all other properties as they were.
*
* @param f function to map the graph's materialized value
*/
def mapMaterializedValue[M2](f: M => M2): Graph[S, M2] = Graph.mapMaterializedValue(self)(f)
}
}
/**
* INTERNAL API
*

View file

@ -37,6 +37,9 @@ private[stream] final class GenericGraph[S <: Shape, Mat](
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GenericGraphWithChangedAttributes(shape, traversalBuilder, attr)
def mapMaterializedValue[Mat2](f: Mat => Mat2): GenericGraph[S, Mat2] =
new GenericGraph(shape, traversalBuilder.transformMat(f))
}
/**