diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index c9e6d89e60..96dfcd532d 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -200,10 +200,10 @@ Accessing the materialized value inside the Graph ------------------------------------------------- In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a -Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that +Source, Sink, Flow or BidiFlow). This is possible by using ``builder.materializedValue`` which gives an ``Outlet`` that can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value. -If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times -to acquire the necessary number of outlets. +If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of +times to acquire the necessary number of outlets. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 59605e3079..9b5b97a146 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -190,7 +190,7 @@ class FlowDocSpec extends AkkaSpec { // It is also possible to map over the materialized values. In r9 we had a // doubly nested pair, but we want to flatten it out val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = - r9.mapMaterialized { + r9.mapMaterializedValue { case ((promise, cancellable), future) => (promise, cancellable, future) } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 953a094d69..9f02dc1313 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -226,7 +226,7 @@ class FlowGraphDocSpec extends AkkaSpec { val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) { implicit builder ⇒ fold ⇒ - (fold.inlet, builder.matValue.mapAsync(4)(identity).outlet) + (fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet) } //#flow-graph-matvalue @@ -243,8 +243,8 @@ class FlowGraphDocSpec extends AkkaSpec { // fold completes // As a result this Source will never emit anything, and its materialited // Future will never complete - builder.matValue.mapAsync(4)(identity) ~> fold - builder.matValue.mapAsync(4)(identity).outlet + builder.materializedValue.mapAsync(4)(identity) ~> fold + builder.materializedValue.mapAsync(4)(identity).outlet } //#flow-graph-matvalue-cycle } diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 27f36539cc..1f0942515b 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -254,10 +254,10 @@ Accessing the materialized value inside the Graph ------------------------------------------------- In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a -Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that +Source, Sink, Flow or BidiFlow). This is possible by using ``builder.materializedValue`` which gives an ``Outlet`` that can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value. -If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times -to acquire the necessary number of outlets. +If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of +times to acquire the necessary number of outlets. .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/BodyPartRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/BodyPartRenderer.scala index 151f3bcc0c..d0768924b9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/BodyPartRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/BodyPartRenderer.scala @@ -33,7 +33,7 @@ private[http] object BodyPartRenderer { def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = { val entityChunks = data.map[ChunkStreamPart](Chunk(_)) - (chunkStream(r.get) ++ entityChunks).mapMaterialized((_) ⇒ ()) + (chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ⇒ ()) } def completePartRendering(): Source[ChunkStreamPart, Any] = diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala index 3ebb0c8624..9738253b53 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala @@ -47,7 +47,7 @@ private object RenderSupport { skipEntity: Boolean = false): Source[ByteString, Any] = { val messageStart = Source.single(r.get) val messageBytes = - if (!skipEntity) (messageStart ++ entityBytes).mapMaterialized(_ ⇒ ()) + if (!skipEntity) (messageStart ++ entityBytes).mapMaterializedValue(_ ⇒ ()) else CancelSecond(messageStart, entityBytes) messageBytes } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 32d7b1d690..fbfe1fc470 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -188,7 +188,7 @@ private[http] object StreamUtils { */ def oneTimeSource[T, Mat](other: Source[T, Mat], errorMsg: String = "One time source can only be instantiated once"): Source[T, Mat] = { val onlyOnceFlag = new AtomicBoolean(false) - other.mapMaterialized { elem ⇒ + other.mapMaterializedValue { elem ⇒ if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true)) throw new IllegalStateException(errorMsg) elem diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 08927ce3f9..1c4e43e0bc 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -47,7 +47,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def bind(interface: String, port: Int, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = Source.adapt(delegate.bind(interface, port)(materializer) .map(new IncomingConnection(_)) - .mapMaterialized(_.map(new ServerBinding(_))(ec))) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding @@ -66,7 +66,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(materializer) .map(new IncomingConnection(_)) - .mapMaterialized(_.map(new ServerBinding(_))(ec))) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` @@ -173,7 +173,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { Flow.wrap { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) .viaMat(delegate.outgoingConnection(host, port))(Keep.right) - .mapMaterialized(_.map(new OutgoingConnection(_))(ec)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } /** @@ -188,7 +188,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { Flow.wrap { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) .viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right) - .mapMaterialized(_.map(new OutgoingConnection(_))(ec)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } /** diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 307c023eeb..bfe26655c9 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -46,7 +46,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, log) IncomingConnection(localAddress, remoteAddress, layer join flow) - }.mapMaterialized { + }.mapMaterializedValue { _.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext) } } @@ -367,7 +367,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E import hcps._ val theHostHeader = hostHeader(host, port, Uri.httpScheme(setup.encrypted)) clientFlow[T](setup.settings)(_.withDefaultHeaders(theHostHeader) -> gatewayFuture) - .mapMaterialized(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) + .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) } private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))( diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 50214ab907..14f73a45f6 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -102,7 +102,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "are triggered in `mapMaterialized`" in { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() - val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterialized(_ ⇒ sys.error("BOOM")) + val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterializedValue(_ ⇒ sys.error("BOOM")) val binding = Http().bindAndHandle(flow, hostname, port) val b1 = Await.result(binding, 3.seconds) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala index 4bb5c6b765..f9a4f3a996 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/TestServer.scala @@ -70,7 +70,7 @@ object TestServer extends App { Flow[Message] .collect { case TextMessage.Strict(name) ⇒ TextMessage.Strict(s"Hello '$name'") - case TextMessage.Streamed(nameStream) ⇒ TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterialized (_ ⇒ ())) + case TextMessage.Streamed(nameStream) ⇒ TextMessage.Streamed(Source.single("Hello ") ++ nameStream mapMaterializedValue (_ ⇒ ())) // ignore binary messages } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index da163fc315..d649c0a3b5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -221,7 +221,7 @@ public class FlowGraphTest extends StreamTest { @Override public void apply(Builder> b, SinkShape out) throws Exception { b.from(Source.single(1)).to(out); - b.from(b.matValue()).to(Sink.foreach(new Procedure>(){ + b.from(b.materializedValue()).to(Sink.foreach(new Procedure>(){ public void apply(Future mat) throws Exception { probe.ref().tell(mat, ActorRef.noSender()); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 63c528e056..6b582b9caa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -165,7 +165,7 @@ class GraphConcatSpec extends TwoStreamsSetup { m1.isInstanceOf[Unit] should be(true) m2.isInstanceOf[Unit] should be(true) - runnable.mapMaterialized((_) ⇒ "boo").run() should be("boo") + runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") } @@ -179,7 +179,7 @@ class GraphConcatSpec extends TwoStreamsSetup { m2.isInstanceOf[Unit] should be(true) m3.isInstanceOf[Unit] should be(true) - runnable.mapMaterialized((_) ⇒ "boo").run() should be("boo") + runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 2474f523ef..1684da7498 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -28,7 +28,7 @@ class GraphMatValueSpec extends AkkaSpec { val f = FlowGraph.closed(foldSink) { implicit b ⇒ fold ⇒ Source(1 to 10) ~> fold - b.matValue.mapAsync(4)(identity) ~> Sink(sub) + b.materializedValue.mapAsync(4)(identity) ~> Sink(sub) }.run() val r1 = Await.result(f, 3.seconds) @@ -45,8 +45,8 @@ class GraphMatValueSpec extends AkkaSpec { fold ⇒ val zip = b.add(ZipWith[Int, Int, Int](_ + _)) Source(1 to 10) ~> fold - b.matValue.mapAsync(4)(identity) ~> zip.in0 - b.matValue.mapAsync(4)(identity) ~> zip.in1 + b.materializedValue.mapAsync(4)(identity) ~> zip.in0 + b.materializedValue.mapAsync(4)(identity) ~> zip.in1 zip.out ~> Sink(sub) }.run() @@ -62,7 +62,7 @@ class GraphMatValueSpec extends AkkaSpec { val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source(foldSink) { implicit b ⇒ fold ⇒ Source(1 to 10) ~> fold - b.matValue + b.materializedValue } "allow exposing the materialized value as port" in { @@ -72,7 +72,7 @@ class GraphMatValueSpec extends AkkaSpec { } "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { - val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterialized((_) ⇒ ()) + val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ⇒ ()) Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 313d38c165..b0d96c17cb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -62,8 +62,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform only the materialized value of this Flow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] = - new Flow(delegate.mapMaterialized(f.apply _)) + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] = + new Flow(delegate.mapMaterializedValue(f.apply _)) /** * Transform this [[Flow]] by appending the given processing steps. @@ -593,7 +593,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * the first element emitted by the given ("second") source is emitted after the last element of this Flow. */ def concat[M](second: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = - new Flow(delegate.concat(second).mapMaterialized(p ⇒ Pair(p._1, p._2))) + new Flow(delegate.concat(second).mapMaterializedValue(p ⇒ Pair(p._1, p._2))) override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) @@ -690,15 +690,15 @@ trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { /** * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] } /** INTERNAL API */ private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] { def shape = ClosedShape def module = runnable.module - override def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] = - new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _)) + override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] = + new RunnableFlowAdapter(runnable.mapMaterializedValue(f.apply _)) override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer) override def withAttributes(attr: OperationAttributes): RunnableFlow[Mat] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index b0bd1caca0..1de59de601 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -300,7 +300,7 @@ object FlowGraph { * * @return The outlet that will emit the materialized value. */ - def matValue: Outlet[Mat] = delegate.matValue + def materializedValue: Outlet[Mat] = delegate.materializedValue def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 08b12ee49f..c993cf59a3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -144,8 +144,8 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ /** * Transform only the materialized value of this Sink, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = - new Sink(delegate.mapMaterialized(f.apply _)) + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = + new Sink(delegate.mapMaterializedValue(f.apply _)) override def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = new Sink(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index fde6b1e319..aee303aff8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -236,8 +236,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Transform only the materialized value of this Source, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] = - new Source(delegate.mapMaterialized(f.apply _)) + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] = + new Source(delegate.mapMaterializedValue(f.apply _)) /** * Transform this [[Source]] by appending the given processing stages. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index b4df1b3d3e..3a08e91213 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -107,7 +107,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { idleTimeout: Duration): Source[IncomingConnection, Future[ServerBinding]] = Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), idleTimeout) .map(new IncomingConnection(_)) - .mapMaterialized(_.map(new ServerBinding(_))(ec))) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) /** * Creates a [[Tcp.ServerBinding]] without specifying options. @@ -116,7 +116,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]] = Source.adapt(delegate.bind(interface, port) .map(new IncomingConnection(_)) - .mapMaterialized(_.map(new ServerBinding(_))(ec))) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) /** * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. @@ -127,7 +127,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { connectTimeout: Duration, idleTimeout: Duration): Flow[ByteString, ByteString, Future[OutgoingConnection]] = Flow.adapt(delegate.outgoingConnection(remoteAddress, localAddress, immutableSeq(options), connectTimeout, idleTimeout) - .mapMaterialized(_.map(new OutgoingConnection(_))(ec))) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))) /** * Creates an [[Tcp.OutgoingConnection]] without specifying options. @@ -135,6 +135,6 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { */ def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = Flow.adapt(delegate.outgoingConnection(new InetSocketAddress(host, port)) - .mapMaterialized(_.map(new OutgoingConnection(_))(ec))) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2a30d45c79..cc08efca2c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -68,7 +68,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * flow into the materialized value of the resulting Flow. */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { - if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _)) + if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterializedValue(combine(().asInstanceOf[Mat], _)) else { val flowCopy = flow.module.carbonCopy new Flow( @@ -129,7 +129,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Transform the materialized value of this Flow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Repr[Out, Mat2] = + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Repr[Out, Mat2] = new Flow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** @@ -320,7 +320,7 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e /** * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: Mat ⇒ Mat2): RunnableFlow[Mat2] = + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableFlow[Mat2] = copy(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 19c874694e..7d55055785 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -367,7 +367,7 @@ object FlowGraph extends GraphApply { * * @return The outlet that will emit the materialized value. */ - def matValue: Outlet[M] = { + def materializedValue: Outlet[M] = { val module = new MaterializedValueSource[Any] moduleInProgress = moduleInProgress.grow(module) module.shape.outlet.asInstanceOf[Outlet[M]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index efdd8c3a9f..d1d784fc85 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -37,7 +37,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: FlowMaterializer): Mat2 = Source.wrap(source).to(this).run() - def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) override def withAttributes(attr: OperationAttributes): Sink[In, Mat] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index cdfd3e0c36..acb1cd6e10 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -84,7 +84,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** * Transform only the materialized value of this Source, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Repr[Out, Mat2] = + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Repr[Out, Mat2] = new Source(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** INTERNAL API */