From 18545a67371721645c4c433cffb29c0acd69f6aa Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 2 Sep 2025 20:04:39 +0100 Subject: [PATCH] remove ssl-config dependency (#2127) * remove ssl-config dependency * update internal code that doesn't need ActorSystem any more * scalafmt --- project/Dependencies.scala | 7 +- project/OSGi.scala | 13 +- .../pekko/stream/io/DeprecatedTlsSpec.scala | 573 ------------------ .../org/apache/pekko/stream/io/TcpSpec.scala | 98 --- .../remove-deprecated-methods.excludes | 1 - .../remove-ssl-config.excludes | 36 ++ stream/src/main/resources/reference.conf | 6 - .../sslconfig/pekko/PekkoSSLConfig.scala | 189 ------ .../pekko/SSLEngineConfigurator.scala | 44 -- .../pekko/util/PekkoLoggerBridge.scala | 44 -- .../pekko/stream/impl/io/TLSActor.scala | 12 +- .../pekko/stream/impl/io/TlsModule.scala | 9 +- .../org/apache/pekko/stream/javadsl/TLS.scala | 114 +--- .../org/apache/pekko/stream/javadsl/Tcp.scala | 147 ----- .../apache/pekko/stream/scaladsl/TLS.scala | 142 +---- .../apache/pekko/stream/scaladsl/Tcp.scala | 115 ---- 16 files changed, 58 insertions(+), 1492 deletions(-) delete mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/io/DeprecatedTlsSpec.scala create mode 100644 stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-ssl-config.excludes delete mode 100644 stream/src/main/scala/com/typesafe/sslconfig/pekko/PekkoSSLConfig.scala delete mode 100644 stream/src/main/scala/com/typesafe/sslconfig/pekko/SSLEngineConfigurator.scala delete mode 100644 stream/src/main/scala/com/typesafe/sslconfig/pekko/util/PekkoLoggerBridge.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4904508da7..7aa44bedfc 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -50,8 +50,6 @@ object Dependencies { val reactiveStreamsVersion = "1.0.4" - val sslConfigVersion = "0.6.1" - val scalaTestVersion = "3.2.19" val scalaTestScalaCheckVersion = "1-18" val scalaCheckVersion = "1.18.0" @@ -83,9 +81,6 @@ object Dependencies { // reactive streams val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion - // ssl-config - val sslConfigCore = "com.typesafe" %% "ssl-config-core" % sslConfigVersion - val lmdb = "org.lmdbjava" % "lmdbjava" % "0.9.1" val junit = "junit" % "junit" % junitVersion @@ -356,7 +351,7 @@ object Dependencies { // pekko stream - lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, sslConfigCore, TestDependencies.scalatest) + lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest) lazy val streamTestkit = l ++= Seq( TestDependencies.scalatest, diff --git a/project/OSGi.scala b/project/OSGi.scala index dcb1b67252..21e9571074 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -96,13 +96,10 @@ object OSGi { lazy val stream = exports( - packages = Seq("org.apache.pekko.stream.*", "com.typesafe.sslconfig.pekko.*"), + packages = Seq("org.apache.pekko.stream.*"), imports = Seq( scalaJava8CompatImport(), - scalaParsingCombinatorImport(), - sslConfigCoreImport("com.typesafe.sslconfig.ssl.*"), - sslConfigCoreImport("com.typesafe.sslconfig.util.*"), - "!com.typesafe.sslconfig.pekko.*")) + scalaParsingCombinatorImport())) lazy val streamTestkit = exports(Seq("org.apache.pekko.stream.testkit.*")) @@ -155,12 +152,6 @@ object OSGi { versionedImport(packageName, "1.0.2", "1.0.2") def scalaParsingCombinatorImport(packageName: String = "scala.util.parsing.combinator.*") = versionedImport(packageName, "1.1.0", "1.2.0") - def sslConfigCoreImport(packageName: String = "com.typesafe.sslconfig") = - versionedImport(packageName, "0.4.0", "1.0.0") - def sslConfigCoreSslImport(packageName: String = "com.typesafe.sslconfig.ssl.*") = - versionedImport(packageName, "0.4.0", "1.0.0") - def sslConfigCoreUtilImport(packageName: String = "com.typesafe.sslconfig.util.*") = - versionedImport(packageName, "0.4.0", "1.0.0") def kamonImport(packageName: String = "kamon.sigar.*") = optionalResolution(versionedImport(packageName, "1.6.5", "1.6.6")) def sigarImport(packageName: String = "org.hyperic.*") = diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/DeprecatedTlsSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/DeprecatedTlsSpec.scala deleted file mode 100644 index 828d371c2a..0000000000 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/DeprecatedTlsSpec.scala +++ /dev/null @@ -1,573 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2018-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream.io - -import java.security.KeyStore -import java.security.SecureRandom -import java.security.cert.CertificateException -import java.util.concurrent.TimeoutException -import javax.net.ssl._ - -import scala.annotation.nowarn -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.Random - -import org.apache.pekko -import pekko.NotUsed -import pekko.pattern.{ after => later } -import pekko.stream._ -import pekko.stream.TLSProtocol._ -import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import pekko.stream.scaladsl._ -import pekko.stream.stage._ -import pekko.stream.testkit._ -import pekko.testkit.TestDuration -import pekko.testkit.WithLogCapturing -import pekko.util.ByteString -import pekko.util.JavaVersion - -import com.typesafe.sslconfig.pekko.PekkoSSLConfig - -object DeprecatedTlsSpec { - - val rnd = new Random - - def initWithTrust(trustPath: String) = { - val password = "changeme" - - val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) - keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray) - - val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) - trustStore.load(getClass.getResourceAsStream(trustPath), password.toCharArray) - - val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) - keyManagerFactory.init(keyStore, password.toCharArray) - - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) - trustManagerFactory.init(trustStore) - - val context = SSLContext.getInstance("TLS") - context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) - context - } - - def initSslContext(): SSLContext = initWithTrust("/truststore") - - /** - * This is an operator that fires a TimeoutException failure after it was started, - * independent of the traffic going through. The purpose is to include the last seen - * element in the exception message to help in figuring out what went wrong. - */ - class Timeout(duration: FiniteDuration) extends GraphStage[FlowShape[ByteString, ByteString]] { - - private val in = Inlet[ByteString]("in") - private val out = Outlet[ByteString]("out") - override val shape = FlowShape(in, out) - - override def createLogic(attr: Attributes) = new TimerGraphStageLogic(shape) { - override def preStart(): Unit = scheduleOnce((), duration) - - var last: ByteString = _ - setHandler(in, - new InHandler { - override def onPush(): Unit = { - last = grab(in) - push(out, last) - } - }) - setHandler(out, - new OutHandler { - override def onPull(): Unit = pull(in) - }) - override def onTimer(x: Any): Unit = { - failStage(new TimeoutException(s"timeout expired, last element was $last")) - } - } - } - - val configOverrides = - """ - pekko.loglevel = DEBUG # issue 21660 - pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"] - pekko.actor.debug.receive=off - """ -} - -@nowarn("msg=deprecated") -class DeprecatedTlsSpec extends StreamSpec(DeprecatedTlsSpec.configOverrides) with WithLogCapturing { - import DeprecatedTlsSpec._ - import GraphDSL.Implicits._ - import system.dispatcher - - val sslConfig: Option[PekkoSSLConfig] = None // no special settings to be applied here - - "SslTls with deprecated SSLContext setup" must { - - val sslContext = initSslContext() - - val debug = Flow[SslTlsInbound].map { x => - x match { - case SessionTruncated => system.log.debug(s" ----------- truncated ") - case SessionBytes(_, b) => system.log.debug(s" ----------- (${b.size}) ${b.take(32).utf8String}") - } - x - } - - val cipherSuites = - NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_128_CBC_SHA") - def clientTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Client, closing) - def badClientTls(closing: TLSClosing) = TLS(initWithTrust("/badtruststore"), None, cipherSuites, Client, closing) - def serverTls(closing: TLSClosing) = TLS(sslContext, None, cipherSuites, Server, closing) - - trait Named { - def name: String = - getClass.getName.reverse.dropWhile(c => "$0123456789".indexOf(c) != -1).takeWhile(_ != '$').reverse - } - - trait CommunicationSetup extends Named { - def decorateFlow( - leftClosing: TLSClosing, - rightClosing: TLSClosing, - rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]): Flow[SslTlsOutbound, SslTlsInbound, NotUsed] - def cleanup(): Unit = () - } - - object ClientInitiates extends CommunicationSetup { - def decorateFlow( - leftClosing: TLSClosing, - rightClosing: TLSClosing, - rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = - clientTls(leftClosing).atop(serverTls(rightClosing).reversed).join(rhs) - } - - object ServerInitiates extends CommunicationSetup { - def decorateFlow( - leftClosing: TLSClosing, - rightClosing: TLSClosing, - rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = - serverTls(leftClosing).atop(clientTls(rightClosing).reversed).join(rhs) - } - - def server(flow: Flow[ByteString, ByteString, Any]) = { - val server = Tcp(system).bind("localhost", 0).to(Sink.foreach(c => c.flow.join(flow).run())).run() - Await.result(server, 2.seconds) - } - - object ClientInitiatesViaTcp extends CommunicationSetup { - var binding: Tcp.ServerBinding = null - def decorateFlow( - leftClosing: TLSClosing, - rightClosing: TLSClosing, - rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { - binding = server(serverTls(rightClosing).reversed.join(rhs)) - clientTls(leftClosing).join(Tcp(system).outgoingConnection(binding.localAddress)) - } - override def cleanup(): Unit = binding.unbind() - } - - object ServerInitiatesViaTcp extends CommunicationSetup { - var binding: Tcp.ServerBinding = null - def decorateFlow( - leftClosing: TLSClosing, - rightClosing: TLSClosing, - rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { - binding = server(clientTls(rightClosing).reversed.join(rhs)) - serverTls(leftClosing).join(Tcp(system).outgoingConnection(binding.localAddress)) - } - override def cleanup(): Unit = binding.unbind() - } - - val communicationPatterns = - Seq(ClientInitiates, ServerInitiates, ClientInitiatesViaTcp, ServerInitiatesViaTcp) - - trait PayloadScenario extends Named { - def flow: Flow[SslTlsInbound, SslTlsOutbound, Any] = - Flow[SslTlsInbound].map { - var session: SSLSession = null - def setSession(s: SSLSession) = { - session = s - system.log.debug(s"new session: $session (${session.getId.mkString(",")})") - } - - { - case SessionTruncated => SendBytes(ByteString("TRUNCATED")) - case SessionBytes(s, b) if session == null => - setSession(s) - SendBytes(b) - case SessionBytes(s, b) if s != session => - setSession(s) - SendBytes(ByteString("NEWSESSION") ++ b) - case SessionBytes(_, b) => SendBytes(b) - } - } - def leftClosing: TLSClosing = IgnoreComplete - def rightClosing: TLSClosing = IgnoreComplete - - def inputs: immutable.Seq[SslTlsOutbound] - def output: ByteString - - protected def send(str: String) = SendBytes(ByteString(str)) - protected def send(ch: Char) = SendBytes(ByteString(ch.toByte)) - } - - object SingleBytes extends PayloadScenario { - val str = "0123456789" - def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) - def output = ByteString(str) - } - - object MediumMessages extends PayloadScenario { - val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 1000)) - def inputs = strs.map(s => SendBytes(ByteString(s))) - def output = ByteString(strs.foldRight("")(_ ++ _)) - } - - object LargeMessages extends PayloadScenario { - // TLS max packet size is 16384 bytes - val strs = "0123456789".map(d => d.toString * (rnd.nextInt(9000) + 17000)) - def inputs = strs.map(s => SendBytes(ByteString(s))) - def output = ByteString(strs.foldRight("")(_ ++ _)) - } - - object EmptyBytesFirst extends PayloadScenario { - def inputs = List(ByteString.empty, ByteString("hello")).map(SendBytes.apply) - def output = ByteString("hello") - } - - object EmptyBytesInTheMiddle extends PayloadScenario { - def inputs = List(ByteString("hello"), ByteString.empty, ByteString(" world")).map(SendBytes.apply) - def output = ByteString("hello world") - } - - object EmptyBytesLast extends PayloadScenario { - def inputs = List(ByteString("hello"), ByteString.empty).map(SendBytes.apply) - def output = ByteString("hello") - } - - object CompletedImmediately extends PayloadScenario { - override def inputs: immutable.Seq[SslTlsOutbound] = Nil - override def output = ByteString.empty - - override def leftClosing: TLSClosing = EagerClose - override def rightClosing: TLSClosing = EagerClose - } - - // this demonstrates that cancellation is ignored so that the five results make it back - object CancellingRHS extends PayloadScenario { - override def flow = - Flow[SslTlsInbound] - .mapConcat { - case SessionTruncated => SessionTruncated :: Nil - case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b))) - } - .take(5) - .mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x))) - .via(super.flow) - override def rightClosing = IgnoreCancel - - val str = "abcdef" * 100 - def inputs = str.map(send) - def output = ByteString(str.take(5)) - } - - object CancellingRHSIgnoresBoth extends PayloadScenario { - override def flow = - Flow[SslTlsInbound] - .mapConcat { - case SessionTruncated => SessionTruncated :: Nil - case SessionBytes(s, bytes) => bytes.map(b => SessionBytes(s, ByteString(b))) - } - .take(5) - .mapAsync(5)(x => later(500.millis, system.scheduler)(Future.successful(x))) - .via(super.flow) - override def rightClosing = IgnoreBoth - - val str = "abcdef" * 100 - def inputs = str.map(send) - def output = ByteString(str.take(5)) - } - - object LHSIgnoresBoth extends PayloadScenario { - override def leftClosing = IgnoreBoth - val str = "0123456789" - def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) - def output = ByteString(str) - } - - object BothSidesIgnoreBoth extends PayloadScenario { - override def leftClosing = IgnoreBoth - override def rightClosing = IgnoreBoth - val str = "0123456789" - def inputs = str.map(ch => SendBytes(ByteString(ch.toByte))) - def output = ByteString(str) - } - - object SessionRenegotiationBySender extends PayloadScenario { - def inputs = List(send("hello"), NegotiateNewSession, send("world")) - def output = ByteString("helloNEWSESSIONworld") - } - - // difference is that the RHS engine will now receive the handshake while trying to send - object SessionRenegotiationByReceiver extends PayloadScenario { - val str = "abcdef" * 100 - def inputs = str.map(send) ++ Seq(NegotiateNewSession) ++ "hello world".map(send) - def output = ByteString(str + "NEWSESSIONhello world") - } - - val logCipherSuite = Flow[SslTlsInbound].map { - var session: SSLSession = null - def setSession(s: SSLSession) = { - session = s - system.log.debug(s"new session: $session (${session.getId.mkString(",")})") - } - - { - case SessionTruncated => SendBytes(ByteString("TRUNCATED")) - case SessionBytes(s, b) if s != session => - setSession(s) - SendBytes(ByteString(s.getCipherSuite) ++ b) - case SessionBytes(_, b) => SendBytes(b) - } - } - - object SessionRenegotiationFirstOne extends PayloadScenario { - override def flow = logCipherSuite - def inputs = NegotiateNewSession.withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil - def output = ByteString("TLS_RSA_WITH_AES_128_CBC_SHAhello") - } - - object SessionRenegotiationFirstTwo extends PayloadScenario { - override def flow = logCipherSuite - def inputs = NegotiateNewSession.withCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") :: send("hello") :: Nil - def output = ByteString("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHAhello") - } - - val renegotiationScenarios = if (JavaVersion.majorVersion <= 21) - Seq( - SessionRenegotiationBySender, - SessionRenegotiationByReceiver, - SessionRenegotiationFirstOne, - SessionRenegotiationFirstTwo) - else - // skip SessionRenegotiationFirstOne as it uses a weak cipher suite and the test will fail - Seq( - SessionRenegotiationBySender, - SessionRenegotiationByReceiver, - SessionRenegotiationFirstTwo) - - val scenarios = - Seq( - SingleBytes, - MediumMessages, - LargeMessages, - EmptyBytesFirst, - EmptyBytesInTheMiddle, - EmptyBytesLast, - CompletedImmediately, - CancellingRHS, - CancellingRHSIgnoresBoth, - LHSIgnoresBoth, - BothSidesIgnoreBoth) ++ renegotiationScenarios - - for { - commPattern <- communicationPatterns - scenario <- scenarios - } { - s"work in mode ${commPattern.name} while sending ${scenario.name}" in { - val onRHS = debug.via(scenario.flow) - val output = - Source(scenario.inputs) - .via(commPattern.decorateFlow(scenario.leftClosing, scenario.rightClosing, onRHS)) - .via(new SimpleLinearGraphStage[SslTlsInbound] { - override def createLogic(inheritedAttributes: Attributes) = - new GraphStageLogic(shape) with InHandler with OutHandler { - setHandlers(in, out, this) - - override def onPush() = push(out, grab(in)) - override def onPull() = pull(in) - - override def onDownstreamFinish(cause: Throwable) = { - system.log.debug(s"me cancelled, cause {}", cause) - completeStage() - } - } - }) - .via(debug) - .collect { case SessionBytes(_, b) => b } - .scan(ByteString.empty)(_ ++ _) - .filter(_.nonEmpty) - .via(new Timeout(10.seconds.dilated)) - .dropWhile(_.size < scenario.output.size) - .runWith(Sink.headOption) - - Await.result(output, 12.seconds.dilated).getOrElse(ByteString.empty).utf8String should be( - scenario.output.utf8String) - - commPattern.cleanup() - } - } - - "emit an error if the TLS handshake fails certificate checks" in { - val getError = Flow[SslTlsInbound] - .map[Either[SslTlsInbound, SSLException]](i => Left(i)) - .recover { case e: SSLException => Right(e) } - .collect { case Right(e) => e } - .toMat(Sink.head)(Keep.right) - - val simple = Flow.fromSinkAndSourceMat(getError, Source.maybe[SslTlsOutbound])(Keep.left) - - // The creation of actual TCP connections is necessary. It is the easiest way to decouple the client and server - // under error conditions, and has the bonus of matching most actual SSL deployments. - val (server, serverErr) = Tcp(system) - .bind("localhost", 0) - .mapAsync(1)(c => c.flow.joinMat(serverTls(IgnoreBoth).reversed.joinMat(simple)(Keep.right))(Keep.right).run()) - .toMat(Sink.head)(Keep.both) - .run() - - val clientErr = simple - .join(badClientTls(IgnoreBoth)) - .join(Tcp(system).outgoingConnection(Await.result(server, 1.second).localAddress)) - .run() - - Await.result(serverErr, 1.second).getMessage should include("certificate_unknown") - val clientErrText = rootCauseOf(Await.result(clientErr, 1.second)).getMessage - clientErrText should include("unable to find valid certification path to requested target") - } - - "reliably cancel subscriptions when TransportIn fails early" in { - val ex = new Exception("hello") - val (sub, out1, out2) = - RunnableGraph - .fromGraph( - GraphDSL.createGraph(Source.asSubscriber[SslTlsOutbound], Sink.head[ByteString], Sink.head[SslTlsInbound])( - (_, _, _)) { implicit b => (s, o1, o2) => - val tls = b.add(clientTls(EagerClose)) - s ~> tls.in1; tls.out1 ~> o1 - o2 <~ tls.out2; tls.in2 <~ Source.failed(ex) - ClosedShape - }) - .run() - the[Exception] thrownBy Await.result(out1, 1.second) should be(ex) - the[Exception] thrownBy Await.result(out2, 1.second) should be(ex) - Thread.sleep(500) - val pub = TestPublisher.probe() - pub.subscribe(sub) - pub.expectSubscription().expectCancellation() - } - - "reliably cancel subscriptions when UserIn fails early" in { - val ex = new Exception("hello") - val (sub, out1, out2) = - RunnableGraph - .fromGraph( - GraphDSL.createGraph(Source.asSubscriber[ByteString], Sink.head[ByteString], Sink.head[SslTlsInbound])( - (_, _, _)) { implicit b => (s, o1, o2) => - val tls = b.add(clientTls(EagerClose)) - Source.failed[SslTlsOutbound](ex) ~> tls.in1; tls.out1 ~> o1 - o2 <~ tls.out2; tls.in2 <~ s - ClosedShape - }) - .run() - the[Exception] thrownBy Await.result(out1, 1.second) should be(ex) - the[Exception] thrownBy Await.result(out2, 1.second) should be(ex) - Thread.sleep(500) - val pub = TestPublisher.probe() - pub.subscribe(sub) - pub.expectSubscription().expectCancellation() - } - - "complete if TLS connection is truncated" in { - - val ks = KillSwitches.shared("ks") - - val scenario = SingleBytes - - val outFlow = { - val terminator = BidiFlow.fromFlows(Flow[ByteString], ks.flow[ByteString]) - clientTls(scenario.leftClosing) - .atop(terminator) - .atop(serverTls(scenario.rightClosing).reversed) - .join(debug.via(scenario.flow)) - .via(debug) - } - - val inFlow = Flow[SslTlsInbound] - .collect { case SessionBytes(_, b) => b } - .scan(ByteString.empty)(_ ++ _) - .via(new Timeout(6.seconds)) - .dropWhile(_.size < scenario.output.size) - - val f = - Source(scenario.inputs) - .via(outFlow) - .via(inFlow) - .map(result => { - ks.shutdown(); result - }) - .runWith(Sink.last) - - Await.result(f, 8.second).utf8String should be(scenario.output.utf8String) - } - - "verify hostname" in { - def run(hostName: String): Future[pekko.Done] = { - val rhs = Flow[SslTlsInbound].map { - case SessionTruncated => SendBytes(ByteString.empty) - case SessionBytes(_, b) => SendBytes(b) - } - val clientTls = TLS(sslContext, None, cipherSuites, Client, EagerClose, Some((hostName, 80))) - val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs) - - Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore) - } - Await.result(run("pekko-remote"), 3.seconds) // CN=pekko-remote - val cause = intercept[Exception] { - Await.result(run("unknown.example.org"), 3.seconds) - } - - cause.getClass should ===(classOf[SSLHandshakeException]) // General SSLEngine problem - val rootCause = rootCauseOf(cause.getCause) - rootCause.getClass should ===(classOf[CertificateException]) - rootCause.getMessage should ===("No name matching unknown.example.org found") - } - } - - def rootCauseOf(e: Throwable): Throwable = { - if (JavaVersion.majorVersion >= 11) e - // Wrapped in extra 'General SSLEngine problem' (sometimes multiple) - // on 1.8.0-265 and before, but not 1.8.0-272 and later... - else if (e.isInstanceOf[SSLHandshakeException]) rootCauseOf(e.getCause) - else e - } - - "A SslTlsPlacebo" must { - - "pass through data" in { - val f = Source(1 to 3) - .map(b => SendBytes(ByteString(b.toByte))) - .via(TLSPlacebo().join(Flow.apply)) - .grouped(10) - .runWith(Sink.head) - val result = Await.result(f, 3.seconds) - result.map(_.bytes) should be((1 to 3).map(b => ByteString(b.toByte))) - result.map(_.session).foreach(s => s.getCipherSuite should be("SSL_NULL_WITH_NULL_NULL")) - } - - } - -} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala index 673629f2bf..2466937559 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala @@ -965,104 +965,6 @@ class TcpSpec extends StreamSpec(""" } - "TLS client and server convenience methods with deprecated SSLContext setup" should { - - "allow for TLS" in { - test() - } - - @nowarn("msg=deprecated") - def test(): Unit = { - // cert is valid until 2025, so if this tests starts failing after that you need to create a new one - val (sslContext, firstSession) = initSslMess() - val address = temporaryServerAddress() - - Tcp() - .bindAndHandleTls( - // just echo characters until we reach '\n', then complete stream - // also - byte is our framing - Flow[ByteString].mapConcat(_.utf8String.toList).takeWhile(_ != '\n').map(c => ByteString(c)), - address.getHostName, - address.getPort, - sslContext, - firstSession) - .futureValue - system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}") - - val connectionFlow = - Tcp().outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession) - - val chars = "hello\n".toList.map(_.toString) - val (connectionF, result) = - Source(chars) - .map(c => ByteString(c)) - .concat(Source.maybe) // do not complete it from our side - .viaMat(connectionFlow)(Keep.right) - .map(_.utf8String) - .toMat(Sink.fold("")(_ + _))(Keep.both) - .run() - - connectionF.futureValue - system.log.info(s"Client connected to ${address.getHostString}:${address.getPort}") - - result.futureValue(PatienceConfiguration.Timeout(10.seconds)) should ===("hello") - } - - @nowarn("msg=deprecated") - def initSslMess() = { - // #setting-up-ssl-context - import java.security.KeyStore - import javax.net.ssl._ - - import org.apache.pekko - import pekko.stream.TLSClientAuth - import pekko.stream.TLSProtocol - - import com.typesafe.sslconfig.pekko.PekkoSSLConfig - - val sslConfig = PekkoSSLConfig(system) - - // Don't hardcode your password in actual code - val password = "abcdef".toCharArray - - // trust store and keys in one keystore - val keyStore = KeyStore.getInstance("PKCS12") - keyStore.load(classOf[TcpSpec].getResourceAsStream("/tcp-spec-keystore.p12"), password) - - val tmf = TrustManagerFactory.getInstance("SunX509") - tmf.init(keyStore) - - val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") - keyManagerFactory.init(keyStore, password) - - // initial ssl context - val sslContext = SSLContext.getInstance("TLS") - sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom) - - // protocols - val defaultParams = sslContext.getDefaultSSLParameters - val defaultProtocols = defaultParams.getProtocols - val protocols = sslConfig.configureProtocols(defaultProtocols, sslConfig.config) - defaultParams.setProtocols(protocols) - - // ciphers - val defaultCiphers = defaultParams.getCipherSuites - val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, sslConfig.config) - defaultParams.setCipherSuites(cipherSuites) - - val negotiateNewSession = TLSProtocol.NegotiateNewSession - .withCipherSuites(cipherSuites.toIndexedSeq: _*) - .withProtocols(protocols.toIndexedSeq: _*) - .withParameters(defaultParams) - .withClientAuth(TLSClientAuth.None) - - // #setting-up-ssl-context - - (sslContext, negotiateNewSession) - } - - } - def validateServerClientCommunication( testData: ByteString, serverConnection: ServerConnection, diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index ffa285b9f7..6b1862b9e1 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -187,7 +187,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stag ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.TimerGraphStageLogic.schedulePeriodically") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.AbruptIOTerminationException") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.AbruptIOTerminationException$") -ProblemFilters.exclude[DirectMissingMethodProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig.validateDefaultTrustManager") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.DelayOverflowStrategy.dropNew") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew$") diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-ssl-config.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-ssl-config.excludes new file mode 100644 index 0000000000..61c3f2ab7e --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-ssl-config.excludes @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove ssl-config dependency and deprecated methods related to it +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.DefaultSSLEngineConfigurator") +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig") +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.PekkoSSLConfig$") +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.SSLEngineConfigurator") +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge") +ProblemFilters.exclude[MissingClassProblem]("com.typesafe.sslconfig.pekko.util.PekkoLoggerFactory") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.TLS.create") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnection") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingTlsConnection") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.javadsl.Tcp.bindTls") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.TLS.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.TLS.apply$*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.outgoingTlsConnection") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.outgoingTlsConnection$*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindTls") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindTls$*") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindAndHandleTls") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.scaladsl.Tcp.bindAndHandleTls$*") diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index f334715bdc..305faeeacb 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -202,9 +202,3 @@ pekko { } } } - -# ssl configuration -# folded in from former ssl-config-pekko module -ssl-config { - logger = "com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge" -} diff --git a/stream/src/main/scala/com/typesafe/sslconfig/pekko/PekkoSSLConfig.scala b/stream/src/main/scala/com/typesafe/sslconfig/pekko/PekkoSSLConfig.scala deleted file mode 100644 index 51f8009e16..0000000000 --- a/stream/src/main/scala/com/typesafe/sslconfig/pekko/PekkoSSLConfig.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package com.typesafe.sslconfig.pekko - -import java.util.Collections -import javax.net.ssl._ - -import org.apache.pekko -import pekko.actor._ -import pekko.annotation.InternalApi -import pekko.event.Logging - -import com.typesafe.sslconfig.pekko.util.PekkoLoggerFactory -import com.typesafe.sslconfig.ssl._ -import com.typesafe.sslconfig.util.LoggerFactory - -@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") -object PekkoSSLConfig extends ExtensionId[PekkoSSLConfig] with ExtensionIdProvider { - - //////////////////// EXTENSION SETUP /////////////////// - - override def get(system: ActorSystem): PekkoSSLConfig = super.get(system) - override def get(system: ClassicActorSystemProvider): PekkoSSLConfig = super.get(system) - def apply()(implicit system: ActorSystem): PekkoSSLConfig = super.apply(system) - - override def lookup = PekkoSSLConfig - - override def createExtension(system: ExtendedActorSystem): PekkoSSLConfig = - new PekkoSSLConfig(system, defaultSSLConfigSettings(system)) - - def defaultSSLConfigSettings(system: ActorSystem): SSLConfigSettings = { - val pekkoOverrides = system.settings.config.getConfig("pekko.ssl-config") - val defaults = system.settings.config.getConfig("ssl-config") - SSLConfigFactory.parse(pekkoOverrides.withFallback(defaults)) - } - -} - -@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") -final class PekkoSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSettings) extends Extension { - - private val mkLogger = new PekkoLoggerFactory(system) - - private val log = Logging(system, classOf[PekkoSSLConfig]) - log.debug("Initializing PekkoSSLConfig extension...") - - /** Can be used to modify the underlying config, most typically used to change a few values in the default config */ - def withSettings(c: SSLConfigSettings): PekkoSSLConfig = - new PekkoSSLConfig(system, c) - - /** - * Returns a new [[PekkoSSLConfig]] instance with the settings changed by the given function. - * Please note that the ActorSystem-wide extension always remains configured via typesafe config, - * custom ones can be created for special-handling specific connections - */ - def mapSettings(f: SSLConfigSettings => SSLConfigSettings): PekkoSSLConfig = - new PekkoSSLConfig(system, f(config)) - - /** - * Returns a new [[PekkoSSLConfig]] instance with the settings changed by the given function. - * Please note that the ActorSystem-wide extension always remains configured via typesafe config, - * custom ones can be created for special-handling specific connections - * - * Java API - */ - // Not same signature as mapSettings to allow latter deprecation of this once we hit Scala 2.12 - def convertSettings(f: java.util.function.Function[SSLConfigSettings, SSLConfigSettings]): PekkoSSLConfig = - new PekkoSSLConfig(system, f.apply(config)) - - val hostnameVerifier = buildHostnameVerifier(config) - - /** - * INTERNAL API - */ - @InternalApi def useJvmHostnameVerification: Boolean = - hostnameVerifier match { - case _: DefaultHostnameVerifier | _: NoopHostnameVerifier => true - case _ => false - } - - val sslEngineConfigurator = { - val sslContext = if (config.default) { - log.info("ssl-config.default is true, using the JDK's default SSLContext") - SSLContext.getDefault - } else { - // break out the static methods as much as we can... - val keyManagerFactory = buildKeyManagerFactory(config) - val trustManagerFactory = buildTrustManagerFactory(config) - new ConfigSSLContextBuilder(mkLogger, config, keyManagerFactory, trustManagerFactory).build() - } - - // protocols! - val defaultParams = sslContext.getDefaultSSLParameters - val defaultProtocols = defaultParams.getProtocols - val protocols = configureProtocols(defaultProtocols, config) - - // ciphers! - val defaultCiphers = defaultParams.getCipherSuites - val cipherSuites = configureCipherSuites(defaultCiphers, config) - - // apply "loose" settings - // !! SNI! - looseDisableSNI(defaultParams) - - new DefaultSSLEngineConfigurator(config, protocols, cipherSuites) - } - - ////////////////// CONFIGURING ////////////////////// - - def buildKeyManagerFactory(ssl: SSLConfigSettings): KeyManagerFactoryWrapper = { - val keyManagerAlgorithm = ssl.keyManagerConfig.algorithm - new DefaultKeyManagerFactoryWrapper(keyManagerAlgorithm) - } - - def buildTrustManagerFactory(ssl: SSLConfigSettings): TrustManagerFactoryWrapper = { - val trustManagerAlgorithm = ssl.trustManagerConfig.algorithm - new DefaultTrustManagerFactoryWrapper(trustManagerAlgorithm) - } - - def buildHostnameVerifier(conf: SSLConfigSettings): HostnameVerifier = { - conf ne null // @unused unavailable - val clazz: Class[HostnameVerifier] = - if (config.loose.disableHostnameVerification) - classOf[DisabledComplainingHostnameVerifier].asInstanceOf[Class[HostnameVerifier]] - else config.hostnameVerifierClass.asInstanceOf[Class[HostnameVerifier]] - - val v = system.dynamicAccess - .createInstanceFor[HostnameVerifier](clazz, Nil) - .orElse(system.dynamicAccess.createInstanceFor[HostnameVerifier](clazz, List(classOf[LoggerFactory] -> mkLogger))) - .getOrElse(throw new Exception("Unable to obtain hostname verifier for class: " + clazz)) - - log.debug("buildHostnameVerifier: created hostname verifier: {}", v) - v - } - - def configureProtocols(existingProtocols: Array[String], sslConfig: SSLConfigSettings): Array[String] = { - val definedProtocols = sslConfig.enabledProtocols match { - case Some(configuredProtocols) => - // If we are given a specific list of protocols, then return it in exactly that order, - // assuming that it's actually possible in the SSL context. - configuredProtocols.filter(existingProtocols.contains).toArray - - case None => - // Otherwise, we return the default protocols in the given list. - Protocols.recommendedProtocols.filter(existingProtocols.contains) - } - - definedProtocols - } - - def configureCipherSuites(existingCiphers: Array[String], sslConfig: SSLConfigSettings): Array[String] = { - val definedCiphers = sslConfig.enabledCipherSuites match { - case Some(configuredCiphers) => - // If we are given a specific list of ciphers, return it in that order. - configuredCiphers.filter(existingCiphers.contains(_)).toArray - - case None => - existingCiphers - } - - definedCiphers - } - - // LOOSE SETTINGS // - - private def looseDisableSNI(defaultParams: SSLParameters): Unit = if (config.loose.disableSNI) { - // this will be logged once for each PekkoSSLConfig - log.warning( - "You are using ssl-config.loose.disableSNI=true! " + - "It is strongly discouraged to disable Server Name Indication, as it is crucial to preventing man-in-the-middle attacks.") - - defaultParams.setServerNames(Collections.emptyList()) - defaultParams.setSNIMatchers(Collections.emptyList()) - } - -} diff --git a/stream/src/main/scala/com/typesafe/sslconfig/pekko/SSLEngineConfigurator.scala b/stream/src/main/scala/com/typesafe/sslconfig/pekko/SSLEngineConfigurator.scala deleted file mode 100644 index e636ae9227..0000000000 --- a/stream/src/main/scala/com/typesafe/sslconfig/pekko/SSLEngineConfigurator.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package com.typesafe.sslconfig.pekko - -import javax.net.ssl.{ SSLContext, SSLEngine } - -import com.typesafe.sslconfig.ssl.SSLConfigSettings - -/** - * Gives the chance to configure the SSLContext before it is going to be used. - * The passed in context will be already set in client mode and provided with hostInfo during initialization. - */ -@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") -trait SSLEngineConfigurator { - def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine -} - -@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") -final class DefaultSSLEngineConfigurator( - config: SSLConfigSettings, - enabledProtocols: Array[String], - enabledCipherSuites: Array[String]) - extends SSLEngineConfigurator { - config ne null // @unused unavailable - def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine = { - engine.setSSLParameters(sslContext.getDefaultSSLParameters) - engine.setEnabledProtocols(enabledProtocols) - engine.setEnabledCipherSuites(enabledCipherSuites) - engine - } -} diff --git a/stream/src/main/scala/com/typesafe/sslconfig/pekko/util/PekkoLoggerBridge.scala b/stream/src/main/scala/com/typesafe/sslconfig/pekko/util/PekkoLoggerBridge.scala deleted file mode 100644 index c9b2a6307f..0000000000 --- a/stream/src/main/scala/com/typesafe/sslconfig/pekko/util/PekkoLoggerBridge.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package com.typesafe.sslconfig.pekko.util - -import org.apache.pekko -import pekko.actor.ActorSystem -import pekko.event.{ DummyClassForStringSources, EventStream } -import pekko.event.Logging._ - -import com.typesafe.sslconfig.util.{ LoggerFactory, NoDepsLogger } - -final class PekkoLoggerFactory(system: ActorSystem) extends LoggerFactory { - override def apply(clazz: Class[_]): NoDepsLogger = new PekkoLoggerBridge(system.eventStream, clazz) - - override def apply(name: String): NoDepsLogger = - new PekkoLoggerBridge(system.eventStream, name, classOf[DummyClassForStringSources]) -} - -class PekkoLoggerBridge(bus: EventStream, logSource: String, logClass: Class[_]) extends NoDepsLogger { - def this(bus: EventStream, clazz: Class[_]) = this(bus, clazz.getCanonicalName, clazz) - - override def isDebugEnabled: Boolean = true - - override def debug(msg: String): Unit = bus.publish(Debug(logSource, logClass, msg)) - - override def info(msg: String): Unit = bus.publish(Info(logSource, logClass, msg)) - - override def warn(msg: String): Unit = bus.publish(Warning(logSource, logClass, msg)) - - override def error(msg: String): Unit = bus.publish(Error(logSource, logClass, msg)) - override def error(msg: String, throwable: Throwable): Unit = bus.publish(Error(logSource, logClass, msg)) - -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TLSActor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TLSActor.scala index 64a19e00a7..8bceeb2f33 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TLSActor.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TLSActor.scala @@ -42,8 +42,8 @@ import pekko.util.ByteString def props( maxInputBufferSize: Int, - createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => Try[Unit], closing: TLSClosing, tracing: Boolean = false): Props = Props(new TLSActor(maxInputBufferSize, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local) @@ -60,8 +60,8 @@ import pekko.util.ByteString */ @InternalApi private[stream] class TLSActor( maxInputBufferSize: Int, - createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => Try[Unit], closing: TLSClosing, tracing: Boolean) extends Actor @@ -170,7 +170,7 @@ import pekko.util.ByteString // The engine could also be instantiated in ActorMaterializerImpl but if creation fails // during materialization it would be worse than failing later on. val engine = - try createSSLEngine(context.system) + try createSSLEngine() catch { case NonFatal(ex) => fail(ex, closeTransport = true); throw ex } engine.beginHandshake() @@ -475,7 +475,7 @@ import pekko.util.ByteString if (tracing) log.debug("handshake finished") val session = engine.getSession - verifySession(context.system, session) match { + verifySession(session) match { case Success(()) => currentSession = session corkUser = false diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsModule.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsModule.scala index 5078ba01e3..4c9b1e1c9f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsModule.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsModule.scala @@ -19,7 +19,6 @@ import scala.util.Try import org.apache.pekko import pekko.NotUsed -import pekko.actor.ActorSystem import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.TLSProtocol._ @@ -37,8 +36,8 @@ import pekko.util.ByteString cipherOut: Outlet[ByteString], shape: BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], attributes: Attributes, - createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => Try[Unit], closing: TLSClosing) extends AtomicModule[BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], NotUsed] { @@ -56,8 +55,8 @@ import pekko.util.ByteString @InternalApi private[stream] object TlsModule { def apply( attributes: Attributes, - createSSLEngine: ActorSystem => SSLEngine, // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) => Try[Unit], // ActorSystem is only needed to support the PekkoSSLConfig legacy, see #21753 + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => Try[Unit], closing: TLSClosing): TlsModule = { val name = attributes.nameOrDefault(s"StreamTls()") val cipherIn = Inlet[ByteString](s"$name.cipherIn") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala index ae2341a3d7..0955fba7ac 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala @@ -13,20 +13,16 @@ package org.apache.pekko.stream.javadsl -import java.util.Optional import java.util.function.{ Consumer, Supplier } -import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } +import javax.net.ssl.{ SSLEngine, SSLSession } import scala.util.Try import org.apache.pekko -import pekko.{ japi, NotUsed } +import pekko.NotUsed import pekko.stream._ import pekko.stream.TLSProtocol._ import pekko.util.ByteString -import pekko.util.OptionConverters._ - -import com.typesafe.sslconfig.pekko.PekkoSSLConfig /** * Stream cipher support based upon JSSE. @@ -65,112 +61,6 @@ import com.typesafe.sslconfig.pekko.PekkoSSLConfig */ object TLS { - /** - * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * This method uses the default closing behavior or [[IgnoreComplete]]. - */ - @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def create( - sslContext: SSLContext, - sslConfig: Optional[PekkoSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, sslConfig.toScala, firstSession, role)) - - /** - * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * This method uses the default closing behavior or [[IgnoreComplete]]. - */ - @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def create( - sslContext: SSLContext, - firstSession: NegotiateNewSession, - role: TLSRole): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, None, firstSession, role)) - - /** - * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[TLSClosing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def create( - sslContext: SSLContext, - sslConfig: Optional[PekkoSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole, - hostInfo: Optional[japi.Pair[String, java.lang.Integer]], - closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new javadsl.BidiFlow( - scaladsl.TLS.apply( - sslContext, - sslConfig.toScala, - firstSession, - role, - closing, - hostInfo.toScala.map(e => (e.first, e.second)))) - - /** - * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]] in client mode. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[TLSClosing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - @deprecated("Use create that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def create( - sslContext: SSLContext, - firstSession: NegotiateNewSession, - role: TLSRole, - hostInfo: Optional[japi.Pair[String, java.lang.Integer]], - closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new javadsl.BidiFlow( - scaladsl.TLS.apply( - sslContext, - None, - firstSession, - role, - closing, - hostInfo.toScala.map(e => (e.first, e.second)))) - /** * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level interface. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index ecd52148ce..d34b1b51a4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -19,11 +19,9 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.{ Function => JFunction } import java.util.function.Supplier -import javax.net.ssl.SSLContext import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession -import scala.annotation.nowarn import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success @@ -40,7 +38,6 @@ import pekko.io.Inet.SocketOption import pekko.stream.Materializer import pekko.stream.SystemMaterializer import pekko.stream.TLSClosing -import pekko.stream.TLSProtocol.NegotiateNewSession import pekko.stream.scaladsl import pekko.util.ByteString import pekko.util.FutureConverters._ @@ -238,42 +235,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { optionalDurationToScala(idleTimeout)) .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) - /** - * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. - * - * Note that the ByteString chunk boundaries are not retained across the network, - * to achieve application level chunks you have to introduce explicit framing in your streams, - * for example using the [[Framing]] operators. - * - * @param remoteAddress The remote address to connect to - * @param localAddress Optional local address for the connection - * @param options TCP options for the connections, see [[pekko.io.Tcp]] for details - * @param halfClose - * Controls whether the connection is kept open even after writing has been completed to the accepted - * TCP connections. - * If set to true, the connection will implement the TCP half-close mechanism, allowing the server to - * write to the connection even after the client has finished writing. The TCP socket is only closed - * after both the client and server finished writing. This setting is recommended for clients and - * therefore it is the default setting. - * If set to false, the connection will immediately closed once the client closes its write side, - * independently whether the server is still attempting to write. - */ - @deprecated("Use bind that takes a java.time.Duration parameter instead.", "Akka 2.6.0") - def outgoingConnection( - remoteAddress: InetSocketAddress, - localAddress: Optional[InetSocketAddress], - options: JIterable[SocketOption], - halfClose: Boolean, - connectTimeout: Duration, - idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = - outgoingConnection( - remoteAddress, - localAddress, - options, - halfClose, - durationToJavaOptional(connectTimeout), - durationToJavaOptional(idleTimeout)) - /** * Creates an [[Tcp.OutgoingConnection]] without specifying options. * It represents a prospective TCP client connection to the given endpoint. @@ -288,60 +249,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { .outgoingConnection(new InetSocketAddress(host, port)) .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) - /** - * Creates an [[Tcp.OutgoingConnection]] with TLS. - * The returned flow represents a TCP client connection to the given endpoint where all bytes in and - * out go through TLS. - * - * @see [[Tcp.outgoingConnection]] - */ - @deprecated( - "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def outgoingTlsConnection( - host: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = - Flow.fromGraph( - delegate - .outgoingTlsConnection(host, port, sslContext, negotiateNewSession) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) - - /** - * Creates an [[Tcp.OutgoingConnection]] with TLS. - * The returned flow represents a TCP client connection to the given endpoint where all bytes in and - * out go through TLS. - * - * @see [[Tcp.outgoingConnection]] - * - * Marked API-may-change to leave room for an improvement around the very long parameter list. - */ - @deprecated( - "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def outgoingTlsConnection( - remoteAddress: InetSocketAddress, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession, - localAddress: Optional[InetSocketAddress], - options: JIterable[SocketOption], - connectTimeout: Duration, - idleTimeout: Duration): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = - Flow.fromGraph( - delegate - .outgoingTlsConnection( - remoteAddress, - sslContext, - negotiateNewSession, - localAddress.toScala, - CollectionUtil.toSeq(options), - connectTimeout, - idleTimeout) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) - /** * Creates an [[Tcp.OutgoingConnection]] with TLS. * The returned flow represents a TCP client connection to the given endpoint where all bytes in and @@ -397,56 +304,6 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) } - /** - * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` - * where all incoming and outgoing bytes are passed through TLS. - * - * @see [[Tcp.bind]] - * Marked API-may-change to leave room for an improvement around the very long parameter list. - * - * Note: the half close parameter is currently ignored - */ - @deprecated( - "Use bindWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def bindTls( - interface: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession, - backlog: Int, - options: JIterable[SocketOption], - @nowarn // unused #26689 - halfClose: Boolean, - idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = - Source.fromGraph( - delegate - .bindTls(interface, port, sslContext, negotiateNewSession, backlog, CollectionUtil.toSeq(options), idleTimeout) - .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) - - /** - * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` - * where all incoming and outgoing bytes are passed through TLS. - * - * @see [[Tcp.bind]] - */ - @deprecated( - "Use bindWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def bindTls( - interface: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession): Source[IncomingConnection, CompletionStage[ServerBinding]] = - Source.fromGraph( - delegate - .bindTls(interface, port, sslContext, negotiateNewSession) - .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) - /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * where all incoming and outgoing bytes are passed through TLS. @@ -501,8 +358,4 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { private def optionalDurationToScala(duration: Optional[java.time.Duration]) = { if (duration.isPresent) duration.get.asScala else Duration.Inf } - - private def durationToJavaOptional(duration: Duration): Optional[java.time.Duration] = { - if (duration.isFinite) Optional.ofNullable(duration.asJava) else Optional.empty() - } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala index d502f7a1f7..88c2585d3d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala @@ -13,22 +13,17 @@ package org.apache.pekko.stream.scaladsl -import java.util.Collections -import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession } -import javax.net.ssl.SSLParameters +import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } -import scala.util.{ Failure, Success, Try } +import scala.util.{ Success, Try } import org.apache.pekko import pekko.NotUsed -import pekko.actor.ActorSystem import pekko.stream._ import pekko.stream.TLSProtocol._ -import pekko.stream.impl.io.{ TlsModule, TlsUtils } +import pekko.stream.impl.io.TlsModule import pekko.util.ByteString -import com.typesafe.sslconfig.pekko.PekkoSSLConfig - /** * Stream cipher support based upon JSSE. * @@ -66,129 +61,6 @@ import com.typesafe.sslconfig.pekko.PekkoSSLConfig */ object TLS { - /** - * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[TLSClosing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def apply( - sslContext: SSLContext, - sslConfig: Option[PekkoSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole, - closing: TLSClosing = IgnoreComplete, - hostInfo: Option[(String, Int)] = None) - : scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = { - def theSslConfig(system: ActorSystem): PekkoSSLConfig = - sslConfig.getOrElse(PekkoSSLConfig(system)) - - val createSSLEngine = { (system: ActorSystem) => - val config = theSslConfig(system) - - val engine = hostInfo match { - case Some((hostname, port)) if !config.config.loose.disableSNI => - sslContext.createSSLEngine(hostname, port) - case _ => sslContext.createSSLEngine() - } - - config.sslEngineConfigurator.configure(engine, sslContext) - engine.setUseClientMode(role == Client) - - val paramsWithSni = - if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) { - val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get) - // In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using - // `createSSLEngine(hostname, port)`. - // In Java 8, SNI is only enabled if the server names are added to the parameters. - // See https://github.com/akka/akka/issues/19287. - newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1))) - - firstSession.copy(sslParameters = Some(newParams)) - } else - firstSession - - val paramsWithHostnameVerification = if (hostInfo.isDefined && config.useJvmHostnameVerification) { - val newParams = paramsWithSni.sslParameters.map(TlsUtils.cloneParameters).getOrElse(new SSLParameters) - newParams.setEndpointIdentificationAlgorithm("HTTPS") - paramsWithSni.copy(sslParameters = Some(newParams)) - } else - paramsWithSni - - TlsUtils.applySessionParameters(engine, paramsWithHostnameVerification) - engine - } - def verifySession: (ActorSystem, SSLSession) => Try[Unit] = - hostInfo match { - case Some((hostname, _)) => { (system, session) => - val config = theSslConfig(system) - if (config.useJvmHostnameVerification || config.hostnameVerifier.verify(hostname, session)) - Success(()) - else - Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname")) - } - case None => (_, _) => Success(()) - } - - scaladsl.BidiFlow.fromGraph(TlsModule(Attributes.none, createSSLEngine, verifySession, closing)) - } - - /** - * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - * - * For a description of the `closing` parameter please refer to [[TLSClosing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. - */ - @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def apply( - sslContext: SSLContext, - firstSession: NegotiateNewSession, - role: TLSRole, - closing: TLSClosing, - hostInfo: Option[(String, Int)]) - : scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - apply(sslContext, None, firstSession, role, closing, hostInfo) - - /** - * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. The - * SSLContext will be used to create an SSLEngine to which then the - * `firstSession` parameters are applied before initiating the first - * handshake. The `role` parameter determines the SSLEngine’s role; this is - * often the same as the underlying transport’s server or client role, but - * that is not a requirement and depends entirely on the application - * protocol. - */ - @deprecated("Use apply that takes a SSLEngine factory instead. Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def apply( - sslContext: SSLContext, - firstSession: NegotiateNewSession, - role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - apply(sslContext, None, firstSession, role, IgnoreComplete, None) - /** * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. * @@ -201,11 +73,11 @@ object TLS { * For a description of the `closing` parameter please refer to [[TLSClosing]]. */ def apply( - createSSLEngine: () => SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753 - verifySession: SSLSession => Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753 + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => Try[Unit], closing: TLSClosing): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = scaladsl.BidiFlow.fromGraph( - TlsModule(Attributes.none, _ => createSSLEngine(), (_, session) => verifySession(session), closing)) + TlsModule(Attributes.none, () => createSSLEngine(), session => verifySession(session), closing)) /** * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]]. @@ -216,7 +88,7 @@ object TLS { * For a description of the `closing` parameter please refer to [[TLSClosing]]. */ def apply( - createSSLEngine: () => SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753 + createSSLEngine: () => SSLEngine, closing: TLSClosing): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = apply(createSSLEngine, _ => Success(()), closing) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Tcp.scala index 89f15efbac..e92a33cf3f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Tcp.scala @@ -15,7 +15,6 @@ package org.apache.pekko.stream.scaladsl import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import javax.net.ssl.SSLContext import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession @@ -38,7 +37,6 @@ import pekko.io.IO import pekko.io.Inet.SocketOption import pekko.stream._ import pekko.stream.Attributes.Attribute -import pekko.stream.TLSProtocol.NegotiateNewSession import pekko.stream.impl.fusing.GraphStages.detacher import pekko.stream.impl.io.ConnectionSourceStage import pekko.stream.impl.io.OutgoingConnectionStage @@ -254,58 +252,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = outgoingConnection(InetSocketAddress.createUnresolved(host, port)) - /** - * Creates an [[Tcp.OutgoingConnection]] with TLS. - * The returned flow represents a TCP client connection to the given endpoint where all bytes in and - * out go through TLS. - * - * For more advanced use cases you can manually combine [[Tcp.outgoingConnection]] and [[TLS]] - * - * @param negotiateNewSession Details about what to require when negotiating the connection with the server - * @param sslContext Context containing details such as the trust and keystore - * - * @see [[Tcp.outgoingConnection]] - */ - @deprecated( - "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def outgoingTlsConnection( - host: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - outgoingTlsConnection(InetSocketAddress.createUnresolved(host, port), sslContext, negotiateNewSession) - - /** - * Creates an [[Tcp.OutgoingConnection]] with TLS. - * The returned flow represents a TCP client connection to the given endpoint where all bytes in and - * out go through TLS. - * - * @see [[Tcp.outgoingConnection]] - * @param negotiateNewSession Details about what to require when negotiating the connection with the server - * @param sslContext Context containing details such as the trust and keystore - */ - @deprecated( - "Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def outgoingTlsConnection( - remoteAddress: InetSocketAddress, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession, - localAddress: Option[InetSocketAddress] = None, - @nowarn // Traversable deprecated in 2.13 - options: immutable.Traversable[SocketOption] = Nil, - connectTimeout: Duration = Duration.Inf, - idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { - - val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) - @nowarn("msg=deprecated") - val tls = TLS(sslContext, negotiateNewSession, TLSRole.client) - connection.join(tlsWrapping.atop(tls).reversed) - } - /** * Creates an [[Tcp.OutgoingConnection]] with TLS. * The returned flow represents a TCP client connection to the given endpoint where all bytes in and @@ -354,35 +300,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { connection.join(tlsWrapping.atop(tls).reversed) } - /** - * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` - * where all incoming and outgoing bytes are passed through TLS. - * - * @param negotiateNewSession Details about what to require when negotiating the connection with the server - * @param sslContext Context containing details such as the trust and keystore - * @see [[Tcp.bind]] - */ - @deprecated( - "Use bindWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def bindTls( - interface: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession, - backlog: Int = defaultBacklog, - @nowarn // Traversable deprecated in 2.13 - options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { - @nowarn("msg=deprecated") - val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed - - bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection => - incomingConnection.copy(flow = incomingConnection.flow.join(tls)) - } - } - /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * where all incoming and outgoing bytes are passed through TLS. @@ -484,38 +401,6 @@ final class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { }) .run() } - - /** - * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` - * handling the incoming connections through TLS and then run using the provided Flow. - * - * @param negotiateNewSession Details about what to require when negotiating the connection with the server - * @param sslContext Context containing details such as the trust and keystore - * @see [[Tcp.bindAndHandle]] - * - * Marked API-may-change to leave room for an improvement around the very long parameter list. - */ - @deprecated( - "Use bindAndHandleWithTls that takes a SSLEngine factory instead. " + - "Setup the SSLEngine with needed parameters.", - "Akka 2.6.0") - def bindAndHandleTls( - handler: Flow[ByteString, ByteString, _], - interface: String, - port: Int, - sslContext: SSLContext, - negotiateNewSession: NegotiateNewSession, - backlog: Int = defaultBacklog, - @nowarn // Traversable deprecated in 2.13 - options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = { - bindTls(interface, port, sslContext, negotiateNewSession, backlog, options, idleTimeout) - .to(Sink.foreach { (conn: IncomingConnection) => - conn.handleWith(handler) - }) - .run() - } - } final class TcpIdleTimeoutException(msg: String, @unused timeout: Duration)