Streams TLS and Tcp with SSLEngine, #21753 (#27766)

* SSLEngine factory instead of SSLContext and AkkaSSLConfig parameters
  in TLS and Tcp
* Update TlsSpec to use SSLEngine
* Keep copy of old TlsSpec for test coverage of deprecated methods
* Update doc example of how to setup a SSLEngine
* full API and deprecations
* don't use default param values
* java doc example
* migration guide
* mima
* update to sslconfig 0.4.0
  * hostname verification changed in sslconfig, so use jvm verifier
* change to mima file
* update doc sample, init SSLContext once
* remove FIXME for halfClosed
This commit is contained in:
Patrik Nordwall 2019-10-08 12:30:41 +02:00 committed by GitHub
parent ae70a833fe
commit e8a1556060
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1205 additions and 126 deletions

View file

@ -145,6 +145,15 @@ Akka is now using Protobuf version 3.9.0 for serialization of messages defined b
Cluster client has been deprecated as of 2.6 in favor of [Akka gRPC](https://doc.akka.io/docs/akka-grpc/current/index.html). Cluster client has been deprecated as of 2.6 in favor of [Akka gRPC](https://doc.akka.io/docs/akka-grpc/current/index.html).
It is not advised to build new applications with Cluster client, and existing users @ref[should migrate to Akka gRPC](../cluster-client.md#migration-to-akka-grpc). It is not advised to build new applications with Cluster client, and existing users @ref[should migrate to Akka gRPC](../cluster-client.md#migration-to-akka-grpc).
### AkkaSslConfig
`AkkaSslConfig` has been deprecated in favor of setting up TLS with `javax.net.ssl.SSLEngine` directly.
This also means that methods Akka Streams `TLS` and `Tcp` that take `SSLContext` or `AkkaSslConfig` have been
deprecated and replaced with corresponding methods that takes a factory function for creating the `SSLEngine`.
See documentation of @ref:[streaming IO with TLS](../stream/stream-io.md#tls).
### akka.Main ### akka.Main
`akka.Main` is deprecated in favour of starting the `ActorSystem` from a custom main class instead. `akka.Main` was not `akka.Main` is deprecated in favour of starting the `ActorSystem` from a custom main class instead. `akka.Main` was not

View file

@ -145,18 +145,20 @@ Java
### TLS ### TLS
Similar factories as shown above for raw TCP but where the data is encrypted using TLS are available from `Tcp` through `outgoingTlsConnection`, `bindTls` and `bindAndHandleTls`, see the @scala[@scaladoc[`Tcp Scaladoc`](akka.stream.scaladsl.Tcp)]@java[@javadoc[`Tcp Javadoc`](akka.stream.javadsl.Tcp)] for details. Similar factories as shown above for raw TCP but where the data is encrypted using TLS are available from `Tcp`
through `outgoingConnectionWithTls`, `bindWithTls` and `bindAndHandleWithTls`,
see the @scala[@scaladoc[`Tcp Scaladoc`](akka.stream.scaladsl.Tcp)]@java[@javadoc[`Tcp Javadoc`](akka.stream.javadsl.Tcp)] for details.
Using TLS requires a keystore and a truststore and then a somewhat involved dance of configuring the SSLContext and the details for how the session should be negotiated: Using TLS requires a keystore and a truststore and then a somewhat involved dance of configuring the SSLEngine and the details for how the session should be negotiated:
Scala Scala
: @@snip [TcpSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala) { #setting-up-ssl-context } : @@snip [TcpSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala) { #setting-up-ssl-engine }
Java Java
: @@snip [TcpTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java) { #setting-up-ssl-context } : @@snip [TcpTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java) { #setting-up-ssl-engine }
The `SslContext` and `NegotiateFirstSession` instances can then be used with the binding or outgoing connection factory methods. The `SSLEngine` instance can then be used with the binding or outgoing connection factory methods.
## Streaming File IO ## Streaming File IO

View file

@ -13,6 +13,7 @@ import scala.concurrent.Await
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import scala.util.Try import scala.util.Try
@ -30,6 +31,7 @@ import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.compress._ import akka.remote.artery.compress._
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.Attributes.LogLevels import akka.stream.Attributes.LogLevels
import akka.stream.IgnoreComplete
import akka.stream.KillSwitches import akka.stream.KillSwitches
import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.SharedKillSwitch import akka.stream.SharedKillSwitch
@ -124,11 +126,15 @@ private[remote] class ArteryTcpTransport(
def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] =
if (tlsEnabled) { if (tlsEnabled) {
val sslProvider = sslEngineProvider.get val sslProvider = sslEngineProvider.get
Tcp().outgoingTlsConnectionWithSSLEngine( Tcp().outgoingConnectionWithTls(
remoteAddress, remoteAddress,
createSSLEngine = () => sslProvider.createClientSSLEngine(host, port), createSSLEngine = () => sslProvider.createClientSSLEngine(host, port),
localAddress = None,
options = Nil,
connectTimeout = settings.Advanced.Tcp.ConnectionTimeout, connectTimeout = settings.Advanced.Tcp.ConnectionTimeout,
verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session))) idleTimeout = Duration.Inf,
verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session)),
closing = IgnoreComplete)
} else { } else {
Tcp().outgoingConnection( Tcp().outgoingConnection(
remoteAddress, remoteAddress,
@ -213,11 +219,15 @@ private[remote] class ArteryTcpTransport(
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
if (tlsEnabled) { if (tlsEnabled) {
val sslProvider = sslEngineProvider.get val sslProvider = sslEngineProvider.get
Tcp().bindTlsWithSSLEngine( Tcp().bindWithTls(
interface = bindHost, interface = bindHost,
port = bindPort, port = bindPort,
createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort), createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort),
verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session))) backlog = Tcp.defaultBacklog,
options = Nil,
idleTimeout = Duration.Inf,
verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session)),
closing = IgnoreComplete)
} else { } else {
Tcp().bind(interface = bindHost, port = bindPort, halfClose = false) Tcp().bind(interface = bindHost, port = bindPort, halfClose = false)
} }

View file

