Merge pull request #20035 from johanandren/wip-20009-unsafe-graph-stages-part-II-the-internals-johanandren

=str #20009 State shared over multiple materializations eliminated
This commit is contained in:
Johan Andrén 2016-03-15 16:05:14 +01:00
commit 1e6c6dafff
4 changed files with 28 additions and 4 deletions

View file

@ -194,6 +194,27 @@ class QueueSourceSpec extends AkkaSpec {
queue.offer(1).onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) }
}
"not share future across materializations" in {
val source = Source.queue[String](1, OverflowStrategy.fail)
val mat1subscriber = TestSubscriber.probe[String]()
val mat2subscriber = TestSubscriber.probe[String]()
val sourceQueue1 = source.to(Sink.fromSubscriber(mat1subscriber)).run()
val sourceQueue2 = source.to(Sink.fromSubscriber(mat2subscriber)).run()
mat1subscriber.ensureSubscription()
mat2subscriber.ensureSubscription()
mat1subscriber.request(1)
sourceQueue1.offer("hello")
mat1subscriber.expectNext("hello")
mat1subscriber.cancel()
sourceQueue1.watchCompletion pipeTo testActor
expectMsg(Done)
sourceQueue2.watchCompletion().isCompleted should ===(false)
}
}
}

View file

@ -20,9 +20,9 @@ final private[stream] class QueueSource[T](maxBuffer: Int, overflowStrategy: Ove
val out = Outlet[T]("queueSource.out")
override val shape: SourceShape[T] = SourceShape.of(out)
val completion = Promise[Done]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val completion = Promise[Done]
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] {
var buffer: Buffer[T] = _
var pendingOffer: Option[(T, Offered)] = None

View file

@ -40,14 +40,14 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef,
override def initialAttributes = Attributes.name("ConnectionSource")
val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out)
private val connectionFlowsAwaitingInitialization = new AtomicLong()
// TODO: Timeout on bind
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ServerBinding]) = {
val bindingPromise = Promise[ServerBinding]
val logic = new TimerGraphStageLogic(shape) {
implicit def self: ActorRef = stageActor.ref
val connectionFlowsAwaitingInitialization = new AtomicLong()
var listener: ActorRef = _
var unbindPromise = Promise[Unit]()

View file

@ -680,7 +680,10 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.model.EntityStreamSizeException"),
// #19849 content negotiation fixes
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException")
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"),
// #20009 internal and shouldn't have been public
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion")
)
)
}