diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala index 644de1b8e8..2dc282d8a1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala @@ -20,7 +20,7 @@ class FlowTakeSpec extends StreamSpec with ScriptedTest { implicit val materializer = ActorMaterializer(settings) - muteDeadLetters(classOf[OnNext], OnComplete.getClass, classOf[RequestMore])() + muteDeadLetters(classOf[OnNext], OnComplete.getClass, classOf[RequestMore[_]])() "A Take" must { diff --git a/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes new file mode 100644 index 0000000000..ddb5a46860 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes @@ -0,0 +1,13 @@ +## removing compiler warnings #26673 +# unused +ProblemFilters.exclude[MissingClassProblem]("akka.stream.TransformerLike") +# internal +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.RequestMore$") +ProblemFilters.exclude[MissingFieldProblem]("akka.stream.impl.ReactiveStreamsCompliance#SpecViolation.serialVersionUID") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Cancel$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefsMaster.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.OutgoingConnectionStage.$default$4") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.ConnectionSourceStage.options") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.ConnectionSourceStage.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.OutgoingConnectionStage.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub#MergedSourceLogic.this") diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 2b607fe132..03b036253c 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -6,6 +6,7 @@ package akka.stream import akka.actor.Cancellable import akka.annotation.InternalApi +import com.github.ghik.silencer.silent import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration @@ -22,6 +23,7 @@ import scala.concurrent.duration.FiniteDuration * * Once the SPI is final this notice will be removed. */ +@silent // deprecatedName(symbol) is deprecated but older Scala versions don't have a string signature, since "2.5.8" abstract class Materializer { /** diff --git a/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala index 2bae757242..3d63338963 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala @@ -10,7 +10,7 @@ class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStack class BindFailedException extends StreamTcpException("bind failed") -@deprecated("BindFailedException object will never be thrown. Match on the class instead.") +@deprecated("BindFailedException object will never be thrown. Match on the class instead.", "2.4.19") case object BindFailedException extends BindFailedException class ConnectionException(msg: String) extends StreamTcpException(msg) diff --git a/akka-stream/src/main/scala/akka/stream/Transformer.scala b/akka-stream/src/main/scala/akka/stream/Transformer.scala deleted file mode 100644 index 564fc8d375..0000000000 --- a/akka-stream/src/main/scala/akka/stream/Transformer.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream - -import scala.collection.immutable - -private[akka] abstract class TransformerLike[-T, +U] { - - /** - * Invoked for each element to produce a (possibly empty) sequence of - * output elements. - */ - def onNext(element: T): immutable.Seq[U] - - /** - * Invoked after handing off the elements produced from one input element to the - * downstream subscribers to determine whether to end stream processing at this point; - * in that case the upstream subscription is canceled. - */ - def isComplete: Boolean = false - - /** - * Invoked before the Transformer terminates (either normal completion or after an onError) - * to produce a (possibly empty) sequence of elements in response to the - * end-of-stream event. - * - * This method is only called if [[#onError]] does not throw an exception. The default implementation - * of [[#onError]] throws the received cause forcing the failure to propagate downstream immediately. - * - * @param e Contains a non-empty option with the error causing the termination or an empty option - * if the Transformer was completed normally - */ - def onTermination(e: Option[Throwable]): immutable.Seq[U] = Nil - - /** - * Invoked when failure is signaled from upstream. If this method throws an exception, then onError is immediately - * propagated downstream. If this method completes normally then [[#onTermination]] is invoked as a final - * step, passing the original cause. - */ - def onError(cause: Throwable): Unit = throw cause - - /** - * Invoked after normal completion or failure. - */ - def cleanup(): Unit = () - -} diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index ec1570abf7..e0ef3c09f5 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -5,13 +5,16 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap + import akka.actor._ import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport } import org.reactivestreams.{ Publisher, Subscriber, Subscription } + import concurrent.duration.Duration import concurrent.duration.FiniteDuration import akka.stream.impl.CancelledSubscription import akka.stream.impl.ReactiveStreamsCompliance._ +import com.github.ghik.silencer.silent @deprecated( "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", @@ -405,6 +408,7 @@ trait ActorPublisher[T] extends Actor { /** * INTERNAL API */ +@silent private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] { import ActorPublisher.Internal._ diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala index aaf97513e4..53e60da05b 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala @@ -6,6 +6,8 @@ package akka.stream.extra import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Source +import com.github.ghik.silencer.silent + import scala.concurrent.duration.FiniteDuration /** @@ -18,11 +20,13 @@ object Implicits { * * See [[Timed]] */ + @deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated implicit class TimedSourceDsl[I, Mat](val source: Source[I, Mat]) extends AnyVal { /** * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. */ + @silent def timed[O, Mat2]( measuredOps: Source[I, Mat] => Source[O, Mat2], onComplete: FiniteDuration => Unit): Source[O, Mat2] = @@ -31,6 +35,7 @@ object Implicits { /** * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ + @silent def timedIntervalBetween(matching: I => Boolean, onInterval: FiniteDuration => Unit): Source[I, Mat] = Timed.timedIntervalBetween[I, Mat](source, matching, onInterval) } @@ -40,11 +45,13 @@ object Implicits { * * See [[Timed]] */ + @deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated implicit class TimedFlowDsl[I, O, Mat](val flow: Flow[I, O, Mat]) extends AnyVal { /** * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. */ + @silent def timed[Out, Mat2]( measuredOps: Flow[I, O, Mat] => Flow[I, Out, Mat2], onComplete: FiniteDuration => Unit): Flow[I, Out, Mat2] = @@ -53,6 +60,7 @@ object Implicits { /** * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ + @silent def timedIntervalBetween(matching: O => Boolean, onInterval: FiniteDuration => Unit): Flow[I, O, Mat] = Timed.timedIntervalBetween[I, O, Mat](flow, matching, onInterval) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 0a66ea2b02..346be46b3a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -11,6 +11,7 @@ import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext } import org.reactivestreams.{ Processor, Subscriber, Subscription } import akka.event.Logging +import akka.util.unused /** * INTERNAL API @@ -153,7 +154,7 @@ import akka.event.Logging case OnSubscribe(_) => throw new IllegalStateException("onSubscribe called after onError or onComplete") } - protected def inputOnError(e: Throwable): Unit = { + protected def inputOnError(@unused e: Throwable): Unit = { clear() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala index bdf23aa849..c6fdd6989e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala @@ -11,6 +11,7 @@ import akka.stream.actor.WatermarkRequestStrategy import akka.actor.Props import akka.actor.Terminated import akka.annotation.InternalApi +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -23,6 +24,7 @@ import akka.annotation.InternalApi /** * INTERNAL API */ +@silent @InternalApi private[akka] class ActorRefSinkActor( ref: ActorRef, highWatermark: Int, diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index 6a8e611c47..5e4aed06f0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -11,8 +11,6 @@ import akka.stream._ import akka.stream.stage._ import akka.util.OptionVal -import scala.annotation.tailrec - private object ActorRefSource { private sealed trait ActorRefStage { def ref: ActorRef } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 3b0f2cff8c..e967ada36d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -8,6 +8,7 @@ import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage } +import akka.util.unused import org.reactivestreams.{ Subscriber, Subscription } /** @@ -120,7 +121,7 @@ import org.reactivestreams.{ Subscriber, Subscription } def onError(input: Int, e: Throwable): Unit - def onDepleted(input: Int): Unit = () + def onDepleted(@unused input: Int): Unit = () def onCompleteWhenNoInput(): Unit = () diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 1b47297843..c1c023c90b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -9,6 +9,7 @@ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import scala.collection.immutable import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } +import akka.util.unused import org.reactivestreams.Subscription /** @@ -188,7 +189,7 @@ import org.reactivestreams.Subscription enqueue(id, elem) } - def onCancel(output: Int): Unit = () + def onCancel(@unused output: Int): Unit = () def demandAvailableFor(id: Int) = new TransferState { override def isCompleted: Boolean = cancelled(id) || completed(id) || errored(id) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 87f0447250..be29cd2f7b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -17,14 +17,14 @@ import akka.annotation.InternalApi /** * INTERNAL API */ -@InternalApi private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long) +@InternalApi private[akka] final case class RequestMore[T](subscription: ActorSubscription[T], demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ -@InternalApi private[akka] final case class Cancel(subscription: ActorSubscription[_]) +@InternalApi private[akka] final case class Cancel[T](subscription: ActorSubscription[T]) extends DeadLetterSuppression with NoSerializationVerificationNeeded diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 1e77477050..396bcc6491 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -13,6 +13,7 @@ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import akka.event.Logging +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -98,6 +99,7 @@ import akka.event.Logging shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { + @silent override def create(context: MaterializationContext) = { val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index c6183b916a..7b667e5062 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -75,7 +75,6 @@ import org.reactivestreams.{ Subscriber, Subscription } final def requireNonNullSubscription(subscription: Subscription): Unit = if (subscription == null) throw subscriptionMustNotBeNullException - @SerialVersionUID(1L) sealed trait SpecViolation extends Throwable @SerialVersionUID(1L) @@ -90,7 +89,7 @@ import org.reactivestreams.{ Subscriber, Subscription } case other => try subscriber.onError(other) catch { - case NonFatal(t) => throw new SignalThrewException(subscriber + ".onError", t) + case NonFatal(t) => throw new SignalThrewException(s"${subscriber}.onError", t) } } @@ -98,21 +97,21 @@ import org.reactivestreams.{ Subscriber, Subscription } requireNonNullElement(element) try subscriber.onNext(element) catch { - case NonFatal(t) => throw new SignalThrewException(subscriber + ".onNext", t) + case NonFatal(t) => throw new SignalThrewException(s"${subscriber}.onNext", t) } } final def tryOnSubscribe[T](subscriber: Subscriber[T], subscription: Subscription): Unit = { try subscriber.onSubscribe(subscription) catch { - case NonFatal(t) => throw new SignalThrewException(subscriber + ".onSubscribe", t) + case NonFatal(t) => throw new SignalThrewException(s"${subscriber}.onSubscribe", t) } } final def tryOnComplete[T](subscriber: Subscriber[T]): Unit = { try subscriber.onComplete() catch { - case NonFatal(t) => throw new SignalThrewException(subscriber + ".onComplete", t) + case NonFatal(t) => throw new SignalThrewException(s"${subscriber}.onComplete", t) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index b71b20c4df..2e00baf329 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -15,6 +15,7 @@ import scala.language.existentials import scala.collection.immutable.Map.Map1 import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStages.SingleSource +import akka.util.unused /** * INTERNAL API @@ -46,7 +47,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource Concat.normalizeConcat(this, that) } - def rewireFirstTo(relativeOffset: Int): Traversal = null + def rewireFirstTo(@unused relativeOffset: Int): Traversal = null } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala index 14fcbbb5a9..20fe06f011 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala @@ -107,11 +107,12 @@ import scala.util.control.NonFatal private def createResource(): Unit = { create().onComplete { resource => - createdCallback(resource).recover { + createdCallback(resource).failed.foreach { case _: StreamDetachedException => // stream stopped resource match { - case Success(r) => close(r) + case Success(r) => + close(r) case Failure(ex) => throw ex // failed to open but stream is stopped already } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 419c1c7057..1924b98930 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -63,7 +63,8 @@ import scala.util.control.NonFatal extends UpstreamBoundaryStageLogic[Any] with OutHandler { - final case class OnError(shell: GraphInterpreterShell, cause: Throwable) extends SimpleBoundaryEvent { + // can't be final because of SI-4440 + case class OnError(shell: GraphInterpreterShell, cause: Throwable) extends SimpleBoundaryEvent { override def execute(): Unit = { if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError port=$internalPortName") BatchingActorInputBoundary.this.onError(cause) @@ -71,7 +72,8 @@ import scala.util.control.NonFatal override def logic: GraphStageLogic = BatchingActorInputBoundary.this } - final case class OnComplete(shell: GraphInterpreterShell) extends SimpleBoundaryEvent { + // can't be final because of SI-4440 + case class OnComplete(shell: GraphInterpreterShell) extends SimpleBoundaryEvent { override def execute(): Unit = { if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete port=$internalPortName") BatchingActorInputBoundary.this.onComplete() @@ -79,7 +81,8 @@ import scala.util.control.NonFatal override def logic: GraphStageLogic = BatchingActorInputBoundary.this } - final case class OnNext(shell: GraphInterpreterShell, e: Any) extends SimpleBoundaryEvent { + // can't be final because of SI-4440 + case class OnNext(shell: GraphInterpreterShell, e: Any) extends SimpleBoundaryEvent { override def execute(): Unit = { if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e port=$internalPortName") BatchingActorInputBoundary.this.onNext(e) @@ -87,7 +90,8 @@ import scala.util.control.NonFatal override def logic: GraphStageLogic = BatchingActorInputBoundary.this } - final case class OnSubscribe(shell: GraphInterpreterShell, subscription: Subscription) extends SimpleBoundaryEvent { + // can't be final because of SI-4440 + case class OnSubscribe(shell: GraphInterpreterShell, subscription: Subscription) extends SimpleBoundaryEvent { override def execute(): Unit = { if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe port=$internalPortName") shell.subscribeArrived() @@ -464,7 +468,8 @@ import scala.util.control.NonFatal * @param promise Will be completed upon processing the event, or failed if processing the event throws * if the event isn't ever processed the promise (the operator stops) is failed elsewhere */ - final case class AsyncInput( + // can't be final because of SI-4440 + case class AsyncInput( shell: GraphInterpreterShell, logic: GraphStageLogic, evt: Any, @@ -484,7 +489,8 @@ import scala.util.control.NonFatal } } - final case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent { + // can't be final because of SI-4440 + case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent { override def execute(eventLimit: Int): Int = if (!waitingForShutdown) { if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") @@ -492,7 +498,8 @@ import scala.util.control.NonFatal } else eventLimit } - final case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent { + // can't be final because of SI-4440 + case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent { override def execute(eventLimit: Int): Int = { if (waitingForShutdown) { subscribesPending = 0 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 0f74918fab..d1c0c3cbda 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -18,19 +18,21 @@ import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl } import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.stream.stage._ import akka.stream.{ Supervision, _ } + import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.{ Future, Promise } import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.{ Failure, Success, Try } - import akka.stream.ActorAttributes.SupervisionStrategy + import scala.concurrent.duration.{ FiniteDuration, _ } import scala.util.control.Exception.Catcher - import akka.stream.impl.Stages.DefaultAttributes import akka.util.OptionVal +import akka.util.unused +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -473,7 +475,7 @@ private[stream] object Collect { }) } - private def onRestart(t: Throwable): Unit = { + private def onRestart(): Unit = { current = zero elementHandled = false } @@ -498,7 +500,7 @@ private[stream] object Collect { case Supervision.Stop => failStage(t) case Supervision.Resume => safePull() case Supervision.Restart => - onRestart(t) + onRestart() safePull() } elementHandled = true @@ -531,7 +533,7 @@ private[stream] object Collect { case NonFatal(ex) => decider(ex) match { case Supervision.Stop => failStage(ex) - case Supervision.Restart => onRestart(ex) + case Supervision.Restart => onRestart() case Supervision.Resume => () } tryPull(in) @@ -629,7 +631,7 @@ private[stream] object Collect { private var aggregator: Out = zero private var aggregating: Future[Out] = Future.successful(aggregator) - private def onRestart(t: Throwable): Unit = { + private def onRestart(@unused t: Throwable): Unit = { aggregator = zero } @@ -1407,6 +1409,7 @@ private[stream] object Collect { private lazy val self = getStageActor { case (_, Terminated(`targetRef`)) => failStage(new WatchedActorTerminatedException("Watch", targetRef)) + case (_, _) => // keep the compiler happy (stage actor receive is total) } override def preStart(): Unit = { @@ -1886,7 +1889,7 @@ private[stream] object Collect { new GraphStageLogic(shape) with InHandler with OutHandler { self => override def toString = s"Reduce.Logic(aggregator=$aggregator)" - var aggregator: T = _ + private var aggregator: T = _ private def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider @@ -1905,6 +1908,7 @@ private[stream] object Collect { }) } + @silent // compiler complaining about aggregator = _: T override def onPush(): Unit = { val elem = grab(in) try { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 1353307b75..123b0c41d0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -13,6 +13,7 @@ import akka.annotation.InternalApi import akka.stream.{ AbruptIOTerminationException, IOResult } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString +import com.github.ghik.silencer.silent import scala.collection.JavaConverters._ import scala.concurrent.Promise @@ -33,6 +34,7 @@ import scala.util.{ Failure, Success, Try } } /** INTERNAL API */ +@silent @InternalApi private[akka] class FileSubscriber( f: Path, completionPromise: Promise[IOResult], diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index a704eb0928..dc9c14c29e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -17,6 +17,7 @@ import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.stage._ import akka.stream.{ IOResult, _ } import akka.util.ByteString +import com.github.ghik.silencer.silent import org.reactivestreams.Publisher import scala.annotation.tailrec @@ -154,6 +155,7 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: val materializer = ActorMaterializerHelper.downcast(context.materializer) val ioResultPromise = Promise[IOResult]() + @silent val pub = try { val is = createInputStream() // can throw, i.e. FileNotFound diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 57bcc2231e..e31e9d5132 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import akka.stream.actor.ActorPublisherMessage import akka.stream.IOResult import akka.util.ByteString +import com.github.ghik.silencer.silent import scala.concurrent.Promise import scala.util.{ Failure, Success } @@ -29,6 +30,7 @@ import scala.util.{ Failure, Success } } /** INTERNAL API */ +@silent @InternalApi private[akka] class InputStreamPublisher( is: InputStream, completionPromise: Promise[IOResult], @@ -44,9 +46,9 @@ import scala.util.{ Failure, Success } var readBytesTotal = 0L def receive = { - case ActorPublisherMessage.Request(elements) => readAndSignal() - case Continue => readAndSignal() - case ActorPublisherMessage.Cancel => context.stop(self) + case ActorPublisherMessage.Request(_) => readAndSignal() + case Continue => readAndSignal() + case ActorPublisherMessage.Cancel => context.stop(self) } def readAndSignal(): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 1eeadc1dfe..8e6b24388e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -171,7 +171,7 @@ private[stream] object InputStreamSinkStage { } catch { case ex: InterruptedException => throw new IOException(ex) } - case Some(data) => + case Some(_) => readBytes(a, begin, length) } } else -1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index eb28d132ba..a65d20fa62 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.stream.{ AbruptIOTerminationException, IOResult } import akka.util.ByteString +import com.github.ghik.silencer.silent import scala.concurrent.Promise import scala.util.{ Failure, Success } @@ -26,6 +27,7 @@ import scala.util.{ Failure, Success } } /** INTERNAL API */ +@silent @InternalApi private[akka] class OutputStreamSubscriber( os: OutputStream, completionPromise: Promise[IOResult], diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 18042dfd03..4ff900d201 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -237,14 +237,14 @@ import scala.util.{ Failure, Success, Try } val flushingOutbound = TransferPhase(outboundHalfClosed) { () => if (tracing) log.debug("flushingOutbound") try doWrap() - catch { case ex: SSLException => nextPhase(completedPhase) } + catch { case _: SSLException => nextPhase(completedPhase) } } val awaitingClose = TransferPhase(inputBunch.inputsAvailableFor(TransportIn) && engineInboundOpen) { () => if (tracing) log.debug("awaitingClose") transportInChoppingBlock.chopInto(transportInBuffer) try doUnwrap(ignoreOutput = true) - catch { case ex: SSLException => nextPhase(completedPhase) } + catch { case _: SSLException => nextPhase(completedPhase) } } val outboundClosed = TransferPhase(outboundHalfClosed || inbound) { () => @@ -253,7 +253,7 @@ import scala.util.{ Failure, Success, Try } if (continue && outboundHalfClosed.isReady) { if (tracing) log.debug("outboundClosed continue") try doWrap() - catch { case ex: SSLException => nextPhase(completedPhase) } + catch { case _: SSLException => nextPhase(completedPhase) } } } @@ -274,7 +274,7 @@ import scala.util.{ Failure, Success, Try } if (inputBunch.isDepleted(TransportIn) && transportInChoppingBlock.isEmpty) { if (tracing) log.debug("closing inbound") try engine.closeInbound() - catch { case ex: SSLException => outputBunch.enqueue(UserOut, SessionTruncated) } + catch { case _: SSLException => outputBunch.enqueue(UserOut, SessionTruncated) } lastHandshakeStatus = engine.getHandshakeStatus completeOrFlush() false diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 670cdcd91a..a147f98e66 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -34,7 +34,7 @@ import scala.concurrent.{ Future, Promise } val tcpManager: ActorRef, val endpoint: InetSocketAddress, val backlog: Int, - val options: immutable.Traversable[SocketOption], + val options: immutable.Iterable[SocketOption], val halfClose: Boolean, val idleTimeout: Duration, val bindShutdownTimeout: FiniteDuration, @@ -420,7 +420,7 @@ private[stream] object ConnectionSourceStage { manager: ActorRef, remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[SocketOption] = Nil, + options: immutable.Iterable[SocketOption] = Nil, halfClose: Boolean = true, connectTimeout: Duration = Duration.Inf, ioSettings: IOSettings) @@ -473,7 +473,7 @@ private[stream] object ConnectionSourceStage { val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = BidiFlow.fromFlows( Flow[ByteString].mapError { - case t: TimeoutException => + case _: TimeoutException => new TcpIdleTimeoutException( s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout", idleTimeout) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala index b9f285bdee..591c6cdaa6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala @@ -18,6 +18,8 @@ import akka.util.ByteString override def createLogic(attr: Attributes) = new DecompressorParsingLogic { override val inflater: Inflater = new Inflater(true) + private val crc32: CRC32 = new CRC32 + override def afterInflate: ParseStep[ByteString] = ReadTrailer override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = crc32.update(buffer, offset, length) @@ -46,7 +48,7 @@ import akka.util.ByteString ParseResult(None, inflating, acceptUpstreamFinish = false) } } - var crc32: CRC32 = new CRC32 + private def fail(msg: String) = throw new ZipException(msg) /** Reading the trailer */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index 0196e3591b..2128264a10 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -136,6 +136,8 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn } tryPull() + + case (_, _) => // keep the compiler happy (stage actor receive is total) } override def onPush(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 89a578405e..00be15872a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -196,6 +196,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio s"Received UNEXPECTED Terminated($ref) message! " + s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down.")) } + + case (_, _) => // keep the compiler happy (stage actor receive is total) } def tryPush(): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala index ef63f399e7..dd3f295f1e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/StreamRefsMaster.scala @@ -6,7 +6,6 @@ package akka.stream.impl.streamref import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi -import akka.event.Logging import akka.stream.impl.SeqActorName /** INTERNAL API */ @@ -14,7 +13,7 @@ import akka.stream.impl.SeqActorName private[stream] object StreamRefsMaster extends ExtensionId[StreamRefsMaster] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): StreamRefsMaster = - new StreamRefsMaster(system) + new StreamRefsMaster override def lookup(): StreamRefsMaster.type = this @@ -23,9 +22,7 @@ private[stream] object StreamRefsMaster extends ExtensionId[StreamRefsMaster] wi /** INTERNAL API */ @InternalApi -private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extends Extension { - - private val log = Logging(system, getClass) +private[stream] final class StreamRefsMaster extends Extension { private[this] val sourceRefStageNames = SeqActorName("SourceRef") // "local target" private[this] val sinkRefStageNames = SeqActorName("SinkRef") // "remote sender" diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 123b01c211..e1f5b2b8fc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -7,6 +7,7 @@ package akka.stream.javadsl import akka.NotUsed import akka.japi.function import akka.stream._ +import com.github.ghik.silencer.silent import scala.concurrent.duration.FiniteDuration @@ -108,6 +109,7 @@ object BidiFlow { * every second in one direction, but no elements are flowing in the other direction. I.e. this operator considers * the *joint* frequencies of the elements in both directions. */ + @silent def bidirectionalIdleTimeout[I, O](timeout: java.time.Duration): BidiFlow[I, I, O, O, NotUsed] = { import akka.util.JavaDurationConverters._ bidirectionalIdleTimeout(timeout.asScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6fe6c12e76..766daec987 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -21,6 +21,8 @@ import akka.actor.ActorRef import akka.dispatch.ExecutionContexts import akka.stream.impl.fusing.LazyFlow import akka.annotation.ApiMayChange +import akka.util.unused +import com.github.ghik.silencer.silent import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ @@ -49,7 +51,7 @@ object Flow { Flow.create[I]().map(f) /** Create a `Flow` which can process elements of type `T`. */ - def of[T](clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() + def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() /** * A graph with the shape of a flow logically is a flow, this method makes it so also in type. @@ -1130,6 +1132,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = groupedWithin(n, d.asScala) @@ -1177,6 +1180,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWeightedWithin( maxWeight: Long, costFn: function.Function[Out, java.lang.Long], @@ -1238,6 +1242,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @silent def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = delay(of.asScala, strategy) @@ -1283,6 +1288,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = dropWithin(d.asScala) @@ -1431,6 +1437,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels * */ + @silent def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recoverWith(pf)) @@ -1590,6 +1597,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * See also [[Flow.limit]], [[Flow.limitWeighted]] */ + @silent def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = takeWithin(d.asScala) @@ -2686,6 +2694,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = initialTimeout(timeout.asScala) @@ -2718,6 +2727,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = completionTimeout(timeout.asScala) @@ -2752,6 +2762,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = idleTimeout(timeout.asScala) @@ -2786,6 +2797,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = backpressureTimeout(timeout.asScala) @@ -2828,6 +2840,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) @@ -3235,6 +3248,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @silent def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] = initialDelay(delay.asScala) @@ -3407,7 +3421,7 @@ object RunnableGraph { def fromGraph[Mat](graph: Graph[ClosedShape, Mat]): RunnableGraph[Mat] = graph match { case r: RunnableGraph[Mat] => r - case other => new RunnableGraphAdapter[Mat](scaladsl.RunnableGraph.fromGraph(graph)) + case _ => new RunnableGraphAdapter[Mat](scaladsl.RunnableGraph.fromGraph(graph)) } /** INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index d7c22e6d65..c65126bdae 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -13,9 +13,8 @@ import akka.util.ConstantFun import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } -import akka.stream.Attributes -import akka.stream.impl.TraversalBuilder +import akka.stream.scaladsl.GenericGraph +import akka.util.unused /** * Merge several streams, taking elements as they arrive from input streams @@ -40,7 +39,7 @@ object Merge { /** * Create a new `Merge` operator with the specified output type. */ - def create[T](clazz: Class[T], inputPorts: Int): Graph[UniformFanInShape[T, T], NotUsed] = create(inputPorts) + def create[T](@unused clazz: Class[T], inputPorts: Int): Graph[UniformFanInShape[T, T], NotUsed] = create(inputPorts) /** * Create a new `Merge` operator with the specified output type. @@ -57,7 +56,10 @@ object Merge { * @param eagerComplete set to true in order to make this operator eagerly * finish as soon as one of its inputs completes */ - def create[T](clazz: Class[T], inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = + def create[T]( + @unused clazz: Class[T], + inputPorts: Int, + eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = create(inputPorts, eagerComplete) } @@ -85,7 +87,9 @@ object MergePreferred { /** * Create a new `MergePreferred` operator with the specified output type. */ - def create[T](clazz: Class[T], secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], NotUsed] = + def create[T]( + @unused clazz: Class[T], + secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], NotUsed] = create(secondaryPorts) /** @@ -106,7 +110,7 @@ object MergePreferred { * finish as soon as one of its inputs completes */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], NotUsed] = create(secondaryPorts, eagerComplete) @@ -136,12 +140,12 @@ object MergePrioritized { * Create a new `MergePrioritized` operator with the specified output type. */ def create[T](priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] = - scaladsl.MergePrioritized(priorities) + scaladsl.MergePrioritized(priorities.toIndexedSeq) /** * Create a new `MergePrioritized` operator with the specified output type. */ - def create[T](clazz: Class[T], priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] = + def create[T](@unused clazz: Class[T], priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] = create(priorities) /** @@ -151,7 +155,7 @@ object MergePrioritized { * finish as soon as one of its inputs completes */ def create[T](priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = - scaladsl.MergePrioritized(priorities, eagerComplete = eagerComplete) + scaladsl.MergePrioritized(priorities.toIndexedSeq, eagerComplete = eagerComplete) /** * Create a new `MergePrioritized` operator with the specified output type. @@ -160,7 +164,7 @@ object MergePrioritized { * finish as soon as one of its inputs completes */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = create(priorities, eagerComplete) @@ -202,7 +206,8 @@ object Broadcast { /** * Create a new `Broadcast` operator with the specified input type. */ - def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount) + def create[T](@unused clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = + create(outputCount) } @@ -230,7 +235,7 @@ object Partition { def create[T]( outputCount: Int, partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] = - new scaladsl.Partition(outputCount, partitioner.apply) + new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false) /** * Create a new `Partition` operator with the specified input type. @@ -253,10 +258,10 @@ object Partition { * @param partitioner function deciding which output each element will be targeted */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], outputCount: Int, partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] = - new scaladsl.Partition(outputCount, partitioner.apply) + new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false) /** * Create a new `Partition` operator with the specified input type. @@ -267,7 +272,7 @@ object Partition { * @param eagerCancel this operator cancels, when any (true) or all (false) of the downstreams cancel */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], outputCount: Int, partitioner: function.Function[T, Integer], eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = @@ -327,7 +332,7 @@ object Balance { * @param clazz a type hint for this method * @param outputCount number of output ports */ - def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = + def create[T](@unused clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount) /** @@ -338,7 +343,7 @@ object Balance { * @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount, waitForAllDownstreams) @@ -352,7 +357,7 @@ object Balance { * @param eagerCancel if true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled. */ def create[T]( - clazz: Class[T], + @unused clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = @@ -479,7 +484,8 @@ object Unzip { /** * Creates a new `Unzip` operator with the specified output types. */ - def create[A, B](left: Class[A], right: Class[B]): Graph[FanOutShape2[A Pair B, A, B], NotUsed] = create[A, B]() + def create[A, B](@unused left: Class[A], @unused right: Class[B]): Graph[FanOutShape2[A Pair B, A, B], NotUsed] = + create[A, B]() } @@ -511,7 +517,7 @@ object Concat { /** * Create a new anonymous `Concat` operator with the specified input types. */ - def create[T](clazz: Class[T]): Graph[UniformFanInShape[T, T], NotUsed] = create() + def create[T](@unused clazz: Class[T]): Graph[UniformFanInShape[T, T], NotUsed] = create() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala index 8ed2a41fb7..e1f1d281dc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -9,6 +9,7 @@ import java.util.function.{ BiFunction, Supplier, ToLongBiFunction } import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange +import akka.util.unused /** * A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of @@ -33,7 +34,7 @@ object MergeHub { * @param clazz Type of elements this hub emits and consumes * @param perProducerBufferSize Buffer space used per producer. */ - def of[T](clazz: Class[T], perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = { + def of[T](@unused clazz: Class[T], perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = { akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize).mapMaterializedValue(_.asJava[T]).asJava } @@ -83,7 +84,7 @@ object BroadcastHub { * concurrent consumers can be in terms of element. If the buffer is full, the producer * is backpressured. Must be a power of two and less than 4096. */ - def of[T](clazz: Class[T], bufferSize: Int): Sink[T, Source[T, NotUsed]] = { + def of[T](@unused clazz: Class[T], bufferSize: Int): Sink[T, Source[T, NotUsed]] = { akka.stream.scaladsl.BroadcastHub.sink[T](bufferSize).mapMaterializedValue(_.asJava).asJava } @@ -133,7 +134,7 @@ object PartitionHub { * is backpressured. */ @ApiMayChange def ofStateful[T]( - clazz: Class[T], + @unused clazz: Class[T], partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]], startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = { @@ -183,7 +184,7 @@ object PartitionHub { * is backpressured. */ @ApiMayChange def of[T]( - clazz: Class[T], + @unused clazz: Class[T], partitioner: BiFunction[Integer, T, Integer], startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala index 31bb038ddc..13a7ee2045 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartFlow.scala @@ -6,6 +6,7 @@ package akka.stream.javadsl import akka.NotUsed import akka.japi.function.Creator +import com.github.ghik.silencer.silent import scala.concurrent.duration.FiniteDuration @@ -78,6 +79,7 @@ object RestartFlow { * In order to skip this additional delay pass in `0`. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ + @silent def withBackoff[In, Out]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -152,6 +154,7 @@ object RestartFlow { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ + @silent def withBackoff[In, Out]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -227,6 +230,7 @@ object RestartFlow { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ + @silent def onFailuresWithBackoff[In, Out]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala index 07a9a385c8..c0f3b07824 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala @@ -6,6 +6,7 @@ package akka.stream.javadsl import akka.NotUsed import akka.japi.function.Creator +import com.github.ghik.silencer.silent import scala.concurrent.duration.FiniteDuration @@ -80,6 +81,7 @@ object RestartSink { * In order to skip this additional delay pass in `0`. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ + @silent def withBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -156,6 +158,7 @@ object RestartSink { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ + @silent def withBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala index 9cc5b015da..2c9ad4bbc3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSource.scala @@ -6,6 +6,7 @@ package akka.stream.javadsl import akka.NotUsed import akka.japi.function.Creator +import com.github.ghik.silencer.silent import scala.concurrent.duration.FiniteDuration @@ -72,6 +73,7 @@ object RestartSource { * In order to skip this additional delay pass in `0`. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ + @silent def withBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -142,6 +144,7 @@ object RestartSource { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ + @silent def withBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -206,6 +209,7 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. * */ + @silent def onFailuresWithBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, @@ -274,6 +278,7 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. * */ + @silent def onFailuresWithBackoff[T]( minBackoff: java.time.Duration, maxBackoff: java.time.Duration, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6c36e10aa3..9bf1d1b821 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -28,6 +28,9 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture import java.util.function.Supplier +import akka.util.unused +import com.github.ghik.silencer.silent + import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag @@ -44,7 +47,7 @@ object Source { /** * Create a `Source` with no elements. The result is the same as calling `Source.empty()` */ - def empty[T](clazz: Class[T]): Source[T, NotUsed] = empty[T]() + def empty[T](@unused clazz: Class[T]): Source[T, NotUsed] = empty[T]() /** * Create a `Source` which materializes a [[java.util.concurrent.CompletableFuture]] which controls what element @@ -225,6 +228,7 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ + @silent def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = Source.tick(initialDelay.asScala, interval.asScala, tick) @@ -1374,6 +1378,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels * */ + @silent def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = new Source(delegate.recoverWith(pf)) @@ -2048,6 +2053,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) @@ -2095,6 +2101,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWeightedWithin( maxWeight: Long, costFn: function.Function[Out, java.lang.Long], @@ -2156,6 +2163,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @silent def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] = delay(of.asScala, strategy) @@ -2201,6 +2209,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = dropWithin(d.asScala) @@ -2326,6 +2335,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels or timer fires */ + @silent def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = takeWithin(d.asScala) @@ -2874,6 +2884,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = initialTimeout(timeout.asScala) @@ -2906,6 +2917,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = completionTimeout(timeout.asScala) @@ -2940,6 +2952,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = idleTimeout(timeout.asScala) @@ -2974,6 +2987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = backpressureTimeout(timeout.asScala) @@ -3016,6 +3030,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) @@ -3421,6 +3436,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @silent def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] = initialDelay(delay.asScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 79cba02e77..7cd9e19fa6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -6,13 +6,17 @@ package akka.stream.javadsl import java.io.{ InputStream, OutputStream } import java.util.stream.Collector + import akka.japi.function import akka.stream.{ javadsl, scaladsl } import akka.stream.IOResult import akka.util.ByteString + import scala.concurrent.duration.FiniteDuration import java.util.concurrent.CompletionStage + import akka.NotUsed +import com.github.ghik.silencer.silent /** * Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams @@ -108,6 +112,7 @@ object StreamConverters { * * @param readTimeout the max time the read operation on the materialized InputStream should block */ + @silent def asInputStream(readTimeout: java.time.Duration): Sink[ByteString, InputStream] = { import akka.util.JavaDurationConverters._ asInputStream(readTimeout.asScala) @@ -183,6 +188,7 @@ object StreamConverters { * * @param writeTimeout the max time the write operation on the materialized OutputStream should block */ + @silent def asOutputStream(writeTimeout: java.time.Duration): javadsl.Source[ByteString, OutputStream] = { import akka.util.JavaDurationConverters._ asOutputStream(writeTimeout.asScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index bacc95c24b..740ee8d762 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -20,6 +20,8 @@ import java.util.Comparator import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import com.github.ghik.silencer.silent + import scala.reflect.ClassTag object SubFlow { @@ -676,6 +678,7 @@ class SubFlow[In, Out, Mat]( * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) @@ -723,6 +726,7 @@ class SubFlow[In, Out, Mat]( * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWeightedWithin( maxWeight: Long, costFn: function.Function[Out, java.lang.Long], @@ -784,6 +788,7 @@ class SubFlow[In, Out, Mat]( * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @silent def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = delay(of.asScala, strategy) @@ -829,6 +834,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = dropWithin(d.asScala) @@ -1043,6 +1049,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels or timer fires */ + @silent def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = takeWithin(d.asScala) @@ -1629,6 +1636,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = initialTimeout(timeout.asScala) @@ -1661,6 +1669,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = completionTimeout(timeout.asScala) @@ -1695,6 +1704,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = idleTimeout(timeout.asScala) @@ -1729,6 +1739,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = backpressureTimeout(timeout.asScala) @@ -1771,6 +1782,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) @@ -2138,6 +2150,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] = initialDelay(delay.asScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 98aa6b79bb..dc1438f4ba 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -17,6 +17,8 @@ import scala.concurrent.duration.FiniteDuration import java.util.Comparator import java.util.concurrent.CompletionStage +import com.github.ghik.silencer.silent + import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag @@ -663,6 +665,7 @@ class SubSource[Out, Mat]( * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) @@ -710,6 +713,7 @@ class SubSource[Out, Mat]( * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @silent def groupedWeightedWithin( maxWeight: Long, costFn: function.Function[Out, java.lang.Long], @@ -758,6 +762,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = dropWithin(d.asScala) @@ -874,6 +879,7 @@ class SubSource[Out, Mat]( * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @silent def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = delay(of.asScala, strategy) @@ -1023,6 +1029,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels or timer fires */ + @silent def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = takeWithin(d.asScala) @@ -1608,6 +1615,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = initialTimeout(timeout.asScala) @@ -1640,6 +1648,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = completionTimeout(timeout.asScala) @@ -1674,6 +1683,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = idleTimeout(timeout.asScala) @@ -1708,6 +1718,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = backpressureTimeout(timeout.asScala) @@ -1750,6 +1761,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) @@ -2113,6 +2125,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @silent def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = initialDelay(delay.asScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index df3dcc5914..f829239e61 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -25,10 +25,11 @@ import akka.io.Inet.SocketOption import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage -import javax.net.ssl.SSLContext +import javax.net.ssl.SSLContext import akka.annotation.{ ApiMayChange, InternalApi } import akka.stream.TLSProtocol.NegotiateNewSession +import com.github.ghik.silencer.silent object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { @@ -272,6 +273,8 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * * @see [[Tcp.bind()]] * Marked API-may-change to leave room for an improvement around the very long parameter list. + * + * Note: the half close parameter is currently ignored */ @ApiMayChange def bindTls( @@ -281,6 +284,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { negotiateNewSession: NegotiateNewSession, backlog: Int, options: JIterable[SocketOption], + @silent // FIXME unused #26689 halfClose: Boolean, idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c95fbddc34..47579dd43b 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -387,7 +387,7 @@ object Flow { LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), noAttrStage.shape).withAttributes(attrs) - case other => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape) + case _ => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape) } /** @@ -444,7 +444,7 @@ object Flow { */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])( combine: (M1, M2) => M): Flow[I, O, M] = - fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (in, out) => + fromGraph(GraphDSL.create(sink, source)(combine) { _ => (in, out) => FlowShape(in.in, out.out) }) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 0896ef75aa..6277a28154 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -282,7 +282,7 @@ object Framing { doParse() } else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) { // Found a match, mark start and end position and iterate if possible - indices += (previous, possibleMatchPos) + indices += (previous -> possibleMatchPos) nextPossibleMatch = possibleMatchPos + separatorBytes.size if (nextPossibleMatch == buffer.size || indices.isFull) { doParse() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index c999d54b34..afd9d1b3f2 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -784,7 +784,7 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea /** * Sets `eagerCancel` to `false`. */ - @deprecated("Use the constructor which also specifies the `eagerCancel` parameter") + @deprecated("Use the constructor which also specifies the `eagerCancel` parameter", "2.5.10") def this(outputPorts: Int, partitioner: T => Int) = this(outputPorts, partitioner, false) val in: Inlet[T] = Inlet[T]("Partition.in") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index 1b1e56cd17..8e0ea405a6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -86,9 +86,10 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) def id: Long } - private final case class Element(id: Long, elem: T) extends Event - private final case class Register(id: Long, demandCallback: AsyncCallback[Long]) extends Event - private final case class Deregister(id: Long) extends Event + // these 3 can't be final because of SI-4440 + private case class Element(id: Long, elem: T) extends Event + private case class Register(id: Long, demandCallback: AsyncCallback[Long]) extends Event + private case class Deregister(id: Long) extends Event final class InputState(signalDemand: AsyncCallback[Long]) { private var untilNextDemandSignal = DemandThreshold @@ -105,9 +106,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) } - final class MergedSourceLogic(_shape: Shape, producerCount: AtomicLong) - extends GraphStageLogic(_shape) - with OutHandler { + final class MergedSourceLogic(_shape: Shape) extends GraphStageLogic(_shape) with OutHandler { /* * Basically all merged messages are shared in this queue. Individual buffer sizes are enforced by tracking * demand per producer in the 'demands' Map. One twist here is that the same queue contains control messages, @@ -226,7 +225,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Sink[T, NotUsed]) = { val idCounter = new AtomicLong() - val logic: MergedSourceLogic = new MergedSourceLogic(shape, idCounter) + val logic: MergedSourceLogic = new MergedSourceLogic(shape) val sink = new GraphStage[SinkShape[T]] { val in: Inlet[T] = Inlet("MergeHub.in") @@ -372,11 +371,11 @@ private[akka] class BroadcastHub[T](bufferSize: Int) private sealed trait HubEvent private object RegistrationPending extends HubEvent - private final case class UnRegister(id: Long, previousOffset: Int, finalOffset: Int) extends HubEvent - private final case class Advance(id: Long, previousOffset: Int) extends HubEvent - private final case class NeedWakeup(id: Long, previousOffset: Int, currentOffset: Int) extends HubEvent - - private final case class Consumer(id: Long, callback: AsyncCallback[ConsumerEvent]) + // these 4 next classes can't be final because of SI-4440 + private case class UnRegister(id: Long, previousOffset: Int, finalOffset: Int) extends HubEvent + private case class Advance(id: Long, previousOffset: Int) extends HubEvent + private case class NeedWakeup(id: Long, previousOffset: Int, currentOffset: Int) extends HubEvent + private case class Consumer(id: Long, callback: AsyncCallback[ConsumerEvent]) private object Completed @@ -614,8 +613,9 @@ private[akka] class BroadcastHub[T](bufferSize: Int) private sealed trait ConsumerEvent private object Wakeup extends ConsumerEvent - private final case class HubCompleted(failure: Option[Throwable]) extends ConsumerEvent - private final case class Initialize(offset: Int) extends ConsumerEvent + // these two can't be final because of SI-4440 + private case class HubCompleted(failure: Option[Throwable]) extends ConsumerEvent + private case class Initialize(offset: Int) extends ConsumerEvent override def createLogicAndMaterializedValue( inheritedAttributes: Attributes): (GraphStageLogic, Source[T, NotUsed]) = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala index ed72ae004d..c9b757c70d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala @@ -11,10 +11,10 @@ import akka.NotUsed * left (first) or only the right (second) of two input values. */ object Keep { - private val _left = (l: Any, r: Any) => l - private val _right = (l: Any, r: Any) => r + private val _left = (l: Any, _: Any) => l + private val _right = (_: Any, r: Any) => r private val _both = (l: Any, r: Any) => (l, r) - private val _none = (l: Any, r: Any) => NotUsed + private val _none = (_: Any, _: Any) => NotUsed def left[L, R]: (L, R) => L = _left.asInstanceOf[(L, R) => L] def right[L, R]: (L, R) => R = _right.asInstanceOf[(L, R) => R] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala index bc29f7abfd..4b7cf295c4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala @@ -248,7 +248,7 @@ trait ScalaSessionAPI { */ def peerCertificates: List[Certificate] = try Option(session.getPeerCertificates).map(_.toList).getOrElse(Nil) - catch { case e: SSLPeerUnverifiedException => Nil } + catch { case _: SSLPeerUnverifiedException => Nil } /** * Scala API: Extract the Principal that the peer engine presented during @@ -256,7 +256,7 @@ trait ScalaSessionAPI { */ def peerPrincipal: Option[Principal] = try Option(session.getPeerPrincipal) - catch { case e: SSLPeerUnverifiedException => None } + catch { case _: SSLPeerUnverifiedException => None } } object ScalaSessionAPI { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index a458623317..e4b48ff53c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -6,10 +6,10 @@ package akka.stream.scaladsl import java.net.InetSocketAddress import java.util.concurrent.TimeoutException + import javax.net.ssl.SSLContext import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession - import akka.actor._ import akka.annotation.{ ApiMayChange, InternalApi } import akka.io.Inet.SocketOption @@ -18,8 +18,10 @@ import akka.stream.TLSProtocol.NegotiateNewSession import akka.stream._ import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout } -import akka.util.ByteString +import akka.util.{ unused, ByteString } import akka.{ Done, NotUsed } +import com.github.ghik.silencer.silent + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -117,6 +119,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { interface: String, port: Int, backlog: Int = 100, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = false, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = @@ -125,7 +128,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { IO(IoTcp)(system), new InetSocketAddress(interface, port), backlog, - options, + options.toList, halfClose, idleTimeout, bindShutdownTimeout, @@ -159,6 +162,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { interface: String, port: Int, backlog: Int = 100, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = false, idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = { @@ -192,6 +196,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { def outgoingConnection( remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = true, connectTimeout: Duration = Duration.Inf, @@ -203,7 +208,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { IO(IoTcp)(system), remoteAddress, localAddress, - options, + options.toList, halfClose, connectTimeout, settings.ioSettings)) @@ -263,6 +268,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { sslContext: SSLContext, negotiateNewSession: NegotiateNewSession, localAddress: Option[InetSocketAddress] = None, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, connectTimeout: Duration = Duration.Inf, idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { @@ -279,6 +285,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { remoteAddress: InetSocketAddress, createSSLEngine: () => SSLEngine, localAddress: Option[InetSocketAddress] = None, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, connectTimeout: Duration = Duration.Inf, idleTimeout: Duration = Duration.Inf, @@ -307,12 +314,12 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { sslContext: SSLContext, negotiateNewSession: NegotiateNewSession, backlog: Int = 100, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { - val tls = tlsWrapping.atop(TLS(sslContext, negotiateNewSession, TLSRole.server)).reversed - bind(interface, port, backlog, options, true, idleTimeout).map { incomingConnection => + bind(interface, port, backlog, options, halfClose = false, idleTimeout).map { incomingConnection => incomingConnection.copy(flow = incomingConnection.flow.join(tls)) } } @@ -325,6 +332,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { port: Int, createSSLEngine: () => SSLEngine, backlog: Int = 100, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf, verifySession: SSLSession => Try[Unit], @@ -355,6 +363,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { sslContext: SSLContext, negotiateNewSession: NegotiateNewSession, backlog: Int = 100, + @silent // Traversable deprecated in 2.13 options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = { bindTls(interface, port, sslContext, negotiateNewSession, backlog, options, idleTimeout) @@ -366,6 +375,6 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { } -final class TcpIdleTimeoutException(msg: String, timeout: Duration) +final class TcpIdleTimeoutException(msg: String, @unused timeout: Duration) extends TimeoutException(msg: String) with NoStackTrace // only used from a single stage diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 3f16ac3e2b..bed08d5db0 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -15,6 +15,7 @@ import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, Su import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } import akka.stream.scaladsl.GenericGraphWithChangedAttributes import akka.util.OptionVal +import akka.util.unused import akka.{ Done, NotUsed } import scala.annotation.tailrec @@ -37,14 +38,14 @@ import scala.concurrent.{ Future, Promise } abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { /** - * Grants eager access to materializer for special purposes. + * Grants access to the materializer before preStart of the graph stage logic is invoked. * * INTERNAL API */ @InternalApi private[akka] def createLogicAndMaterializedValue( inheritedAttributes: Attributes, - materializer: Materializer): (GraphStageLogic, M) = createLogicAndMaterializedValue(inheritedAttributes) + @unused materializer: Materializer): (GraphStageLogic, M) = createLogicAndMaterializedValue(inheritedAttributes) @throws(classOf[Exception]) def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) @@ -705,9 +706,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: setHandler(in, new Reading(in, n - pos, getHandler(in))((elem: T) => { result(pos) = elem pos += 1 - if (pos == n) andThen(result) - }, () => onClose(result.take(pos)))) - } else andThen(result) + if (pos == n) andThen(result.toSeq) + }, () => onClose(result.take(pos).toSeq))) + } else andThen(result.toSeq) } /** @@ -775,7 +776,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Caution: for n == 1 andThen is called after resetting the handler, for * other values it is called without resetting the handler. n MUST be positive. */ - private final class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)( + // can't be final because of SI-4440 + private class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)( andThen: T => Unit, onComplete: () => Unit) extends InHandler { @@ -923,7 +925,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: extends OutHandler { private var followUps: Emitting[T] = _ private var followUpsTail: Emitting[T] = _ - private def as[U] = this.asInstanceOf[Emitting[U]] protected def followUp(): Unit = { setHandler(out, previous) @@ -973,15 +974,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: next } - private def addFollowUps(e: Emitting[T]): Unit = - if (followUps == null) { - followUps = e.followUps - followUpsTail = e.followUpsTail - } else { - followUpsTail.followUps = e.followUps - followUpsTail = e.followUpsTail - } - /** * Dequeue `this` from the head of the queue, meaning that this object will * not be retained (setHandler will install the followUp). For this reason @@ -1104,11 +1096,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: sealed trait State // waiting for materialization completion or during dispatching of initially queued events - private final case class Pending(pendingEvents: List[Event]) extends State + // - can't be final because of SI-4440 + private case class Pending(pendingEvents: List[Event]) extends State // stream is initialized and so no threads can just send events without any synchronization overhead private case object Initialized extends State - // Event with feedback promise - private final case class Event(e: T, handlingPromise: Promise[Done]) + // Event with feedback promise - can't be final because of SI-4440 + private case class Event(e: T, handlingPromise: Promise[Done]) private[this] val NoPendingEvents = Pending(Nil) private[this] val currentState = new AtomicReference[State](NoPendingEvents) @@ -1540,7 +1533,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap * @param timerKey key of the scheduled timer */ @throws(classOf[Exception]) - protected def onTimer(timerKey: Any): Unit = () + protected def onTimer(@unused timerKey: Any): Unit = () // Internal hooks to avoid reliance on user calling super in postStop protected[stream] override def afterPostStop(): Unit = { diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala index 1e3af59d04..7cd7e1aa35 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala @@ -109,6 +109,7 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett } def buildHostnameVerifier(conf: SSLConfigSettings): HostnameVerifier = { + conf ne null // @unused unavailable val clazz: Class[HostnameVerifier] = if (config.loose.disableHostnameVerification) classOf[DisabledComplainingHostnameVerifier].asInstanceOf[Class[HostnameVerifier]] diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala index 5ce3cb5a9e..5348d1579b 100644 --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/SSLEngineConfigurator.scala @@ -21,6 +21,7 @@ final class DefaultSSLEngineConfigurator( enabledProtocols: Array[String], enabledCipherSuites: Array[String]) extends SSLEngineConfigurator { + config ne null // @unused unavailable def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine = { engine.setSSLParameters(sslContext.getDefaultSSLParameters) engine.setEnabledProtocols(enabledProtocols) diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index b06fc103d7..5ab5848ffb 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -25,7 +25,8 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "akka-protobuf", "akka-stream-typed", "akka-cluster-typed", - "akka - cluster - tools") + "akka-cluster-tools", + "akka-stream") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination") @@ -48,7 +49,7 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { silencerSettings ++ scoverageSettings ++ Seq( Compile / scalacOptions ++= ( - if (fatalWarningsFor(name.value)) Seq("-Xfatal-warnings") + if (!scalaVersion.value.startsWith("2.11") && fatalWarningsFor(name.value)) Seq("-Xfatal-warnings") else Seq.empty ), Test / scalacOptions --= testUndicipline, @@ -64,11 +65,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "-Yno-adapted-args") case Some((2, 12)) => disciplineScalacOptions - case Some((2, 11)) => - disciplineScalacOptions ++ Set("-language:existentials") -- Set( - "-Ywarn-extra-implicit", - "-Ywarn-unused:_", - "-Ypartial-unification") case _ => Nil }).toSeq,