@ -5,12 +5,12 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.Done; import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.function.Function2; import akka.japi.function.Function2;
import akka.japi.function.Procedure; import akka.japi.function.Procedure;
import akka.stream.BindFailedException; import akka.stream.BindFailedException;
import akka.stream.StreamTcpException; import akka.stream.StreamTcpException;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.Tcp.IncomingConnection; import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding; import akka.stream.javadsl.Tcp.ServerBinding;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
@ -23,8 +23,14 @@ import static akka.util.ByteString.emptyByteString;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -34,15 +40,17 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
// #setting-up-ssl-context // #setting-up-ssl-engine
// imports // imports
import akka.stream.TLSClientAuth;
import akka.stream.TLSProtocol;
import com.typesafe.sslconfig.akka.AkkaSSLConfig;
import java.security.KeyStore; import java.security.KeyStore;
import javax.net.ssl.*;
import java.security.SecureRandom; import java.security.SecureRandom;
// #setting-up-ssl-context import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import akka.stream.TLSRole;
// #setting-up-ssl-engine
public class TcpTest extends StreamTest { public class TcpTest extends StreamTest {
public TcpTest() { public TcpTest() {
@ -165,15 +173,12 @@ public class TcpTest extends StreamTest {
} }
// compile only sample // compile only sample
public void constructSslContext() throws Exception { // #setting-up-ssl-engine
ActorSystem system = null; // initialize SSLContext once
private final SSLContext sslContext;
// #setting-up-ssl-context
// -- setup logic ---
AkkaSSLConfig sslConfig = AkkaSSLConfig.get(system);
{
try {
// Don't hardcode your password in actual code // Don't hardcode your password in actual code
char[] password = "abcdef".toCharArray(); char[] password = "abcdef".toCharArray();
@ -181,34 +186,41 @@ public class TcpTest extends StreamTest {
KeyStore keyStore = KeyStore.getInstance("PKCS12"); KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(getClass().getResourceAsStream("/tcp-spec-keystore.p12"), password); keyStore.load(getClass().getResourceAsStream("/tcp-spec-keystore.p12"), password);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
tmf.init(keyStore); trustManagerFactory.init(keyStore);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(keyStore, password); keyManagerFactory.init(keyStore, password);
// initial ssl context // init ssl context
SSLContext sslContext = SSLContext.getInstance("TLS"); SSLContext context = SSLContext.getInstance("TLSv1.2");
sslContext.init(keyManagerFactory.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); context.init(
keyManagerFactory.getKeyManagers(),
trustManagerFactory.getTrustManagers(),
new SecureRandom());
// protocols sslContext = context;
SSLParameters defaultParams = sslContext.getDefaultSSLParameters();
String[] defaultProtocols = defaultParams.getProtocols();
String[] protocols = sslConfig.configureProtocols(defaultProtocols, sslConfig.config());
defaultParams.setProtocols(protocols);
// ciphers } catch (KeyStoreException
String[] defaultCiphers = defaultParams.getCipherSuites(); | IOException
String[] cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, sslConfig.config()); | NoSuchAlgorithmException
defaultParams.setCipherSuites(cipherSuites); | CertificateException
| UnrecoverableKeyException
TLSProtocol.NegotiateNewSession negotiateNewSession = | KeyManagementException e) {
TLSProtocol.negotiateNewSession() throw new RuntimeException(e);
.withCipherSuites(cipherSuites)
.withProtocols(protocols)
.withParameters(defaultParams)
.withClientAuth(TLSClientAuth.none());
// #setting-up-ssl-context
} }
}
// create new SSLEngine from the SSLContext, which was initialized once
public SSLEngine createSSLEngine(TLSRole role) {
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(role.equals(akka.stream.TLSRole.client()));
engine.setEnabledCipherSuites(new String[] {"TLS_RSA_WITH_AES_128_CBC_SHA"});
engine.setEnabledProtocols(new String[] {"TLSv1.2"});
return engine;
}
// #setting-up-ssl-engine
} }

View file

@ -0,0 +1,546 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.io
import java.security.KeyStore
import java.security.SecureRandom
import java.security.cert.CertificateException
import java.util.concurrent.TimeoutException
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
import akka.NotUsed
import akka.pattern.{ after => later }
import akka.stream.TLSProtocol._
import akka.stream._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.WithLogCapturing
import akka.util.ByteString
import akka.util.JavaVersion
import com.github.ghik.silencer.silent
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import javax.net.ssl._
object DeprecatedTlsSpec {
val rnd = new Random
def initWithTrust(trustPath: String) = {
val password = "changeme"
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(getClass.getResourceAsStream(trustPath), password.toCharArray)
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
keyManagerFactory.init(keyStore, password.toCharArray)
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("TLS")
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
context
}
def initSslContext(): SSLContext = initWithTrust("/truststore")
/**
* This is an operator that fires a TimeoutException failure 2 seconds after it was started,
* independent of the traffic going through. The purpose is to include the last seen
* element in the exception message to help in figuring out what went wrong.
*/
class Timeout(duration: FiniteDuration) extends GraphStage[FlowShape[ByteString, ByteString]] {
private val in = Inlet[ByteString]("in")
private val out = Outlet[ByteString]("out")
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new TimerGraphStageLogic(shape) {
override def preStart(): Unit = scheduleOnce((), duration)
var last: ByteString = _
setHandler(in, new InHandler {
override def onPush(): Unit = {
last = grab(in)
push(out, last)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
override def onTimer(x: Any): Unit = {
failStage(new TimeoutException(s"timeout expired, last element was $last"))
}
}
}
val configOverrides =
"""
akka.loglevel = DEBUG # issue 21660
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.debug.receive=off
"""
}
@silent("deprecated")
class DeprecatedTlsSpec extends StreamSpec(DeprecatedTlsSpec.configOverrides) with WithLogCapturing {
import GraphDSL.Implicits._
import DeprecatedTlsSpec._
import system.dispatcher
val sslConfig: Option[AkkaSSLConfig] = None // no special settings to be applied here
"SslTls with deprecated SSLContext setup" must {
val sslContext = initSslContext()
val debug = Flow[SslTlsInbound].map { x =>
x match {
case SessionTruncated => system.log.debug(s" ----------- truncated ")
case SessionBytes(_, b) => system.log.debug(s" ----------- (${b.size}) ${b.take(32).utf8String}")
}
x
}
val cipherSuites =
NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA")
def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing)
def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing)
def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing)
trait Named {
def name: String =
getClass.getName.reverse.dropWhile(c => "$0123456789".indexOf(c) != -1).takeWhile(_ != '$').reverse
}
trait CommunicationSetup extends Named {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed]
def cleanup(): Unit = ()
}
object ClientInitiates extends CommunicationSetup {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
clientTls(leftClosing).atop(serverTls(rightClosing).reversed).join(rhs)
}
object ServerInitiates extends CommunicationSetup {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
serverTls(leftClosing).atop(clientTls(rightClosing).reversed).join(rhs)
}
def server(flow: Flow[ByteString, ByteString, Any]) = {
val server = Tcp().bind("localhost", 0).to(Sink.foreach(c => c.flow.join(flow).run())).run()
Await.result(server, 2.seconds)
}
object ClientInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(serverTls(rightClosing).reversed.join(rhs))
clientTls(leftClosing).join(Tcp().outgoingConnection(binding.localAddress))
}
override def cleanup(): Unit = binding.unbind()
}
object ServerInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(clientTls(rightClosing).reversed.join(rhs))
serverTls(leftClosing).join(Tcp().outgoingConnection(binding.localAddress))
}
override def cleanup(): Unit = binding.unbind()
}
val communicationPatterns =
Seq(ClientInitiates, ServerInitiates, ClientInitiatesViaTcp, ServerInitiatesViaTcp)
trait PayloadScenario extends Named {
def flow: Flow[SslTlsInbound, SslTlsOutbound, Any] =
Flow[SslTlsInbound].map {
var session: SSLSession = null
def setSession(s: SSLSession) = {
session = s
system.log.debug(s"new session: $session (${session.getId.mkString(",")})")
}
{
case SessionTruncated => SendBytes(ByteString("TRUNCATED"))
case SessionBytes(s, b) if session == null =>
setSession(s)
SendBytes(b)
case SessionBytes(s, b) if s != session =>
setSession(s)
SendBytes(ByteString("NEWSESSION") ++ b)
case SessionBytes(_, b) => SendBytes(b)
}
}
def leftClosing: TLSClosing = IgnoreComplete
def rightClosing: TLSClosing = IgnoreComplete
def inputs: immutable.Seq[SslTlsOutbound]
def output: ByteString
protected def send(str: String) = SendBytes(ByteString(str))
protected def send(ch: Char) = SendBytes(ByteString(ch.toByte))
}
object SingleBytes extends PayloadScenario {
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object MediumMessages extends PayloadScenario {
val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 1000))
def inputs = strs.map(s => SendBytes(ByteString(s)))
def output = ByteString(strs.foldRight("")(_ ++ _))
}
object LargeMessages extends PayloadScenario {
// TLS max packet size is 16384 bytes
val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 17000))
def inputs = strs.map(s => SendBytes(ByteString(s)))
def output = ByteString(strs.foldRight("")(_ ++ _))
}
object EmptyBytesFirst extends PayloadScenario {
def inputs = List(ByteString.empty, ByteString("hello")).map(SendBytes)
def output = ByteString("hello")
}
object EmptyBytesInTheMiddle extends PayloadScenario {
def inputs = List(ByteString("hello"), ByteString.empty, ByteString(" world")).map(SendBytes)
def output = ByteString("hello world")
}
object EmptyBytesLast extends PayloadScenario {
def inputs = List(ByteString("hello"), ByteString.empty).map(SendBytes)
def output = ByteString("hello")
}
object CompletedImmediately extends PayloadScenario {
override def inputs: immutable.Seq[SslTlsOutbound] = Nil
override def output = ByteString.empty
override def leftClosing: TLSClosing = EagerClose
override def rightClosing: TLSClosing = EagerClose
}
// this demonstrates that cancellation is ignored so that the five results make it back
object CancellingRHS extends PayloadScenario {
override def flow =
Flow[SslTlsInbound]
.mapConcat {
case SessionTruncated => SessionTruncated :: Nil
case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b)))
}
.take(5)
.mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow)
override def rightClosing = IgnoreCancel
val str = "abcdef" * 100
def inputs = str.map(send)
def output = ByteString(str.take(5))
}
object CancellingRHSIgnoresBoth extends PayloadScenario {
override def flow =
Flow[SslTlsInbound]
.mapConcat {
case SessionTruncated => SessionTruncated :: Nil
case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b)))
}
.take(5)
.mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow)
override def rightClosing = IgnoreBoth
val str = "abcdef" * 100
def inputs = str.map(send)
def output = ByteString(str.take(5))
}
object LHSIgnoresBoth extends PayloadScenario {
override def leftClosing = IgnoreBoth
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object BothSidesIgnoreBoth extends PayloadScenario {
override def leftClosing = IgnoreBoth
override def rightClosing = IgnoreBoth
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object SessionRenegotiationBySender extends PayloadScenario {
def inputs = List(send("hello"), NegotiateNewSession, send("world"))
def output = ByteString("helloNEWSESSIONworld")
}
// difference is that the RHS engine will now receive the handshake while trying to send
object SessionRenegotiationByReceiver extends PayloadScenario {
val str = "abcdef" * 100
def inputs = str.map(send) ++ Seq(NegotiateNewSession) ++ "hello world".map(send)
def output = ByteString(str + "NEWSESSIONhello world")
}
val logCipherSuite = Flow[SslTlsInbound].map {
var session: SSLSession = null
def setSession(s: SSLSession) = {
session = s
system.log.debug(s"new session: $session (${session.getId.mkString(",")})")
}
{
case SessionTruncated => SendBytes(ByteString("TRUNCATED"))
case SessionBytes(s, b) if s != session =>
setSession(s)
SendBytes(ByteString(s.getCipherSuite) ++ b)
case SessionBytes(_, b) => SendBytes(b)
}
}
object SessionRenegotiationFirstOne extends PayloadScenario {
override def flow = logCipherSuite
def inputs = NegotiateNewSession.withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil
def output = ByteString("TLS_RSA_WITH_AES_128_CBC_SHAhello")
}
object SessionRenegotiationFirstTwo extends PayloadScenario {
override def flow = logCipherSuite
def inputs = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil
def output = ByteString("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHAhello")
}
val scenarios =
Seq(
SingleBytes,
MediumMessages,
LargeMessages,
EmptyBytesFirst,
EmptyBytesInTheMiddle,
EmptyBytesLast,
CompletedImmediately,
CancellingRHS,
CancellingRHSIgnoresBoth,
LHSIgnoresBoth,
BothSidesIgnoreBoth,
SessionRenegotiationBySender,
SessionRenegotiationByReceiver,
SessionRenegotiationFirstOne,
SessionRenegotiationFirstTwo)
for {
commPattern <- communicationPatterns
scenario <- scenarios
} {
s"work in mode ${commPattern.name} while sending ${scenario.name}" in assertAllStagesStopped {
val onRHS = debug.via(scenario.flow)
val output =
Source(scenario.inputs)
.via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS))
.via(new SimpleLinearGraphStage[SslTlsInbound] {
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush() = push(out, grab(in))
override def onPull() = pull(in)
override def onDownstreamFinish(cause: Throwable) = {
system.log.debug(s"me cancelled, cause {}", cause)
completeStage()
}
}
})
.via(debug)
.collect { case SessionBytes(_, b) => b }
.scan(ByteString.empty)(_ ++ _)
.filter(_.nonEmpty)
.via(new Timeout(6.seconds))
.dropWhile(_.size < scenario.output.size)
.runWith(Sink.headOption)
Await.result(output, 8.seconds).getOrElse(ByteString.empty).utf8String should be(scenario.output.utf8String)
commPattern.cleanup()
}
}
"emit an error if the TLS handshake fails certificate checks" in assertAllStagesStopped {
val getError = Flow[SslTlsInbound]
.map[Either[SslTlsInbound, SSLException]](i => Left(i))
.recover { case e: SSLException => Right(e) }
.collect { case Right(e) => e }
.toMat(Sink.head)(Keep.right)
val simple = Flow.fromSinkAndSourceMat(getError, Source.maybe[SslTlsOutbound])(Keep.left)
// The creation of actual TCP connections is necessary. It is the easiest way to decouple the client and server
// under error conditions, and has the bonus of matching most actual SSL deployments.
val (server, serverErr) = Tcp()
.bind("localhost", 0)
.mapAsync(1)(c => c.flow.joinMat(serverTls(IgnoreBoth).reversed.joinMat(simple)(Keep.right))(Keep.right).run())
.toMat(Sink.head)(Keep.both)
.run()
val clientErr = simple
.join(badClientTls(IgnoreBoth))
.join(Tcp().outgoingConnection(Await.result(server, 1.second).localAddress))
.run()
Await.result(serverErr, 1.second).getMessage should include("certificate_unknown")
val clientErrText = Await.result(clientErr, 1.second).getMessage
if (JavaVersion.majorVersion >= 11)
clientErrText should include("unable to find valid certification path to requested target")
else
clientErrText should equal("General SSLEngine problem")
}
"reliably cancel subscriptions when TransportIn fails early" in assertAllStagesStopped {
val ex = new Exception("hello")
val (sub, out1, out2) =
RunnableGraph
.fromGraph(
GraphDSL.create(Source.asSubscriber[SslTlsOutbound], Sink.head[ByteString], Sink.head[SslTlsInbound])(
(_, _, _)) { implicit b => (s, o1, o2) =>
val tls = b.add(clientTls(EagerClose))
s ~> tls.in1; tls.out1 ~> o1
o2 <~ tls.out2; tls.in2 <~ Source.failed(ex)
ClosedShape
})
.run()
the[Exception] thrownBy Await.result(out1, 1.second) should be(ex)
the[Exception] thrownBy Await.result(out2, 1.second) should be(ex)
Thread.sleep(500)
val pub = TestPublisher.probe()
pub.subscribe(sub)
pub.expectSubscription().expectCancellation()
}
"reliably cancel subscriptions when UserIn fails early" in assertAllStagesStopped {
val ex = new Exception("hello")
val (sub, out1, out2) =
RunnableGraph
.fromGraph(GraphDSL.create(Source.asSubscriber[ByteString], Sink.head[ByteString], Sink.head[SslTlsInbound])(
(_, _, _)) { implicit b => (s, o1, o2) =>
val tls = b.add(clientTls(EagerClose))
Source.failed[SslTlsOutbound](ex) ~> tls.in1; tls.out1 ~> o1
o2 <~ tls.out2; tls.in2 <~ s
ClosedShape
})
.run()
the[Exception] thrownBy Await.result(out1, 1.second) should be(ex)
the[Exception] thrownBy Await.result(out2, 1.second) should be(ex)
Thread.sleep(500)
val pub = TestPublisher.probe()
pub.subscribe(sub)
pub.expectSubscription().expectCancellation()
}
"complete if TLS connection is truncated" in assertAllStagesStopped {
val ks = KillSwitches.shared("ks")
val scenario = SingleBytes
val outFlow = {
val terminator = BidiFlow.fromFlows(Flow[ByteString], ks.flow[ByteString])
clientTls(scenario.leftClosing)
.atop(terminator)
.atop(serverTls(scenario.rightClosing).reversed)
.join(debug.via(scenario.flow))
.via(debug)
}
val inFlow = Flow[SslTlsInbound]
.collect { case SessionBytes(_, b) => b }
.scan(ByteString.empty)(_ ++ _)
.via(new Timeout(6.seconds))
.dropWhile(_.size < scenario.output.size)
val f =
Source(scenario.inputs)
.via(outFlow)
.via(inFlow)
.map(result => {
ks.shutdown(); result
})
.runWith(Sink.last)
Await.result(f, 8.second).utf8String should be(scenario.output.utf8String)
}
"verify hostname" in assertAllStagesStopped {
def run(hostName: String): Future[akka.Done] = {
val rhs = Flow[SslTlsInbound].map {
case SessionTruncated => SendBytes(ByteString.empty)
case SessionBytes(_, b) => SendBytes(b)
}
val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80)))
val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs)
Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore)
}
Await.result(run("akka-remote"), 3.seconds) // CN=akka-remote
val cause = intercept[Exception] {
Await.result(run("unknown.example.org"), 3.seconds)
}
cause.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem
val cause2 = cause.getCause
cause2.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem
val cause3 = cause2.getCause
cause3.getClass should ===(classOf[CertificateException])
cause3.getMessage should ===("No name matching unknown.example.org found")
}
}
"A SslTlsPlacebo" must {
"pass through data" in {
val f = Source(1 to 3)
.map(b => SendBytes(ByteString(b.toByte)))
.via(TLSPlacebo().join(Flow.apply))
.grouped(10)
.runWith(Sink.head)
val result = Await.result(f, 3.seconds)
result.map(_.bytes) should be((1 to 3).map(b => ByteString(b.toByte)))
result.map(_.session).foreach(s => s.getCipherSuite should be("SSL_NULL_WITH_NULL_NULL"))
}
}
}

