diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 997f3f55a8..78de8df9b1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -739,7 +739,7 @@ private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] object WrappedSubscription { sealed trait SubscriptionState { def demand: Long } case object PassThrough extends SubscriptionState { override def demand: Long = 0 } - final case class Buffering(demand: Long) extends SubscriptionState + case class Buffering(demand: Long) extends SubscriptionState val NoBufferedDemand = Buffering(0) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index fbae436eff..9b3653beeb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -49,7 +49,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val downstreamStatus = new AtomicReference[DownstreamStatus](Ok) - val logic = new GraphStageLogic(shape) with CallbackWrapper[(AdapterToStageMessage, Promise[Unit])] { + final class OutputStreamSourceLogic extends GraphStageLogic(shape) + with CallbackWrapper[(AdapterToStageMessage, Promise[Unit])] { + var flush: Option[Promise[Unit]] = None var close: Option[Promise[Unit]] = None @@ -148,6 +150,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration } } + val logic = new OutputStreamSourceLogic (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index aea652bb03..ac9c044db0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -146,6 +146,9 @@ private[stream] class TLSActor( private val transportInChoppingBlock = new ChoppingBlock(TransportIn, "TransportIn") transportInChoppingBlock.prepare(transportInBuffer) + var lastHandshakeStatus: HandshakeStatus = null + var corkUser = true + // The engine could also be instantiated in ActorMaterializerImpl but if creation fails // during materialization it would be worse than failing later on. val engine = @@ -189,9 +192,6 @@ private[stream] class TLSActor( * representing the Engine. */ - var lastHandshakeStatus: HandshakeStatus = _ - var corkUser = true - val engineNeedsWrap = new TransferState { def isReady = lastHandshakeStatus == NEED_WRAP def isCompleted = engine.isOutboundDone @@ -482,4 +482,4 @@ private[stream] object TlsUtils { newParameters.setWantClientAuth(old.getWantClientAuth) newParameters } -} \ No newline at end of file +}