diff --git a/.travis.yml b/.travis.yml index be5cc6a8b6..29ab92d056 100644 --- a/.travis.yml +++ b/.travis.yml @@ -38,7 +38,7 @@ jobs: - stage: scala3 name: scala3 # separate job since only a few modules compile with Scala 3 yet - script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-typed/compile + script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-typed/compile akka-stream/compile stages: - name: whitesource diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 1a6e4209c4..cdde9f87e0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -29,7 +29,8 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers { "lazyFutureFlow", // lazyCompletionStageFlow "futureFlow", // completionStageFlow "futureSink", // completionStageSink - "lazyFutureSink" // lazyCompletionStageSink + "lazyFutureSink", // lazyCompletionStageSink + "createGraph" // renamed/overload of create for getting type inference working in Scala 3 ) val javaIgnore = diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index eb1313515a..74b40a3a89 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -22,7 +22,7 @@ private[stream] abstract class GraphCreate { */ def create[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], block: function.Function2[GraphDSL.Builder[M], S1, S]): Graph[S, M] = - scaladsl.GraphDSL.create(g1) { b => s => block.apply(b.asJava, s) } + scaladsl.GraphDSL.createGraph(g1) { b => s => block.apply(b.asJava, s) } /** * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s @@ -30,7 +30,7 @@ private[stream] abstract class GraphCreate { */ def create[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: function.Function2[M1, M2, M], block: function.Function3[GraphDSL.Builder[M], S1, S2, S]): Graph[S, M] = - scaladsl.GraphDSL.create(g1, g2)(combineMat.apply) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } + scaladsl.GraphDSL.createGraph(g1, g2)(combineMat.apply) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } [3..21#/** * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s @@ -38,7 +38,7 @@ private[stream] abstract class GraphCreate { */ def create1[[#S1 <: Shape#], S <: Shape, [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: function.Function1[[#M1#], M], block: function.Function2[GraphDSL.Builder[M], [#S1#], S]): Graph[S, M] = - scaladsl.GraphDSL.create([#g1#])(combineMat.apply) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }# + scaladsl.GraphDSL.createGraph([#g1#])(combineMat.apply) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }# ] } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index c18e7418ec..dc61ba1885 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -21,6 +21,8 @@ trait GraphApply { /** * Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]] * along with the [[GraphDSL.Builder]] to the given create function. + * + * Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatiblity. Use createGraph instead. */ def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape) => S): Graph[S, Mat] = { val builder = new GraphDSL.Builder @@ -30,12 +32,26 @@ trait GraphApply { createGraph(s, builder) } + /** + * Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]] + * along with the [[GraphDSL.Builder]] to the given create function. + */ + def createGraph[S <: Shape, S1 <: Shape, Mat](g1: Graph[S1, Mat])(buildBlock: GraphDSL.Builder[Mat] => S1 => S): Graph[S, Mat] = { + val builder = new GraphDSL.Builder + val s1 = builder.add(g1, Keep.right) + val s = buildBlock(builder)(s1) + + createGraph(s, builder) + } + [2..# /** * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s * along with the [[GraphDSL.Builder]] to the given create function. + * + * Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatiblity. Use createGraph instead. */ def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = { val builder = new GraphDSL.Builder @@ -46,8 +62,22 @@ trait GraphApply { val s = buildBlock(builder)([#s1#]) createGraph(s, builder) - }# + } + /** + * Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s + * along with the [[GraphDSL.Builder]] to the given create function. + */ + def createGraph[S <: Shape, Mat, [#M1#], [#S1 <: Shape#]]([#g1: Graph[S1, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#S1#]) => S): Graph[S, Mat] = { + val builder = new GraphDSL.Builder + val curried = combineMat.curried + val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1)) + [2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))# + ] + val s = buildBlock(builder)([#s1#]) + + createGraph(s, builder) + }# ] private def createGraph[S <: Shape, Mat](shape: S, graphBuilder: GraphDSL.Builder[Mat]): Graph[S, Mat] = diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template index 9e2fcb2629..ea5cb4e6db 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipLatestWithApply.scala.template @@ -36,7 +36,7 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F private var willShutDown = false private val contextPropagation = ContextPropagation() - [#val inlet0 = new ZipLatestInlet(in0)# + [#private val inlet0 = new ZipLatestInlet(in0)# ] private var waitingForTuple = false private var staleTupleValues = true diff --git a/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/compile-on-scala-3.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/compile-on-scala-3.backwards.excludes new file mode 100644 index 0000000000..230b172737 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/compile-on-scala-3.backwards.excludes @@ -0,0 +1,8 @@ +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub#MergedSourceLogic.enqueue") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.BoundedBuffer#FixedQueue.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.BoundedBuffer#DynamicQueue.this") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.PhasedFusingActorMaterializer.makeLogger") + +# Effectively internal +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.MaterializerLoggingProvider.makeLogger") diff --git a/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala new file mode 100644 index 0000000000..b2669ad6af --- /dev/null +++ b/akka-stream/src/main/scala-2.12/akka/stream/impl/ImplicitExtensionIdApply.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.ActorSystem +import akka.actor.ExtensionId +import akka.annotation.DoNotInherit +import akka.stream.scaladsl.Tcp +import com.typesafe.sslconfig.akka.AkkaSSLConfig + +import scala.annotation.nowarn + +/* + * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define + * on Scala 2 and Scala 3 + */ + +/** + * Not for user extension + */ +@DoNotInherit +trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { + def apply()(implicit system: ActorSystem): Tcp = super.apply(system) +} + +/** + * Not for user extension + */ +@DoNotInherit +@nowarn("msg=deprecated") +trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { + def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) +} diff --git a/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala new file mode 100644 index 0000000000..b2669ad6af --- /dev/null +++ b/akka-stream/src/main/scala-2.13/akka/stream/impl/ImplicitExtensionIdApply.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.ActorSystem +import akka.actor.ExtensionId +import akka.annotation.DoNotInherit +import akka.stream.scaladsl.Tcp +import com.typesafe.sslconfig.akka.AkkaSSLConfig + +import scala.annotation.nowarn + +/* + * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define + * on Scala 2 and Scala 3 + */ + +/** + * Not for user extension + */ +@DoNotInherit +trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { + def apply()(implicit system: ActorSystem): Tcp = super.apply(system) +} + +/** + * Not for user extension + */ +@DoNotInherit +@nowarn("msg=deprecated") +trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { + def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) +} diff --git a/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala b/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala new file mode 100644 index 0000000000..115073e478 --- /dev/null +++ b/akka-stream/src/main/scala-3/akka/stream/impl/ImplicitExtensionIdApply.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.ActorSystem +import akka.actor.ExtensionId +import akka.annotation.DoNotInherit +import akka.stream.scaladsl.Tcp +import com.typesafe.sslconfig.akka.AkkaSSLConfig + +import scala.annotation.nowarn + +/* + * Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define + * on Scala 2 and Scala 3 + */ + +/** + * Not for user extension + */ +@DoNotInherit +trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] { + override def apply(implicit system: ActorSystem): Tcp = super.apply(system) +} + +/** + * Not for user extension + */ +@DoNotInherit +@nowarn("msg=deprecated") +trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] { + override def apply(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) +} diff --git a/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala b/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala index 3655ad4c5f..234241c11e 100644 --- a/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala +++ b/akka-stream/src/main/scala/akka/stream/MaterializerLoggingProvider.scala @@ -4,14 +4,15 @@ package akka.stream +import akka.annotation.DoNotInherit import akka.event.LoggingAdapter /** - * SPI intended only to be extended by custom [[Materializer]] implementations, - * that also want to provide operators they materialize with specialized [[akka.event.LoggingAdapter]] instances. + * Not for user extension */ +@DoNotInherit trait MaterializerLoggingProvider { this: Materializer => - def makeLogger(logSource: Class[_]): LoggingAdapter + def makeLogger(logSource: Class[Any]): LoggingAdapter } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 37125a795f..0437a18508 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -233,7 +233,7 @@ import akka.util.unused protected def downstreamRunning: Actor.Receive = { case SubscribePending => - subscribePending(exposedPublisher.takePendingSubscribers()) + subscribePending(exposedPublisher.takePendingSubscribers().asInstanceOf[Seq[Subscriber[Any]]]) case RequestMore(_, elements) => if (elements < 1) { error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala index 70dec14daa..ba6d6895f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala @@ -143,27 +143,42 @@ private[akka] object Buffer { */ @InternalApi private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] { + import BoundedBuffer._ + def used: Int = q.used + def isFull: Boolean = q.isFull + def isEmpty: Boolean = q.isEmpty + def nonEmpty: Boolean = q.nonEmpty def enqueue(elem: T): Unit = q.enqueue(elem) + def dequeue(): T = q.dequeue() def peek(): T = q.peek() + def clear(): Unit = q.clear() + def dropHead(): Unit = q.dropHead() + def dropTail(): Unit = q.dropTail() - private final class FixedQueue extends Buffer[T] { + private var q: Buffer[T] = new FixedQueue[T](capacity, newBuffer => q = newBuffer) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object BoundedBuffer { + private final class FixedQueue[T](override val capacity: Int, switchBuffer: Buffer[T] => Unit) extends Buffer[T] { import Buffer._ private val queue = new Array[AnyRef](FixedQueueSize) private var head = 0 private var tail = 0 - override def capacity = BoundedBuffer.this.capacity override def used = tail - head override def isFull = used == capacity override def isEmpty = tail == head @@ -171,11 +186,11 @@ private[akka] object Buffer { override def enqueue(elem: T): Unit = if (tail - head == FixedQueueSize) { - val queue = new DynamicQueue() + val queue = new DynamicQueue[T](capacity) while (nonEmpty) { queue.enqueue(dequeue()) } - q = queue + switchBuffer(queue) queue.enqueue(elem) } else { queue(tail & FixedQueueMask) = elem.asInstanceOf[AnyRef] @@ -203,8 +218,7 @@ private[akka] object Buffer { } } - private final class DynamicQueue() extends ju.LinkedList[T] with Buffer[T] { - override def capacity = BoundedBuffer.this.capacity + private final class DynamicQueue[T](override val capacity: Int) extends ju.LinkedList[T] with Buffer[T] { override def used = size override def isFull = size == capacity override def nonEmpty = !isEmpty() @@ -215,6 +229,4 @@ private[akka] object Buffer { override def dropHead(): Unit = remove() override def dropTail(): Unit = removeLast() } - - private var q: Buffer[T] = new FixedQueue } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 7d7a0eb495..b43fe90d64 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -610,7 +610,7 @@ private final case class SavedIslandData( } } - override def makeLogger(logSource: Class[_]): LoggingAdapter = + override def makeLogger(logSource: Class[Any]): LoggingAdapter = Logging(system, logSource) /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala index 3b672e5b6c..3830da0204 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala @@ -7,7 +7,7 @@ package akka.stream.impl import scala.annotation.tailrec import scala.util.control.NoStackTrace -import ResizableMultiReaderRingBuffer._ +import ResizableMultiReaderRingBuffer.{ Cursor, Cursors, NothingToReadException } import akka.annotation.InternalApi diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index eb19b20578..6150d4a01d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -299,8 +299,6 @@ import akka.util.ccompat._ require(maxConcurrentPulls > 0, "Max concurrent pulls must be greater than 0") - type Requested[E] = Promise[Option[E]] - val in = Inlet[T]("queueSink.in") override def initialAttributes = DefaultAttributes.queueSink override val shape: SinkShape[T] = SinkShape.of(in) @@ -309,14 +307,13 @@ import akka.util.ccompat._ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] { - type Received[E] = Try[Option[E]] val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") // Allocates one additional element to hold stream closed/failure indicators - val buffer: Buffer[Received[T]] = Buffer(maxBuffer + 1, inheritedAttributes) - val currentRequests: Buffer[Requested[T]] = Buffer(maxConcurrentPulls, inheritedAttributes) + val buffer: Buffer[Try[Option[T]]] = Buffer(maxBuffer + 1, inheritedAttributes) + val currentRequests: Buffer[Promise[Option[T]]] = Buffer(maxConcurrentPulls, inheritedAttributes) override def preStart(): Unit = { setKeepGoing(true) @@ -324,7 +321,7 @@ import akka.util.ccompat._ } private val callback = getAsyncCallback[Output[T]] { - case QueueSink.Pull(pullPromise) => + case QueueSink.Pull(pullPromise: Promise[Option[T]] @unchecked) => if (currentRequests.isFull) pullPromise.failure( new IllegalStateException(s"Too many concurrent pulls. Specified maximum is $maxConcurrentPulls. " + @@ -337,7 +334,7 @@ import akka.util.ccompat._ case QueueSink.Cancel => completeStage() } - def sendDownstream(promise: Requested[T]): Unit = { + def sendDownstream(promise: Promise[Option[T]]): Unit = { val e = buffer.dequeue() promise.complete(e) e match { @@ -445,17 +442,19 @@ import akka.util.ccompat._ @InternalApi private[akka] final class MutableCollectorState[T, R]( collector: java.util.stream.Collector[T, Any, R], accumulator: java.util.function.BiConsumer[Any, T], - val accumulated: Any) + _accumulated: Any) extends CollectorState[T, R] { + override def accumulated(): Any = _accumulated + override def update(elem: T): CollectorState[T, R] = { - accumulator.accept(accumulated, elem) + accumulator.accept(_accumulated, elem) this } override def finish(): R = { // only called if completed without elements - collector.finisher().apply(accumulated) + collector.finisher().apply(_accumulated) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 1a5c4e0160..5c8e67a74b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -40,7 +40,7 @@ import akka.util.NanoTimeTokenBucket private val nanosBetweenTokens = per.toNanos / cost // 100 ms is a realistic minimum between tokens, otherwise the maximumBurst is adjusted // to be able to support higher rates - val effectiveMaximumBurst = + val effectiveMaximumBurst: Long = if (maximumBurst == Throttle.AutomaticMaximumBurst) math.max(1, ((100 * 1000 * 1000) / nanosBetweenTokens)) else maximumBurst require(!(mode == ThrottleMode.Enforcing && effectiveMaximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 0263a3014c..9a5f96710d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -109,7 +109,7 @@ import akka.stream.stage._ * when this accidentally leaks onto threads that are not stopped when this * class should be unloaded. */ - override def initialValue = new Array(1) + override def initialValue: Array[AnyRef] = new Array(1) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d93b060332..fc63e2aad5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -2085,6 +2085,7 @@ private[stream] object Collect { override def toString = s"Reduce.Logic(aggregator=$aggregator)" private var aggregator: T = _ + private val empty: T = aggregator private def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider @@ -2113,7 +2114,7 @@ private[stream] object Collect { decider(ex) match { case Supervision.Stop => failStage(ex) case Supervision.Restart => - aggregator = _: T + aggregator = empty setInitialInHandler() case _ => () diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 47c5184de7..e3f610398c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -425,7 +425,7 @@ import akka.util.ByteString case BUFFER_OVERFLOW => flushToUser() transportInChoppingBlock.putBack(transportInBuffer) - case s => fail(new IllegalStateException(s"unexpected status $s in doUnwrap()")) + case null => fail(new IllegalStateException(s"unexpected status 'null' in doUnwrap()")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index e9370ab89d..daa95bf5a0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -177,7 +177,7 @@ import akka.util.ByteString private def unbindCompleted(): Unit = { stageActor.unwatch(listener) - unbindPromise.trySuccess(Done) + unbindPromise.trySuccess(()) if (connectionFlowsAwaitingInitialization.get() == 0) completeStage() else scheduleOnce(BindShutdownTimer, bindShutdownTimeout) } @@ -192,7 +192,7 @@ import akka.util.ByteString override def postStop(): Unit = { // a bit unexpected to succeed here rather than fail with abrupt stage termination // but there was an existing test case covering this behavior - unbindPromise.trySuccess(Done) + unbindPromise.trySuccess(()) bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala index 292760d09c..3b8b62a87c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefSettingsImpl.scala @@ -12,7 +12,7 @@ import akka.stream.StreamRefSettings /** INTERNAL API */ @InternalApi -private[akka] final case class StreamRefSettingsImpl private ( +private[akka] final case class StreamRefSettingsImpl( override val bufferCapacity: Int, override val demandRedeliveryInterval: FiniteDuration, override val subscriptionTimeout: FiniteDuration, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 242d1e1795..da077524d2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -25,8 +25,8 @@ object BidiFlow { */ def fromGraph[I1, O1, I2, O2, M](g: Graph[BidiShape[I1, O1, I2, O2], M]): BidiFlow[I1, O1, I2, O2, M] = g match { - case bidi: BidiFlow[I1, O1, I2, O2, M] => bidi - case other => new BidiFlow(scaladsl.BidiFlow.fromGraph(other)) + case bidi: BidiFlow[I1, O1, I2, O2, M] @unchecked => bidi + case other => new BidiFlow(scaladsl.BidiFlow.fromGraph(other)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 71d843611e..257f10ec41 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -12,6 +12,8 @@ import java.util.concurrent.CompletionStage import akka.stream.{ javadsl, scaladsl, IOResult } import akka.util.ByteString import akka.util.ccompat.JavaConverters._ +import akka.stream.scaladsl.SinkToCompletionStage +import akka.stream.scaladsl.SourceToCompletionStage /** * Java API: Factories to create sinks and sources from files diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 008a75aca7..b00d260724 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -64,8 +64,8 @@ object Flow { */ def fromGraph[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = g match { - case f: Flow[I, O, M] => f - case other => new Flow(scaladsl.Flow.fromGraph(other)) + case f: Flow[I, O, M] @unchecked => f + case other => new Flow(scaladsl.Flow.fromGraph(other)) } /** @@ -1679,9 +1679,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def recoverWith( clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = - recoverWith { + recoverWith({ case elem if clazz.isInstance(elem) => supplier.get() - } + }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index fae4713823..4da302077e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -668,15 +668,15 @@ object GraphDSL extends GraphCreate { def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) - final class ForwardOps[T](out: Outlet[T]) { - def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { out ~> in; self } - def to(dst: SinkShape[_ >: T]): Builder[Mat] = { out ~> dst; self } - def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } - def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } - def via[U](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((out ~> f).outlet) - def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) - def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) - def out(): Outlet[T] = out + final class ForwardOps[T](_out: Outlet[T]) { + def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { _out ~> in; self } + def to(dst: SinkShape[_ >: T]): Builder[Mat] = { _out ~> dst; self } + def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { _out ~> j; self } + def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { _out ~> j; self } + def via[U](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((_out ~> f).outlet) + def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((_out ~> j).outlet) + def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((_out ~> j).outlet) + def out(): Outlet[T] = _out } final class ReverseOps[T](out: Inlet[T]) { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index edf298a9a7..28cfd492bc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -25,6 +25,7 @@ import akka.japi.function import akka.japi.function.Creator import akka.stream.{ javadsl, scaladsl, _ } import akka.stream.impl.LinearTraversalBuilder +import akka.stream.scaladsl.SinkToCompletionStage /** Java API */ object Sink { @@ -313,8 +314,8 @@ object Sink { */ def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] = g match { - case s: Sink[T, M] => s - case other => new Sink(scaladsl.Sink.fromGraph(other)) + case s: Sink[T, M] @unchecked => s + case other => new Sink(scaladsl.Sink.fromGraph(other)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 50af2d1f03..47d3219fac 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -622,7 +622,7 @@ object Source { */ def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match { - case s: Source[T, M] => s + case s: Source[T, M] @unchecked => s case s if s eq scaladsl.Source.empty => empty().asInstanceOf[Source[T, M]] case other => new Source(scaladsl.Source.fromGraph(other)) } @@ -2131,9 +2131,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def recoverWith( clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = - recoverWith { + recoverWith({ case elem if clazz.isInstance(elem) => supplier.get() - } + }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after @@ -2195,7 +2195,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = recoverWithRetries(attempts, { case elem if clazz.isInstance(elem) => supplier.get() - }) + }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** * Transform each input element into an `Iterable` of output elements that is diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 70b833f118..a7e485b0c4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -17,6 +17,8 @@ import akka.japi.function import akka.stream.{ javadsl, scaladsl } import akka.stream.IOResult import akka.util.ByteString +import akka.stream.scaladsl.SinkToCompletionStage +import akka.stream.scaladsl.SourceToCompletionStage /** * Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 483cea1cc1..7e5cfd9e0e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -232,8 +232,8 @@ object BidiFlow { */ def fromGraph[I1, O1, I2, O2, Mat](graph: Graph[BidiShape[I1, O1, I2, O2], Mat]): BidiFlow[I1, O1, I2, O2, Mat] = graph match { - case bidi: BidiFlow[I1, O1, I2, O2, Mat] => bidi - case bidi: javadsl.BidiFlow[I1, O1, I2, O2, Mat] => bidi.asScala + case bidi: BidiFlow[I1, O1, I2, O2, Mat] => bidi + case bidi: javadsl.BidiFlow[I1, O1, I2, O2, Mat] @unchecked => bidi.asScala case other => new BidiFlow(other.traversalBuilder, other.shape) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 93c38029d8..875040a75a 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -379,7 +379,7 @@ object Flow { def fromGraph[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = g match { case f: Flow[I, O, M] => f - case f: javadsl.Flow[I, O, M] => f.asScala + case f: javadsl.Flow[I, O, M] @unchecked => f.asScala case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] => // move these from the operator itself to make the returned source // behave as it is the operator with regards to attributes @@ -464,7 +464,7 @@ object Flow { */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])( combine: (M1, M2) => M): Flow[I, O, M] = - fromGraph(GraphDSL.create(sink, source)(combine) { _ => (in, out) => + fromGraph(GraphDSL.createGraph(sink, source)(combine) { _ => (in, out) => FlowShape(in.in, out.out) }) @@ -560,7 +560,7 @@ object Flow { def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])( combine: (M1, M2) => M): Flow[I, O, M] = // format: OFF - Flow.fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (i, o) => + Flow.fromGraph(GraphDSL.createGraph(sink, source)(combine) { implicit b => (i, o) => import GraphDSL.Implicits._ val bidi = b.add(new CoupledTerminationBidi[I, O]) /* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */ @@ -2699,7 +2699,7 @@ trait FlowOps[+Out, +Mat] { } protected def zipGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val zip = b.add(Zip[Out, U]()) r ~> zip.in1 FlowShape(zip.in0, zip.out) @@ -2725,7 +2725,7 @@ trait FlowOps[+Out, +Mat] { protected def zipLatestGraph[U, M]( that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val zip = b.add(ZipLatest[Out, U]()) r ~> zip.in1 FlowShape(zip.in0, zip.out) @@ -2748,7 +2748,7 @@ trait FlowOps[+Out, +Mat] { protected def zipWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])( combine: (Out, Out2) => Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val zip = b.add(ZipWith[Out, Out2, Out3](combine)) r ~> zip.in1 FlowShape(zip.in0, zip.out) @@ -2776,7 +2776,7 @@ trait FlowOps[+Out, +Mat] { protected def zipLatestWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])( combine: (Out, Out2) => Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val zip = b.add(ZipLatestWith[Out, Out2, Out3](combine)) r ~> zip.in1 FlowShape(zip.in0, zip.out) @@ -2858,7 +2858,7 @@ trait FlowOps[+Out, +Mat] { that: Graph[SourceShape[U], M], segmentSize: Int, eagerClose: Boolean = false): Graph[FlowShape[Out @uncheckedVariance, U], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val interleave = b.add(Interleave[U](2, segmentSize, eagerClose)) r ~> interleave.in(1) FlowShape(interleave.in(0), interleave.out) @@ -2882,7 +2882,7 @@ trait FlowOps[+Out, +Mat] { protected def mergeGraph[U >: Out, M]( that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(Merge[U](2, eagerComplete)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) @@ -2904,7 +2904,7 @@ trait FlowOps[+Out, +Mat] { protected def mergeLatestGraph[U >: Out, M]( that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(MergeLatest[U](2, eagerComplete)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) @@ -2929,7 +2929,7 @@ trait FlowOps[+Out, +Mat] { that: Graph[SourceShape[U], M], priority: Boolean, eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(MergePreferred[U](1, eagerComplete)) r ~> merge.in(if (priority) 0 else 1) FlowShape(merge.in(if (priority) 1 else 0), merge.out) @@ -2956,7 +2956,7 @@ trait FlowOps[+Out, +Mat] { leftPriority: Int, rightPriority: Int, eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(MergePrioritized[U](Seq(leftPriority, rightPriority), eagerComplete)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) @@ -2982,7 +2982,7 @@ trait FlowOps[+Out, +Mat] { protected def mergeSortedGraph[U >: Out, M](that: Graph[SourceShape[U], M])( implicit ord: Ordering[U]): Graph[FlowShape[Out @uncheckedVariance, U], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(new MergeSorted[U]) r ~> merge.in1 FlowShape(merge.in0, merge.out) @@ -3017,7 +3017,7 @@ trait FlowOps[+Out, +Mat] { protected def concatGraph[U >: Out, Mat2]( that: Graph[SourceShape[U], Mat2], detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(Concat[U](2, detached)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) @@ -3088,7 +3088,7 @@ trait FlowOps[+Out, +Mat] { protected def prependGraph[U >: Out, Mat2]( that: Graph[SourceShape[U], Mat2], detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => val merge = b.add(Concat[U](2, detached)) r ~> merge.in(0) FlowShape(merge.in(1), merge.out) @@ -3144,7 +3144,7 @@ trait FlowOps[+Out, +Mat] { protected def orElseGraph[U >: Out, Mat2]( secondary: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = - GraphDSL.create(secondary) { implicit b => secondary => + GraphDSL.createGraph(secondary) { implicit b => secondary => val orElse = b.add(OrElse[U]()) secondary ~> orElse.in(1) @@ -3200,7 +3200,7 @@ trait FlowOps[+Out, +Mat] { def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that)) protected def alsoToGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => import GraphDSL.Implicits._ val bcast = b.add(Broadcast[Out](2, eagerCancel = true)) bcast.out(1) ~> r @@ -3224,7 +3224,7 @@ trait FlowOps[+Out, +Mat] { protected def divertToGraph[M]( that: Graph[SinkShape[Out], M], when: Out => Boolean): Graph[FlowShape[Out @uncheckedVariance, Out], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => import GraphDSL.Implicits._ val partition = b.add(new Partition[Out](2, out => if (when(out)) 1 else 0, true)) partition.out(1) ~> r @@ -3250,7 +3250,7 @@ trait FlowOps[+Out, +Mat] { def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out] = via(wireTapGraph(that)) protected def wireTapGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = - GraphDSL.create(that) { implicit b => r => + GraphDSL.createGraph(that) { implicit b => r => import GraphDSL.Implicits._ val bcast = b.add(WireTap[Out]()) bcast.out1 ~> r diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index ed7ce7c477..5b80a56c4e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -239,7 +239,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Boo def isDraining: Boolean = drainingEnabled && draining // External API - def enqueue(ev: Event): Unit = { + private[MergeHub] def enqueue(ev: Event): Unit = { queue.add(ev) /* * Simple volatile var is enough, there is no need for a CAS here. The first important thing to note diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 39b4a6f08c..7dae6a21ee 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -139,7 +139,7 @@ object Sink { def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] = g match { case s: Sink[T, M] => s - case s: javadsl.Sink[T, M] => s.asScala + case s: javadsl.Sink[T, M] @unchecked => s.asScala case g: GraphStageWithMaterializedValue[SinkShape[T], M] => // move these from the stage itself to make the returned source // behave as it is the stage with regards to attributes @@ -161,7 +161,7 @@ object Sink { def fromMaterializer[T, M](factory: (Materializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = Flow .fromMaterializer({ (mat, attr) => - Flow.fromGraph(GraphDSL.create(factory(mat, attr)) { b => sink => + Flow.fromGraph(GraphDSL.createGraph(factory(mat, attr)) { b => sink => FlowShape(sink.in, b.materializedValue.outlet) }) }) @@ -579,7 +579,7 @@ object Sink { onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, - onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] = + onFailureMessage: (Throwable) => Any = Status.Failure.apply): Sink[T, NotUsed] = actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 68349c0bc9..91b4ca1ead 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -315,7 +315,7 @@ object Source { */ def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match { case s: Source[T, M] => s - case s: javadsl.Source[T, M] => s.asScala + case s: javadsl.Source[T, M] @unchecked => s.asScala case g: GraphStageWithMaterializedValue[SourceShape[T], M] => // move these from the stage itself to make the returned source // behave as it is the stage with regards to attributes @@ -783,7 +783,7 @@ object Source { */ def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])( strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = { - val secondPartiallyCombined = GraphDSL.create(second) { implicit b => secondShape => + val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b => secondShape => import GraphDSL.Implicits._ val c = b.add(strategy(2)) secondShape ~> c.in(1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index ed7d086662..67a37f8044 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -135,7 +135,7 @@ object StreamConverters { if (parallelism == 1) javaCollector[T, R](collectorFactory) else { Sink - .fromGraph(GraphDSL.create(Sink.head[R]) { implicit b => sink => + .fromGraph(GraphDSL.createGraph(Sink.head[R]) { implicit b => sink => import GraphDSL.Implicits._ val factory = collectorFactory.asInstanceOf[() => Collector[T, Any, R]] val balance = b.add(Balance[T](parallelism)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 693d0620b5..07c5b52fa3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -9,7 +9,6 @@ import java.util.concurrent.TimeoutException import javax.net.ssl.SSLContext import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.Duration @@ -17,9 +16,7 @@ import scala.concurrent.duration.FiniteDuration import scala.util.Success import scala.util.Try import scala.util.control.NoStackTrace - import scala.annotation.nowarn - import akka.Done import akka.NotUsed import akka.actor._ @@ -30,6 +27,7 @@ import akka.io.Inet.SocketOption import akka.stream._ import akka.stream.Attributes.Attribute import akka.stream.TLSProtocol.NegotiateNewSession +import akka.stream.impl.TcpImplicitExtensionIdApply import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.impl.io.ConnectionSourceStage import akka.stream.impl.io.OutgoingConnectionStage @@ -38,7 +36,7 @@ import akka.util.ByteString import akka.util.JavaDurationConverters._ import akka.util.unused -object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { +object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with ExtensionIdProvider { /** * Represents a successful TCP server binding. @@ -79,8 +77,6 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { */ final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) - def apply()(implicit system: ActorSystem): Tcp = super.apply(system) - override def get(system: ActorSystem): Tcp = super.get(system) override def get(system: ClassicActorSystemProvider): Tcp = super.get(system) @@ -90,7 +86,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { // just wraps/unwraps the TLS byte events to provide ByteString, ByteString flows private val tlsWrapping: BidiFlow[ByteString, TLSProtocol.SendBytes, TLSProtocol.SslTlsInbound, ByteString, NotUsed] = - BidiFlow.fromFlows(Flow[ByteString].map(TLSProtocol.SendBytes), Flow[TLSProtocol.SslTlsInbound].collect { + BidiFlow.fromFlows(Flow[ByteString].map(TLSProtocol.SendBytes.apply), Flow[TLSProtocol.SslTlsInbound].collect { case sb: TLSProtocol.SessionBytes => sb.bytes // ignore other kinds of inbounds (currently only Truncated) }) diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 0817a2c2c7..3aae399e02 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -282,7 +282,7 @@ private[akka] object ConcurrentAsyncCallbackState { // stream is initialized and so no threads can just send events without any synchronization overhead case object Initialized extends State[Nothing] // Event with feedback promise - final case class Event[E](e: E, handlingPromise: Promise[Done]) + final case class Event[+E](e: E, handlingPromise: Promise[Done]) val NoPendingEvents = Pending[Nothing](Nil) } @@ -1243,7 +1243,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: // started - can just dispatch async message to interpreter onAsyncInput(event, promise) - case list @ Pending(l) => + case list @ Pending(l: List[Event[T]]) => // not started yet if (!currentState.compareAndSet(list, Pending[T](Event[T](event, promise) :: l))) invokeWithPromise(event, promise) diff --git a/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala b/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala index 5755b5d24a..682ed5107e 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/StageLogging.scala @@ -30,7 +30,7 @@ trait StageLogging { self: GraphStageLogic => if (_log eq null) { materializer match { case p: MaterializerLoggingProvider => - _log = p.makeLogger(logSource) + _log = p.makeLogger(logSource.asInstanceOf[Class[Any]]) case _ => _log = NoLogging } diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala index e814cb9845..bf454f07aa 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala @@ -8,23 +8,21 @@ import java.security.KeyStore import java.security.cert.CertPathValidatorException import java.util.Collections import javax.net.ssl._ - import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory import com.typesafe.sslconfig.ssl._ import com.typesafe.sslconfig.util.LoggerFactory - import akka.actor._ import akka.annotation.InternalApi import akka.event.Logging +import akka.stream.impl.AkkaSSLConfigExtensionIdApply @deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0") -object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider { +object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with AkkaSSLConfigExtensionIdApply with ExtensionIdProvider { //////////////////// EXTENSION SETUP /////////////////// override def get(system: ActorSystem): AkkaSSLConfig = super.get(system) override def get(system: ClassicActorSystemProvider): AkkaSSLConfig = super.get(system) - def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system) override def lookup = AkkaSSLConfig @@ -44,7 +42,7 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett private val mkLogger = new AkkaLoggerFactory(system) - private val log = Logging(system, getClass) + private val log = Logging(system, classOf[AkkaSSLConfig]) log.debug("Initializing AkkaSSLConfig extension...") /** Can be used to modify the underlying config, most typically used to change a few values in the default config */ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bb0c4d211c..86a345c206 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,7 +29,7 @@ object Dependencies { val scala212Version = "2.12.14" val scala213Version = "2.13.5" - val scala3Version = "3.0.0" + val scala3Version = "3.0.1-RC1" val reactiveStreamsVersion = "1.0.3"