View file

@ -40,7 +40,6 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
@ -48,6 +47,8 @@ import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.github.ghik.silencer.silent
@silent("never used") @silent("never used")
class NonResolvingDnsActor(cache: SimpleDnsCache, config: Config) extends Actor { class NonResolvingDnsActor(cache: SimpleDnsCache, config: Config) extends Actor {
def receive = { def receive = {
@ -830,9 +831,93 @@ class TcpSpec extends StreamSpec("""
} }
"TLS client and server convenience methods" should { "TLS client and server convenience methods with SSLEngine setup" should {
"allow for 'simple' TLS" in { "allow for TLS" in {
// cert is valid until 2025, so if this tests starts failing after that you need to create a new one
val address = temporaryServerAddress()
Tcp()
.bindAndHandleWithTls(
// just echo charactes until we reach '\n', then complete stream
// also - byte is our framing
Flow[ByteString].mapConcat(_.utf8String.toList).takeWhile(_ != '\n').map(c => ByteString(c)),
address.getHostName,
address.getPort,
() => createSSLEngine(TLSRole.server))
.futureValue
system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}")
val connectionFlow =
Tcp().outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client))
val chars = "hello\n".toList.map(_.toString)
val (connectionF, result) =
Source(chars)
.map(c => ByteString(c))
.concat(Source.maybe) // do not complete it from our side
.viaMat(connectionFlow)(Keep.right)
.map(_.utf8String)
.toMat(Sink.fold("")(_ + _))(Keep.both)
.run()
connectionF.futureValue
system.log.info(s"Client connected to ${address.getHostString}:${address.getPort}")
result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello")
}
// #setting-up-ssl-engine
import java.security.KeyStore
import javax.net.ssl.SSLEngine
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext
import akka.stream.TLSRole
// initialize SSLContext once
lazy val sslContext: SSLContext = {
// Don't hardcode your password in actual code
val password = "abcdef".toCharArray
// trust store and keys in one keystore
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(getClass.getResourceAsStream("/tcp-spec-keystore.p12"), password)
val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
trustManagerFactory.init(keyStore)
val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(keyStore, password)
// init ssl context
val context = SSLContext.getInstance("TLSv1.2")
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
context
}
// create new SSLEngine from the SSLContext, which was initialized once
def createSSLEngine(role: TLSRole): SSLEngine = {
val engine = sslContext.createSSLEngine()
engine.setUseClientMode(role == akka.stream.Client)
engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
engine.setEnabledProtocols(Array("TLSv1.2"))
engine
}
// #setting-up-ssl-engine
}
"TLS client and server convenience methods with deprecated SSLContext setup" should {
"allow for TLS" in {
test()
}
@silent("deprecated")
def test(): Unit = {
// cert is valid until 2025, so if this tests starts failing after that you need to create a new one // cert is valid until 2025, so if this tests starts failing after that you need to create a new one
val (sslContext, firstSession) = initSslMess() val (sslContext, firstSession) = initSslMess()
val address = temporaryServerAddress() val address = temporaryServerAddress()
@ -867,6 +952,7 @@ class TcpSpec extends StreamSpec("""
result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello") result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello")
} }
@silent("deprecated")
def initSslMess() = { def initSslMess() = {
// #setting-up-ssl-context // #setting-up-ssl-context
import java.security.KeyStore import java.security.KeyStore

View file

@ -6,16 +6,16 @@ package akka.stream.io
import java.security.KeyStore import java.security.KeyStore
import java.security.SecureRandom import java.security.SecureRandom
import java.security.cert.CertificateException
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.NotUsed import akka.NotUsed
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Random import scala.util.Random
import akka.pattern.{ after => later } import akka.pattern.{ after => later }
import akka.stream._ import akka.stream._
import akka.stream.TLSProtocol._ import akka.stream.TLSProtocol._
@ -32,7 +32,10 @@ object TlsSpec {
val rnd = new Random val rnd = new Random
def initWithTrust(trustPath: String) = { val SSLEnabledAlgorithms: Set[String] = Set("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA")
val SSLProtocol: String = "TLSv1.2"
def initWithTrust(trustPath: String): SSLContext = {
val password = "changeme" val password = "changeme"
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
@ -47,7 +50,7 @@ object TlsSpec {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init(trustStore) trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("TLS") val context = SSLContext.getInstance(SSLProtocol)
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
context context
} }
@ -99,8 +102,6 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing
import GraphDSL.Implicits._ import GraphDSL.Implicits._
val sslConfig: Option[AkkaSSLConfig] = None // no special settings to be applied here
"SslTls" must { "SslTls" must {
val sslContext = initSslContext() val sslContext = initSslContext()
@ -113,11 +114,42 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing
x x
} }
val cipherSuites = def createSSLEngine(context: SSLContext, role: TLSRole): SSLEngine =
NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") createSSLEngine2(context, role, hostnameVerification = false, hostInfo = None)
def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing)
def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing) def createSSLEngine2(
def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing) context: SSLContext,
role: TLSRole,
hostnameVerification: Boolean,
hostInfo: Option[(String, Int)]): SSLEngine = {
val engine = hostInfo match {
case None =>
if (hostnameVerification)
throw new IllegalArgumentException("hostInfo must be defined for hostnameVerification to work.")
context.createSSLEngine()
case Some((hostname, port)) => context.createSSLEngine(hostname, port)
}
if (hostnameVerification && role == akka.stream.Client) {
val sslParams = sslContext.getDefaultSSLParameters
sslParams.setEndpointIdentificationAlgorithm("HTTPS")
engine.setSSLParameters(sslParams)
}
engine.setUseClientMode(role == akka.stream.Client)
engine.setEnabledCipherSuites(SSLEnabledAlgorithms.toArray)
engine.setEnabledProtocols(Array(SSLProtocol))
engine
}
def clientTls(closing: TLSClosing) =
TLS(() => createSSLEngine(sslContext, Client), closing)
def badClientTls(closing: TLSClosing) =
TLS(() => createSSLEngine(initWithTrust("/badtruststore"), Client), closing)
def serverTls(closing: TLSClosing) =
TLS(() => createSSLEngine(sslContext, Server), closing)
trait Named { trait Named {
def name: String = def name: String =
@ -507,7 +539,10 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing
case SessionTruncated => SendBytes(ByteString.empty) case SessionTruncated => SendBytes(ByteString.empty)
case SessionBytes(_, b) => SendBytes(b) case SessionBytes(_, b) => SendBytes(b)
} }
val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80))) val clientTls = TLS(
() => createSSLEngine2(sslContext, Client, hostnameVerification = true, hostInfo = Some((hostName, 80))),
EagerClose)
val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs) val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs)
Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore) Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore)
@ -516,7 +551,13 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing
val cause = intercept[Exception] { val cause = intercept[Exception] {
Await.result(run("unknown.example.org"), 3.seconds) Await.result(run("unknown.example.org"), 3.seconds)
} }
cause.getMessage should ===("Hostname verification failed! Expected session to be for unknown.example.org")
cause.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem
val cause2 = cause.getCause
cause2.getClass should ===(classOf[SSLHandshakeException]) //General SSLEngine problem
val cause3 = cause2.getCause
cause3.getClass should ===(classOf[CertificateException])
cause3.getMessage should ===("No name matching unknown.example.org found")
} }
} }

