From e8a1556060152eb24f059ea431910a838973eee8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 8 Oct 2019 12:30:41 +0200 Subject: [PATCH] Streams TLS and Tcp with SSLEngine, #21753 (#27766) * SSLEngine factory instead of SSLContext and AkkaSSLConfig parameters in TLS and Tcp * Update TlsSpec to use SSLEngine * Keep copy of old TlsSpec for test coverage of deprecated methods * Update doc example of how to setup a SSLEngine * full API and deprecations * don't use default param values * java doc example * migration guide * mima * update to sslconfig 0.4.0 * hostname verification changed in sslconfig, so use jvm verifier * change to mima file * update doc sample, init SSLContext once * remove FIXME for halfClosed --- .../project/migration-guide-2.5.x-2.6.x.md | 9 + .../src/main/paradox/stream/stream-io.md | 12 +- .../artery/tcp/ArteryTcpTransport.scala | 18 +- .../java/akka/stream/javadsl/TcpTest.java | 102 ++-- .../akka/stream/io/DeprecatedTlsSpec.scala | 546 ++++++++++++++++++ .../test/scala/akka/stream/io/TcpSpec.scala | 92 ++- .../test/scala/akka/stream/io/TlsSpec.scala | 67 ++- .../issue-21753-sslconfig.excludes | 12 + .../main/scala/akka/stream/javadsl/TLS.scala | 14 +- .../main/scala/akka/stream/javadsl/Tcp.scala | 221 ++++++- .../main/scala/akka/stream/scaladsl/TLS.scala | 42 +- .../main/scala/akka/stream/scaladsl/Tcp.scala | 177 +++++- .../sslconfig/akka/AkkaSSLConfig.scala | 15 +- .../akka/SSLEngineConfigurator.scala | 2 + project/Dependencies.scala | 2 +- 15 files changed, 1205 insertions(+), 126 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/DeprecatedTlsSpec.scala create mode 100644 akka-stream/src/main/mima-filters/2.5.x.backwards.excludes/issue-21753-sslconfig.excludes diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index ef8663446f..b978d92f7c 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -145,6 +145,15 @@ Akka is now using Protobuf version 3.9.0 for serialization of messages defined b Cluster client has been deprecated as of 2.6 in favor of [Akka gRPC](https://doc.akka.io/docs/akka-grpc/current/index.html). It is not advised to build new applications with Cluster client, and existing users @ref[should migrate to Akka gRPC](../cluster-client.md#migration-to-akka-grpc). +### AkkaSslConfig + +`AkkaSslConfig` has been deprecated in favor of setting up TLS with `javax.net.ssl.SSLEngine` directly. + +This also means that methods Akka Streams `TLS` and `Tcp` that take `SSLContext` or `AkkaSslConfig` have been +deprecated and replaced with corresponding methods that takes a factory function for creating the `SSLEngine`. + +See documentation of @ref:[streaming IO with TLS](../stream/stream-io.md#tls). + ### akka.Main `akka.Main` is deprecated in favour of starting the `ActorSystem` from a custom main class instead. `akka.Main` was not diff --git a/akka-docs/src/main/paradox/stream/stream-io.md b/akka-docs/src/main/paradox/stream/stream-io.md index 9e337b2352..80f75fbadd 100644 --- a/akka-docs/src/main/paradox/stream/stream-io.md +++ b/akka-docs/src/main/paradox/stream/stream-io.md @@ -145,18 +145,20 @@ Java ### TLS -Similar factories as shown above for raw TCP but where the data is encrypted using TLS are available from `Tcp` through `outgoingTlsConnection`, `bindTls` and `bindAndHandleTls`, see the @scala[@scaladoc[`Tcp Scaladoc`](akka.stream.scaladsl.Tcp)]@java[@javadoc[`Tcp Javadoc`](akka.stream.javadsl.Tcp)] for details. +Similar factories as shown above for raw TCP but where the data is encrypted using TLS are available from `Tcp` +through `outgoingConnectionWithTls`, `bindWithTls` and `bindAndHandleWithTls`, +see the @scala[@scaladoc[`Tcp Scaladoc`](akka.stream.scaladsl.Tcp)]@java[@javadoc[`Tcp Javadoc`](akka.stream.javadsl.Tcp)] for details. -Using TLS requires a keystore and a truststore and then a somewhat involved dance of configuring the SSLContext and the details for how the session should be negotiated: +Using TLS requires a keystore and a truststore and then a somewhat involved dance of configuring the SSLEngine and the details for how the session should be negotiated: Scala -: @@snip [TcpSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala) { #setting-up-ssl-context } +: @@snip [TcpSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala) { #setting-up-ssl-engine } Java -: @@snip [TcpTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java) { #setting-up-ssl-context } +: @@snip [TcpTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java) { #setting-up-ssl-engine } -The `SslContext` and `NegotiateFirstSession` instances can then be used with the binding or outgoing connection factory methods. +The `SSLEngine` instance can then be used with the binding or outgoing connection factory methods. ## Streaming File IO diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 524d65c542..70216a4292 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -13,6 +13,7 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration.Duration import scala.util.Failure import scala.util.Success import scala.util.Try @@ -30,6 +31,7 @@ import akka.remote.artery.Decoder.InboundCompressionAccess import akka.remote.artery.compress._ import akka.stream.Attributes import akka.stream.Attributes.LogLevels +import akka.stream.IgnoreComplete import akka.stream.KillSwitches import akka.stream.Materializer import akka.stream.SharedKillSwitch @@ -124,11 +126,15 @@ private[remote] class ArteryTcpTransport( def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = if (tlsEnabled) { val sslProvider = sslEngineProvider.get - Tcp().outgoingTlsConnectionWithSSLEngine( + Tcp().outgoingConnectionWithTls( remoteAddress, createSSLEngine = () => sslProvider.createClientSSLEngine(host, port), + localAddress = None, + options = Nil, connectTimeout = settings.Advanced.Tcp.ConnectionTimeout, - verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session))) + idleTimeout = Duration.Inf, + verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session)), + closing = IgnoreComplete) } else { Tcp().outgoingConnection( remoteAddress, @@ -213,11 +219,15 @@ private[remote] class ArteryTcpTransport( val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = if (tlsEnabled) { val sslProvider = sslEngineProvider.get - Tcp().bindTlsWithSSLEngine( + Tcp().bindWithTls( interface = bindHost, port = bindPort, createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort), - verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session))) + backlog = Tcp.defaultBacklog, + options = Nil, + idleTimeout = Duration.Inf, + verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session)), + closing = IgnoreComplete) } else { Tcp().bind(interface = bindHost, port = bindPort, halfClose = false) } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java index 2e25004720..5045dffe55 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java @@ -5,12 +5,12 @@ package akka.stream.javadsl; import akka.Done; -import akka.actor.ActorSystem; import akka.japi.function.Function2; import akka.japi.function.Procedure; import akka.stream.BindFailedException; import akka.stream.StreamTcpException; import akka.stream.StreamTest; + import akka.stream.javadsl.Tcp.IncomingConnection; import akka.stream.javadsl.Tcp.ServerBinding; import akka.testkit.AkkaJUnitActorSystemResource; @@ -23,8 +23,14 @@ import static akka.util.ByteString.emptyByteString; import org.junit.ClassRule; import org.junit.Test; +import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; @@ -34,15 +40,17 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -// #setting-up-ssl-context +// #setting-up-ssl-engine // imports -import akka.stream.TLSClientAuth; -import akka.stream.TLSProtocol; -import com.typesafe.sslconfig.akka.AkkaSSLConfig; import java.security.KeyStore; -import javax.net.ssl.*; import java.security.SecureRandom; -// #setting-up-ssl-context +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; +import akka.stream.TLSRole; + +// #setting-up-ssl-engine public class TcpTest extends StreamTest { public TcpTest() { @@ -165,50 +173,54 @@ public class TcpTest extends StreamTest { } // compile only sample - public void constructSslContext() throws Exception { - ActorSystem system = null; + // #setting-up-ssl-engine + // initialize SSLContext once + private final SSLContext sslContext; - // #setting-up-ssl-context + { + try { + // Don't hardcode your password in actual code + char[] password = "abcdef".toCharArray(); - // -- setup logic --- + // trust store and keys in one keystore + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(getClass().getResourceAsStream("/tcp-spec-keystore.p12"), password); - AkkaSSLConfig sslConfig = AkkaSSLConfig.get(system); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509"); + trustManagerFactory.init(keyStore); - // Don't hardcode your password in actual code - char[] password = "abcdef".toCharArray(); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); + keyManagerFactory.init(keyStore, password); - // trust store and keys in one keystore - KeyStore keyStore = KeyStore.getInstance("PKCS12"); - keyStore.load(getClass().getResourceAsStream("/tcp-spec-keystore.p12"), password); + // init ssl context + SSLContext context = SSLContext.getInstance("TLSv1.2"); + context.init( + keyManagerFactory.getKeyManagers(), + trustManagerFactory.getTrustManagers(), + new SecureRandom()); - TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); - tmf.init(keyStore); + sslContext = context; - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); - keyManagerFactory.init(keyStore, password); - - // initial ssl context - SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(keyManagerFactory.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); - - // protocols - SSLParameters defaultParams = sslContext.getDefaultSSLParameters(); - String[] defaultProtocols = defaultParams.getProtocols(); - String[] protocols = sslConfig.configureProtocols(defaultProtocols, sslConfig.config()); - defaultParams.setProtocols(protocols); - - // ciphers - String[] defaultCiphers = defaultParams.getCipherSuites(); - String[] cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, sslConfig.config()); - defaultParams.setCipherSuites(cipherSuites); - - TLSProtocol.NegotiateNewSession negotiateNewSession = - TLSProtocol.negotiateNewSession() - .withCipherSuites(cipherSuites) - .withProtocols(protocols) - .withParameters(defaultParams) - .withClientAuth(TLSClientAuth.none()); - - // #setting-up-ssl-context + } catch (KeyStoreException + | IOException + | NoSuchAlgorithmException + | CertificateException + | UnrecoverableKeyException + | KeyManagementException e) { + throw new RuntimeException(e); + } } + + // create new SSLEngine from the SSLContext, which was initialized once + public SSLEngine createSSLEngine(TLSRole role) { + SSLEngine engine = sslContext.createSSLEngine(); + + engine.setUseClientMode(role.equals(akka.stream.TLSRole.client())); + engine.setEnabledCipherSuites(new String[] {"TLS_RSA_WITH_AES_128_CBC_SHA"}); + engine.setEnabledProtocols(new String[] {"TLSv1.2"}); + + return engine; + } + // #setting-up-ssl-engine + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/DeprecatedTlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/DeprecatedTlsSpec.scala new file mode 100644 index 0000000000..0280d2845d --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/DeprecatedTlsSpec.scala @@ -0,0 +1,546 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.stream.io + +import java.security.KeyStore +import java.security.SecureRandom +import java.security.cert.CertificateException +import java.util.concurrent.TimeoutException + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Random + +import akka.NotUsed +import akka.pattern.{ after => later } +import akka.stream.TLSProtocol._ +import akka.stream._ +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl._ +import akka.stream.stage._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.testkit.WithLogCapturing +import akka.util.ByteString +import akka.util.JavaVersion +import com.github.ghik.silencer.silent +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import javax.net.ssl._ + +object DeprecatedTlsSpec { + + val rnd = new Random + + def initWithTrust(trustPath: String) = { + val password = "changeme" + + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray) + + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + trustStore.load(getClass.getResourceAsStream(trustPath), password.toCharArray) + + val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + keyManagerFactory.init(keyStore, password.toCharArray) + + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init(trustStore) + + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) + context + } + + def initSslContext(): SSLContext = initWithTrust("/truststore") + + /** + * This is an operator that fires a TimeoutException failure 2 seconds after it was started, + * independent of the traffic going through. The purpose is to include the last seen + * element in the exception message to help in figuring out what went wrong. + */ + class Timeout(duration: FiniteDuration) extends GraphStage[FlowShape[ByteString, ByteString]] { + + private val in = Inlet[ByteString]("in") + private val out = Outlet[ByteString]("out") + override val shape = FlowShape(in, out) + + override def createLogic(attr: Attributes) = new TimerGraphStageLogic(shape) { + override def preStart(): Unit = scheduleOnce((), duration) + + var last: ByteString = _ + setHandler(in, new InHandler { + override def onPush(): Unit = { + last = grab(in) + push(out, last) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + override def onTimer(x: Any): Unit = { + failStage(new TimeoutException(s"timeout expired, last element was $last")) + } + } + } + + val configOverrides = + """ + akka.loglevel = DEBUG # issue 21660 + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.debug.receive=off + """ +} + +@silent("deprecated") +class DeprecatedTlsSpec extends StreamSpec(DeprecatedTlsSpec.configOverrides) with WithLogCapturing { + import GraphDSL.Implicits._ + import DeprecatedTlsSpec._ + import system.dispatcher + + val sslConfig: Option[AkkaSSLConfig] = None // no special settings to be applied here + + "SslTls with deprecated SSLContext setup" must { + + val sslContext = initSslContext() + + val debug = Flow[SslTlsInbound].map { x => + x match { + case SessionTruncated => system.log.debug(s" ----------- truncated ") + case SessionBytes(_, b) => system.log.debug(s" ----------- (${b.size}) ${b.take(32).utf8String}") + } + x + } + + val cipherSuites = + NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") + def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing) + def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing) + def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing) + + trait Named { + def name: String = + getClass.getName.reverse.dropWhile(c => "$0123456789".indexOf(c) != -1).takeWhile(_ != '$').reverse + } + + trait CommunicationSetup extends Named { + def decorateFlow( + leftClosing: TLSClosing, + rightClosing: TLSClosing, + rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed] + def cleanup(): Unit = () + } + + object ClientInitiates extends CommunicationSetup { + def decorateFlow( + leftClosing: TLSClosing, + rightClosing: TLSClosing, + rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = + clientTls(leftClosing).atop(serverTls(rightClosing).reversed).join(rhs) + } + + object ServerInitiates extends CommunicationSetup { + def decorateFlow( + leftClosing: TLSClosing, + rightClosing: TLSClosing, + rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = + serverTls(leftClosing).atop(clientTls(rightClosing).reversed).join(rhs) + } + + def server(flow: Flow[ByteString, ByteString, Any]) = { + val server = Tcp().bind("localhost", 0).to(Sink.foreach(c => c.flow.join(flow).run())).run() + Await.result(server, 2.seconds) + } + + object ClientInitiatesViaTcp extends CommunicationSetup { + var binding: Tcp.ServerBinding = null + def decorateFlow( + leftClosing: TLSClosing, + rightClosing: TLSClosing, + rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { + binding = server(serverTls(rightClosing).reversed.join(rhs)) + clientTls(leftClosing).join(Tcp().outgoingConnection(binding.localAddress)) + } + override def cleanup(): Unit = binding.unbind() + } + + object ServerInitiatesViaTcp extends CommunicationSetup { + var binding: Tcp.ServerBinding = null + def decorateFlow( + leftClosing: TLSClosing, + rightClosing: TLSClosing, + rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { + binding = server(clientTls(rightClosing).reversed.join(rhs)) + serverTls(leftClosing).join(Tcp().outgoingConnection(binding.localAddress)) + } + override def cleanup(): Unit = binding.unbind() + } + + val communicationPatterns = + Seq(ClientInitiates, ServerInitiates, ClientInitiatesViaTcp, ServerInitiatesViaTcp) + + trait PayloadScenario extends Named { + def flow: Flow[SslTlsInbound, SslTlsOutbound, Any] = + Flow[SslTlsInbound].map { + var session: SSLSession = null + def setSession(s: SSLSession) = { + session = s + system.log.debug(s"new session: $session (${session.getId.mkString(",")})") + } + + { + case SessionTruncated => SendBytes(ByteString("TRUNCATED")) + case SessionBytes(s, b) if session == null => + setSession(s) + SendBytes(b) + case SessionBytes(s, b) if s != session => + setSession(s) + SendBytes(ByteString("NEWSESSION") ++ b) + case SessionBytes(_, b) => SendBytes(b) + } + } + def leftClosing: TLSClosing = IgnoreComplete + def rightClosing: TLSClosing = IgnoreComplete + + def inputs: immutable.Seq[SslTlsOutbound] + def output: ByteString + + protected def send(str: String) = SendBytes(ByteString(str)) + protected def send(ch: Char) = SendBytes(ByteString(ch.toByte)) + } + + object SingleBytes extends PayloadScenario { + val str = "0123456789" + def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) + def output = ByteString(str) + } + + object MediumMessages extends PayloadScenario { + val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 1000)) + def inputs = strs.map(s => SendBytes(ByteString(s))) + def output = ByteString(strs.foldRight("")(_ ++ _)) + } + + object LargeMessages extends PayloadScenario { + // TLS max packet size is 16384 bytes + val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 17000)) + def inputs = strs.map(s => SendBytes(ByteString(s))) + def output = ByteString(strs.foldRight("")(_ ++ _)) + } + + object EmptyBytesFirst extends PayloadScenario { + def inputs = List(ByteString.empty, ByteString("hello")).map(SendBytes) + def output = ByteString("hello") + } + + object EmptyBytesInTheMiddle extends PayloadScenario { + def inputs = List(ByteString("hello"), ByteString.empty, ByteString(" world")).map(SendBytes) + def output = ByteString("hello world") + } + + object EmptyBytesLast extends PayloadScenario { + def inputs = List(ByteString("hello"), ByteString.empty).map(SendBytes) + def output = ByteString("hello") + } + + object CompletedImmediately extends PayloadScenario { + override def inputs: immutable.Seq[SslTlsOutbound] = Nil + override def output = ByteString.empty + + override def leftClosing: TLSClosing = EagerClose + override def rightClosing: TLSClosing = EagerClose + } + + // this demonstrates that cancellation is ignored so that the five results make it back + object CancellingRHS extends PayloadScenario { + override def flow = + Flow[SslTlsInbound] + .mapConcat { + case SessionTruncated => SessionTruncated :: Nil + case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b))) + } + .take(5) + .mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x))) + .via(super.flow) + override def rightClosing = IgnoreCancel + + val str = "abcdef" * 100 + def inputs = str.map(send) + def output = ByteString(str.take(5)) + } + + object CancellingRHSIgnoresBoth extends PayloadScenario { + override def flow = + Flow[SslTlsInbound] + .mapConcat { + case SessionTruncated => SessionTruncated :: Nil + case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b))) + } + .take(5) + .mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x))) + .via(super.flow) + override def rightClosing = IgnoreBoth + + val str = "abcdef" * 100 + def inputs = str.map(send) + def output = ByteString(str.take(5)) + } + + object LHSIgnoresBoth extends PayloadScenario { + override def leftClosing = IgnoreBoth + val str = "0123456789" + def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) + def output = ByteString(str) + } + + object BothSidesIgnoreBoth extends PayloadScenario { + override def leftClosing = IgnoreBoth + override def rightClosing = IgnoreBoth + val str = "0123456789" + def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) + def output = ByteString(str) + } + + object SessionRenegotiationBySender extends PayloadScenario { + def inputs = List(send("hello"), NegotiateNewSession, send("world")) + def output = ByteString("helloNEWSESSIONworld") + } + + // difference is that the RHS engine will now receive the handshake while trying to send + object SessionRenegotiationByReceiver extends PayloadScenario { + val str = "abcdef" * 100 + def inputs = str.map(send) ++ Seq(NegotiateNewSession) ++ "hello world".map(send) + def output = ByteString(str + "NEWSESSIONhello world") + } + + val logCipherSuite = Flow[SslTlsInbound].map { + var session: SSLSession = null + def setSession(s: SSLSession) = { + session = s + system.log.debug(s"new session: $session (${session.getId.mkString(",")})") + } + + { + case SessionTruncated => SendBytes(ByteString("TRUNCATED")) + case SessionBytes(s, b) if s != session => + setSession(s) + SendBytes(ByteString(s.getCipherSuite) ++ b) + case SessionBytes(_, b) => SendBytes(b) + } + } + + object SessionRenegotiationFirstOne extends PayloadScenario { + override def flow = logCipherSuite + def inputs = NegotiateNewSession.withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil + def output = ByteString("TLS_RSA_WITH_AES_128_CBC_SHAhello") + } + + object SessionRenegotiationFirstTwo extends PayloadScenario { + override def flow = logCipherSuite + def inputs = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil + def output = ByteString("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHAhello") + } + + val scenarios = + Seq( + SingleBytes, + MediumMessages, + LargeMessages, + EmptyBytesFirst, + EmptyBytesInTheMiddle, + EmptyBytesLast, + CompletedImmediately, + CancellingRHS, + CancellingRHSIgnoresBoth, + LHSIgnoresBoth, + BothSidesIgnoreBoth, + SessionRenegotiationBySender, + SessionRenegotiationByReceiver, + SessionRenegotiationFirstOne, + SessionRenegotiationFirstTwo) + + for { + commPattern <- communicationPatterns + scenario <- scenarios + } { + s"work in mode ${commPattern.name} while sending ${scenario.name}" in assertAllStagesStopped { + val onRHS = debug.via(scenario.flow) + val output = + Source(scenario.inputs) + .via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS)) + .via(new SimpleLinearGraphStage[SslTlsInbound] { + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + + override def onPush() = push(out, grab(in)) + override def onPull() = pull(in) + + override def onDownstreamFinish(cause: Throwable) = { + system.log.debug(s"me cancelled, cause {}", cause) + completeStage() + } + } + }) + .via(debug) + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .filter(_.nonEmpty) + .via(new Timeout(6.seconds)) + .dropWhile(_.size < scenario.output.size) + .runWith(Sink.headOption) + + Await.result(output, 8.seconds).getOrElse(ByteString.empty).utf8String should be(scenario.output.utf8String) + + commPattern.cleanup() + } + } + + "emit an error if the TLS handshake fails certificate checks" in assertAllStagesStopped { + val getError = Flow[SslTlsInbound] + .map[Either[SslTlsInbound, SSLException]](i => Left(i)) + .recover { case e: SSLException => Right(e) } + .collect { case Right(e) => e } + .toMat(Sink.head)(Keep.right) + + val simple = Flow.fromSinkAndSourceMat(getError, Source.maybe[SslTlsOutbound])(Keep.left) + + // The creation of actual TCP connections is necessary. It is the easiest way to decouple the client and server + // under error conditions, and has the bonus of matching most actual SSL deployments. + val (server, serverErr) = Tcp() + .bind("localhost", 0) + .mapAsync(1)(c => c.flow.joinMat(serverTls(IgnoreBoth).reversed.joinMat(simple)(Keep.right))(Keep.right).run()) + .toMat(Sink.head)(Keep.both) + .run() + + val clientErr = simple + .join(badClientTls(IgnoreBoth)) + .join(Tcp().outgoingConnection(Await.result(server, 1.second).localAddress)) + .run() + + Await.result(serverErr, 1.second).getMessage should include("certificate_unknown") + val clientErrText = Await.result(clientErr, 1.second).getMessage + if (JavaVersion.majorVersion >= 11) + clientErrText should include("unable to find valid certification path to requested target") + else + clientErrText should equal("General SSLEngine problem") + } + + "reliably cancel subscriptions when TransportIn fails early" in assertAllStagesStopped { + val ex = new Exception("hello") + val (sub, out1, out2) = + RunnableGraph + .fromGraph( + GraphDSL.create(Source.asSubscriber[SslTlsOutbound], Sink.head[ByteString], Sink.head[SslTlsInbound])( + (_, _, _)) { implicit b => (s, o1, o2) => + val tls = b.add(clientTls(EagerClose)) + s ~> tls.in1; tls.out1 ~> o1 + o2 <~ tls.out2; tls.in2 <~ Source.failed(ex) + ClosedShape + }) + .run() + the[Exception] thrownBy Await.result(out1, 1.second) should be(ex) + the[Exception] thrownBy Await.result(out2, 1.second) should be(ex) + Thread.sleep(500) + val pub = TestPublisher.probe() + pub.subscribe(sub) + pub.expectSubscription().expectCancellation() + } + + "reliably cancel subscriptions when UserIn fails early" in assertAllStagesStopped { + val ex = new Exception("hello") + val (sub, out1, out2) = + RunnableGraph + .fromGraph(GraphDSL.create(Source.asSubscriber[ByteString], Sink.head[ByteString], Sink.head[SslTlsInbound])( + (_, _, _)) { implicit b => (s, o1, o2) => + val tls = b.add(clientTls(EagerClose)) + Source.failed[SslTlsOutbound](ex) ~> tls.in1; tls.out1 ~> o1 + o2 <~ tls.out2; tls.in2 <~ s + ClosedShape + }) + .run() + the[Exception] thrownBy Await.result(out1, 1.second) should be(ex) + the[Exception] thrownBy Await.result(out2, 1.second) should be(ex) + Thread.sleep(500) + val pub = TestPublisher.probe() + pub.subscribe(sub) + pub.expectSubscription().expectCancellation() + } + + "complete if TLS connection is truncated" in assertAllStagesStopped { + + val ks = KillSwitches.shared("ks") + + val scenario = SingleBytes + + val outFlow = { + val terminator = BidiFlow.fromFlows(Flow[ByteString], ks.flow[ByteString]) + clientTls(scenario.leftClosing) + .atop(terminator) + .atop(serverTls(scenario.rightClosing).reversed) + .join(debug.via(scenario.flow)) + .via(debug) + } + + val inFlow = Flow[SslTlsInbound] + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .via(new Timeout(6.seconds)) + .dropWhile(_.size < scenario.output.size) + + val f = + Source(scenario.inputs) + .via(outFlow) + .via(inFlow) + .map(result => { + ks.shutdown(); result + }) + .runWith(Sink.last) + + Await.result(f, 8.second).utf8String should be(scenario.output.utf8String) + } + + "verify hostname" in assertAllStagesStopped { + def run(hostName: String): Future[akka.Done] = { + val rhs = Flow[SslTlsInbound].map { + case SessionTruncated => SendBytes(ByteString.empty) + case SessionBytes(_, b) => SendBytes(b) + } + val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80))) + val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs) + + Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore) + } + Await.result(run("akka-remote"), 3.seconds) // CN=akka-remote + val cause = intercept[Exception] { + Await.result(run("unknown.example.org"), 3.seconds) + } + cause.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem + val cause2 = cause.getCause + cause2.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem + val cause3 = cause2.getCause + cause3.getClass should ===(classOf[CertificateException]) + cause3.getMessage should ===("No name matching unknown.example.org found") + } + + } + + "A SslTlsPlacebo" must { + + "pass through data" in { + val f = Source(1 to 3) + .map(b => SendBytes(ByteString(b.toByte))) + .via(TLSPlacebo().join(Flow.apply)) + .grouped(10) + .runWith(Sink.head) + val result = Await.result(f, 3.seconds) + result.map(_.bytes) should be((1 to 3).map(b => ByteString(b.toByte))) + result.map(_.session).foreach(s => s.getCipherSuite should be("SSL_NULL_WITH_NULL_NULL")) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index a85eeaec16..1ff8dda2f7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -40,7 +40,6 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.concurrent.PatienceConfiguration.Timeout - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.ExecutionContext @@ -48,6 +47,8 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ +import com.github.ghik.silencer.silent + @silent("never used") class NonResolvingDnsActor(cache: SimpleDnsCache, config: Config) extends Actor { def receive = { @@ -830,9 +831,93 @@ class TcpSpec extends StreamSpec(""" } - "TLS client and server convenience methods" should { + "TLS client and server convenience methods with SSLEngine setup" should { - "allow for 'simple' TLS" in { + "allow for TLS" in { + // cert is valid until 2025, so if this tests starts failing after that you need to create a new one + val address = temporaryServerAddress() + + Tcp() + .bindAndHandleWithTls( + // just echo charactes until we reach '\n', then complete stream + // also - byte is our framing + Flow[ByteString].mapConcat(_.utf8String.toList).takeWhile(_ != '\n').map(c => ByteString(c)), + address.getHostName, + address.getPort, + () => createSSLEngine(TLSRole.server)) + .futureValue + system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}") + + val connectionFlow = + Tcp().outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client)) + + val chars = "hello\n".toList.map(_.toString) + val (connectionF, result) = + Source(chars) + .map(c => ByteString(c)) + .concat(Source.maybe) // do not complete it from our side + .viaMat(connectionFlow)(Keep.right) + .map(_.utf8String) + .toMat(Sink.fold("")(_ + _))(Keep.both) + .run() + + connectionF.futureValue + system.log.info(s"Client connected to ${address.getHostString}:${address.getPort}") + + result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello") + } + + // #setting-up-ssl-engine + import java.security.KeyStore + import javax.net.ssl.SSLEngine + import javax.net.ssl.TrustManagerFactory + import javax.net.ssl.KeyManagerFactory + import javax.net.ssl.SSLContext + import akka.stream.TLSRole + + // initialize SSLContext once + lazy val sslContext: SSLContext = { + // Don't hardcode your password in actual code + val password = "abcdef".toCharArray + + // trust store and keys in one keystore + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(getClass.getResourceAsStream("/tcp-spec-keystore.p12"), password) + + val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") + trustManagerFactory.init(keyStore) + + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(keyStore, password) + + // init ssl context + val context = SSLContext.getInstance("TLSv1.2") + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) + context + } + + // create new SSLEngine from the SSLContext, which was initialized once + def createSSLEngine(role: TLSRole): SSLEngine = { + val engine = sslContext.createSSLEngine() + + engine.setUseClientMode(role == akka.stream.Client) + engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA")) + engine.setEnabledProtocols(Array("TLSv1.2")) + + engine + } + // #setting-up-ssl-engine + + } + + "TLS client and server convenience methods with deprecated SSLContext setup" should { + + "allow for TLS" in { + test() + } + + @silent("deprecated") + def test(): Unit = { // cert is valid until 2025, so if this tests starts failing after that you need to create a new one val (sslContext, firstSession) = initSslMess() val address = temporaryServerAddress() @@ -867,6 +952,7 @@ class TcpSpec extends StreamSpec(""" result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello") } + @silent("deprecated") def initSslMess() = { // #setting-up-ssl-context import java.security.KeyStore diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 6fe4a9de5b..f31a21243d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -6,16 +6,16 @@ package akka.stream.io import java.security.KeyStore import java.security.SecureRandom +import java.security.cert.CertificateException import java.util.concurrent.TimeoutException import akka.NotUsed -import com.typesafe.sslconfig.akka.AkkaSSLConfig - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random + import akka.pattern.{ after => later } import akka.stream._ import akka.stream.TLSProtocol._ @@ -32,7 +32,10 @@ object TlsSpec { val rnd = new Random - def initWithTrust(trustPath: String) = { + val SSLEnabledAlgorithms: Set[String] = Set("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") + val SSLProtocol: String = "TLSv1.2" + + def initWithTrust(trustPath: String): SSLContext = { val password = "changeme" val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) @@ -47,7 +50,7 @@ object TlsSpec { val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) trustManagerFactory.init(trustStore) - val context = SSLContext.getInstance("TLS") + val context = SSLContext.getInstance(SSLProtocol) context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) context } @@ -99,8 +102,6 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing import GraphDSL.Implicits._ - val sslConfig: Option[AkkaSSLConfig] = None // no special settings to be applied here - "SslTls" must { val sslContext = initSslContext() @@ -113,11 +114,42 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing x } - val cipherSuites = - NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") - def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing) - def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing) - def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing) + def createSSLEngine(context: SSLContext, role: TLSRole): SSLEngine = + createSSLEngine2(context, role, hostnameVerification = false, hostInfo = None) + + def createSSLEngine2( + context: SSLContext, + role: TLSRole, + hostnameVerification: Boolean, + hostInfo: Option[(String, Int)]): SSLEngine = { + + val engine = hostInfo match { + case None => + if (hostnameVerification) + throw new IllegalArgumentException("hostInfo must be defined for hostnameVerification to work.") + context.createSSLEngine() + case Some((hostname, port)) => context.createSSLEngine(hostname, port) + } + + if (hostnameVerification && role == akka.stream.Client) { + val sslParams = sslContext.getDefaultSSLParameters + sslParams.setEndpointIdentificationAlgorithm("HTTPS") + engine.setSSLParameters(sslParams) + } + + engine.setUseClientMode(role == akka.stream.Client) + engine.setEnabledCipherSuites(SSLEnabledAlgorithms.toArray) + engine.setEnabledProtocols(Array(SSLProtocol)) + + engine + } + + def clientTls(closing: TLSClosing) = + TLS(() => createSSLEngine(sslContext, Client), closing) + def badClientTls(closing: TLSClosing) = + TLS(() => createSSLEngine(initWithTrust("/badtruststore"), Client), closing) + def serverTls(closing: TLSClosing) = + TLS(() => createSSLEngine(sslContext, Server), closing) trait Named { def name: String = @@ -507,7 +539,10 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing case SessionTruncated => SendBytes(ByteString.empty) case SessionBytes(_, b) => SendBytes(b) } - val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80))) + val clientTls = TLS( + () => createSSLEngine2(sslContext, Client, hostnameVerification = true, hostInfo = Some((hostName, 80))), + EagerClose) + val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs) Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore) @@ -516,7 +551,13 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing val cause = intercept[Exception] { Await.result(run("unknown.example.org"), 3.seconds) } - cause.getMessage should ===("Hostname verification failed! Expected session to be for unknown.example.org") + + cause.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem + val cause2 = cause.getCause + cause2.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem + val cause3 = cause2.getCause + cause3.getClass should ===(classOf[CertificateException]) + cause3.getMessage should ===("No name matching unknown.example.org found") } } diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes/issue-21753-sslconfig.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes/issue-21753-sslconfig.excludes new file mode 100644 index 0000000000..cc907e24b5 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes/issue-21753-sslconfig.excludes @@ -0,0 +1,12 @@ +# #21753 internal method WithSSLEngine renamed and changed +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$3") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$8") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$8") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala index b92dc116cc..8f3d8d5853 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala @@ -30,10 +30,8 @@ import scala.util.Try * documentation. The philosophy of this integration into Akka Streams is to * expose all knobs and dials to client code and therefore not limit the * configuration possibilities. In particular the client code will have to - * provide the SSLContext from which the SSLEngine is then created. Handshake - * parameters are set using [[NegotiateNewSession]] messages, the settings for - * the initial handshake need to be provided up front using the same class; - * please refer to the method documentation below. + * provide the SSLEngine, which is typically created from a SSLContext. Handshake + * parameters and other parameters are defined when creating the SSLEngine. * * '''IMPORTANT NOTE''' * @@ -66,6 +64,7 @@ object TLS { * * This method uses the default closing behavior or [[IgnoreComplete]]. */ + @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def create( sslContext: SSLContext, sslConfig: Optional[AkkaSSLConfig], @@ -84,6 +83,7 @@ object TLS { * * This method uses the default closing behavior or [[IgnoreComplete]]. */ + @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def create( sslContext: SSLContext, firstSession: NegotiateNewSession, @@ -106,6 +106,7 @@ object TLS { * The SSLEngine may use this information e.g. when an endpoint identification algorithm was * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ + @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def create( sslContext: SSLContext, sslConfig: Optional[AkkaSSLConfig], @@ -138,6 +139,7 @@ object TLS { * The SSLEngine may use this information e.g. when an endpoint identification algorithm was * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ + @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def create( sslContext: SSLContext, firstSession: NegotiateNewSession, @@ -156,7 +158,7 @@ object TLS { /** * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. * - * You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for + * You specify a factory `sslEngineCreator` to create an SSLEngine that must already be configured for * client and server mode and with all the parameters for the first session. * * You can specify a verification function `sessionVerifier` that will be called @@ -174,7 +176,7 @@ object TLS { /** * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. * - * You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for + * You specify a factory `sslEngineCreator` to create an SSLEngine that must already be configured for * client and server mode and with all the parameters for the first session. * * For a description of the `closing` parameter please refer to [[TLSClosing]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index 870fa98cc1..9dab6e4ccd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -6,9 +6,9 @@ package akka.stream.javadsl import java.lang.{ Iterable => JIterable } import java.util.Optional +import java.util.function.{ Function => JFunction } import akka.{ Done, NotUsed } - import scala.concurrent.duration._ import java.net.InetSocketAddress @@ -21,17 +21,24 @@ import akka.stream.scaladsl import akka.util.ByteString import akka.japi.Util.immutableSeq import akka.io.Inet.SocketOption - import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import java.util.function.Supplier + +import scala.util.Failure +import scala.util.Success import akka.actor.ClassicActorSystemProvider import javax.net.ssl.SSLContext import akka.annotation.InternalApi import akka.stream.SystemMaterializer +import akka.stream.TLSClosing import akka.stream.TLSProtocol.NegotiateNewSession +import akka.util.JavaDurationConverters._ import com.github.ghik.silencer.silent +import javax.net.ssl.SSLEngine +import javax.net.ssl.SSLSession object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { @@ -161,13 +168,44 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int, options: JIterable[SocketOption], halfClose: Boolean, - idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = + idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bind(interface, port, backlog, immutableSeq(options), halfClose, idleTimeout) + .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout)) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * + * Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing + * [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future + * completes is the server ready to accept client connections. + * + * @param interface The interface to listen on + * @param port The port to listen on + * @param backlog Controls the size of the connection backlog + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the client to + * write to the connection even after the server has finished writing. The TCP socket is only closed + * after both the client and server finished writing. + * If set to false, the connection will immediately closed once the server closes its write side, + * independently whether the client is still attempting to write. This setting is recommended + * for servers, and therefore it is the default setting. + */ + @deprecated("Use bind that takes a java.time.Duration parameter instead.", "2.6.0") + def bind( + interface: String, + port: Int, + backlog: Int, + options: JIterable[SocketOption], + halfClose: Boolean, + idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = + bind(interface, port, backlog, options, halfClose, durationToJavaOptional(idleTimeout)) + /** * Creates a [[Tcp.ServerBinding]] without specifying options. * It represents a prospective TCP server binding on the given `endpoint`. @@ -208,8 +246,8 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { localAddress: Optional[InetSocketAddress], options: JIterable[SocketOption], halfClose: Boolean, - connectTimeout: Duration, - idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = + connectTimeout: Optional[java.time.Duration], + idleTimeout: Optional[java.time.Duration]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = Flow.fromGraph( delegate .outgoingConnection( @@ -217,10 +255,46 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { localAddress.asScala, immutableSeq(options), halfClose, - connectTimeout, - idleTimeout) + optionalDurationToScala(connectTimeout), + optionalDurationToScala(idleTimeout)) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + /** + * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * + * Note that the ByteString chunk boundaries are not retained across the network, + * to achieve application level chunks you have to introduce explicit framing in your streams, + * for example using the [[Framing]] operators. + * + * @param remoteAddress The remote address to connect to + * @param localAddress Optional local address for the connection + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the server to + * write to the connection even after the client has finished writing. The TCP socket is only closed + * after both the client and server finished writing. This setting is recommended for clients and + * therefore it is the default setting. + * If set to false, the connection will immediately closed once the client closes its write side, + * independently whether the server is still attempting to write. + */ + @deprecated("Use bind that takes a java.time.Duration parameter instead.", "2.6.0") + def outgoingConnection( + remoteAddress: InetSocketAddress, + localAddress: Optional[InetSocketAddress], + options: JIterable[SocketOption], + halfClose: Boolean, + connectTimeout: Duration, + idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = + outgoingConnection( + remoteAddress, + localAddress, + options, + halfClose, + durationToJavaOptional(connectTimeout), + durationToJavaOptional(idleTimeout)) + /** * Creates an [[Tcp.OutgoingConnection]] without specifying options. * It represents a prospective TCP client connection to the given endpoint. @@ -242,6 +316,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * @see [[Tcp.outgoingConnection()]] */ + @deprecated( + "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def outgoingTlsConnection( host: String, port: Int, @@ -261,6 +339,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * Marked API-may-change to leave room for an improvement around the very long parameter list. */ + @deprecated( + "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def outgoingTlsConnection( remoteAddress: InetSocketAddress, sslContext: SSLContext, @@ -281,6 +363,61 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { idleTimeout) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + /** + * Creates an [[Tcp.OutgoingConnection]] with TLS. + * The returned flow represents a TCP client connection to the given endpoint where all bytes in and + * out go through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.outgoingConnection()]] + */ + def outgoingConnectionWithTls( + remoteAddress: InetSocketAddress, + createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = + Flow.fromGraph( + delegate + .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get()) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + + /** + * Creates an [[Tcp.OutgoingConnection]] with TLS. + * The returned flow represents a TCP client connection to the given endpoint where all bytes in and + * out go through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.outgoingConnection()]] + */ + def outgoingConnectionWithTls( + remoteAddress: InetSocketAddress, + createSSLEngine: Supplier[SSLEngine], + localAddress: Optional[InetSocketAddress], + options: JIterable[SocketOption], + connectTimeout: Optional[java.time.Duration], + idleTimeout: Optional[java.time.Duration], + verifySession: JFunction[SSLSession, Optional[Throwable]], + closing: TLSClosing): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = { + Flow.fromGraph( + delegate + .outgoingConnectionWithTls( + remoteAddress, + createSSLEngine = () => createSSLEngine.get(), + localAddress.asScala, + immutableSeq(options), + optionalDurationToScala(connectTimeout), + optionalDurationToScala(idleTimeout), + session => + verifySession.apply(session).asScala match { + case None => Success(()) + case Some(t) => Failure(t) + }, + closing) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + } + /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * where all incoming and outgoing bytes are passed through TLS. @@ -290,6 +427,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * Note: the half close parameter is currently ignored */ + @deprecated( + "Use bindWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def bindTls( interface: String, port: Int, @@ -297,7 +438,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { negotiateNewSession: NegotiateNewSession, backlog: Int, options: JIterable[SocketOption], - @silent // FIXME unused #26689 + @silent // unused #26689 halfClose: Boolean, idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( @@ -312,6 +453,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * @see [[Tcp.bind()]] */ + @deprecated( + "Use bindWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def bindTls( interface: String, port: Int, @@ -323,4 +468,62 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * where all incoming and outgoing bytes are passed through TLS. + * + * @see [[Tcp.bind()]] + */ + def bindWithTls( + interface: String, + port: Int, + createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + Source.fromGraph( + delegate + .bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get()) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } + + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * where all incoming and outgoing bytes are passed through TLS. + * + * @see [[Tcp.bind()]] + */ + def bindWithTls( + interface: String, + port: Int, + createSSLEngine: Supplier[SSLEngine], + backlog: Int, + options: JIterable[SocketOption], + idleTimeout: Optional[java.time.Duration], + verifySession: JFunction[SSLSession, Optional[Throwable]], + closing: TLSClosing): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + Source.fromGraph( + delegate + .bindWithTls( + interface, + port, + createSSLEngine = () => createSSLEngine.get(), + backlog, + immutableSeq(options), + optionalDurationToScala(idleTimeout), + session => + verifySession.apply(session).asScala match { + case None => Success(()) + case Some(t) => Failure(t) + }, + closing) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } + + private def optionalDurationToScala(duration: Optional[java.time.Duration]) = { + if (duration.isPresent) duration.get.asScala else Duration.Inf + } + + private def durationToJavaOptional(duration: Duration): Optional[java.time.Duration] = { + if (duration.isFinite) Optional.ofNullable(duration.asJava) else Optional.empty() + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala index 4b7cf295c4..74b16b31f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala @@ -5,8 +5,8 @@ package akka.stream.scaladsl import java.util.Collections -import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession } +import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession } import akka.stream.impl.io.{ TlsModule, TlsUtils } import akka.NotUsed import akka.actor.ActorSystem @@ -14,9 +14,10 @@ import akka.stream._ import akka.stream.TLSProtocol._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig - import scala.util.{ Failure, Success, Try } +import javax.net.ssl.SSLParameters + /** * Stream cipher support based upon JSSE. * @@ -30,10 +31,8 @@ import scala.util.{ Failure, Success, Try } * documentation. The philosophy of this integration into Akka Streams is to * expose all knobs and dials to client code and therefore not limit the * configuration possibilities. In particular the client code will have to - * provide the SSLContext from which the SSLEngine is then created. Handshake - * parameters are set using [[NegotiateNewSession]] messages, the settings for - * the initial handshake need to be provided up front using the same class; - * please refer to the method documentation below. + * provide the SSLEngine, which is typically created from a SSLContext. Handshake + * parameters and other parameters are defined when creating the SSLEngine. * * '''IMPORTANT NOTE''' * @@ -71,6 +70,7 @@ object TLS { * The SSLEngine may use this information e.g. when an endpoint identification algorithm was * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ + @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def apply( sslContext: SSLContext, sslConfig: Option[AkkaSSLConfig], @@ -94,7 +94,7 @@ object TLS { config.sslEngineConfigurator.configure(engine, sslContext) engine.setUseClientMode(role == Client) - val finalSessionParameters = + val paramsWithSni = if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) { val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get) // In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using @@ -102,21 +102,29 @@ object TLS { // In Java 8, SNI is only enabled if the server names are added to the parameters. // See https://github.com/akka/akka/issues/19287. newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1))) + firstSession.copy(sslParameters = Some(newParams)) } else firstSession - TlsUtils.applySessionParameters(engine, finalSessionParameters) + val paramsWithHostnameVerification = if (hostInfo.isDefined && config.useJvmHostnameVerification) { + val newParams = paramsWithSni.sslParameters.map(TlsUtils.cloneParameters).getOrElse(new SSLParameters) + newParams.setEndpointIdentificationAlgorithm("HTTPS") + paramsWithSni.copy(sslParameters = Some(newParams)) + } else + paramsWithSni + + TlsUtils.applySessionParameters(engine, paramsWithHostnameVerification) engine } def verifySession: (ActorSystem, SSLSession) => Try[Unit] = hostInfo match { case Some((hostname, _)) => { (system, session) => - val hostnameVerifier = theSslConfig(system).hostnameVerifier - if (!hostnameVerifier.verify(hostname, session)) - Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname")) - else + val config = theSslConfig(system) + if (config.useJvmHostnameVerification || config.hostnameVerifier.verify(hostname, session)) Success(()) + else + Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname")) } case None => (_, _) => Success(()) } @@ -140,6 +148,7 @@ object TLS { * The SSLEngine may use this information e.g. when an endpoint identification algorithm was * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ + @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def apply( sslContext: SSLContext, firstSession: NegotiateNewSession, @@ -158,6 +167,7 @@ object TLS { * that is not a requirement and depends entirely on the application * protocol. */ + @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0") def apply( sslContext: SSLContext, firstSession: NegotiateNewSession, @@ -165,9 +175,9 @@ object TLS { apply(sslContext, None, firstSession, role, IgnoreComplete, None) /** - * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. + * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. * - * You can specify a constructor to create an SSLEngine that must already be configured for + * You specify a factory to create an SSLEngine that must already be configured for * client and server mode and with all the parameters for the first session. * * You can specify a verification function that will be called after every successful handshake @@ -183,9 +193,9 @@ object TLS { TlsModule(Attributes.none, _ => createSSLEngine(), (_, session) => verifySession(session), closing)) /** - * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. + * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. * - * You can specify a constructor to create an SSLEngine that must already be configured for + * You specify a factory to create an SSLEngine that must already be configured for * client and server mode and with all the parameters for the first session. * * For a description of the `closing` parameter please refer to [[TLSClosing]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index b4620b63c4..dd96465cb1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -28,12 +28,12 @@ import com.github.ghik.silencer.silent import javax.net.ssl.SSLContext import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import scala.util.Success import scala.util.Try import scala.util.control.NoStackTrace @@ -92,13 +92,18 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { case sb: TLSProtocol.SessionBytes => sb.bytes // ignore other kinds of inbounds (currently only Truncated) }) + + /** + * INTERNAL API + */ + @InternalApi private[akka] val defaultBacklog = 100 } final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import Tcp._ // TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead? - val bindShutdownTimeout = + val bindShutdownTimeout: FiniteDuration = system.settings.config.getDuration("akka.stream.materializer.subscription-timeout.timeout").asScala /** @@ -125,7 +130,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { def bind( interface: String, port: Int, - backlog: Int = 100, + backlog: Int = defaultBacklog, @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = false, @@ -167,7 +172,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { handler: Flow[ByteString, ByteString, _], interface: String, port: Int, - backlog: Int = 100, + backlog: Int = defaultBacklog, @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = false, @@ -249,6 +254,10 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * @see [[Tcp.outgoingConnection()]] */ + @deprecated( + "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def outgoingTlsConnection( host: String, port: Int, @@ -267,6 +276,10 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * Marked API-may-change to leave room for an improvement around the very long parameter list. */ + @deprecated( + "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def outgoingTlsConnection( remoteAddress: InetSocketAddress, sslContext: SSLContext, @@ -278,23 +291,53 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) + @silent("deprecated") val tls = TLS(sslContext, negotiateNewSession, TLSRole.client) connection.join(tlsWrapping.atop(tls).reversed) } /** - * INTERNAL API: for raw SSLEngine + * Creates an [[Tcp.OutgoingConnection]] with TLS. + * The returned flow represents a TCP client connection to the given endpoint where all bytes in and + * out go through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.outgoingConnection()]] */ - @InternalApi private[akka] def outgoingTlsConnectionWithSSLEngine( + def outgoingConnectionWithTls( + remoteAddress: InetSocketAddress, + createSSLEngine: () => SSLEngine): Flow[ByteString, ByteString, Future[OutgoingConnection]] = + outgoingConnectionWithTls( + remoteAddress, + createSSLEngine, + localAddress = None, + options = Nil, + connectTimeout = Duration.Inf, + idleTimeout = Duration.Inf, + verifySession = _ => Success(()), + closing = IgnoreComplete) + + /** + * Creates an [[Tcp.OutgoingConnection]] with TLS. + * The returned flow represents a TCP client connection to the given endpoint where all bytes in and + * out go through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.outgoingConnection()]] + */ + def outgoingConnectionWithTls( remoteAddress: InetSocketAddress, createSSLEngine: () => SSLEngine, - localAddress: Option[InetSocketAddress] = None, - @silent // Traversable deprecated in 2.13 - options: immutable.Traversable[SocketOption] = Nil, - connectTimeout: Duration = Duration.Inf, - idleTimeout: Duration = Duration.Inf, + localAddress: Option[InetSocketAddress], + options: immutable.Seq[SocketOption], + connectTimeout: Duration, + idleTimeout: Duration, verifySession: SSLSession => Try[Unit], - closing: TLSClosing = IgnoreComplete): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { + closing: TLSClosing): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) val tls = TLS(createSSLEngine, verifySession, closing) @@ -311,15 +354,20 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * Marked API-may-change to leave room for an improvement around the very long parameter list. */ + @deprecated( + "Use bindWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def bindTls( interface: String, port: Int, sslContext: SSLContext, negotiateNewSession: NegotiateNewSession, - backlog: Int = 100, + backlog: Int = defaultBacklog, @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { + @silent("deprecated") val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection => @@ -328,26 +376,107 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * INTERNAL API + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * where all incoming and outgoing bytes are passed through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.bind]] */ - @InternalApi private[akka] def bindTlsWithSSLEngine( + def bindWithTls( + interface: String, + port: Int, + createSSLEngine: () => SSLEngine): Source[IncomingConnection, Future[ServerBinding]] = + bindWithTls( + interface, + port, + createSSLEngine, + backlog = defaultBacklog, + options = Nil, + idleTimeout = Duration.Inf, + verifySession = _ => Success(()), + closing = IgnoreComplete) + + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * where all incoming and outgoing bytes are passed through TLS. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client mode and with all the parameters for the first session. + * + * @see [[Tcp.bind]] + */ + def bindWithTls( interface: String, port: Int, createSSLEngine: () => SSLEngine, - backlog: Int = 100, - @silent // Traversable deprecated in 2.13 - options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf, + backlog: Int, + options: immutable.Seq[SocketOption], + idleTimeout: Duration, verifySession: SSLSession => Try[Unit], - closing: TLSClosing = IgnoreComplete): Source[IncomingConnection, Future[ServerBinding]] = { + closing: TLSClosing): Source[IncomingConnection, Future[ServerBinding]] = { val tls = tlsWrapping.atop(TLS(createSSLEngine, verifySession, closing)).reversed - bind(interface, port, backlog, options, true, idleTimeout).map { incomingConnection => + bind(interface, port, backlog, options, halfClose = true, idleTimeout).map { incomingConnection => incomingConnection.copy(flow = incomingConnection.flow.join(tls)) } } + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * all incoming and outgoing bytes are passed through TLS and handling the incoming connections using the + * provided Flow. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client server and with all the parameters for the first session. + * + * @see [[Tcp.bindAndHandle]] + */ + def bindAndHandleWithTls( + handler: Flow[ByteString, ByteString, _], + interface: String, + port: Int, + createSSLEngine: () => SSLEngine)(implicit m: Materializer): Future[ServerBinding] = + bindAndHandleWithTls( + handler, + interface, + port, + createSSLEngine, + backlog = defaultBacklog, + options = Nil, + idleTimeout = Duration.Inf, + verifySession = _ => Success(()), + closing = IgnoreComplete) + + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * all incoming and outgoing bytes are passed through TLS and handling the incoming connections using the + * provided Flow. + * + * You specify a factory to create an SSLEngine that must already be configured for + * client server and with all the parameters for the first session. + * + * @see [[Tcp.bindAndHandle]] + */ + def bindAndHandleWithTls( + handler: Flow[ByteString, ByteString, _], + interface: String, + port: Int, + createSSLEngine: () => SSLEngine, + backlog: Int, + options: immutable.Seq[SocketOption], + idleTimeout: Duration, + verifySession: SSLSession => Try[Unit], + closing: TLSClosing)(implicit m: Materializer): Future[ServerBinding] = { + bindWithTls(interface, port, createSSLEngine, backlog, options, idleTimeout, verifySession, closing) + .to(Sink.foreach { conn: IncomingConnection => + conn.handleWith(handler) + }) + .run() + } + /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * handling the incoming connections through TLS and then run using the provided Flow. @@ -358,13 +487,17 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * Marked API-may-change to leave room for an improvement around the very long parameter list. */ + @deprecated( + "Use bindAndHandleWithTls that takes a SSLEngine factory instead. " + + "Setup the SSLEngine with needed parameters.", + "2.6.0") def bindAndHandleTls( handler: Flow[ByteString, ByteString, _], interface: String, port: Int, sslContext: SSLContext, negotiateNewSession: NegotiateNewSession, - backlog: Int = 100, + backlog: Int = defaultBacklog, @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = { diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala index 7cd7e1aa35..a332a8757d 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala @@ -7,15 +7,16 @@ package com.typesafe.sslconfig.akka import java.security.KeyStore import java.security.cert.CertPathValidatorException import java.util.Collections -import javax.net.ssl._ +import javax.net.ssl._ import akka.actor._ +import akka.annotation.InternalApi import akka.event.Logging import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory import com.typesafe.sslconfig.ssl._ import com.typesafe.sslconfig.util.LoggerFactory -// TODO: remove again in 2.5.x, see https://github.com/akka/akka/issues/21753 +@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider { //////////////////// EXTENSION SETUP /////////////////// @@ -36,6 +37,7 @@ object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider } +@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSettings) extends Extension { private val mkLogger = new AkkaLoggerFactory(system) @@ -68,6 +70,15 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett val hostnameVerifier = buildHostnameVerifier(config) + /** + * INTERNAL API + */ + @InternalApi def useJvmHostnameVerification: Boolean = + hostnameVerifier match { + case _: DefaultHostnameVerifier | _: NoopHostnameVerifier => true + case _ => false + } + val sslEngineConfigurator = { val sslContext = if (config.default) { log.info("ssl-config.default is true, using the JDK's default SSLContext") diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala index 5348d1579b..69490342f2 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala @@ -12,10 +12,12 @@ import com.typesafe.sslconfig.ssl.SSLConfigSettings * Gives the chance to configure the SSLContext before it is going to be used. * The passed in context will be already set in client mode and provided with hostInfo during initialization. */ +@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") trait SSLEngineConfigurator { def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine } +@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") final class DefaultSSLEngineConfigurator( config: SSLConfigSettings, enabledProtocols: Array[String], diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 238bed10ba..9b2a85bc96 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -31,7 +31,7 @@ object Dependencies { val reactiveStreamsVersion = "1.0.3" - val sslConfigVersion = "0.3.8" + val sslConfigVersion = "0.4.0" val Versions = Seq( crossScalaVersions := Seq(scala212Version, scala213Version),