Subscription timeouts not working #19980

This commit is contained in:
Johan Andrén 2019-07-05 08:19:46 +02:00 committed by GitHub
parent 62f639e054
commit 5fe12a3e9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 7 deletions

View file

@ -88,7 +88,16 @@ import org.reactivestreams.Subscriber
* subscription a VirtualProcessor would perform (and it also saves overhead).
*/
override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = {
val proc = new VirtualPublisher[In]
context.materializer match {
case am: ActorMaterializer =>
if (am.settings.subscriptionTimeoutSettings.mode != StreamSubscriptionTimeoutTerminationMode.noop)
am.scheduleOnce(am.settings.subscriptionTimeoutSettings.timeout, new Runnable {
def run(): Unit = proc.onSubscriptionTimeout(am)
})
case _ => // not possible to setup timeout
}
(proc, proc)
}
@ -110,8 +119,8 @@ import org.reactivestreams.Subscriber
context,
FanoutProcessorImpl.props(context.effectiveAttributes, actorMaterializer.settings))
val fanoutProcessor = new ActorProcessor[In, In](impl)
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
(fanoutProcessor, fanoutProcessor)
}