View file

@ -0,0 +1,12 @@
# #21753 internal method WithSSLEngine renamed and changed
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.outgoingTlsConnectionWithSSLEngine$default$8")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp.bindTlsWithSSLEngine$default$8")

View file

@ -30,10 +30,8 @@ import scala.util.Try
* documentation. The philosophy of this integration into Akka Streams is to * documentation. The philosophy of this integration into Akka Streams is to
* expose all knobs and dials to client code and therefore not limit the * expose all knobs and dials to client code and therefore not limit the
* configuration possibilities. In particular the client code will have to * configuration possibilities. In particular the client code will have to
* provide the SSLContext from which the SSLEngine is then created. Handshake * provide the SSLEngine, which is typically created from a SSLContext. Handshake
* parameters are set using [[NegotiateNewSession]] messages, the settings for * parameters and other parameters are defined when creating the SSLEngine.
* the initial handshake need to be provided up front using the same class;
* please refer to the method documentation below.
* *
* '''IMPORTANT NOTE''' * '''IMPORTANT NOTE'''
* *
@ -66,6 +64,7 @@ object TLS {
* *
* This method uses the default closing behavior or [[IgnoreComplete]]. * This method uses the default closing behavior or [[IgnoreComplete]].
*/ */
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def create( def create(
sslContext: SSLContext, sslContext: SSLContext,
sslConfig: Optional[AkkaSSLConfig], sslConfig: Optional[AkkaSSLConfig],
@ -84,6 +83,7 @@ object TLS {
* *
* This method uses the default closing behavior or [[IgnoreComplete]]. * This method uses the default closing behavior or [[IgnoreComplete]].
*/ */
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def create( def create(
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
@ -106,6 +106,7 @@ object TLS {
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was * The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/ */
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def create( def create(
sslContext: SSLContext, sslContext: SSLContext,
sslConfig: Optional[AkkaSSLConfig], sslConfig: Optional[AkkaSSLConfig],
@ -138,6 +139,7 @@ object TLS {
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was * The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/ */
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def create( def create(
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
@ -156,7 +158,7 @@ object TLS {
/** /**
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface.
* *
* You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for * You specify a factory `sslEngineCreator` to create an SSLEngine that must already be configured for
* client and server mode and with all the parameters for the first session. * client and server mode and with all the parameters for the first session.
* *
* You can specify a verification function `sessionVerifier` that will be called * You can specify a verification function `sessionVerifier` that will be called
@ -174,7 +176,7 @@ object TLS {
/** /**
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface.
* *
* You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for * You specify a factory `sslEngineCreator` to create an SSLEngine that must already be configured for
* client and server mode and with all the parameters for the first session. * client and server mode and with all the parameters for the first session.
* *
* For a description of the `closing` parameter please refer to [[TLSClosing]]. * For a description of the `closing` parameter please refer to [[TLSClosing]].

View file

@ -6,9 +6,9 @@ package akka.stream.javadsl
import java.lang.{ Iterable => JIterable } import java.lang.{ Iterable => JIterable }
import java.util.Optional import java.util.Optional
import java.util.function.{ Function => JFunction }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.net.InetSocketAddress import java.net.InetSocketAddress
@ -21,17 +21,24 @@ import akka.stream.scaladsl
import akka.util.ByteString import akka.util.ByteString
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.io.Inet.SocketOption import akka.io.Inet.SocketOption
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.util.Failure
import scala.util.Success
import akka.actor.ClassicActorSystemProvider import akka.actor.ClassicActorSystemProvider
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.SystemMaterializer import akka.stream.SystemMaterializer
import akka.stream.TLSClosing
import akka.stream.TLSProtocol.NegotiateNewSession import akka.stream.TLSProtocol.NegotiateNewSession
import akka.util.JavaDurationConverters._
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
@ -161,13 +168,44 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
backlog: Int, backlog: Int,
options: JIterable[SocketOption], options: JIterable[SocketOption],
halfClose: Boolean, halfClose: Boolean,
idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] =
Source.fromGraph( Source.fromGraph(
delegate delegate
.bind(interface, port, backlog, immutableSeq(options), halfClose, idleTimeout) .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout))
.map(new IncomingConnection(_)) .map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
*
* Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing
* [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future
* completes is the server ready to accept client connections.
*
* @param interface The interface to listen on
* @param port The port to listen on
* @param backlog Controls the size of the connection backlog
* @param options TCP options for the connections, see [[akka.io.Tcp]] for details
* @param halfClose
* Controls whether the connection is kept open even after writing has been completed to the accepted
* TCP connections.
* If set to true, the connection will implement the TCP half-close mechanism, allowing the client to
* write to the connection even after the server has finished writing. The TCP socket is only closed
* after both the client and server finished writing.
* If set to false, the connection will immediately closed once the server closes its write side,
* independently whether the client is still attempting to write. This setting is recommended
* for servers, and therefore it is the default setting.
*/
@deprecated("Use bind that takes a java.time.Duration parameter instead.", "2.6.0")
def bind(
interface: String,
port: Int,
backlog: Int,
options: JIterable[SocketOption],
halfClose: Boolean,
idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] =
bind(interface, port, backlog, options, halfClose, durationToJavaOptional(idleTimeout))
/** /**
* Creates a [[Tcp.ServerBinding]] without specifying options. * Creates a [[Tcp.ServerBinding]] without specifying options.
* It represents a prospective TCP server binding on the given `endpoint`. * It represents a prospective TCP server binding on the given `endpoint`.
@ -208,8 +246,8 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
localAddress: Optional[InetSocketAddress], localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption], options: JIterable[SocketOption],
halfClose: Boolean, halfClose: Boolean,
connectTimeout: Duration, connectTimeout: Optional[java.time.Duration],
idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = idleTimeout: Optional[java.time.Duration]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph( Flow.fromGraph(
delegate delegate
.outgoingConnection( .outgoingConnection(
@ -217,10 +255,46 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
localAddress.asScala, localAddress.asScala,
immutableSeq(options), immutableSeq(options),
halfClose, halfClose,
connectTimeout, optionalDurationToScala(connectTimeout),
idleTimeout) optionalDurationToScala(idleTimeout))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
/**
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
*
* Note that the ByteString chunk boundaries are not retained across the network,
* to achieve application level chunks you have to introduce explicit framing in your streams,
* for example using the [[Framing]] operators.
*
* @param remoteAddress The remote address to connect to
* @param localAddress Optional local address for the connection
* @param options TCP options for the connections, see [[akka.io.Tcp]] for details
* @param halfClose
* Controls whether the connection is kept open even after writing has been completed to the accepted
* TCP connections.
* If set to true, the connection will implement the TCP half-close mechanism, allowing the server to
* write to the connection even after the client has finished writing. The TCP socket is only closed
* after both the client and server finished writing. This setting is recommended for clients and
* therefore it is the default setting.
* If set to false, the connection will immediately closed once the client closes its write side,
* independently whether the server is still attempting to write.
*/
@deprecated("Use bind that takes a java.time.Duration parameter instead.", "2.6.0")
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
halfClose: Boolean,
connectTimeout: Duration,
idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
outgoingConnection(
remoteAddress,
localAddress,
options,
halfClose,
durationToJavaOptional(connectTimeout),
durationToJavaOptional(idleTimeout))
/** /**
* Creates an [[Tcp.OutgoingConnection]] without specifying options. * Creates an [[Tcp.OutgoingConnection]] without specifying options.
* It represents a prospective TCP client connection to the given endpoint. * It represents a prospective TCP client connection to the given endpoint.
@ -242,6 +316,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* @see [[Tcp.outgoingConnection()]] * @see [[Tcp.outgoingConnection()]]
*/ */
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def outgoingTlsConnection( def outgoingTlsConnection(
host: String, host: String,
port: Int, port: Int,
@ -261,6 +339,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* Marked API-may-change to leave room for an improvement around the very long parameter list. * Marked API-may-change to leave room for an improvement around the very long parameter list.
*/ */
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def outgoingTlsConnection( def outgoingTlsConnection(
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
sslContext: SSLContext, sslContext: SSLContext,
@ -281,6 +363,61 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
idleTimeout) idleTimeout)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph(
delegate
.outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get())
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
createSSLEngine: Supplier[SSLEngine],
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
connectTimeout: Optional[java.time.Duration],
idleTimeout: Optional[java.time.Duration],
verifySession: JFunction[SSLSession, Optional[Throwable]],
closing: TLSClosing): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = {
Flow.fromGraph(
delegate
.outgoingConnectionWithTls(
remoteAddress,
createSSLEngine = () => createSSLEngine.get(),
localAddress.asScala,
immutableSeq(options),
optionalDurationToScala(connectTimeout),
optionalDurationToScala(idleTimeout),
session =>
verifySession.apply(session).asScala match {
case None => Success(())
case Some(t) => Failure(t)
},
closing)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
}
/** /**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS. * where all incoming and outgoing bytes are passed through TLS.
@ -290,6 +427,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* Note: the half close parameter is currently ignored * Note: the half close parameter is currently ignored
*/ */
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def bindTls( def bindTls(
interface: String, interface: String,
port: Int, port: Int,
@ -297,7 +438,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
negotiateNewSession: NegotiateNewSession, negotiateNewSession: NegotiateNewSession,
backlog: Int, backlog: Int,
options: JIterable[SocketOption], options: JIterable[SocketOption],
@silent // FIXME unused #26689 @silent // unused #26689
halfClose: Boolean, halfClose: Boolean,
idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] =
Source.fromGraph( Source.fromGraph(
@ -312,6 +453,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* @see [[Tcp.bind()]] * @see [[Tcp.bind()]]
*/ */
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def bindTls( def bindTls(
interface: String, interface: String,
port: Int, port: Int,
@ -323,4 +468,62 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
.map(new IncomingConnection(_)) .map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
*/
def bindWithTls(
interface: String,
port: Int,
createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
.bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get())
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
*/
def bindWithTls(
interface: String,
port: Int,
createSSLEngine: Supplier[SSLEngine],
backlog: Int,
options: JIterable[SocketOption],
idleTimeout: Optional[java.time.Duration],
verifySession: JFunction[SSLSession, Optional[Throwable]],
closing: TLSClosing): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
.bindWithTls(
interface,
port,
createSSLEngine = () => createSSLEngine.get(),
backlog,
immutableSeq(options),
optionalDurationToScala(idleTimeout),
session =>
verifySession.apply(session).asScala match {
case None => Success(())
case Some(t) => Failure(t)
},
closing)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
private def optionalDurationToScala(duration: Optional[java.time.Duration]) = {
if (duration.isPresent) duration.get.asScala else Duration.Inf
}
private def durationToJavaOptional(duration: Duration): Optional[java.time.Duration] = {
if (duration.isFinite) Optional.ofNullable(duration.asJava) else Optional.empty()
}
} }

View file

@ -5,8 +5,8 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import java.util.Collections import java.util.Collections
import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession }
import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession }
import akka.stream.impl.io.{ TlsModule, TlsUtils } import akka.stream.impl.io.{ TlsModule, TlsUtils }
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -14,9 +14,10 @@ import akka.stream._
import akka.stream.TLSProtocol._ import akka.stream.TLSProtocol._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.sslconfig.akka.AkkaSSLConfig import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import javax.net.ssl.SSLParameters
/** /**
* Stream cipher support based upon JSSE. * Stream cipher support based upon JSSE.
* *
@ -30,10 +31,8 @@ import scala.util.{ Failure, Success, Try }
* documentation. The philosophy of this integration into Akka Streams is to * documentation. The philosophy of this integration into Akka Streams is to
* expose all knobs and dials to client code and therefore not limit the * expose all knobs and dials to client code and therefore not limit the
* configuration possibilities. In particular the client code will have to * configuration possibilities. In particular the client code will have to
* provide the SSLContext from which the SSLEngine is then created. Handshake * provide the SSLEngine, which is typically created from a SSLContext. Handshake
* parameters are set using [[NegotiateNewSession]] messages, the settings for * parameters and other parameters are defined when creating the SSLEngine.
* the initial handshake need to be provided up front using the same class;
* please refer to the method documentation below.
* *
* '''IMPORTANT NOTE''' * '''IMPORTANT NOTE'''
* *
@ -71,6 +70,7 @@ object TLS {
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was * The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/ */
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def apply( def apply(
sslContext: SSLContext, sslContext: SSLContext,
sslConfig: Option[AkkaSSLConfig], sslConfig: Option[AkkaSSLConfig],
@ -94,7 +94,7 @@ object TLS {
config.sslEngineConfigurator.configure(engine, sslContext) config.sslEngineConfigurator.configure(engine, sslContext)
engine.setUseClientMode(role == Client) engine.setUseClientMode(role == Client)
val finalSessionParameters = val paramsWithSni =
if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) { if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) {
val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get) val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get)
// In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using // In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using
@ -102,21 +102,29 @@ object TLS {
// In Java 8, SNI is only enabled if the server names are added to the parameters. // In Java 8, SNI is only enabled if the server names are added to the parameters.
// See https://github.com/akka/akka/issues/19287. // See https://github.com/akka/akka/issues/19287.
newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1))) newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1)))
firstSession.copy(sslParameters = Some(newParams)) firstSession.copy(sslParameters = Some(newParams))
} else } else
firstSession firstSession
TlsUtils.applySessionParameters(engine, finalSessionParameters) val paramsWithHostnameVerification = if (hostInfo.isDefined && config.useJvmHostnameVerification) {
val newParams = paramsWithSni.sslParameters.map(TlsUtils.cloneParameters).getOrElse(new SSLParameters)
newParams.setEndpointIdentificationAlgorithm("HTTPS")
paramsWithSni.copy(sslParameters = Some(newParams))
} else
paramsWithSni
TlsUtils.applySessionParameters(engine, paramsWithHostnameVerification)
engine engine
} }
def verifySession: (ActorSystem, SSLSession) => Try[Unit] = def verifySession: (ActorSystem, SSLSession) => Try[Unit] =
hostInfo match { hostInfo match {
case Some((hostname, _)) => { (system, session) => case Some((hostname, _)) => { (system, session) =>
val hostnameVerifier = theSslConfig(system).hostnameVerifier val config = theSslConfig(system)
if (!hostnameVerifier.verify(hostname, session)) if (config.useJvmHostnameVerification || config.hostnameVerifier.verify(hostname, session))
Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"))
else
Success(()) Success(())
else
Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"))
} }
case None => (_, _) => Success(()) case None => (_, _) => Success(())
} }
@ -140,6 +148,7 @@ object TLS {
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was * The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/ */
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def apply( def apply(
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
@ -158,6 +167,7 @@ object TLS {
* that is not a requirement and depends entirely on the application * that is not a requirement and depends entirely on the application
* protocol. * protocol.
*/ */
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", "2.6.0")
def apply( def apply(
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
@ -165,9 +175,9 @@ object TLS {
apply(sslContext, None, firstSession, role, IgnoreComplete, None) apply(sslContext, None, firstSession, role, IgnoreComplete, None)
/** /**
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]].
* *
* You can specify a constructor to create an SSLEngine that must already be configured for * You specify a factory to create an SSLEngine that must already be configured for
* client and server mode and with all the parameters for the first session. * client and server mode and with all the parameters for the first session.
* *
* You can specify a verification function that will be called after every successful handshake * You can specify a verification function that will be called after every successful handshake
@ -183,9 +193,9 @@ object TLS {
TlsModule(Attributes.none, _ => createSSLEngine(), (_, session) => verifySession(session), closing)) TlsModule(Attributes.none, _ => createSSLEngine(), (_, session) => verifySession(session), closing))
/** /**
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]].
* *
* You can specify a constructor to create an SSLEngine that must already be configured for * You specify a factory to create an SSLEngine that must already be configured for
* client and server mode and with all the parameters for the first session. * client and server mode and with all the parameters for the first session.
* *
* For a description of the `closing` parameter please refer to [[TLSClosing]]. * For a description of the `closing` parameter please refer to [[TLSClosing]].

View file

@ -28,12 +28,12 @@ import com.github.ghik.silencer.silent
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession import javax.net.ssl.SSLSession
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import scala.util.Try import scala.util.Try
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -92,13 +92,18 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
case sb: TLSProtocol.SessionBytes => sb.bytes case sb: TLSProtocol.SessionBytes => sb.bytes
// ignore other kinds of inbounds (currently only Truncated) // ignore other kinds of inbounds (currently only Truncated)
}) })
/**
* INTERNAL API
*/
@InternalApi private[akka] val defaultBacklog = 100
} }
final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
import Tcp._ import Tcp._
// TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead? // TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
val bindShutdownTimeout = val bindShutdownTimeout: FiniteDuration =
system.settings.config.getDuration("akka.stream.materializer.subscription-timeout.timeout").asScala system.settings.config.getDuration("akka.stream.materializer.subscription-timeout.timeout").asScala
/** /**
@ -125,7 +130,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
def bind( def bind(
interface: String, interface: String,
port: Int, port: Int,
backlog: Int = 100, backlog: Int = defaultBacklog,
@silent // Traversable deprecated in 2.13 @silent // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil, options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = false, halfClose: Boolean = false,
@ -167,7 +172,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
handler: Flow[ByteString, ByteString, _], handler: Flow[ByteString, ByteString, _],
interface: String, interface: String,
port: Int, port: Int,
backlog: Int = 100, backlog: Int = defaultBacklog,
@silent // Traversable deprecated in 2.13 @silent // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil, options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = false, halfClose: Boolean = false,
@ -249,6 +254,10 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* @see [[Tcp.outgoingConnection()]] * @see [[Tcp.outgoingConnection()]]
*/ */
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def outgoingTlsConnection( def outgoingTlsConnection(
host: String, host: String,
port: Int, port: Int,
@ -267,6 +276,10 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* Marked API-may-change to leave room for an improvement around the very long parameter list. * Marked API-may-change to leave room for an improvement around the very long parameter list.
*/ */
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def outgoingTlsConnection( def outgoingTlsConnection(
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
sslContext: SSLContext, sslContext: SSLContext,
@ -278,23 +291,53 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout)
@silent("deprecated")
val tls = TLS(sslContext, negotiateNewSession, TLSRole.client) val tls = TLS(sslContext, negotiateNewSession, TLSRole.client)
connection.join(tlsWrapping.atop(tls).reversed) connection.join(tlsWrapping.atop(tls).reversed)
} }
/** /**
* INTERNAL API: for raw SSLEngine * Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
*/ */
@InternalApi private[akka] def outgoingTlsConnectionWithSSLEngine( def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
createSSLEngine: () => SSLEngine): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
outgoingConnectionWithTls(
remoteAddress,
createSSLEngine,
localAddress = None,
options = Nil,
connectTimeout = Duration.Inf,
idleTimeout = Duration.Inf,
verifySession = _ => Success(()),
closing = IgnoreComplete)
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
createSSLEngine: () => SSLEngine, createSSLEngine: () => SSLEngine,
localAddress: Option[InetSocketAddress] = None, localAddress: Option[InetSocketAddress],
@silent // Traversable deprecated in 2.13 options: immutable.Seq[SocketOption],
options: immutable.Traversable[SocketOption] = Nil, connectTimeout: Duration,
connectTimeout: Duration = Duration.Inf, idleTimeout: Duration,
idleTimeout: Duration = Duration.Inf,
verifySession: SSLSession => Try[Unit], verifySession: SSLSession => Try[Unit],
closing: TLSClosing = IgnoreComplete): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { closing: TLSClosing): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout)
val tls = TLS(createSSLEngine, verifySession, closing) val tls = TLS(createSSLEngine, verifySession, closing)
@ -311,15 +354,20 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* Marked API-may-change to leave room for an improvement around the very long parameter list. * Marked API-may-change to leave room for an improvement around the very long parameter list.
*/ */
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def bindTls( def bindTls(
interface: String, interface: String,
port: Int, port: Int,
sslContext: SSLContext, sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession, negotiateNewSession: NegotiateNewSession,
backlog: Int = 100, backlog: Int = defaultBacklog,
@silent // Traversable deprecated in 2.13 @silent // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil, options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = {
@silent("deprecated")
val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed
bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection => bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection =>
@ -328,26 +376,107 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
} }
/** /**
* INTERNAL API * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.bind]]
*/ */
@InternalApi private[akka] def bindTlsWithSSLEngine( def bindWithTls(
interface: String,
port: Int,
createSSLEngine: () => SSLEngine): Source[IncomingConnection, Future[ServerBinding]] =
bindWithTls(
interface,
port,
createSSLEngine,
backlog = defaultBacklog,
options = Nil,
idleTimeout = Duration.Inf,
verifySession = _ => Success(()),
closing = IgnoreComplete)
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.bind]]
*/
def bindWithTls(
interface: String, interface: String,
port: Int, port: Int,
createSSLEngine: () => SSLEngine, createSSLEngine: () => SSLEngine,
backlog: Int = 100, backlog: Int,
@silent // Traversable deprecated in 2.13 options: immutable.Seq[SocketOption],
options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration,
idleTimeout: Duration = Duration.Inf,
verifySession: SSLSession => Try[Unit], verifySession: SSLSession => Try[Unit],
closing: TLSClosing = IgnoreComplete): Source[IncomingConnection, Future[ServerBinding]] = { closing: TLSClosing): Source[IncomingConnection, Future[ServerBinding]] = {
val tls = tlsWrapping.atop(TLS(createSSLEngine, verifySession, closing)).reversed val tls = tlsWrapping.atop(TLS(createSSLEngine, verifySession, closing)).reversed
bind(interface, port, backlog, options, true, idleTimeout).map { incomingConnection => bind(interface, port, backlog, options, halfClose = true, idleTimeout).map { incomingConnection =>
incomingConnection.copy(flow = incomingConnection.flow.join(tls)) incomingConnection.copy(flow = incomingConnection.flow.join(tls))
} }
} }
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* all incoming and outgoing bytes are passed through TLS and handling the incoming connections using the
* provided Flow.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client server and with all the parameters for the first session.
*
* @see [[Tcp.bindAndHandle]]
*/
def bindAndHandleWithTls(
handler: Flow[ByteString, ByteString, _],
interface: String,
port: Int,
createSSLEngine: () => SSLEngine)(implicit m: Materializer): Future[ServerBinding] =
bindAndHandleWithTls(
handler,
interface,
port,
createSSLEngine,
backlog = defaultBacklog,
options = Nil,
idleTimeout = Duration.Inf,
verifySession = _ => Success(()),
closing = IgnoreComplete)
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* all incoming and outgoing bytes are passed through TLS and handling the incoming connections using the
* provided Flow.
*
* You specify a factory to create an SSLEngine that must already be configured for
* client server and with all the parameters for the first session.
*
* @see [[Tcp.bindAndHandle]]
*/
def bindAndHandleWithTls(
handler: Flow[ByteString, ByteString, _],
interface: String,
port: Int,
createSSLEngine: () => SSLEngine,
backlog: Int,
options: immutable.Seq[SocketOption],
idleTimeout: Duration,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing)(implicit m: Materializer): Future[ServerBinding] = {
bindWithTls(interface, port, createSSLEngine, backlog, options, idleTimeout, verifySession, closing)
.to(Sink.foreach { conn: IncomingConnection =>
conn.handleWith(handler)
})
.run()
}
/** /**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* handling the incoming connections through TLS and then run using the provided Flow. * handling the incoming connections through TLS and then run using the provided Flow.
@ -358,13 +487,17 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* *
* Marked API-may-change to leave room for an improvement around the very long parameter list. * Marked API-may-change to leave room for an improvement around the very long parameter list.
*/ */
@deprecated(
"Use bindAndHandleWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"2.6.0")
def bindAndHandleTls( def bindAndHandleTls(
handler: Flow[ByteString, ByteString, _], handler: Flow[ByteString, ByteString, _],
interface: String, interface: String,
port: Int, port: Int,
sslContext: SSLContext, sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession, negotiateNewSession: NegotiateNewSession,
backlog: Int = 100, backlog: Int = defaultBacklog,
@silent // Traversable deprecated in 2.13 @silent // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil, options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = { idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = {

View file

@ -7,15 +7,16 @@ package com.typesafe.sslconfig.akka
import java.security.KeyStore import java.security.KeyStore
import java.security.cert.CertPathValidatorException import java.security.cert.CertPathValidatorException
import java.util.Collections import java.util.Collections
import javax.net.ssl._
import javax.net.ssl._
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi
import akka.event.Logging import akka.event.Logging
import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory
import com.typesafe.sslconfig.ssl._ import com.typesafe.sslconfig.ssl._
import com.typesafe.sslconfig.util.LoggerFactory import com.typesafe.sslconfig.util.LoggerFactory
// TODO: remove again in 2.5.x, see https://github.com/akka/akka/issues/21753 @deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider { object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider {
//////////////////// EXTENSION SETUP /////////////////// //////////////////// EXTENSION SETUP ///////////////////
@ -36,6 +37,7 @@ object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider
} }
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSettings) extends Extension { final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSettings) extends Extension {
private val mkLogger = new AkkaLoggerFactory(system) private val mkLogger = new AkkaLoggerFactory(system)
@ -68,6 +70,15 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett
val hostnameVerifier = buildHostnameVerifier(config) val hostnameVerifier = buildHostnameVerifier(config)
/**
* INTERNAL API
*/
@InternalApi def useJvmHostnameVerification: Boolean =
hostnameVerifier match {
case _: DefaultHostnameVerifier | _: NoopHostnameVerifier => true
case _ => false
}
val sslEngineConfigurator = { val sslEngineConfigurator = {
val sslContext = if (config.default) { val sslContext = if (config.default) {
log.info("ssl-config.default is true, using the JDK's default SSLContext") log.info("ssl-config.default is true, using the JDK's default SSLContext")

View file

@ -12,10 +12,12 @@ import com.typesafe.sslconfig.ssl.SSLConfigSettings
* Gives the chance to configure the SSLContext before it is going to be used. * Gives the chance to configure the SSLContext before it is going to be used.
* The passed in context will be already set in client mode and provided with hostInfo during initialization. * The passed in context will be already set in client mode and provided with hostInfo during initialization.
*/ */
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
trait SSLEngineConfigurator { trait SSLEngineConfigurator {
def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine
} }
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
final class DefaultSSLEngineConfigurator( final class DefaultSSLEngineConfigurator(
config: SSLConfigSettings, config: SSLConfigSettings,
enabledProtocols: Array[String], enabledProtocols: Array[String],

View file

@ -31,7 +31,7 @@ object Dependencies {
val reactiveStreamsVersion = "1.0.3" val reactiveStreamsVersion = "1.0.3"
val sslConfigVersion = "0.3.8" val sslConfigVersion = "0.4.0"
val Versions = Seq( val Versions = Seq(
crossScalaVersions := Seq(scala212Version, scala213Version), crossScalaVersions := Seq(scala212Version, scala213Version),