!str #17123: Make materialized value handling method names consistent

This commit is contained in:
Endre Sándor Varga 2015-05-05 10:29:41 +02:00
parent 035037dd24
commit 7ad4fdc3ce
23 changed files with 50 additions and 50 deletions

View file

@ -200,10 +200,10 @@ Accessing the materialized value inside the Graph
-------------------------------------------------
In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.materializedValue`` which gives an ``Outlet`` that
can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value.
If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times
to acquire the necessary number of outlets.
If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of
times to acquire the necessary number of outlets.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue

View file

@ -190,7 +190,7 @@ class FlowDocSpec extends AkkaSpec {
// It is also possible to map over the materialized values. In r9 we had a
// doubly nested pair, but we want to flatten it out
val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] =
r9.mapMaterialized {
r9.mapMaterializedValue {
case ((promise, cancellable), future) =>
(promise, cancellable, future)
}

View file

@ -226,7 +226,7 @@ class FlowGraphDocSpec extends AkkaSpec {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
(fold.inlet, builder.matValue.mapAsync(4)(identity).outlet)
(fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet)
}
//#flow-graph-matvalue
@ -243,8 +243,8 @@ class FlowGraphDocSpec extends AkkaSpec {
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.matValue.mapAsync(4)(identity) ~> fold
builder.matValue.mapAsync(4)(identity).outlet
builder.materializedValue.mapAsync(4)(identity) ~> fold
builder.materializedValue.mapAsync(4)(identity).outlet
}
//#flow-graph-matvalue-cycle
}

View file

@ -254,10 +254,10 @@ Accessing the materialized value inside the Graph
-------------------------------------------------
In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.materializedValue`` which gives an ``Outlet`` that
can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value.
If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times
to acquire the necessary number of outlets.
If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of
times to acquire the necessary number of outlets.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue

View file

@ -33,7 +33,7 @@ private[http] object BodyPartRenderer {
def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(chunkStream(r.get) ++ entityChunks).mapMaterialized((_) ())
(chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ())
}
def completePartRendering(): Source[ChunkStreamPart, Any] =

View file

@ -47,7 +47,7 @@ private object RenderSupport {
skipEntity: Boolean = false): Source[ByteString, Any] = {
val messageStart = Source.single(r.get)
val messageBytes =
if (!skipEntity) (messageStart ++ entityBytes).mapMaterialized(_ ())
if (!skipEntity) (messageStart ++ entityBytes).mapMaterializedValue(_ ())
else CancelSecond(messageStart, entityBytes)
messageBytes
}

View file

@ -188,7 +188,7 @@ private[http] object StreamUtils {
*/
def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = {
val onlyOnceFlag = new AtomicBoolean(false)
other.mapMaterialized { elem
other.mapMaterializedValue { elem
if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true))
throw new IllegalStateException(errorMsg)
elem

View file

@ -47,7 +47,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def bind(interface: String, port: Int, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
Source.adapt(delegate.bind(interface, port)(materializer)
.map(new IncomingConnection(_))
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
@ -66,7 +66,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
@ -173,7 +173,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
Flow.wrap {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port))(Keep.right)
.mapMaterialized(_.map(new OutgoingConnection(_))(ec))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
@ -188,7 +188,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
Flow.wrap {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right)
.mapMaterialized(_.map(new OutgoingConnection(_))(ec))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**

View file

@ -46,7 +46,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
case Tcp.IncomingConnection(localAddress, remoteAddress, flow)
val layer = serverLayer(settings, log)
IncomingConnection(localAddress, remoteAddress, layer join flow)
}.mapMaterialized {
}.mapMaterializedValue {
_.map(tcpBinding ServerBinding(tcpBinding.localAddress)(() tcpBinding.unbind()))(fm.executionContext)
}
}
@ -367,7 +367,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
import hcps._
val theHostHeader = hostHeader(host, port, Uri.httpScheme(setup.encrypted))
clientFlow[T](setup.settings)(_.withDefaultHeaders(theHostHeader) -> gatewayFuture)
.mapMaterialized(_ HostConnectionPool(hcps)(gatewayFuture))
.mapMaterializedValue(_ HostConnectionPool(hcps)(gatewayFuture))
}
private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest (HttpRequest, Future[PoolGateway]))(

View file

@ -102,7 +102,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
"are triggered in `mapMaterialized`" in {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].map(_ HttpResponse()).mapMaterialized(_ sys.error("BOOM"))
val flow = Flow[HttpRequest].map(_ HttpResponse()).mapMaterializedValue(_ sys.error("BOOM"))
val binding = Http().bindAndHandle(flow, hostname, port)
val b1 = Await.result(binding, 3.seconds)

View file

@ -70,7 +70,7 @@ object TestServer extends App {
Flow[Message]
.collect {
case TextMessage.Strict(name) TextMessage.Strict(s"Hello '$name'")
case TextMessage.Streamed(nameStream) TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterialized (_ ()))
case TextMessage.Streamed(nameStream) TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterializedValue (_ ()))
// ignore binary messages
}
}

View file

