diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 88e8e0d9dd..3b55c96a13 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -280,8 +280,7 @@ object Flow { } /** - * Helper to create `Flow` without a [[Source]] or a [[Sink]]. - * Example usage: `Flow[Int]` + * Returns a `Flow` which outputs all its inputs. */ def apply[T]: Flow[T, T, NotUsed] = identity.asInstanceOf[Flow[T, T, NotUsed]] @@ -297,16 +296,21 @@ object Flow { } /** - * Helper to create `Flow` from a `Sink`and a `Source`. + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. */ def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none) /** - * Helper to create `Flow` from a `Sink`and a `Source`. - */ - def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] = - fromGraph(GraphDSL.create(sink, source)(f) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. + */ + def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] = + fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 517fc0f4ad..37fdc1e049 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -23,14 +23,14 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * * Represents a successful TCP server binding. */ - case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) { + final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) { def unbind(): Future[Unit] = unbindAction() } /** * Represents an accepted incoming TCP connection. */ - case class IncomingConnection( + final case class IncomingConnection( localAddress: InetSocketAddress, remoteAddress: InetSocketAddress, flow: Flow[ByteString, ByteString, NotUsed]) { @@ -49,7 +49,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * Represents a prospective outgoing TCP connection. */ - case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) + final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) def apply()(implicit system: ActorSystem): Tcp = super.apply(system) @@ -60,7 +60,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system) } -class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { +final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import Tcp._ // TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?