diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala index 116224384d..f64c9bc567 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala @@ -463,7 +463,7 @@ class TraversalBuilderSpec extends AkkaSpec { //TODO: Dummy test cases just for smoke-testing. Should be removed. "foo" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() import scala.concurrent.duration._ val graph = Source.repeat(1).take(10).toMat(Sink.fold(0)(_ + _))(Keep.right) @@ -472,7 +472,7 @@ class TraversalBuilderSpec extends AkkaSpec { } "islands 1" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() val sub = TestSubscriber.probe[Int]() val graph = Source.repeat(1).take(10).toMat(Sink.asPublisher(false))(Keep.right) @@ -484,7 +484,7 @@ class TraversalBuilderSpec extends AkkaSpec { } "islands 2" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() val pub = TestPublisher.probe[Int]() import scala.concurrent.duration._ @@ -503,7 +503,7 @@ class TraversalBuilderSpec extends AkkaSpec { } "islands 3" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() val sub = TestSubscriber.probe[Int]() Source .repeat(1) @@ -516,7 +516,7 @@ class TraversalBuilderSpec extends AkkaSpec { } "islands 4" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() val pub = TestPublisher.probe[Int]() import scala.concurrent.duration._ @@ -531,9 +531,9 @@ class TraversalBuilderSpec extends AkkaSpec { } "bidiflow1" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) - val flow1 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1)) - val flow2 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1)) + implicit val mat = PhasedFusingActorMaterializer() + val flow1 = Flow.fromGraph(fusing.Map((x: Int) ⇒ x + 1)) + val flow2 = Flow.fromGraph(fusing.Map((x: Int) ⇒ x + 1)) val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none) @@ -543,7 +543,7 @@ class TraversalBuilderSpec extends AkkaSpec { } "bidiflow reverse" in { - implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000)) + implicit val mat = PhasedFusingActorMaterializer() val flow1 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1)) val flow2 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1)) diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index 2bebd64ae3..f2bc0a59c2 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -2,3 +2,12 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3") +# Various compiler warnings in streams #26399 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.PhasedFusingActorMaterializer.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ProcessorModulePhase.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.BoundedBuffer#DynamicQueue.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.TlsModulePhase.this") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.snapshot.MaterializerState.requestFromChild") diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 957f56aabe..e7a65a656e 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -626,7 +626,7 @@ final class IOSettings private (val tcpWriteBufferSize: Int) { def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value) - private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize): IOSettings = new IOSettings( + private def copy(tcpWriteBufferSize: Int): IOSettings = new IOSettings( tcpWriteBufferSize = tcpWriteBufferSize) override def equals(other: Any): Boolean = other match { diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index 82317e83c5..5732c798e9 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -166,7 +166,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { override def onPush(): Unit = { val elem = grab(in) if (matching(elem)) { - val d = updateInterval(elem) + val d = updateInterval() if (matched > 1) onInterval(d) @@ -176,7 +176,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { override def onPull(): Unit = pull(in) - private def updateInterval(in: T): FiniteDuration = { + private def updateInterval(): FiniteDuration = { matched += 1 val nowNanos = System.nanoTime() val d = nowNanos - prevNanos diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 8de2585045..f82dbcc698 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -151,7 +151,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa */ @InternalApi private[akka] object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = - Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) + Props(new StreamSupervisor(haveShutDown)).withDeploy(Deploy.local) .withDispatcher(settings.dispatcher) private[stream] val baseName = "StreamSupervisor" private val actorName = SeqActorName(baseName) @@ -173,7 +173,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa /** * INTERNAL API */ -@InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { +@InternalApi private[akka] class StreamSupervisor(haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 49b627b8d7..73b716bc36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -148,7 +148,7 @@ import akka.event.Logging } protected def completed: Actor.Receive = { - case OnSubscribe(subscription) ⇒ throw new IllegalStateException("onSubscribe called after onError or onComplete") + case OnSubscribe(_) ⇒ throw new IllegalStateException("onSubscribe called after onError or onComplete") } protected def inputOnError(e: Throwable): Unit = { @@ -229,7 +229,7 @@ import akka.event.Logging protected def downstreamRunning: Actor.Receive = { case SubscribePending ⇒ subscribePending(exposedPublisher.takePendingSubscribers()) - case RequestMore(subscription, elements) ⇒ + case RequestMore(_, elements) ⇒ if (elements < 1) { error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) } else { @@ -238,7 +238,7 @@ import akka.event.Logging downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded pump.pump() } - case Cancel(subscription) ⇒ + case Cancel(_) ⇒ downstreamCompleted = true exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException)) pump.pump() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index b50211c7bc..a3be9fffef 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -84,7 +84,7 @@ import org.reactivestreams.Subscription private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit = try shutdownReason match { - case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError + case Some(_: SpecViolation) ⇒ // ok, not allowed to call onError case Some(e) ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala index b02c697ec4..25b32cafa7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala @@ -176,7 +176,7 @@ private[akka] object Buffer { override def enqueue(elem: T): Unit = if (tail - head == FixedQueueSize) { - val queue = new DynamicQueue(head) + val queue = new DynamicQueue() while (nonEmpty) { queue.enqueue(dequeue()) } @@ -208,7 +208,7 @@ private[akka] object Buffer { } } - private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] { + private final class DynamicQueue() extends ju.LinkedList[T] with Buffer[T] { override def capacity = BoundedBuffer.this.capacity override def used = size override def isFull = size == capacity diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index 694179e0e3..ad422eb510 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -56,7 +56,6 @@ import scala.annotation.switch private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted - private var charsInObject = 0 private var completedObject = false private var inStringExpression = false private var isStartOfEscapeSequence = false @@ -140,7 +139,6 @@ import scala.annotation.switch depth -= 1 pos += 1 if (depth == 0) { - charsInObject = 0 completedObject = true } } else if (isWhitespace(input) && !inStringExpression) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index a014629612..5989fe1c4f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -57,16 +57,16 @@ import akka.util.OptionVal override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] + new ProcessorModulePhase().asInstanceOf[PhaseIsland[Any]] }, TlsModuleIslandTag → new Phase[Any] { def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new TlsModulePhase(effectiveAttributes, materializer, islandName).asInstanceOf[PhaseIsland[Any]] + new TlsModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, GraphStageTag → DefaultPhase) - @InternalApi private[akka] def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = { + @InternalApi private[akka] def apply()(implicit context: ActorRefFactory): ActorMaterializer = { val haveShutDown = new AtomicBoolean(false) val system = actorSystemOf(context) val materializerSettings = ActorMaterializerSettings(system) @@ -848,7 +848,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff /** * INTERNAL API */ -@InternalApi private[akka] final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) +@InternalApi private[akka] final class ProcessorModulePhase() extends PhaseIsland[Processor[Any, Any]] { override def name: String = "ProcessorModulePhase" private[this] var processor: Processor[Any, Any] = _ @@ -876,7 +876,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff /** * INTERNAL API */ -@InternalApi private[akka] final class TlsModulePhase(attributes: Attributes, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { +@InternalApi private[akka] final class TlsModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { def name: String = "TlsModulePhase" var tlsActor: ActorRef = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 8dec5e188c..f2eadb9f63 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -14,7 +14,6 @@ import akka.stream.stage._ import akka.stream.scaladsl.SourceQueueWithComplete import scala.compat.java8.FutureConverters._ import scala.concurrent.{ Future, Promise } -import scala.util.control.NonFatal /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 00665016f7..6ac3b03286 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -275,7 +275,7 @@ import scala.util.control.NonFatal case s: Subscription ⇒ if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher") if (!compareAndSet(s, EmptyPublisher)) onComplete() - case b @ Both(s) ⇒ + case _@ Both(s) ⇒ if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert") set(Inert) tryOnComplete(s) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 0ee0109fc4..472c1f2df2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -10,9 +10,9 @@ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } import akka.stream.scaladsl.Keep import akka.util.OptionVal + import scala.language.existentials import scala.collection.immutable.Map.Map1 - import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStages.SingleSource @@ -277,7 +277,6 @@ import akka.stream.impl.fusing.GraphStages.SingleSource */ @InternalApi private[impl] def printTraversal(t: Traversal, indent: Int = 0): Unit = { var current: Traversal = t - var slot = 0 def prindent(s: String): Unit = println(" | " * indent + s) @@ -1019,7 +1018,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource islandTag = OptionVal.None // islandTag is reset for the new enclosing builder ) - case OptionVal.Some(composite) ⇒ + case OptionVal.Some(_) ⇒ /* * In this case we need to assemble as much as we can, and create a new "sandwich" of * beforeBuilder ~ pendingBuilder ~ traversalSoFar @@ -1089,8 +1088,8 @@ import akka.stream.impl.fusing.GraphStages.SingleSource */ override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = this.islandTag match { - case OptionVal.Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag)) + case OptionVal.Some(_) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 0ddc37a1dc..9c58777df2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -316,7 +316,7 @@ import scala.util.control.NonFatal private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit = try shutdownReason match { - case OptionVal.Some(e: SpecViolation) ⇒ // ok, not allowed to call onError + case OptionVal.Some(_: SpecViolation) ⇒ // ok, not allowed to call onError case OptionVal.Some(e) ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, e) @@ -348,7 +348,6 @@ import scala.util.control.NonFatal // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) private var downstreamCompleted = false // when upstream failed before we got the exposed publisher - private var upstreamFailed: OptionVal[Throwable] = OptionVal.None private var upstreamCompleted: Boolean = false private def onNext(elem: Any): Unit = { @@ -369,7 +368,6 @@ import scala.util.control.NonFatal // No need to fail if had already been cancelled, or we closed earlier if (!(downstreamCompleted || upstreamCompleted)) { upstreamCompleted = true - upstreamFailed = OptionVal.Some(e) publisher.shutdown(Some(e)) if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 38b2b49076..cd7655e691 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -215,21 +215,6 @@ import scala.concurrent.{ Future, Promise } def monitor[T]: GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] = new MonitorFlow[T] - private object TickSource { - class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { - private val cancelPromise = Promise[Done]() - - def cancelFuture: Future[Done] = cancelPromise.future - - override def cancel(): Boolean = { - if (!isCancelled) cancelPromise.trySuccess(Done) - true - } - - override def isCancelled: Boolean = cancelled.get() - } - } - final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) @@ -440,7 +425,6 @@ import scala.concurrent.{ Future, Promise } (logic, promise.future) } - } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 49029e219c..8a96fb3a12 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1471,7 +1471,7 @@ private[stream] object Collect { override def genString(t: Materializer): String = { try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})" catch { - case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName) + case _: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 9b350b12ba..132c4371ee 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -695,7 +695,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource else setCallback(callback) - case m: /* Materialized */ AsyncCallback[Command @unchecked] ⇒ + case _: /* Materialized */ AsyncCallback[Command @unchecked] ⇒ failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 9662a6fd64..5a5c6b817c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -84,7 +84,7 @@ import scala.util.{ Failure, Success, Try } } catch { case closingException: Exception ⇒ result match { case Success(ioResult) ⇒ - val statusWithClosingException = ioResult.status.transform(d ⇒ Failure(closingException), ex ⇒ Failure(closingException.initCause(ex))) + val statusWithClosingException = ioResult.status.transform(_ ⇒ Failure(closingException), ex ⇒ Failure(closingException.initCause(ex))) completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException)) case Failure(ex) ⇒ completionPromise.tryFailure(closingException.initCause(ex)) } diff --git a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala index 751a3b2ea2..0fcda7a721 100644 --- a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala +++ b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala @@ -52,7 +52,7 @@ object MaterializerState { /** INTERNAL API */ @InternalApi - private[akka] def requestFromChild(child: ActorRef)(implicit ec: ExecutionContext): Future[StreamSnapshot] = { + private[akka] def requestFromChild(child: ActorRef): Future[StreamSnapshot] = { // FIXME arbitrary timeout implicit val timeout: Timeout = 10.seconds (child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot] diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 9577343e2b..db5f653d5e 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -493,7 +493,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) - val it = interpreter val elem = connection.slot // Fast path @@ -1146,8 +1145,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: // stage has stopped to fail incoming async callback invocations by being set to null private val asyncCallbacksInProgress = new AtomicReference[List[Promise[Done]]](Nil) - private def stopped = asyncCallbacksInProgress.get() == null - private var _stageActor: StageActor = _ final def stageActor: StageActor = _stageActor match { case null ⇒ throw StageActorRefNotInitializedException()