diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index f46f93e1b1..915c96d341 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -109,7 +109,7 @@ class TcpSpec extends StreamSpec(""" val tcpWriteProbe = new TcpWriteProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -127,7 +127,7 @@ class TcpSpec extends StreamSpec(""" val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).via(Tcp(system).outgoingConnection(server.address)).to(Sink.ignore).run() + Source(testInput).via(Tcp().outgoingConnection(server.address)).to(Sink.ignore).run() val serverConnection = server.waitAccept() serverConnection.read(256) @@ -143,7 +143,7 @@ class TcpSpec extends StreamSpec(""" val resultFuture = Source .fromPublisher(idle.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .runFold(ByteString.empty)((acc, in) => acc ++ in) val serverConnection = server.waitAccept() @@ -160,8 +160,8 @@ class TcpSpec extends StreamSpec(""" val tcpWriteProbe = new TcpWriteProbe() val future = Source .fromPublisher(tcpWriteProbe.publisherProbe) - .viaMat(Tcp(system) - .outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))( + .viaMat( + Tcp().outgoingConnection(InetSocketAddress.createUnresolved("example.com", 666), connectTimeout = 1.second))( Keep.right) .toMat(Sink.ignore)(Keep.left) .run() @@ -177,7 +177,7 @@ class TcpSpec extends StreamSpec(""" val tcpReadProbe = new TcpReadProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -211,7 +211,7 @@ class TcpSpec extends StreamSpec(""" val tcpReadProbe = new TcpReadProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -243,7 +243,7 @@ class TcpSpec extends StreamSpec(""" val tcpReadProbe = new TcpReadProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -279,7 +279,7 @@ class TcpSpec extends StreamSpec(""" val tcpReadProbe = new TcpReadProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -316,7 +316,7 @@ class TcpSpec extends StreamSpec(""" val tcpReadProbe = new TcpReadProbe() Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -350,7 +350,7 @@ class TcpSpec extends StreamSpec(""" Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -381,7 +381,7 @@ class TcpSpec extends StreamSpec(""" Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -413,7 +413,7 @@ class TcpSpec extends StreamSpec(""" Source .fromPublisher(tcpWriteProbe.publisherProbe) - .via(Tcp(system).outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .to(Sink.fromSubscriber(tcpReadProbe.subscriberProbe)) .run() val serverConnection = server.waitAccept() @@ -434,7 +434,7 @@ class TcpSpec extends StreamSpec(""" val tcpWriteProbe1 = new TcpWriteProbe() val tcpReadProbe2 = new TcpReadProbe() val tcpWriteProbe2 = new TcpWriteProbe() - val outgoingConnection = Tcp(system).outgoingConnection(server.address) + val outgoingConnection = Tcp().outgoingConnection(server.address) val conn1F = Source @@ -473,7 +473,7 @@ class TcpSpec extends StreamSpec(""" Flow.fromSinkAndSource(Sink.cancelled, Source.single(ByteString("Early response"))) val binding = - Tcp(system) + Tcp() .bind("127.0.0.1", 0, halfClose = true) .toMat(Sink.foreach { conn => conn.flow.join(writeButDontRead).run() @@ -482,7 +482,7 @@ class TcpSpec extends StreamSpec(""" .futureValue val result = Source.empty - .via(Tcp(system).outgoingConnection(binding.localAddress)) + .via(Tcp().outgoingConnection(binding.localAddress)) .toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right) .run() @@ -496,7 +496,7 @@ class TcpSpec extends StreamSpec(""" Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) val binding = - Tcp(system) + Tcp() .bind("127.0.0.1", 0, halfClose = false) .toMat(Sink.foreach { conn => conn.flow.join(writeButIgnoreRead).run() @@ -506,7 +506,7 @@ class TcpSpec extends StreamSpec(""" val (promise, result) = Source .maybe[ByteString] - .via(Tcp(system).outgoingConnection(binding.localAddress)) + .via(Tcp().outgoingConnection(binding.localAddress)) .toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.both) .run() @@ -520,7 +520,7 @@ class TcpSpec extends StreamSpec(""" val serverAddress = temporaryServerAddress() val binding = - Tcp(system) + Tcp() .bind(serverAddress.getHostString, serverAddress.getPort, halfClose = false) .toMat(Sink.foreach { conn => conn.flow.join(Flow[ByteString]).run() @@ -529,7 +529,7 @@ class TcpSpec extends StreamSpec(""" .futureValue val result = Source(immutable.Iterable.fill(1000)(ByteString(0))) - .via(Tcp(system).outgoingConnection(serverAddress, halfClose = true)) + .via(Tcp().outgoingConnection(serverAddress, halfClose = true)) .runFold(0)(_ + _.size) result.futureValue should ===(1000) @@ -620,7 +620,7 @@ class TcpSpec extends StreamSpec(""" "be able to implement echo" in { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = - Tcp(system).bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run() + Tcp().bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run() // make sure that the server has bound to the socket val binding = bindingFuture.futureValue @@ -628,9 +628,7 @@ class TcpSpec extends StreamSpec(""" val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val resultFuture = - Source(testInput) - .via(Tcp(system).outgoingConnection(serverAddress)) - .runFold(ByteString.empty)((acc, in) => acc ++ in) + Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) => acc ++ in) binding.whenUnbound.value should be(None) resultFuture.futureValue should be(expectedOutput) @@ -642,13 +640,13 @@ class TcpSpec extends StreamSpec(""" "work with a chain of echoes" in { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = - Tcp(system).bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run() + Tcp().bind(serverAddress.getHostString, serverAddress.getPort).toMat(echoHandler)(Keep.both).run() // make sure that the server has bound to the socket val binding = bindingFuture.futureValue binding.whenUnbound.value should be(None) - val echoConnection = Tcp(system).outgoingConnection(serverAddress) + val echoConnection = Tcp().outgoingConnection(serverAddress) val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) @@ -670,7 +668,7 @@ class TcpSpec extends StreamSpec(""" "bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept { val address = temporaryServerAddress() val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() - val bind = Tcp(system).bind(address.getHostString, address.getPort) + val bind = Tcp().bind(address.getHostString, address.getPort) // Bind succeeded, we have a local address val binding1 = bind.to(Sink.fromSubscriber(probe1)).run().futureValue @@ -773,7 +771,7 @@ class TcpSpec extends StreamSpec(""" "handle single connection when connection flow is immediately cancelled" in { implicit val ec: ExecutionContext = system.dispatcher - val (bindingFuture, connection) = Tcp(system).bind("localhost", 0).toMat(Sink.head)(Keep.both).run() + val (bindingFuture, connection) = Tcp().bind("localhost", 0).toMat(Sink.head)(Keep.both).run() connection.map { c => c.handleWith(Flow[ByteString]) @@ -782,7 +780,7 @@ class TcpSpec extends StreamSpec(""" val binding = bindingFuture.futureValue val expected = ByteString("test") - val msg = Source.single(expected).via(Tcp(system).outgoingConnection(binding.localAddress)).runWith(Sink.head) + val msg = Source.single(expected).via(Tcp().outgoingConnection(binding.localAddress)).runWith(Sink.head) msg.futureValue shouldBe expected binding.unbind() @@ -818,13 +816,13 @@ class TcpSpec extends StreamSpec(""" .to(Sink.ignore) val serverBound = - Tcp(system).bind(address.getHostString, address.getPort).toMat(accept2ConnectionSink)(Keep.left).run() + Tcp().bind(address.getHostString, address.getPort).toMat(accept2ConnectionSink)(Keep.left).run() // make sure server has started serverBound.futureValue val firstProbe = TestPublisher.probe[ByteString]() - val firstResult = Source.fromPublisher(firstProbe).via(Tcp(system).outgoingConnection(address)).runWith(Sink.seq) + val firstResult = Source.fromPublisher(firstProbe).via(Tcp().outgoingConnection(address)).runWith(Sink.seq) // create the first connection and wait until the flow is running server side firstClientConnected.future.futureValue(Timeout(5.seconds)) @@ -832,7 +830,7 @@ class TcpSpec extends StreamSpec(""" firstProbe.sendNext(ByteString(23)) // then connect the second one, which will be ignored - val rejected = Source(List(ByteString(67))).via(Tcp(system).outgoingConnection(address)).runWith(Sink.seq) + val rejected = Source(List(ByteString(67))).via(Tcp().outgoingConnection(address)).runWith(Sink.seq) secondClientIgnored.future.futureValue // first connection should be fine @@ -849,12 +847,12 @@ class TcpSpec extends StreamSpec(""" try { val address = temporaryServerAddress() - val bindingFuture = Tcp(system).bindAndHandle(Flow[ByteString], address.getHostString, address.getPort) + val bindingFuture = Tcp().bindAndHandle(Flow[ByteString], address.getHostString, address.getPort) // Ensure server is running bindingFuture.futureValue // and is possible to communicate with - Source.single(ByteString(0)).via(Tcp(system).outgoingConnection(address)).runWith(Sink.ignore).futureValue + Source.single(ByteString(0)).via(Tcp().outgoingConnection(address)).runWith(Sink.ignore).futureValue sys2.terminate().futureValue @@ -865,7 +863,7 @@ class TcpSpec extends StreamSpec(""" "show host and port in bind exception message" in EventFilter[BindException](occurrences = 1).intercept { val (host, port) = temporaryServerHostnameAndPort() - val bind = Tcp(system).bind(host, port) + val bind = Tcp().bind(host, port) val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val binding1 = bind.to(Sink.fromSubscriber(probe1)).run().futureValue @@ -890,7 +888,7 @@ class TcpSpec extends StreamSpec(""" // cert is valid until 2025, so if this tests starts failing after that you need to create a new one val address = temporaryServerAddress() - Tcp(system) + Tcp() .bindAndHandleWithTls( // just echo characters until we reach '\n', then complete stream // also - byte is our framing @@ -902,7 +900,7 @@ class TcpSpec extends StreamSpec(""" system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}") val connectionFlow = - Tcp(system).outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client)) + Tcp().outgoingConnectionWithTls(address, () => createSSLEngine(TLSRole.client)) val chars = "hello\n".toList.map(_.toString) val (connectionF, result) = @@ -976,7 +974,7 @@ class TcpSpec extends StreamSpec(""" val (sslContext, firstSession) = initSslMess() val address = temporaryServerAddress() - Tcp(system) + Tcp() .bindAndHandleTls( // just echo characters until we reach '\n', then complete stream // also - byte is our framing @@ -989,7 +987,7 @@ class TcpSpec extends StreamSpec(""" system.log.info(s"Server bound to ${address.getHostString}:${address.getPort}") val connectionFlow = - Tcp(system).outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession) + Tcp().outgoingTlsConnection(address.getHostName, address.getPort, sslContext, firstSession) val chars = "hello\n".toList.map(_.toString) val (connectionF, result) = diff --git a/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/31266-simplify-tcp-apply.excludes b/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/31266-simplify-tcp-apply.excludes new file mode 100644 index 0000000000..aafffee412 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/31266-simplify-tcp-apply.excludes @@ -0,0 +1,7 @@ +# removed internal classes from type hierarchy +ProblemFilters.exclude[MissingTypesProblem]("com.typesafe.sslconfig.akka.AkkaSSLConfig$") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.scaladsl.Tcp$") + +# internal classes +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.AkkaSSLConfigExtensionIdApply") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.TcpImplicitExtensionIdApply") \ No newline at end of file diff --git a/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala deleted file mode 100644 index 849773e826..0000000000 --- a/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package akka.stream.impl - -import akka.actor.ActorSystem -import akka.actor.ExtensionId -import akka.annotation.DoNotInherit -import akka.stream.scaladsl.Tcp -import com.typesafe.sslconfig.akka.AkkaSSLConfig - -import scala.annotation.nowarn - -/* - * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define - * on Scala 2 and Scala 3 - */ - -/** - * Not for user extension - */ -@DoNotInherit -trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { - def apply()(implicit system: ActorSystem): Tcp = super.apply(system) -} - -/** - * Not for user extension - */ -@DoNotInherit -@nowarn("msg=deprecated") -trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { - def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) -} diff --git a/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala deleted file mode 100644 index 849773e826..0000000000 --- a/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package akka.stream.impl - -import akka.actor.ActorSystem -import akka.actor.ExtensionId -import akka.annotation.DoNotInherit -import akka.stream.scaladsl.Tcp -import com.typesafe.sslconfig.akka.AkkaSSLConfig - -import scala.annotation.nowarn - -/* - * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define - * on Scala 2 and Scala 3 - */ - -/** - * Not for user extension - */ -@DoNotInherit -trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { - def apply()(implicit system: ActorSystem): Tcp = super.apply(system) -} - -/** - * Not for user extension - */ -@DoNotInherit -@nowarn("msg=deprecated") -trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { - def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) -} diff --git a/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala deleted file mode 100644 index c119440448..0000000000 --- a/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package akka.stream.impl - -import akka.actor.ActorSystem -import akka.actor.ExtensionId -import akka.annotation.DoNotInherit -import akka.stream.scaladsl.Tcp -import com.typesafe.sslconfig.akka.AkkaSSLConfig - -import scala.annotation.nowarn - -/* - * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define - * on Scala 2 and Scala 3 - */ - -/** - * Not for user extension - */ -@DoNotInherit -trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { - override def apply(implicit system: ActorSystem): Tcp = super.apply(system) -} - -/** - * Not for user extension - */ -@DoNotInherit -@nowarn("msg=deprecated") -trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { - override def apply(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 6c4d54b1da..17acac9016 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -27,7 +27,6 @@ import akka.io.Inet.SocketOption import akka.stream._ import akka.stream.Attributes.Attribute import akka.stream.TLSProtocol.NegotiateNewSession -import akka.stream.impl.TcpImplicitExtensionIdApply import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.impl.io.ConnectionSourceStage import akka.stream.impl.io.OutgoingConnectionStage @@ -36,7 +35,7 @@ import akka.util.ByteString import akka.util.JavaDurationConverters._ import akka.util.unused -object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with ExtensionIdProvider { +object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * Represents a successful TCP server binding. @@ -77,6 +76,8 @@ object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with Extens */ final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) + def apply()(implicit system: ActorSystem): Tcp = super.apply(system) + override def get(system: ActorSystem): Tcp = super.get(system) override def get(system: ClassicActorSystemProvider): Tcp = super.get(system) diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala index 74cba3e7fd..aa0cc02ab5 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala @@ -12,16 +12,16 @@ import com.typesafe.sslconfig.util.LoggerFactory import akka.actor._ import akka.annotation.InternalApi import akka.event.Logging -import akka.stream.impl.AkkaSSLConfigExtensionIdApply import scala.annotation.nowarn @deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") -object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with AkkaSSLConfigExtensionIdApply with ExtensionIdProvider { +object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider { //////////////////// EXTENSION SETUP /////////////////// override def get(system: ActorSystem): AkkaSSLConfig = super.get(system) override def get(system: ClassicActorSystemProvider): AkkaSSLConfig = super.get(system) + def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) override def lookup = AkkaSSLConfig