remove ssl-config dependency (#2127)

* remove ssl-config dependency

* update internal code that doesn't need ActorSystem any more

* scalafmt
This commit is contained in:
PJ Fanning 2025-09-02 20:04:39 +01:00 committed by GitHub
parent e3c5fe222c
commit 18545a6737
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 58 additions and 1492 deletions

View file

@ -50,8 +50,6 @@ object Dependencies {
val reactiveStreamsVersion = "1.0.4"
val sslConfigVersion = "0.6.1"
val scalaTestVersion = "3.2.19"
val scalaTestScalaCheckVersion = "1-18"
val scalaCheckVersion = "1.18.0"
@ -83,9 +81,6 @@ object Dependencies {
// reactive streams
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion
// ssl-config
val sslConfigCore = "com.typesafe" %% "ssl-config-core" % sslConfigVersion
val lmdb = "org.lmdbjava" % "lmdbjava" % "0.9.1"
val junit = "junit" % "junit" % junitVersion
@ -356,7 +351,7 @@ object Dependencies {
// pekko stream
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, sslConfigCore, TestDependencies.scalatest)
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
lazy val streamTestkit = l ++= Seq(
TestDependencies.scalatest,

View file

@ -96,13 +96,10 @@ object OSGi {
lazy val stream =
exports(
packages = Seq("org.apache.pekko.stream.*", "com.typesafe.sslconfig.pekko.*"),
packages = Seq("org.apache.pekko.stream.*"),
imports = Seq(
scalaJava8CompatImport(),
scalaParsingCombinatorImport(),
sslConfigCoreImport("com.typesafe.sslconfig.ssl.*"),
sslConfigCoreImport("com.typesafe.sslconfig.util.*"),
"!com.typesafe.sslconfig.pekko.*"))
scalaParsingCombinatorImport()))
lazy val streamTestkit = exports(Seq("org.apache.pekko.stream.testkit.*"))
@ -155,12 +152,6 @@ object OSGi {
versionedImport(packageName, "1.0.2", "1.0.2")
def scalaParsingCombinatorImport(packageName: String = "scala.util.parsing.combinator.*") =
versionedImport(packageName, "1.1.0", "1.2.0")
def sslConfigCoreImport(packageName: String = "com.typesafe.sslconfig") =
versionedImport(packageName, "0.4.0", "1.0.0")
def sslConfigCoreSslImport(packageName: String = "com.typesafe.sslconfig.ssl.*") =
versionedImport(packageName, "0.4.0", "1.0.0")
def sslConfigCoreUtilImport(packageName: String = "com.typesafe.sslconfig.util.*") =
versionedImport(packageName, "0.4.0", "1.0.0")
def kamonImport(packageName: String = "kamon.sigar.*") =
optionalResolution(versionedImport(packageName, "1.6.5", "1.6.6"))
def sigarImport(packageName: String = "org.hyperic.*") =

View file

@ -1,573 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.stream.io
import java.security.KeyStore
import java.security.SecureRandom
import java.security.cert.CertificateException
import java.util.concurrent.TimeoutException
import javax.net.ssl._
import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
import org.apache.pekko
import pekko.NotUsed
import pekko.pattern.{ after => later }
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.scaladsl._
import pekko.stream.stage._
import pekko.stream.testkit._
import pekko.testkit.TestDuration
import pekko.testkit.WithLogCapturing
import pekko.util.ByteString
import pekko.util.JavaVersion
import com.typesafe.sslconfig.pekko.PekkoSSLConfig
object DeprecatedTlsSpec {
val rnd = new Random
def initWithTrust(trustPath: String) = {
val password = "changeme"
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(getClass.getResourceAsStream(trustPath), password.toCharArray)
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
keyManagerFactory.init(keyStore, password.toCharArray)
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("TLS")
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
context
}
def initSslContext(): SSLContext = initWithTrust("/truststore")
/**
* This is an operator that fires a TimeoutException failure after it was started,
* independent of the traffic going through. The purpose is to include the last seen
* element in the exception message to help in figuring out what went wrong.
*/
class Timeout(duration: FiniteDuration) extends GraphStage[FlowShape[ByteString, ByteString]] {
private val in = Inlet[ByteString]("in")
private val out = Outlet[ByteString]("out")
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new TimerGraphStageLogic(shape) {
override def preStart(): Unit = scheduleOnce((), duration)
var last: ByteString = _
setHandler(in,
new InHandler {
override def onPush(): Unit = {
last = grab(in)
push(out, last)
}
})
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(in)
})
override def onTimer(x: Any): Unit = {
failStage(new TimeoutException(s"timeout expired, last element was $last"))
}
}
}
val configOverrides =
"""
pekko.loglevel = DEBUG # issue 21660
pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
pekko.actor.debug.receive=off
"""
}
@nowarn("msg=deprecated")
class DeprecatedTlsSpec extends StreamSpec(DeprecatedTlsSpec.configOverrides) with WithLogCapturing {
import DeprecatedTlsSpec._
import GraphDSL.Implicits._
import system.dispatcher
val sslConfig: Option[PekkoSSLConfig] = None // no special settings to be applied here
"SslTls with deprecated SSLContext setup" must {
val sslContext = initSslContext()
val debug = Flow[SslTlsInbound].map { x =>
x match {
case SessionTruncated => system.log.debug(s" ----------- truncated ")
case SessionBytes(_, b) => system.log.debug(s" ----------- (${b.size}) ${b.take(32).utf8String}")
}
x
}
val cipherSuites =
NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA")
def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing)
def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing)
def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing)
trait Named {
def name: String =
getClass.getName.reverse.dropWhile(c => "$0123456789".indexOf(c) != -1).takeWhile(_ != '$').reverse
}
trait CommunicationSetup extends Named {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed]
def cleanup(): Unit = ()
}
object ClientInitiates extends CommunicationSetup {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
clientTls(leftClosing).atop(serverTls(rightClosing).reversed).join(rhs)
}
object ServerInitiates extends CommunicationSetup {
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) =
serverTls(leftClosing).atop(clientTls(rightClosing).reversed).join(rhs)
}
def server(flow: Flow[ByteString, ByteString, Any]) = {
val server = Tcp(system).bind("localhost", 0).to(Sink.foreach(c => c.flow.join(flow).run())).run()
Await.result(server, 2.seconds)
}
object ClientInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(serverTls(rightClosing).reversed.join(rhs))
clientTls(leftClosing).join(Tcp(system).outgoingConnection(binding.localAddress))
}
override def cleanup(): Unit = binding.unbind()
}
object ServerInitiatesViaTcp extends CommunicationSetup {
var binding: Tcp.ServerBinding = null
def decorateFlow(
leftClosing: TLSClosing,
rightClosing: TLSClosing,
rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = {
binding = server(clientTls(rightClosing).reversed.join(rhs))
serverTls(leftClosing).join(Tcp(system).outgoingConnection(binding.localAddress))
}
override def cleanup(): Unit = binding.unbind()
}
val communicationPatterns =
Seq(ClientInitiates, ServerInitiates, ClientInitiatesViaTcp, ServerInitiatesViaTcp)
trait PayloadScenario extends Named {
def flow: Flow[SslTlsInbound, SslTlsOutbound, Any] =
Flow[SslTlsInbound].map {
var session: SSLSession = null
def setSession(s: SSLSession) = {
session = s
system.log.debug(s"new session: $session (${session.getId.mkString(",")})")
}
{
case SessionTruncated => SendBytes(ByteString("TRUNCATED"))
case SessionBytes(s, b) if session == null =>
setSession(s)
SendBytes(b)
case SessionBytes(s, b) if s != session =>
setSession(s)
SendBytes(ByteString("NEWSESSION") ++ b)
case SessionBytes(_, b) => SendBytes(b)
}
}
def leftClosing: TLSClosing = IgnoreComplete
def rightClosing: TLSClosing = IgnoreComplete
def inputs: immutable.Seq[SslTlsOutbound]
def output: ByteString
protected def send(str: String) = SendBytes(ByteString(str))
protected def send(ch: Char) = SendBytes(ByteString(ch.toByte))
}
object SingleBytes extends PayloadScenario {
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object MediumMessages extends PayloadScenario {
val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 1000))
def inputs = strs.map(s => SendBytes(ByteString(s)))
def output = ByteString(strs.foldRight("")(_ ++ _))
}
object LargeMessages extends PayloadScenario {
// TLS max packet size is 16384 bytes
val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 17000))
def inputs = strs.map(s => SendBytes(ByteString(s)))
def output = ByteString(strs.foldRight("")(_ ++ _))
}
object EmptyBytesFirst extends PayloadScenario {
def inputs = List(ByteString.empty, ByteString("hello")).map(SendBytes.apply)
def output = ByteString("hello")
}
object EmptyBytesInTheMiddle extends PayloadScenario {
def inputs = List(ByteString("hello"), ByteString.empty, ByteString(" world")).map(SendBytes.apply)
def output = ByteString("hello world")
}
object EmptyBytesLast extends PayloadScenario {
def inputs = List(ByteString("hello"), ByteString.empty).map(SendBytes.apply)
def output = ByteString("hello")
}
object CompletedImmediately extends PayloadScenario {
override def inputs: immutable.Seq[SslTlsOutbound] = Nil
override def output = ByteString.empty
override def leftClosing: TLSClosing = EagerClose
override def rightClosing: TLSClosing = EagerClose
}
// this demonstrates that cancellation is ignored so that the five results make it back
object CancellingRHS extends PayloadScenario {
override def flow =
Flow[SslTlsInbound]
.mapConcat {
case SessionTruncated => SessionTruncated :: Nil
case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b)))
}
.take(5)
.mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow)
override def rightClosing = IgnoreCancel
val str = "abcdef" * 100
def inputs = str.map(send)
def output = ByteString(str.take(5))
}
object CancellingRHSIgnoresBoth extends PayloadScenario {
override def flow =
Flow[SslTlsInbound]
.mapConcat {
case SessionTruncated => SessionTruncated :: Nil
case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b)))
}
.take(5)
.mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow)
override def rightClosing = IgnoreBoth
val str = "abcdef" * 100
def inputs = str.map(send)
def output = ByteString(str.take(5))
}
object LHSIgnoresBoth extends PayloadScenario {
override def leftClosing = IgnoreBoth
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object BothSidesIgnoreBoth extends PayloadScenario {
override def leftClosing = IgnoreBoth
override def rightClosing = IgnoreBoth
val str = "0123456789"
def inputs = str.map(ch => SendBytes(ByteString(ch.toByte)))
def output = ByteString(str)
}
object SessionRenegotiationBySender extends PayloadScenario {
def inputs = List(send("hello"), NegotiateNewSession, send("world"))
def output = ByteString("helloNEWSESSIONworld")
}
// difference is that the RHS engine will now receive the handshake while trying to send
object SessionRenegotiationByReceiver extends PayloadScenario {
val str = "abcdef" * 100
def inputs = str.map(send) ++ Seq(NegotiateNewSession) ++ "hello world".map(send)
def output = ByteString(str + "NEWSESSIONhello world")
}
val logCipherSuite = Flow[SslTlsInbound].map {
var session: SSLSession = null
def setSession(s: SSLSession) = {
session = s
system.log.debug(s"new session: $session (${session.getId.mkString(",")})")
}
{
case SessionTruncated => SendBytes(ByteString("TRUNCATED"))
case SessionBytes(s, b) if s != session =>
setSession(s)
SendBytes(ByteString(s.getCipherSuite) ++ b)
case SessionBytes(_, b) => SendBytes(b)
}
}
object SessionRenegotiationFirstOne extends PayloadScenario {
override def flow = logCipherSuite
def inputs = NegotiateNewSession.withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil
def output = ByteString("TLS_RSA_WITH_AES_128_CBC_SHAhello")
}
object SessionRenegotiationFirstTwo extends PayloadScenario {
override def flow = logCipherSuite
def inputs = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil
def output = ByteString("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHAhello")
}
val renegotiationScenarios = if (JavaVersion.majorVersion <= 21)
Seq(
SessionRenegotiationBySender,
SessionRenegotiationByReceiver,
SessionRenegotiationFirstOne,
SessionRenegotiationFirstTwo)
else
// skip SessionRenegotiationFirstOne as it uses a weak cipher suite and the test will fail
Seq(
SessionRenegotiationBySender,
SessionRenegotiationByReceiver,
SessionRenegotiationFirstTwo)
val scenarios =
Seq(
SingleBytes,
MediumMessages,
LargeMessages,
EmptyBytesFirst,
EmptyBytesInTheMiddle,
EmptyBytesLast,
CompletedImmediately,
CancellingRHS,
CancellingRHSIgnoresBoth,
LHSIgnoresBoth,
BothSidesIgnoreBoth) ++ renegotiationScenarios
for {
commPattern <- communicationPatterns
scenario <- scenarios
} {
s"work in mode ${commPattern.name} while sending ${scenario.name}" in {
val onRHS = debug.via(scenario.flow)
val output =
Source(scenario.inputs)
.via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS))
.via(new SimpleLinearGraphStage[SslTlsInbound] {
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush() = push(out, grab(in))
override def onPull() = pull(in)
override def onDownstreamFinish(cause: Throwable) = {
system.log.debug(s"me cancelled, cause {}", cause)
completeStage()
}
}
})
.via(debug)
.collect { case SessionBytes(_, b) => b }
.scan(ByteString.empty)(_ ++ _)
.filter(_.nonEmpty)
.via(new Timeout(10.seconds.dilated))
.dropWhile(_.size < scenario.output.size)
.runWith(Sink.headOption)
Await.result(output, 12.seconds.dilated).getOrElse(ByteString.empty).utf8String should be(
scenario.output.utf8String)
commPattern.cleanup()
}
}
"emit an error if the TLS handshake fails certificate checks" in {
val getError = Flow[SslTlsInbound]
.map[Either[SslTlsInbound, SSLException]](i => Left(i))
.recover { case e: SSLException => Right(e) }
.collect { case Right(e) => e }
.toMat(Sink.head)(Keep.right)
val simple = Flow.fromSinkAndSourceMat(getError, Source.maybe[SslTlsOutbound])(Keep.left)
// The creation of actual TCP connections is necessary. It is the easiest way to decouple the client and server
// under error conditions, and has the bonus of matching most actual SSL deployments.
val (server, serverErr) = Tcp(system)
.bind("localhost", 0)
.mapAsync(1)(c => c.flow.joinMat(serverTls(IgnoreBoth).reversed.joinMat(simple)(Keep.right))(Keep.right).run())
.toMat(Sink.head)(Keep.both)
.run()
val clientErr = simple
.join(badClientTls(IgnoreBoth))
.join(Tcp(system).outgoingConnection(Await.result(server, 1.second).localAddress))
.run()
Await.result(serverErr, 1.second).getMessage should include("certificate_unknown")
val clientErrText = rootCauseOf(Await.result(clientErr, 1.second)).getMessage
clientErrText should include("unable to find valid certification path to requested target")
}
"reliably cancel subscriptions when TransportIn fails early" in {
val ex = new Exception("hello")
val (sub, out1, out2) =
RunnableGraph
.fromGraph(
GraphDSL.createGraph(Source.asSubscriber[SslTlsOutbound], Sink.head[ByteString], Sink.head[SslTlsInbound])(
(_, _, _)) { implicit b => (s, o1, o2) =>
val tls = b.add(clientTls(EagerClose))
s ~> tls.in1; tls.out1 ~> o1
o2 <~ tls.out2; tls.in2 <~ Source.failed(ex)
ClosedShape
})
.run()
the[Exception] thrownBy Await.result(out1, 1.second) should be(ex)
the[Exception] thrownBy Await.result(out2, 1.second) should be(ex)
Thread.sleep(500)
val pub = TestPublisher.probe()
pub.subscribe(sub)
pub.expectSubscription().expectCancellation()
}
"reliably cancel subscriptions when UserIn fails early" in {
val ex = new Exception("hello")
val (sub, out1, out2) =
RunnableGraph
.fromGraph(
GraphDSL.createGraph(Source.asSubscriber[ByteString], Sink.head[ByteString], Sink.head[SslTlsInbound])(
(_, _, _)) { implicit b => (s, o1, o2) =>
val tls = b.add(clientTls(EagerClose))
Source.failed[SslTlsOutbound](ex) ~> tls.in1; tls.out1 ~> o1
o2 <~ tls.out2; tls.in2 <~ s
ClosedShape
})
.run()
the[Exception] thrownBy Await.result(out1, 1.second) should be(ex)
the[Exception] thrownBy Await.result(out2, 1.second) should be(ex)
Thread.sleep(500)
val pub = TestPublisher.probe()
pub.subscribe(sub)
pub.expectSubscription().expectCancellation()
}
"complete if TLS connection is truncated" in {
val ks = KillSwitches.shared("ks")
val scenario = SingleBytes
val outFlow = {
val terminator = BidiFlow.fromFlows(Flow[ByteString], ks.flow[ByteString])
clientTls(scenario.leftClosing)
.atop(terminator)
.atop(serverTls(scenario.rightClosing).reversed)
.join(debug.via(scenario.flow))
.via(debug)
}
val inFlow = Flow[SslTlsInbound]
.collect { case SessionBytes(_, b) => b }
.scan(ByteString.empty)(_ ++ _)
.via(new Timeout(6.seconds))
.dropWhile(_.size < scenario.output.size)
val f =
Source(scenario.inputs)
.via(outFlow)
.via(inFlow)
.map(result => {
ks.shutdown(); result
})
.runWith(Sink.last)
Await.result(f, 8.second).utf8String should be(scenario.output.utf8String)
}
"verify hostname" in {
def run(hostName: String): Future[pekko.Done] = {
val rhs = Flow[SslTlsInbound].map {
case SessionTruncated => SendBytes(ByteString.empty)
case SessionBytes(_, b) => SendBytes(b)
}
val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80)))
val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs)
Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore)
}
Await.result(run("pekko-remote"), 3.seconds) // CN=pekko-remote
val cause = intercept[Exception] {
Await.result(run("unknown.example.org"), 3.seconds)
}
cause.getClass should ===(classOf[SSLHandshakeException]) // General SSLEngine problem
val rootCause = rootCauseOf(cause.getCause)
rootCause.getClass should ===(classOf[CertificateException])
rootCause.getMessage should ===("No name matching unknown.example.org found")
}
}
def rootCauseOf(e: Throwable): Throwable = {
if (JavaVersion.majorVersion >= 11) e
// Wrapped in extra 'General SSLEngine problem' (sometimes multiple)
// on 1.8.0-265 and before, but not 1.8.0-272 and later...
else if (e.isInstanceOf[SSLHandshakeException]) rootCauseOf(e.getCause)
else e
}
"A SslTlsPlacebo" must {
"pass through data" in {
val f = Source(1 to 3)
.map(b => SendBytes(ByteString(b.toByte)))
.via(TLSPlacebo().join(Flow.apply))
.grouped(10)
.runWith(Sink.head)
val result = Await.result(f, 3.seconds)
result.map(_.bytes) should be((1 to 3).map(b => ByteString(b.toByte)))
result.map(_.session).foreach(s => s.getCipherSuite should be("SSL_NULL_WITH_NULL_NULL"))
}
}
}

