diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 55a0edd039..1a4b613160 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -194,6 +194,27 @@ class QueueSourceSpec extends AkkaSpec { queue.offer(1).onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } } + "not share future across materializations" in { + val source = Source.queue[String](1, OverflowStrategy.fail) + + val mat1subscriber = TestSubscriber.probe[String]() + val mat2subscriber = TestSubscriber.probe[String]() + val sourceQueue1 = source.to(Sink.fromSubscriber(mat1subscriber)).run() + val sourceQueue2 = source.to(Sink.fromSubscriber(mat2subscriber)).run() + + mat1subscriber.ensureSubscription() + mat2subscriber.ensureSubscription() + + mat1subscriber.request(1) + sourceQueue1.offer("hello") + mat1subscriber.expectNext("hello") + mat1subscriber.cancel() + sourceQueue1.watchCompletion pipeTo testActor + expectMsg(Done) + + sourceQueue2.watchCompletion().isCompleted should ===(false) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index c11a41476d..f11b8c494c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -20,9 +20,9 @@ final private[stream] class QueueSource[T](maxBuffer: Int, overflowStrategy: Ove val out = Outlet[T]("queueSource.out") override val shape: SourceShape[T] = SourceShape.of(out) - val completion = Promise[Done] override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val completion = Promise[Done] val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] { var buffer: Buffer[T] = _ var pendingOffer: Option[(T, Offered)] = None diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index ca6538f59a..3ac29119a7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -40,14 +40,14 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef, override def initialAttributes = Attributes.name("ConnectionSource") val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out) - private val connectionFlowsAwaitingInitialization = new AtomicLong() - // TODO: Timeout on bind override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ServerBinding]) = { val bindingPromise = Promise[ServerBinding] val logic = new TimerGraphStageLogic(shape) { implicit def self: ActorRef = stageActor.ref + + val connectionFlowsAwaitingInitialization = new AtomicLong() var listener: ActorRef = _ var unbindPromise = Promise[Unit]() diff --git a/project/MiMa.scala b/project/MiMa.scala index 4f4d9fd896..adba135c98 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -680,7 +680,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.model.EntityStreamSizeException"), // #19849 content negotiation fixes - ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException") + ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"), + + // #20009 internal and shouldn't have been public + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion") ) ) }