=str fix Scala 3 signature for Tcp() / AkkaSSLConfig() (#31384)

* =str fix Scala 3 signature for Tcp() / AkkaSSLConfig()

This was broken for some reason in #30324 / 3a3e643e07
and not covered by tests.

Fixes #31266

* add mima
This commit is contained in:
Johannes Rudolph 2022-08-12 15:02:06 +02:00 committed by GitHub
parent 6e049efce3
commit 06eb8e29d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 148 deletions

View file

@ -109,7 +109,7 @@ class TcpSpec extends StreamSpec("""
val tcpWriteProbe = new TcpWriteProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -127,7 +127,7 @@ class TcpSpec extends StreamSpec("""
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).via(Tcp(system).outgoingConnection(server.address)).to(Sink.ignore).run()
Source(testInput).via(Tcp().outgoingConnection(server.address)).to(Sink.ignore).run()
val serverConnection = server.waitAccept()
serverConnection.read(256)
@ -143,7 +143,7 @@ class TcpSpec extends StreamSpec("""
val resultFuture =
Source
.fromPublisher(idle.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.runFold(ByteString.empty)((acc, in) => acc ++ in)
val serverConnection = server.waitAccept()
@ -160,8 +160,8 @@ class TcpSpec extends StreamSpec("""
val tcpWriteProbe = new TcpWriteProbe()
val future = Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.viaMat(Tcp(system)
.outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))(
.viaMat(
Tcp().outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))(
Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run()
@ -177,7 +177,7 @@ class TcpSpec extends StreamSpec("""
val tcpReadProbe = new TcpReadProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -211,7 +211,7 @@ class TcpSpec extends StreamSpec("""
val tcpReadProbe = new TcpReadProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -243,7 +243,7 @@ class TcpSpec extends StreamSpec("""
val tcpReadProbe = new TcpReadProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -279,7 +279,7 @@ class TcpSpec extends StreamSpec("""
val tcpReadProbe = new TcpReadProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -316,7 +316,7 @@ class TcpSpec extends StreamSpec("""
val tcpReadProbe = new TcpReadProbe()
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -350,7 +350,7 @@ class TcpSpec extends StreamSpec("""
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -381,7 +381,7 @@ class TcpSpec extends StreamSpec("""
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -413,7 +413,7 @@ class TcpSpec extends StreamSpec("""
Source
.fromPublisher(tcpWriteProbe.publisherProbe)
.via(Tcp(system).outgoingConnection(server.address))
.via(Tcp().outgoingConnection(server.address))
.to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe))
.run()
val serverConnection = server.waitAccept()
@ -434,7 +434,7 @@ class TcpSpec extends StreamSpec("""
val tcpWriteProbe1 = new TcpWriteProbe()
val tcpReadProbe2 = new TcpReadProbe()
val tcpWriteProbe2 = new TcpWriteProbe()
val outgoingConnection = Tcp(system).outgoingConnection(server.address)
val outgoingConnection = Tcp().outgoingConnection(server.address)
val conn1F =
Source
@ -473,7 +473,7 @@ class TcpSpec extends StreamSpec("""
Flow.fromSinkAndSource(Sink.cancelled, Source.single(ByteString("Early response")))
val binding =
Tcp(system)
Tcp()
.bind("127.0.0.1", 0, halfClose = true)
.toMat(Sink.foreach { conn =>
conn.flow.join(writeButDontRead).run()
@ -482,7 +482,7 @@ class TcpSpec extends StreamSpec("""
.futureValue
val result = Source.empty
.via(Tcp(system).outgoingConnection(binding.localAddress))
.via(Tcp().outgoingConnection(binding.localAddress))
.toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right)
.run()
@ -496,7 +496,7 @@ class TcpSpec extends StreamSpec("""
Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right)
val binding =
Tcp(system)
Tcp()
.bind("127.0.0.1", 0, halfClose = false)
.toMat(Sink.foreach { conn =>
conn.flow.join(writeButIgnoreRead).run()
@ -506,7 +506,7 @@ class TcpSpec extends StreamSpec("""
val (promise, result) = Source
.maybe[ByteString]
.via(Tcp(system).outgoingConnection(binding.localAddress))
.via(Tcp().outgoingConnection(binding.localAddress))
.toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.both)
.run()
@ -520,7 +520,7 @@ class TcpSpec extends StreamSpec("""
val serverAddress = temporaryServerAddress()
val binding =
Tcp(system)
Tcp()
.bind(serverAddress.getHostString, serverAddress.getPort, halfClose = false)
.toMat(Sink.foreach { conn =>
conn.flow.join(Flow[ByteString]).run()
@ -529,7 +529,7 @@ class TcpSpec extends StreamSpec("""
.futureValue
val result = Source(immutable.Iterable.fill(1000)(ByteString(0)))
.via(Tcp(system).outgoingConnection(serverAddress, halfClose = true))
.via(Tcp().outgoingConnection(serverAddress, halfClose = true))
.runFold(0)(_ + _.size)
result.futureValue should ===(1000)
@ -620,7 +620,7 @@ class TcpSpec extends StreamSpec("""
"be able to implement echo" in {
val serverAddress = temporaryServerAddress()
val (bindingFuture, echoServerFinish) =
Tcp(system).bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run()
Tcp().bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run()
// make sure that the server has bound to the socket
val binding = bindingFuture.futureValue
@ -628,9 +628,7 @@ class TcpSpec extends StreamSpec("""
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
val resultFuture =
Source(testInput)
.via(Tcp(system).outgoingConnection(serverAddress))
.runFold(ByteString.empty)((acc, in) => acc ++ in)
Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) => acc ++ in)
binding.whenUnbound.value should be(None)
resultFuture.futureValue should be(expectedOutput)
@ -642,13 +640,13 @@ class TcpSpec extends StreamSpec("""
"work with a chain of echoes" in {
val serverAddress = temporaryServerAddress()
val (bindingFuture, echoServerFinish) =
Tcp(system).bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run()
Tcp().bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run()
// make sure that the server has bound to the socket
val binding = bindingFuture.futureValue
binding.whenUnbound.value should be(None)
val echoConnection = Tcp(system).outgoingConnection(serverAddress)
val echoConnection = Tcp().outgoingConnection(serverAddress)
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
@ -670,7 +668,7 @@ class TcpSpec extends StreamSpec("""
"bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept {
val address = temporaryServerAddress()
val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]()
val bind = Tcp(system).bind(address.getHostString, address.getPort)
val bind = Tcp().bind(address.getHostString, address.getPort)
// Bind succeeded, we have a local address
val binding1 = bind.to(Sink.fromSubscriber(probe1)).run().futureValue
@ -773,7 +771,7 @@ class TcpSpec extends StreamSpec("""
"handle single connection when connection flow is immediately cancelled" in {
implicit val ec: ExecutionContext = system.dispatcher
val (bindingFuture, connection) = Tcp(system).bind("localhost", 0).toMat(Sink.head)(Keep.both).run()
val (bindingFuture, connection) = Tcp().bind("localhost", 0).toMat(Sink.head)(Keep.both).run()
connection.map { c =>
c.handleWith(Flow[ByteString])
@ -782,7 +780,7 @@ class TcpSpec extends StreamSpec("""
val binding = bindingFuture.futureValue
val expected = ByteString("test")
val msg = Source.single(expected).via(Tcp(system).outgoingConnection(binding.localAddress)).runWith(Sink.head)
val msg = Source.single(expected).via(Tcp().outgoingConnection(binding.localAddress)).runWith(Sink.head)
msg.futureValue shouldBe expected
binding.unbind()
@ -818,13 +816,13 @@ class TcpSpec extends StreamSpec("""
.to(Sink.ignore)
val serverBound =
Tcp(system).bind(address.getHostString, address.getPort).toMat(accept2ConnectionSink)(Keep.left).run()
Tcp().bind(address.getHostString, address.getPort).toMat(accept2ConnectionSink)(Keep.left).run()
// make sure server has started
serverBound.futureValue
val firstProbe = TestPublisher.probe[ByteString]()
val firstResult = Source.fromPublisher(firstProbe).via(Tcp(system).outgoingConnection(address)).runWith(Sink.seq)
val firstResult = Source.fromPublisher(firstProbe).via(Tcp().outgoingConnection(address)).runWith(Sink.seq)
// create the first connection and wait until the flow is running server side
firstClientConnected.future.futureValue(Timeout(5.seconds))
@ -832,7 +830,7 @@ class TcpSpec extends StreamSpec("""
firstProbe.sendNext(ByteString(23))
// then connect the second one, which will be ignored
val rejected = Source(List(ByteString(67))).via(Tcp(system).outgoingConnection(address)).runWith(Sink.seq)
val rejected = Source(List(ByteString(67))).via(Tcp().outgoingConnection(address)).runWith(Sink.seq)
secondClientIgnored.future.futureValue
// first connection should be fine
@ -849,12 +847,12 @@ class TcpSpec extends StreamSpec("""
try {
val address = temporaryServerAddress()
val bindingFuture = Tcp(system).bindAndHandle(Flow[ByteString], address.getHostString, address.getPort)
val bindingFuture = Tcp().bindAndHandle(Flow[ByteString], address.getHostString, address.getPort)
// Ensure server is running
bindingFuture.futureValue
// and is possible to communicate with
Source.single(ByteString(0)).via(Tcp(system).outgoingConnection(address)).runWith(Sink.ignore).futureValue
Source.single(ByteString(0)).via(Tcp().outgoingConnection(address)).runWith(Sink.ignore).futureValue
sys2.terminate().futureValue
@ -865,7 +863,7 @@ class TcpSpec extends StreamSpec("""
"show host and port in bind exception message" in EventFilter[BindException](occurrences = 1).intercept {
val (host, port) = temporaryServerHostnameAndPort()
val bind = Tcp(system).bind(host, port)
val bind = Tcp().bind(host, port)
val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]()
val binding1 = bind.to(Sink.fromSubscriber(probe1)).run().futureValue
@ -890,7 +888,7 @@ class TcpSpec extends StreamSpec("""
// cert is valid until 2025, so if this tests starts failing after that you need to create a new one
val address = temporaryServerAddress()
Tcp(system)
Tcp()
.bindAndHandleWithTls(
// just echo characters until we reach '\n', then complete stream
// also - byte is our framing
@ -902,7 +900,7 @@ class TcpSpec extends StreamSpec("""
system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}")
val connectionFlow =
Tcp(system).outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client))
Tcp().outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client))
val chars = "hello\n".toList.map(_.toString)
val (connectionF, result) =
@ -976,7 +974,7 @@ class TcpSpec extends StreamSpec("""
val (sslContext, firstSession) = initSslMess()
val address = temporaryServerAddress()
Tcp(system)
Tcp()
.bindAndHandleTls(
// just echo characters until we reach '\n', then complete stream
// also - byte is our framing
@ -989,7 +987,7 @@ class TcpSpec extends StreamSpec("""
system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}")
val connectionFlow =
Tcp(system).outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession)
Tcp().outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession)
val chars = "hello\n".toList.map(_.toString)
val (connectionF, result) =

View file

@ -0,0 +1,7 @@
# removed internal classes from type hierarchy
ProblemFilters.exclude[MissingTypesProblem]("com.typesafe.sslconfig.akka.AkkaSSLConfig$")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.scaladsl.Tcp$")
# internal classes
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.AkkaSSLConfigExtensionIdApply")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.TcpImplicitExtensionIdApply")

View file

@ -1,35 +0,0 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -1,35 +0,0 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -1,35 +0,0 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
override def apply(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
override def apply(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -27,7 +27,6 @@ import akka.io.Inet.SocketOption
import akka.stream._
import akka.stream.Attributes.Attribute
import akka.stream.TLSProtocol.NegotiateNewSession
import akka.stream.impl.TcpImplicitExtensionIdApply
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.impl.io.ConnectionSourceStage
import akka.stream.impl.io.OutgoingConnectionStage
@ -36,7 +35,7 @@ import akka.util.ByteString
import akka.util.JavaDurationConverters._
import akka.util.unused
object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with ExtensionIdProvider {
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
/**
* Represents a successful TCP server binding.
@ -77,6 +76,8 @@ object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with Extens
*/
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
override def get(system: ActorSystem): Tcp = super.get(system)
override def get(system: ClassicActorSystemProvider): Tcp = super.get(system)

View file

@ -12,16 +12,16 @@ import com.typesafe.sslconfig.util.LoggerFactory
import akka.actor._
import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream.impl.AkkaSSLConfigExtensionIdApply
import scala.annotation.nowarn
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with AkkaSSLConfigExtensionIdApply with ExtensionIdProvider {
object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider {
//////////////////// EXTENSION SETUP ///////////////////
override def get(system: ActorSystem): AkkaSSLConfig = super.get(system)
override def get(system: ClassicActorSystemProvider): AkkaSSLConfig = super.get(system)
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
override def lookup = AkkaSSLConfig