@ -221,7 +221,7 @@ public class FlowGraphTest extends StreamTest {
@Override
public void apply(Builder<Future<Integer>> b, SinkShape<Integer> out) throws Exception {
b.from(Source.single(1)).to(out);
b.from(b.matValue()).to(Sink.foreach(new Procedure<Future<Integer>>(){
b.from(b.materializedValue()).to(Sink.foreach(new Procedure<Future<Integer>>(){
public void apply(Future<Integer> mat) throws Exception {
probe.ref().tell(mat, ActorRef.noSender());
}

View file

@ -165,7 +165,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
runnable.mapMaterialized((_) "boo").run() should be("boo")
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
@ -179,7 +179,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
runnable.mapMaterialized((_) "boo").run() should be("boo")
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
}

View file

@ -28,7 +28,7 @@ class GraphMatValueSpec extends AkkaSpec {
val f = FlowGraph.closed(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.matValue.mapAsync(4)(identity) ~> Sink(sub)
b.materializedValue.mapAsync(4)(identity) ~> Sink(sub)
}.run()
val r1 = Await.result(f, 3.seconds)
@ -45,8 +45,8 @@ class GraphMatValueSpec extends AkkaSpec {
fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.matValue.mapAsync(4)(identity) ~> zip.in0
b.matValue.mapAsync(4)(identity) ~> zip.in1
b.materializedValue.mapAsync(4)(identity) ~> zip.in0
b.materializedValue.mapAsync(4)(identity) ~> zip.in1
zip.out ~> Sink(sub)
}.run()
@ -62,7 +62,7 @@ class GraphMatValueSpec extends AkkaSpec {
val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.matValue
b.materializedValue
}
"allow exposing the materialized value as port" in {
@ -72,7 +72,7 @@ class GraphMatValueSpec extends AkkaSpec {
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterialized((_) ())
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}

View file

@ -62,8 +62,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Transform only the materialized value of this Flow, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
new Flow(delegate.mapMaterialized(f.apply _))
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
new Flow(delegate.mapMaterializedValue(f.apply _))
/**
* Transform this [[Flow]] by appending the given processing steps.
@ -593,7 +593,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* the first element emitted by the given ("second") source is emitted after the last element of this Flow.
*/
def concat[M](second: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] =
new Flow(delegate.concat(second).mapMaterialized(p Pair(p._1, p._2)))
new Flow(delegate.concat(second).mapMaterializedValue(p Pair(p._1, p._2)))
override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))
@ -690,15 +690,15 @@ trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] {
/**
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2]
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2]
}
/** INTERNAL API */
private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] {
def shape = ClosedShape
def module = runnable.module
override def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] =
new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _))
override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] =
new RunnableFlowAdapter(runnable.mapMaterializedValue(f.apply _))
override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer)
override def withAttributes(attr: OperationAttributes): RunnableFlow[Mat] =

View file

@ -300,7 +300,7 @@ object FlowGraph {
*
* @return The outlet that will emit the materialized value.
*/
def matValue: Outlet[Mat] = delegate.matValue
def materializedValue: Outlet[Mat] = delegate.materializedValue
def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat)

View file

@ -144,8 +144,8 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
/**
* Transform only the materialized value of this Sink, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterialized(f.apply _))
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
override def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] =
new Sink(delegate.withAttributes(attr))

View file

@ -236,8 +236,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/**
* Transform only the materialized value of this Source, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
new Source(delegate.mapMaterialized(f.apply _))
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
new Source(delegate.mapMaterializedValue(f.apply _))
/**
* Transform this [[Source]] by appending the given processing stages.

View file

@ -107,7 +107,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
idleTimeout: Duration): Source[IncomingConnection, Future[ServerBinding]] =
Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), idleTimeout)
.map(new IncomingConnection(_))
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Creates a [[Tcp.ServerBinding]] without specifying options.
@ -116,7 +116,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]] =
Source.adapt(delegate.bind(interface, port)
.map(new IncomingConnection(_))
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
@ -127,7 +127,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
connectTimeout: Duration,
idleTimeout: Duration): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
Flow.adapt(delegate.outgoingConnection(remoteAddress, localAddress, immutableSeq(options), connectTimeout, idleTimeout)
.mapMaterialized(_.map(new OutgoingConnection(_))(ec)))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)))
/**
* Creates an [[Tcp.OutgoingConnection]] without specifying options.
@ -135,6 +135,6 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
Flow.adapt(delegate.outgoingConnection(new InetSocketAddress(host, port))
.mapMaterialized(_.map(new OutgoingConnection(_))(ec)))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)))
}

View file

@ -68,7 +68,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _))
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterializedValue(combine(().asInstanceOf[Mat], _))
else {
val flowCopy = flow.module.carbonCopy
new Flow(
@ -129,7 +129,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/**
* Transform the materialized value of this Flow, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: Mat Mat2): Repr[Out, Mat2] =
def mapMaterializedValue[Mat2](f: Mat Mat2): Repr[Out, Mat2] =
new Flow(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**
@ -320,7 +320,7 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e
/**
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: Mat Mat2): RunnableFlow[Mat2] =
def mapMaterializedValue[Mat2](f: Mat Mat2): RunnableFlow[Mat2] =
copy(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**

View file

@ -367,7 +367,7 @@ object FlowGraph extends GraphApply {
*
* @return The outlet that will emit the materialized value.
*/
def matValue: Outlet[M] = {
def materializedValue: Outlet[M] = {
val module = new MaterializedValueSource[Any]
moduleInProgress = moduleInProgress.grow(module)
module.shape.outlet.asInstanceOf[Outlet[M]]

View file

@ -37,7 +37,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: FlowMaterializer): Mat2 =
Source.wrap(source).to(this).run()
def mapMaterialized[Mat2](f: Mat Mat2): Sink[In, Mat2] =
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
override def withAttributes(attr: OperationAttributes): Sink[In, Mat] =

View file

@ -84,7 +84,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
/**
* Transform only the materialized value of this Source, leaving all other properties as they were.
*/
def mapMaterialized[Mat2](f: Mat Mat2): Repr[Out, Mat2] =
def mapMaterializedValue[Mat2](f: Mat Mat2): Repr[Out, Mat2] =
new Source(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** INTERNAL API */