View file

@ -965,104 +965,6 @@ class TcpSpec extends StreamSpec("""
}
"TLS client and server convenience methods with deprecated SSLContext setup" should {
"allow for TLS" in {
test()
}
@nowarn("msg=deprecated")
def test(): Unit = {
// cert is valid until 2025, so if this tests starts failing after that you need to create a new one
val (sslContext, firstSession) = initSslMess()
val address = temporaryServerAddress()
Tcp()
.bindAndHandleTls(
// just echo characters until we reach '\n', then complete stream
// also - byte is our framing
Flow[ByteString].mapConcat(_.utf8String.toList).takeWhile(_ != '\n').map(c => ByteString(c)),
address.getHostName,
address.getPort,
sslContext,
firstSession)
.futureValue
system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}")
val connectionFlow =
Tcp().outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession)
val chars = "hello\n".toList.map(_.toString)
val (connectionF, result) =
Source(chars)
.map(c => ByteString(c))
.concat(Source.maybe) // do not complete it from our side
.viaMat(connectionFlow)(Keep.right)
.map(_.utf8String)
.toMat(Sink.fold("")(_ + _))(Keep.both)
.run()
connectionF.futureValue
system.log.info(s"Client connected to ${address.getHostString}:${address.getPort}")
result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello")
}
@nowarn("msg=deprecated")
def initSslMess() = {
// #setting-up-ssl-context
import java.security.KeyStore
import javax.net.ssl._
import org.apache.pekko
import pekko.stream.TLSClientAuth
import pekko.stream.TLSProtocol
import com.typesafe.sslconfig.pekko.PekkoSSLConfig
val sslConfig = PekkoSSLConfig(system)
// Don't hardcode your password in actual code
val password = "abcdef".toCharArray
// trust store and keys in one keystore
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(classOf[TcpSpec].getResourceAsStream("/tcp-spec-keystore.p12"), password)
val tmf = TrustManagerFactory.getInstance("SunX509")
tmf.init(keyStore)
val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(keyStore, password)
// initial ssl context
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
// protocols
val defaultParams = sslContext.getDefaultSSLParameters
val defaultProtocols = defaultParams.getProtocols
val protocols = sslConfig.configureProtocols(defaultProtocols, sslConfig.config)
defaultParams.setProtocols(protocols)
// ciphers
val defaultCiphers = defaultParams.getCipherSuites
val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, sslConfig.config)
defaultParams.setCipherSuites(cipherSuites)
val negotiateNewSession = TLSProtocol.NegotiateNewSession
.withCipherSuites(cipherSuites.toIndexedSeq: _*)
.withProtocols(protocols.toIndexedSeq: _*)
.withParameters(defaultParams)
.withClientAuth(TLSClientAuth.None)
// #setting-up-ssl-context
(sslContext, negotiateNewSession)
}
}
def validateServerClientCommunication(
testData: ByteString,
serverConnection: ServerConnection,

View file

@ -187,7 +187,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stag
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodically")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.AbruptIOTerminationException")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.AbruptIOTerminationException$")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig.validateDefaultTrustManager")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.DelayOverflowStrategy.dropNew")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew$")

View file

@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Remove ssl-config dependency and deprecated methods related to it
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.DefaultSSLEngineConfigurator")
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig")
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig$")
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.SSLEngineConfigurator")
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge")
ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.util.PekkoLoggerFactory")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.TLS.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnection")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingTlsConnection")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.bindTls")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.TLS.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.TLS.apply$*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.outgoingTlsConnection")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.outgoingTlsConnection$*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindTls")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindTls$*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindAndHandleTls")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindAndHandleTls$*")

View file

@ -202,9 +202,3 @@ pekko {
}
}
}
# ssl configuration
# folded in from former ssl-config-pekko module
ssl-config {
logger = "com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge"
}

View file

@ -1,189 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package com.typesafe.sslconfig.pekko
import java.util.Collections
import javax.net.ssl._
import org.apache.pekko
import pekko.actor._
import pekko.annotation.InternalApi
import pekko.event.Logging
import com.typesafe.sslconfig.pekko.util.PekkoLoggerFactory
import com.typesafe.sslconfig.ssl._
import com.typesafe.sslconfig.util.LoggerFactory
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
object PekkoSSLConfig extends ExtensionId[PekkoSSLConfig] with ExtensionIdProvider {
//////////////////// EXTENSION SETUP ///////////////////
override def get(system: ActorSystem): PekkoSSLConfig = super.get(system)
override def get(system: ClassicActorSystemProvider): PekkoSSLConfig = super.get(system)
def apply()(implicit system: ActorSystem): PekkoSSLConfig = super.apply(system)
override def lookup = PekkoSSLConfig
override def createExtension(system: ExtendedActorSystem): PekkoSSLConfig =
new PekkoSSLConfig(system, defaultSSLConfigSettings(system))
def defaultSSLConfigSettings(system: ActorSystem): SSLConfigSettings = {
val pekkoOverrides = system.settings.config.getConfig("pekko.ssl-config")
val defaults = system.settings.config.getConfig("ssl-config")
SSLConfigFactory.parse(pekkoOverrides.withFallback(defaults))
}
}
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
final class PekkoSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSettings) extends Extension {
private val mkLogger = new PekkoLoggerFactory(system)
private val log = Logging(system, classOf[PekkoSSLConfig])
log.debug("Initializing PekkoSSLConfig extension...")
/** Can be used to modify the underlying config, most typically used to change a few values in the default config */
def withSettings(c: SSLConfigSettings): PekkoSSLConfig =
new PekkoSSLConfig(system, c)
/**
* Returns a new [[PekkoSSLConfig]] instance with the settings changed by the given function.
* Please note that the ActorSystem-wide extension always remains configured via typesafe config,
* custom ones can be created for special-handling specific connections
*/
def mapSettings(f: SSLConfigSettings => SSLConfigSettings): PekkoSSLConfig =
new PekkoSSLConfig(system, f(config))
/**
* Returns a new [[PekkoSSLConfig]] instance with the settings changed by the given function.
* Please note that the ActorSystem-wide extension always remains configured via typesafe config,
* custom ones can be created for special-handling specific connections
*
* Java API
*/
// Not same signature as mapSettings to allow latter deprecation of this once we hit Scala 2.12
def convertSettings(f: java.util.function.Function[SSLConfigSettings, SSLConfigSettings]): PekkoSSLConfig =
new PekkoSSLConfig(system, f.apply(config))
val hostnameVerifier = buildHostnameVerifier(config)
/**
* INTERNAL API
*/
@InternalApi def useJvmHostnameVerification: Boolean =
hostnameVerifier match {
case _: DefaultHostnameVerifier | _: NoopHostnameVerifier => true
case _ => false
}
val sslEngineConfigurator = {
val sslContext = if (config.default) {
log.info("ssl-config.default is true, using the JDK's default SSLContext")
SSLContext.getDefault
} else {
// break out the static methods as much as we can...
val keyManagerFactory = buildKeyManagerFactory(config)
val trustManagerFactory = buildTrustManagerFactory(config)
new ConfigSSLContextBuilder(mkLogger, config, keyManagerFactory, trustManagerFactory).build()
}
// protocols!
val defaultParams = sslContext.getDefaultSSLParameters
val defaultProtocols = defaultParams.getProtocols
val protocols = configureProtocols(defaultProtocols, config)
// ciphers!
val defaultCiphers = defaultParams.getCipherSuites
val cipherSuites = configureCipherSuites(defaultCiphers, config)
// apply "loose" settings
// !! SNI!
looseDisableSNI(defaultParams)
new DefaultSSLEngineConfigurator(config, protocols, cipherSuites)
}
////////////////// CONFIGURING //////////////////////
def buildKeyManagerFactory(ssl: SSLConfigSettings): KeyManagerFactoryWrapper = {
val keyManagerAlgorithm = ssl.keyManagerConfig.algorithm
new DefaultKeyManagerFactoryWrapper(keyManagerAlgorithm)
}
def buildTrustManagerFactory(ssl: SSLConfigSettings): TrustManagerFactoryWrapper = {
val trustManagerAlgorithm = ssl.trustManagerConfig.algorithm
new DefaultTrustManagerFactoryWrapper(trustManagerAlgorithm)
}
def buildHostnameVerifier(conf: SSLConfigSettings): HostnameVerifier = {
conf ne null // @unused unavailable
val clazz: Class[HostnameVerifier] =
if (config.loose.disableHostnameVerification)
classOf[DisabledComplainingHostnameVerifier].asInstanceOf[Class[HostnameVerifier]]
else config.hostnameVerifierClass.asInstanceOf[Class[HostnameVerifier]]
val v = system.dynamicAccess
.createInstanceFor[HostnameVerifier](clazz, Nil)
.orElse(system.dynamicAccess.createInstanceFor[HostnameVerifier](clazz, List(classOf[LoggerFactory] -> mkLogger)))
.getOrElse(throw new Exception("Unable to obtain hostname verifier for class: " + clazz))
log.debug("buildHostnameVerifier: created hostname verifier: {}", v)
v
}
def configureProtocols(existingProtocols: Array[String], sslConfig: SSLConfigSettings): Array[String] = {
val definedProtocols = sslConfig.enabledProtocols match {
case Some(configuredProtocols) =>
// If we are given a specific list of protocols, then return it in exactly that order,
// assuming that it's actually possible in the SSL context.
configuredProtocols.filter(existingProtocols.contains).toArray
case None =>
// Otherwise, we return the default protocols in the given list.
Protocols.recommendedProtocols.filter(existingProtocols.contains)
}
definedProtocols
}
def configureCipherSuites(existingCiphers: Array[String], sslConfig: SSLConfigSettings): Array[String] = {
val definedCiphers = sslConfig.enabledCipherSuites match {
case Some(configuredCiphers) =>
// If we are given a specific list of ciphers, return it in that order.
configuredCiphers.filter(existingCiphers.contains(_)).toArray
case None =>
existingCiphers
}
definedCiphers
}
// LOOSE SETTINGS //
private def looseDisableSNI(defaultParams: SSLParameters): Unit = if (config.loose.disableSNI) {
// this will be logged once for each PekkoSSLConfig
log.warning(
"You are using ssl-config.loose.disableSNI=true! " +
"It is strongly discouraged to disable Server Name Indication, as it is crucial to preventing man-in-the-middle attacks.")
defaultParams.setServerNames(Collections.emptyList())
defaultParams.setSNIMatchers(Collections.emptyList())
}
}

View file

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package com.typesafe.sslconfig.pekko
import javax.net.ssl.{ SSLContext, SSLEngine }
import com.typesafe.sslconfig.ssl.SSLConfigSettings
/**
* Gives the chance to configure the SSLContext before it is going to be used.
* The passed in context will be already set in client mode and provided with hostInfo during initialization.
*/
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
trait SSLEngineConfigurator {
def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine
}
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
final class DefaultSSLEngineConfigurator(
config: SSLConfigSettings,
enabledProtocols: Array[String],
enabledCipherSuites: Array[String])
extends SSLEngineConfigurator {
config ne null // @unused unavailable
def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine = {
engine.setSSLParameters(sslContext.getDefaultSSLParameters)
engine.setEnabledProtocols(enabledProtocols)
engine.setEnabledCipherSuites(enabledCipherSuites)
engine
}
}

View file

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package com.typesafe.sslconfig.pekko.util
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.event.{ DummyClassForStringSources, EventStream }
import pekko.event.Logging._
import com.typesafe.sslconfig.util.{ LoggerFactory, NoDepsLogger }
final class PekkoLoggerFactory(system: ActorSystem) extends LoggerFactory {
override def apply(clazz: Class[_]): NoDepsLogger = new PekkoLoggerBridge(system.eventStream, clazz)
override def apply(name: String): NoDepsLogger =
new PekkoLoggerBridge(system.eventStream, name, classOf[DummyClassForStringSources])
}
class PekkoLoggerBridge(bus: EventStream, logSource: String, logClass: Class[_]) extends NoDepsLogger {
def this(bus: EventStream, clazz: Class[_]) = this(bus, clazz.getCanonicalName, clazz)
override def isDebugEnabled: Boolean = true
override def debug(msg: String): Unit = bus.publish(Debug(logSource, logClass, msg))
override def info(msg: String): Unit = bus.publish(Info(logSource, logClass, msg))
override def warn(msg: String): Unit = bus.publish(Warning(logSource, logClass, msg))
override def error(msg: String): Unit = bus.publish(Error(logSource, logClass, msg))
override def error(msg: String, throwable: Throwable): Unit = bus.publish(Error(logSource, logClass, msg))
}

View file

@ -42,8 +42,8 @@ import pekko.util.ByteString
def props(
maxInputBufferSize: Int,
createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
createSSLEngine: () => SSLEngine,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing,
tracing: Boolean = false): Props =
Props(new TLSActor(maxInputBufferSize, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local)
@ -60,8 +60,8 @@ import pekko.util.ByteString
*/
@InternalApi private[stream] class TLSActor(
maxInputBufferSize: Int,
createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
createSSLEngine: () => SSLEngine,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing,
tracing: Boolean)
extends Actor
@ -170,7 +170,7 @@ import pekko.util.ByteString
// The engine could also be instantiated in ActorMaterializerImpl but if creation fails
// during materialization it would be worse than failing later on.
val engine =
try createSSLEngine(context.system)
try createSSLEngine()
catch { case NonFatal(ex) => fail(ex, closeTransport = true); throw ex }
engine.beginHandshake()
@ -475,7 +475,7 @@ import pekko.util.ByteString
if (tracing) log.debug("handshake finished")
val session = engine.getSession
verifySession(context.system, session) match {
verifySession(session) match {
case Success(()) =>
currentSession = session
corkUser = false

View file

@ -19,7 +19,6 @@ import scala.util.Try
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.TLSProtocol._
@ -37,8 +36,8 @@ import pekko.util.ByteString
cipherOut: Outlet[ByteString],
shape: BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound],
attributes: Attributes,
createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
createSSLEngine: () => SSLEngine,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing)
extends AtomicModule[BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], NotUsed] {
@ -56,8 +55,8 @@ import pekko.util.ByteString
@InternalApi private[stream] object TlsModule {
def apply(
attributes: Attributes,
createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753
createSSLEngine: () => SSLEngine,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing): TlsModule = {
val name = attributes.nameOrDefault(s"StreamTls()")
val cipherIn = Inlet[ByteString](s"$name.cipherIn")

View file

@ -13,20 +13,16 @@
package org.apache.pekko.stream.javadsl
import java.util.Optional
import java.util.function.{ Consumer, Supplier }
import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession }
import javax.net.ssl.{ SSLEngine, SSLSession }
import scala.util.Try
import org.apache.pekko
import pekko.{ japi, NotUsed }
import pekko.NotUsed
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.util.ByteString
import pekko.util.OptionConverters._
import com.typesafe.sslconfig.pekko.PekkoSSLConfig
/**
* Stream cipher support based upon JSSE.
@ -65,112 +61,6 @@ import com.typesafe.sslconfig.pekko.PekkoSSLConfig
*/
object TLS {
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* This method uses the default closing behavior or [[IgnoreComplete]].
*/
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def create(
sslContext: SSLContext,
sslConfig: Optional[PekkoSSLConfig],
firstSession: NegotiateNewSession,
role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, sslConfig.toScala, firstSession, role))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* This method uses the default closing behavior or [[IgnoreComplete]].
*/
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def create(
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, None, firstSession, role))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def create(
sslContext: SSLContext,
sslConfig: Optional[PekkoSSLConfig],
firstSession: NegotiateNewSession,
role: TLSRole,
hostInfo: Optional[japi.Pair[String, java.lang.Integer]],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(
scaladsl.TLS.apply(
sslContext,
sslConfig.toScala,
firstSession,
role,
closing,
hostInfo.toScala.map(e => (e.first, e.second))))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/
@deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def create(
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole,
hostInfo: Optional[japi.Pair[String, java.lang.Integer]],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(
scaladsl.TLS.apply(
sslContext,
None,
firstSession,
role,
closing,
hostInfo.toScala.map(e => (e.first, e.second))))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level interface.
*

View file

@ -19,11 +19,9 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.{ Function => JFunction }
import java.util.function.Supplier
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
@ -40,7 +38,6 @@ import pekko.io.Inet.SocketOption
import pekko.stream.Materializer
import pekko.stream.SystemMaterializer
import pekko.stream.TLSClosing
import pekko.stream.TLSProtocol.NegotiateNewSession
import pekko.stream.scaladsl
import pekko.util.ByteString
import pekko.util.FutureConverters._
@ -238,42 +235,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
optionalDurationToScala(idleTimeout))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
/**
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
*
* Note that the ByteString chunk boundaries are not retained across the network,
* to achieve application level chunks you have to introduce explicit framing in your streams,
* for example using the [[Framing]] operators.
*
* @param remoteAddress The remote address to connect to
* @param localAddress Optional local address for the connection
* @param options TCP options for the connections, see [[pekko.io.Tcp]] for details
* @param halfClose
* Controls whether the connection is kept open even after writing has been completed to the accepted
* TCP connections.
* If set to true, the connection will implement the TCP half-close mechanism, allowing the server to
* write to the connection even after the client has finished writing. The TCP socket is only closed
* after both the client and server finished writing. This setting is recommended for clients and
* therefore it is the default setting.
* If set to false, the connection will immediately closed once the client closes its write side,
* independently whether the server is still attempting to write.
*/
@deprecated("Use bind that takes a java.time.Duration parameter instead.", "Akka 2.6.0")
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
halfClose: Boolean,
connectTimeout: Duration,
idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
outgoingConnection(
remoteAddress,
localAddress,
options,
halfClose,
durationToJavaOptional(connectTimeout),
durationToJavaOptional(idleTimeout))
/**
* Creates an [[Tcp.OutgoingConnection]] without specifying options.
* It represents a prospective TCP client connection to the given endpoint.
@ -288,60 +249,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
.outgoingConnection(new InetSocketAddress(host, port))
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection]]
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def outgoingTlsConnection(
host: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph(
delegate
.outgoingTlsConnection(host, port, sslContext, negotiateNewSession)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection]]
*
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def outgoingTlsConnection(
remoteAddress: InetSocketAddress,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession,
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
connectTimeout: Duration,
idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph(
delegate
.outgoingTlsConnection(
remoteAddress,
sslContext,
negotiateNewSession,
localAddress.toScala,
CollectionUtil.toSeq(options),
connectTimeout,
idleTimeout)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
@ -397,56 +304,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
}
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind]]
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*
* Note: the half close parameter is currently ignored
*/
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def bindTls(
interface: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession,
backlog: Int,
options: JIterable[SocketOption],
@nowarn // unused #26689
halfClose: Boolean,
idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] =
Source.fromGraph(
delegate
.bindTls(interface, port, sslContext, negotiateNewSession, backlog, CollectionUtil.toSeq(options), idleTimeout)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind]]
*/
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def bindTls(
interface: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession): Source[IncomingConnection, CompletionStage[ServerBinding]] =
Source.fromGraph(
delegate
.bindTls(interface, port, sslContext, negotiateNewSession)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
@ -501,8 +358,4 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
private def optionalDurationToScala(duration: Optional[java.time.Duration]) = {
if (duration.isPresent) duration.get.asScala else Duration.Inf
}
private def durationToJavaOptional(duration: Duration): Optional[java.time.Duration] = {
if (duration.isFinite) Optional.ofNullable(duration.asJava) else Optional.empty()
}
}

View file

@ -13,22 +13,17 @@
package org.apache.pekko.stream.scaladsl
import java.util.Collections
import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession }
import javax.net.ssl.SSLParameters
import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession }
import scala.util.{ Failure, Success, Try }
import scala.util.{ Success, Try }
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.stream.impl.io.{ TlsModule, TlsUtils }
import pekko.stream.impl.io.TlsModule
import pekko.util.ByteString
import com.typesafe.sslconfig.pekko.PekkoSSLConfig
/**
* Stream cipher support based upon JSSE.
*
@ -66,129 +61,6 @@ import com.typesafe.sslconfig.pekko.PekkoSSLConfig
*/
object TLS {
/**
* Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def apply(
sslContext: SSLContext,
sslConfig: Option[PekkoSSLConfig],
firstSession: NegotiateNewSession,
role: TLSRole,
closing: TLSClosing = IgnoreComplete,
hostInfo: Option[(String, Int)] = None)
: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = {
def theSslConfig(system: ActorSystem): PekkoSSLConfig =
sslConfig.getOrElse(PekkoSSLConfig(system))
val createSSLEngine = { (system: ActorSystem) =>
val config = theSslConfig(system)
val engine = hostInfo match {
case Some((hostname, port)) if !config.config.loose.disableSNI =>
sslContext.createSSLEngine(hostname, port)
case _ => sslContext.createSSLEngine()
}
config.sslEngineConfigurator.configure(engine, sslContext)
engine.setUseClientMode(role == Client)
val paramsWithSni =
if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) {
val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get)
// In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using
// `createSSLEngine(hostname, port)`.
// In Java 8, SNI is only enabled if the server names are added to the parameters.
// See https://github.com/akka/akka/issues/19287.
newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1)))
firstSession.copy(sslParameters = Some(newParams))
} else
firstSession
val paramsWithHostnameVerification = if (hostInfo.isDefined && config.useJvmHostnameVerification) {
val newParams = paramsWithSni.sslParameters.map(TlsUtils.cloneParameters).getOrElse(new SSLParameters)
newParams.setEndpointIdentificationAlgorithm("HTTPS")
paramsWithSni.copy(sslParameters = Some(newParams))
} else
paramsWithSni
TlsUtils.applySessionParameters(engine, paramsWithHostnameVerification)
engine
}
def verifySession: (ActorSystem, SSLSession) => Try[Unit] =
hostInfo match {
case Some((hostname, _)) => { (system, session) =>
val config = theSslConfig(system)
if (config.useJvmHostnameVerification || config.hostnameVerifier.verify(hostname, session))
Success(())
else
Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"))
}
case None => (_, _) => Success(())
}
scaladsl.BidiFlow.fromGraph(TlsModule(Attributes.none, createSSLEngine, verifySession, closing))
}
/**
* Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
*/
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def apply(
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole,
closing: TLSClosing,
hostInfo: Option[(String, Int)])
: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
apply(sslContext, None, firstSession, role, closing, hostInfo)
/**
* Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The
* SSLContext will be used to create an SSLEngine to which then the
* `firstSession` parameters are applied before initiating the first
* handshake. The `role` parameter determines the SSLEngines role; this is
* often the same as the underlying transports server or client role, but
* that is not a requirement and depends entirely on the application
* protocol.
*/
@deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def apply(
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
apply(sslContext, None, firstSession, role, IgnoreComplete, None)
/**
* Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]].
*
@ -201,11 +73,11 @@ object TLS {
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*/
def apply(
createSSLEngine: () => SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753
verifySession: SSLSession => Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753
createSSLEngine: () => SSLEngine,
verifySession: SSLSession => Try[Unit],
closing: TLSClosing): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
scaladsl.BidiFlow.fromGraph(
TlsModule(Attributes.none, _ => createSSLEngine(), (_, session) => verifySession(session), closing))
TlsModule(Attributes.none, () => createSSLEngine(), session => verifySession(session), closing))
/**
* Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]].
@ -216,7 +88,7 @@ object TLS {
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*/
def apply(
createSSLEngine: () => SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753
createSSLEngine: () => SSLEngine,
closing: TLSClosing): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
apply(createSSLEngine, _ => Success(()), closing)
}

View file

@ -15,7 +15,6 @@ package org.apache.pekko.stream.scaladsl
import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
@ -38,7 +37,6 @@ import pekko.io.IO
import pekko.io.Inet.SocketOption
import pekko.stream._
import pekko.stream.Attributes.Attribute
import pekko.stream.TLSProtocol.NegotiateNewSession
import pekko.stream.impl.fusing.GraphStages.detacher
import pekko.stream.impl.io.ConnectionSourceStage
import pekko.stream.impl.io.OutgoingConnectionStage
@ -254,58 +252,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
outgoingConnection(InetSocketAddress.createUnresolved(host, port))
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* For more advanced use cases you can manually combine [[Tcp.outgoingConnection]] and [[TLS]]
*
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
*
* @see [[Tcp.outgoingConnection]]
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def outgoingTlsConnection(
host: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
outgoingTlsConnection(InetSocketAddress.createUnresolved(host, port), sslContext, negotiateNewSession)
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection]]
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def outgoingTlsConnection(
remoteAddress: InetSocketAddress,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession,
localAddress: Option[InetSocketAddress] = None,
@nowarn // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout)
@nowarn("msg=deprecated")
val tls = TLS(sslContext, negotiateNewSession, TLSRole.client)
connection.join(tlsWrapping.atop(tls).reversed)
}
/**
* Creates an [[Tcp.OutgoingConnection]] with TLS.
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
@ -354,35 +300,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
connection.join(tlsWrapping.atop(tls).reversed)
}
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
* @see [[Tcp.bind]]
*/
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def bindTls(
interface: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession,
backlog: Int = defaultBacklog,
@nowarn // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = {
@nowarn("msg=deprecated")
val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed
bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection =>
incomingConnection.copy(flow = incomingConnection.flow.join(tls))
}
}
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
@ -484,38 +401,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
})
.run()
}
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* handling the incoming connections through TLS and then run using the provided Flow.
*
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
* @see [[Tcp.bindAndHandle]]
*
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*/
@deprecated(
"Use bindAndHandleWithTls that takes a SSLEngine factory instead. " +
"Setup the SSLEngine with needed parameters.",
"Akka 2.6.0")
def bindAndHandleTls(
handler: Flow[ByteString, ByteString, _],
interface: String,
port: Int,
sslContext: SSLContext,
negotiateNewSession: NegotiateNewSession,
backlog: Int = defaultBacklog,
@nowarn // Traversable deprecated in 2.13
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = {
bindTls(interface, port, sslContext, negotiateNewSession, backlog, options, idleTimeout)
.to(Sink.foreach { (conn: IncomingConnection) =>
conn.handleWith(handler)
})
.run()
}
}
final class TcpIdleTimeoutException(msg: String, @unused timeout: Duration)