Merge pull request #16996 from akka/wip-16751-rs-rc3-patriknw

=str #16751 Update to reactive-streams 1.0-RC3
This commit is contained in:
Patrik Nordwall 2015-03-05 17:59:34 +01:00
commit ff2c7425bc
79 changed files with 862 additions and 414 deletions

View file

@ -56,11 +56,11 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
an[BindFailedException] shouldBe thrownBy { Await.result(binding.to(Sink(probe2)).run(), 3.seconds) }
probe2.expectErrorOrSubscriptionFollowedByError()
probe2.expectSubscriptionAndError()
val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
an[BindFailedException] shouldBe thrownBy { Await.result(binding.to(Sink(probe3)).run(), 3.seconds) }
probe3.expectErrorOrSubscriptionFollowedByError()
probe3.expectSubscriptionAndError()
// Now unbind the first
Await.result(b1.unbind(), 1.second)

View file

@ -24,6 +24,5 @@ class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerificat
ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher")))
}
override def createHelperPublisher(elements: Long) =
createSimpleIntPublisher(elements)
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import org.testng.annotations.AfterClass
import akka.stream.testkit.AkkaSpec
import akka.event.Logging
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import org.testng.annotations.BeforeClass
trait ActorSystemLifecycle {
private var _system: ActorSystem = _
final def system: ActorSystem = _system
def shutdownTimeout: FiniteDuration = 10.seconds
@BeforeClass
def createActorSystem(): Unit = {
_system = ActorSystem(Logging.simpleName(getClass), AkkaSpec.testConf)
_system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception")))
}
@AfterClass
def shutdownActorSystem(): Unit = {
try {
system.shutdown()
system.awaitTermination(shutdownTimeout)
} catch {
case _: TimeoutException
val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout,
system.asInstanceOf[ActorSystemImpl].printTree)
throw new RuntimeException(msg)
}
}
}

View file

@ -3,51 +3,32 @@
*/
package akka.stream.tck
import akka.event.Logging
import scala.collection.{ mutable, immutable }
import akka.actor.ActorSystem
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
import akka.testkit.TestEvent
import org.reactivestreams.{ Subscriber, Subscription, Processor, Publisher }
import org.reactivestreams.tck.IdentityProcessorVerification
import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publisherShutdownTimeout: Long)
extends IdentityProcessorVerification[T](env, publisherShutdownTimeout)
with TestNGSuiteLike {
with TestNGSuiteLike with ActorSystemLifecycle {
system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception")))
def this(printlnDebug: Boolean) =
this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
}
def this(printlnDebug: Boolean) {
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
def this() = this(false)
override def createErrorStatePublisher(): Publisher[T] =
StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!"))
def createSimpleIntPublisher(elements: Long)(implicit mat: ActorFlowMaterializer): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Source(iterable).runWith(Sink.publisher())
}
def processorFromFlow[T](flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = {
def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = {
val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T](), Sink.publisher[T]())
new Processor[T, T] {
@ -61,4 +42,13 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env
/** By default Akka Publishers do not support Fanout! */
override def maxSupportedSubscribers: Long = 1L
override lazy val publisherExecutorService: ExecutorService =
Executors.newFixedThreadPool(3)
@AfterClass
def shutdownPublisherExecutorService(): Unit = {
publisherExecutorService.shutdown()
publisherExecutorService.awaitTermination(3, TimeUnit.SECONDS)
}
}

View file

@ -3,10 +3,9 @@
*/
package akka.stream.tck
import scala.collection.immutable
import akka.event.Logging
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.ActorFlowMaterializer
@ -16,31 +15,26 @@ import org.reactivestreams.Publisher
import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment }
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
import akka.actor.ActorSystemImpl
import java.util.concurrent.TimeoutException
abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
abstract class AkkaPublisherVerification[T](val env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[T](env, publisherShutdownTimeout)
with TestNGSuiteLike {
with TestNGSuiteLike with ActorSystemLifecycle {
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
}
def this(printlnDebug: Boolean) =
this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
def this(printlnDebug: Boolean) {
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() = this(false)
def this() {
this(false)
}
implicit val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system)
@AfterClass
def shutdownActorSystem(): Unit = {
system.shutdown()
system.awaitTermination(10.seconds)
}
implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system)
override def createErrorStatePublisher(): Publisher[T] =
StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!"))
def iterable(elements: Long): immutable.Iterable[Int] =
if (elements > Int.MaxValue)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements.toInt
}

View file

@ -20,57 +20,28 @@ import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment)
abstract class AkkaSubscriberBlackboxVerification[T](env: TestEnvironment)
extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike
with AkkaSubscriberVerificationLike {
with AkkaSubscriberVerificationLike with ActorSystemLifecycle {
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug))
}
def this(printlnDebug: Boolean) =
this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug))
def this(printlnDebug: Boolean) {
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
def this() = this(false)
}
abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, env: TestEnvironment)
abstract class AkkaSubscriberWhiteboxVerification[T](env: TestEnvironment)
extends SubscriberWhiteboxVerification[T](env) with TestNGSuiteLike
with AkkaSubscriberVerificationLike {
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug))
}
def this(printlnDebug: Boolean) =
this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug))
def this(printlnDebug: Boolean) {
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
def this() = this(false)
}
trait AkkaSubscriberVerificationLike {
implicit def system: ActorSystem
implicit val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
def createSimpleIntPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Source(iterable).runWith(Sink.publisher())
}
@AfterClass
def shutdownActorSystem(): Unit = {
system.shutdown()
system.awaitTermination(10.seconds)
}
implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
}

View file

@ -9,9 +9,8 @@ import org.reactivestreams.Subscriber
class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
new BlackholeSubscriber[Int](2)
override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2)
override def createHelperPublisher(elements: Long): Publisher[Int] = createSimpleIntPublisher(elements)
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class ConcatTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher())
}
// FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4
override def optional_spec111_maySupportMultiSubscribe(): Unit = ()
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import akka.stream.impl.EmptyPublisher
class EmptyPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = EmptyPublisher[Int]
override def maxElementsFromPublisher(): Long = 0
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
import akka.stream.FlattenStrategy
class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher())
}
// FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4
override def optional_spec111_maySupportMultiSubscribe(): Unit = ()
}

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl._
import org.reactivestreams.Subscriber
class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber())
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl._
import org.reactivestreams.Subscriber
class ForeachSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.foreach { _ }).runWith(Source.subscriber())
override def createElement(element: Int): Int = element
}

View file

@ -3,17 +3,13 @@
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import akka.stream.impl.Stages.Identity
import akka.stream.scaladsl.{ OperationAttributes, Flow }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }
import org.reactivestreams.Processor
class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
@ -25,10 +21,6 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
Flow[Int].andThen(Identity()).withAttributes(OperationAttributes.name("identity")))
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = ActorFlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
override def createElement(element: Int): Int = element
}

View file

@ -19,4 +19,4 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] {
}
override def maxElementsFromPublisher(): Long = 1
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.impl.EmptyPublisher
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class GroupByTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
if (elements == 0) EmptyPublisher[Int]
else {
val futureGroupSource =
Source(iterable(elements)).groupBy(elem "all").map { case (_, group) group }.runWith(Sink.head())
val groupSource = Await.result(futureGroupSource, 3.seconds)
groupSource.runWith(Sink.publisher())
}
// FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4
override def optional_spec111_maySupportMultiSubscribe(): Unit = ()
}

View file

@ -15,6 +15,5 @@ class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
new HeadSinkSubscriber[Int](Promise[Int]())
override def createHelperPublisher(elements: Long) =
createSimpleIntPublisher(elements)
override def createElement(element: Int): Int = element
}

View file

@ -10,18 +10,11 @@ import org.reactivestreams._
class IterablePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements.toInt
Source(iterable).runWith(Sink.publisher())
override def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements)).runWith(Sink.publisher())
}
override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue(): Unit = {
// FIXME: This test needs RC3
notVerified()
}
}
// FIXME #16983
override def required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(): Unit = ()
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
class LazyEmptySourceTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
Source.lazyEmpty[Int].runWith(Sink.publisher())
override def maxElementsFromPublisher(): Long = 0
}

View file

@ -3,8 +3,6 @@
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.scaladsl.{ Flow, OperationAttributes }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }
@ -13,8 +11,6 @@ import scala.concurrent.Future
class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
@ -25,10 +21,6 @@ class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] {
Flow[Int].mapAsync(Future.successful).withAttributes(OperationAttributes.name("identity")))
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = ActorFlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.{ Flow, OperationAttributes }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }
import scala.concurrent.Future
class MapAsyncUnorderedTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = ActorFlowMaterializer(settings)(system)
processorFromFlow(
Flow[Int].mapAsyncUnordered(Future.successful).withAttributes(OperationAttributes.name("identity")))
}
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Flow, OperationAttributes }
import org.reactivestreams.Processor
class MapTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
implicit val materializer = ActorFlowMaterializer()(system)
processorFromFlow(
Flow[Int].map(elem elem).withAttributes(OperationAttributes.name("identity")))
}
override def createElement(element: Int): Int = element
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class PrefixAndTailTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) tail }.runWith(Sink.head())
val tailSource = Await.result(futureTailSource, 3.seconds)
tailSource.runWith(Sink.publisher())
}
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class SingleElementSourceTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
Source.single(1).runWith(Sink.publisher())
override def maxElementsFromPublisher(): Long = 1
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.impl.EmptyPublisher
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class SplitWhenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] =
if (elements == 0) EmptyPublisher[Int]
else {
val futureSource = Source(iterable(elements)).splitWhen(elem false).runWith(Sink.head())
val source = Await.result(futureSource, 3.seconds)
source.runWith(Sink.publisher())
}
}

View file

@ -14,13 +14,12 @@ class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
if (elements >= 10000)
0 until 10000 // this publisher is not intended to be used for large collections
else
0 until elements.toInt
Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher())
}
override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("RS TCK 1.0.0.M3 does not handle sync publishers well")
}
}

View file

@ -14,7 +14,6 @@ object Timeouts {
def publisherShutdownTimeoutMillis: Int = 1000
def defaultTimeoutMillis(implicit system: ActorSystem): Int =
500.millis.dilated(system).toMillis.toInt
def defaultTimeoutMillis: Int = 500
}

View file

@ -3,8 +3,6 @@
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import akka.stream.impl.ActorFlowMaterializerImpl
import akka.stream.impl.Stages.Identity
@ -15,8 +13,6 @@ import org.reactivestreams.{ Processor, Publisher }
class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
@ -31,10 +27,6 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
processorFromFlow(Flow[Int].transform(mkStage))
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = ActorFlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
override def createElement(element: Int): Int = element
}

View file

@ -113,6 +113,23 @@ object StreamTestKit {
def expectError(cause: Throwable): Unit = probe.expectMsg(OnError(cause))
def expectError(): Throwable = probe.expectMsgType[OnError].cause
def expectSubscriptionAndError(cause: Throwable): Unit = {
val sub = expectSubscription()
sub.request(1)
expectError(cause)
}
def expectSubscriptionAndError(): Throwable = {
val sub = expectSubscription()
sub.request(1)
expectError()
}
def expectSubscriptionAndComplete(): Unit = {
val sub = expectSubscription()
sub.request(1)
expectComplete()
}
def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I] = {
probe.fishForMessage(hint = s"OnNext($element) or ${cause.getClass.getName}") {
case OnNext(n) true
@ -123,28 +140,6 @@ object StreamTestKit {
}
}
def expectErrorOrSubscriptionFollowedByError(cause: Throwable): Unit = {
val t = expectErrorOrSubscriptionFollowedByError()
assert(t == cause, s"expected $cause, found $cause")
}
def expectErrorOrSubscriptionFollowedByError(): Throwable =
probe.expectMsgPF() {
case s: OnSubscribe
s.subscription.request(1)
expectError()
case OnError(cause) cause
}
def expectCompletedOrSubscriptionFollowedByComplete(): Unit = {
probe.expectMsgPF() {
case s: OnSubscribe
s.subscription.request(1)
expectComplete()
case OnComplete
}
}
def expectNoMsg(): Unit = probe.expectNoMsg()
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)

View file

@ -53,39 +53,39 @@ abstract class TwoStreamsSetup extends AkkaSpec {
def commonTests() = {
"work with two immediately completed publishers" in {
val subscriber = setup(completedPublisher, completedPublisher)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
subscriber.expectSubscriptionAndComplete()
}
"work with two delayed completed publishers" in {
val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
subscriber.expectSubscriptionAndComplete()
}
"work with one immediately completed and one delayed completed publisher" in {
val subscriber = setup(completedPublisher, soonToCompletePublisher)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
subscriber.expectSubscriptionAndComplete()
}
"work with two immediately failed publishers" in {
val subscriber = setup(failedPublisher, failedPublisher)
subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber.expectSubscriptionAndError(TestException)
}
"work with two delayed failed publishers" in {
val subscriber = setup(soonToFailPublisher, soonToFailPublisher)
subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber.expectSubscriptionAndError(TestException)
}
// Warning: The two test cases below are somewhat implementation specific and might fail if the implementation
// is changed. They are here to be an early warning though.
"work with one immediately failed and one delayed failed publisher (case 1)" in {
val subscriber = setup(soonToFailPublisher, failedPublisher)
subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber.expectSubscriptionAndError(TestException)
}
"work with one immediately failed and one delayed failed publisher (case 2)" in {
val subscriber = setup(failedPublisher, soonToFailPublisher)
subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber.expectSubscriptionAndError(TestException)
}
}

View file

@ -169,7 +169,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
ref ! Err("early err")
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectError.getMessage should be("early err")
s.expectSubscriptionAndError.getMessage should be("early err")
}
"drop onNext elements after cancel" in {
@ -225,7 +225,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
ref ! Complete
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectComplete
s.expectSubscriptionAndComplete
}
"only allow one subscriber" in {
@ -236,7 +236,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
s.expectSubscription
val s2 = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s2)
s2.expectError.getClass should be(classOf[IllegalStateException])
s2.expectSubscriptionAndError.getClass should be(classOf[IllegalStateException])
}
"signal onCompete when actor is stopped" in {
@ -325,7 +325,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
// now subscribers will already be rejected, while the actor could perform some clean-up
val sub = StreamTestKit.SubscriberProbe()
pub.subscribe(sub)
sub.expectError()
sub.expectSubscriptionAndError()
expectMsg("cleaned-up")
// termination is tiggered by user code
@ -343,6 +343,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
// subscribe right away, should cancel subscription-timeout
pub.subscribe(sub)
sub.expectSubscription()
expectNoMsg()
}

View file

@ -294,7 +294,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
val serverConnection = server.waitAccept()
serverConnection.abort()
tcpReadProbe.subscriberProbe.expectErrorOrSubscriptionFollowedByError()
tcpReadProbe.subscriberProbe.expectSubscriptionAndError()
tcpWriteProbe.tcpWriteSubscription.expectCancellation()
serverConnection.expectTerminated()
@ -408,11 +408,11 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
val probe2 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]()
val binding2F = bind.to(Sink(probe2)).run()
probe2.expectErrorOrSubscriptionFollowedByError(BindFailedException)
probe2.expectSubscriptionAndError(BindFailedException)
val probe3 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]()
val binding3F = bind.to(Sink(probe3)).run()
probe3.expectErrorOrSubscriptionFollowedByError()
probe3.expectSubscriptionAndError()
an[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) }
an[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) }

View file

@ -35,7 +35,7 @@ class FlowFromFutureSpec extends AkkaSpec {
val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectError(ex)
c.expectSubscriptionAndError(ex)
}
"produce one element when Future is completed" in {
@ -119,4 +119,4 @@ class FlowFromFutureSpec extends AkkaSpec {
c.expectNoMsg(200.millis)
}
}
}
}

View file

@ -142,7 +142,7 @@ class FlowGroupBySpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
subscriber.expectSubscriptionAndComplete()
}
"abort on onError from upstream" in {

View file

@ -56,7 +56,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectCompletedOrSubscriptionFollowedByComplete()
c.expectSubscriptionAndComplete()
c.expectNoMsg(100.millis)
}
@ -167,7 +167,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
val p = createSource(iterable).runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectErrorOrSubscriptionFollowedByError().getMessage should be("no good iterator")
c.expectSubscriptionAndError().getMessage should be("no good iterator")
c.expectNoMsg(100.millis)
}
@ -181,7 +181,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec {
val p = createSource(iterable).runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectErrorOrSubscriptionFollowedByError().getMessage should be("no next")
c.expectSubscriptionAndError().getMessage should be("no next")
c.expectNoMsg(100.millis)
}
}

View file

@ -16,6 +16,7 @@ import akka.testkit.TestLatch
import akka.testkit.TestProbe
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream.impl.ReactiveStreamsCompliance
class FlowMapAsyncSpec extends AkkaSpec {
@ -137,6 +138,25 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}
"signal NPE when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b")).mapAsync(elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
}
"resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))(
_.mapAsync(elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
.to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
for (elem List("a", "c")) c.expectNext(elem)
c.expectComplete()
}
"should handle cancel properly" in {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()

View file

@ -17,6 +17,7 @@ import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.OnComplete
import akka.stream.impl.ReactiveStreamsCompliance
class FlowMapAsyncUnorderedSpec extends AkkaSpec {
@ -133,6 +134,25 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
c.probe.receiveN(5).toSet should be(expected)
}
"signal NPE when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b")).mapAsyncUnordered(elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
}
"resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))(
_.mapAsyncUnordered(elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
.to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
for (elem List("a", "c")) c.expectNext(elem)
c.expectComplete()
}
"should handle cancel properly" in {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()

View file

@ -33,7 +33,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
prefix should be(Nil)
val tailSubscriber = SubscriberProbe[Int]
tailFlow.to(Sink(tailSubscriber)).run()
tailSubscriber.expectComplete()
tailSubscriber.expectSubscriptionAndComplete()
}
"work on short input" in {
@ -43,7 +43,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
prefix should be(List(1, 2, 3))
val tailSubscriber = SubscriberProbe[Int]
tailFlow.to(Sink(tailSubscriber)).run()
tailSubscriber.expectComplete()
tailSubscriber.expectSubscriptionAndComplete()
}
"work on longer inputs" in {
@ -87,7 +87,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
tail.to(Sink(subscriber)).run()
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
subscriber.expectSubscriptionAndComplete()
}
"handle onError when no substream open" in {

View file

@ -510,7 +510,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val downstream2 = StreamTestKit.SubscriberProbe[String]()
publisher.subscribe(downstream2)
downstream2.expectError() should be(TestException)
downstream2.expectSubscriptionAndError() should be(TestException)
}
}
@ -524,7 +524,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val downstream2 = StreamTestKit.SubscriberProbe[Any]()
publisher.subscribe(downstream2)
// IllegalStateException shut down
downstream2.expectError().isInstanceOf[IllegalStateException] should be(true)
downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true)
}
}
}
@ -572,6 +572,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val downstream3 = StreamTestKit.SubscriberProbe[Any]()
publisher.subscribe(downstream3)
downstream3.expectSubscription()
// IllegalStateException terminated abruptly
checkError(downstream3)
} finally {

View file

@ -12,6 +12,7 @@ import scala.util.control.NoStackTrace
import scala.concurrent.Await
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.Supervision
import akka.stream.impl.ReactiveStreamsCompliance
class FlowSupervisionSpec extends AkkaSpec {
import OperationAttributes.supervisionStrategy
@ -38,5 +39,19 @@ class FlowSupervisionSpec extends AkkaSpec {
result should be(List(1, 2, 4, 5))
}
"complete stream with NPE failure when null is emitted" in {
intercept[NullPointerException] {
Await.result(Source(List("a", "b")).map(_ null).grouped(1000).runWith(Sink.head()), 3.seconds)
}.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
}
"resume stream when null is emitted" in {
val nullMap = Flow[String].map(elem if (elem == "b") null else elem)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
val result = Await.result(Source(List("a", "b", "c")).via(nullMap)
.grouped(1000).runWith(Sink.head()), 3.seconds)
result should be(List("a", "c"))
}
}
}

View file

@ -97,10 +97,10 @@ class GraphConcatSpec extends TwoStreamsSetup {
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and first nonempty publisher" in {
@ -113,7 +113,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft
if (!errorSignalled) subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and second nonempty publisher" in {
@ -126,7 +126,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft
if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft
if (!errorSignalled) subscriber.expectErrorOrSubscriptionFollowedByError(TestException)
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
}
"correctly handle async errors in secondary upstream" in {

View file

@ -580,7 +580,7 @@ class GraphFlexiMergeSpec extends AkkaSpec {
val s = SubscriberProbe[String]
p.subscribe(s)
s.expectErrorOrSubscriptionFollowedByError().getMessage should be("ERROR")
s.expectSubscriptionAndError().getMessage should be("ERROR")
}
"emit failure" in {

View file

@ -52,34 +52,34 @@ class GraphZipSpec extends TwoStreamsSetup {
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
val subscription2 = subscriber2.expectSubscriptionAndError(TestException)
}
}

View file

@ -73,34 +73,34 @@ class GraphZipWithSpec extends TwoStreamsSetup {
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
val subscription2 = subscriber2.expectSubscriptionAndError(TestException)
}
"zipWith a ETA expanded Person.apply (3 inputs)" in {

View file

@ -53,11 +53,11 @@ class SourceSpec extends AkkaSpec {
val p = Source.empty.runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
c.expectSubscriptionAndComplete()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectComplete()
c2.expectSubscriptionAndComplete()
}
}
@ -67,11 +67,11 @@ class SourceSpec extends AkkaSpec {
val p = Source.failed(ex).runWith(Sink.publisher())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectError(ex)
c.expectSubscriptionAndError(ex)
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectError(ex)
c2.expectSubscriptionAndError(ex)
}
}
@ -166,4 +166,4 @@ class SourceSpec extends AkkaSpec {
}
}
}
}

View file

@ -56,7 +56,7 @@ class TickSourceSpec extends AkkaSpec {
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
c2.expectError()
c2.expectSubscriptionAndError()
sub1.request(1)
c1.expectNext("tick")
c1.expectNoMsg(200.millis)

View file

@ -19,6 +19,8 @@ import akka.actor.UntypedActor
import concurrent.duration.Duration
import concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
import akka.stream.impl.CancelledSubscription
import akka.stream.impl.ReactiveStreamsCompliance._
object ActorPublisher {
@ -240,10 +242,9 @@ trait ActorPublisher[T] extends Actor {
super.aroundReceive(receive, msg)
} else {
demand += n
if (demand < 0 && lifecycleState == Active) // Long has overflown
onError(totalPendingDemandMustNotExceedLongMaxValueException)
else
super.aroundReceive(receive, msg)
if (demand < 0)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
super.aroundReceive(receive, msg)
}
case Subscribe(sub: Subscriber[_])
@ -253,11 +254,16 @@ trait ActorPublisher[T] extends Actor {
subscriber = sub
lifecycleState = Active
tryOnSubscribe(sub, new ActorPublisherSubscription(self))
case ErrorEmitted(cause) tryOnError(sub, cause)
case Completed tryOnComplete(sub)
case ErrorEmitted(cause)
tryOnSubscribe(sub, CancelledSubscription)
tryOnError(sub, cause)
case Completed
tryOnSubscribe(sub, CancelledSubscription)
tryOnComplete(sub)
case Active | Canceled
tryOnSubscribe(sub, CancelledSubscription)
tryOnError(sub,
if (subscriber eq sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException
if (subscriber == sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException
else ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException)
}
@ -337,8 +343,10 @@ private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publ
import ActorPublisher._
import ActorPublisher.Internal._
override def subscribe(sub: Subscriber[_ >: T]): Unit =
override def subscribe(sub: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(sub)
ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]])
}
}
/**

View file

@ -15,6 +15,7 @@ import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.UntypedActor
import akka.actor.DeadLetterSuppression
import akka.stream.impl.ReactiveStreamsCompliance
object ActorSubscriber {
@ -274,10 +275,19 @@ trait ActorSubscriber extends Actor {
*/
private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] {
import ActorSubscriberMessage._
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! OnError(cause)
}
override def onComplete(): Unit = impl ! OnComplete
override def onNext(element: T): Unit = impl ! OnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! ActorSubscriber.OnSubscribe(subscription)
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! OnNext(element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
impl ! ActorSubscriber.OnSubscribe(subscription)
}
}
/**

View file

@ -28,10 +28,19 @@ private[akka] object ActorProcessor {
*/
private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl)
with Processor[I, O] {
override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s)
override def onError(t: Throwable): Unit = impl ! OnError(t)
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
impl ! OnSubscribe(s)
}
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
impl ! OnError(t)
}
override def onComplete(): Unit = impl ! OnComplete
override def onNext(t: I): Unit = impl ! OnNext(t)
override def onNext(elem: I): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
impl ! OnNext(elem)
}
}
/**
@ -162,11 +171,12 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
private val _subreceive = new SubReceive(waitingExposedPublisher)
def enqueueOutputElement(elem: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
downstreamDemand -= 1
tryOnNext(subscriber, elem)
}
def complete(): Unit = {
override def complete(): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (exposedPublisher ne null) exposedPublisher.shutdown(None)
@ -174,7 +184,14 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
}
}
def cancel(e: Throwable): Unit = {
override def cancel(): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (exposedPublisher ne null) exposedPublisher.shutdown(None)
}
}
override def error(e: Throwable): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
@ -182,7 +199,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
}
}
def isClosed: Boolean = downstreamCompleted
override def isClosed: Boolean = downstreamCompleted
protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber)
@ -192,7 +209,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
subscriber = sub
tryOnSubscribe(subscriber, createSubscription())
} else
tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}"))
rejectAdditionalSubscriber(sub, s"${Logging.simpleName(this)}")
}
protected def waitingExposedPublisher: Actor.Receive = {
@ -208,14 +225,12 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements)
if (elements < 1) {
cancel(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
downstreamDemand += elements
if (downstreamDemand < 1) { // Long has overflown
cancel(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException)
}
pump.pump() // FIXME should this be called even on overflow, sounds like a bug to me
if (downstreamDemand < 1)
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
pump.pump()
}
case Cancel(subscription)
downstreamCompleted = true
@ -255,11 +270,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
protected def onError(e: Throwable): Unit = fail(e)
protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel()
primaryOutputs.cancel(e)
primaryOutputs.error(e)
context.stop(self)
}
@ -273,7 +287,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
override def postStop(): Unit = {
primaryInputs.cancel()
primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly"))
primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly"))
}
override def postRestart(reason: Throwable): Unit = {

View file

@ -47,7 +47,8 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
protected val wakeUpMsg: Any = SubscribePending
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
@tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(subscriber)
@tailrec def doSubscribe(): Unit = {
val current = pendingSubscribers.get
if (current eq null)
reportSubscribeFailure(subscriber)
@ -55,11 +56,11 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
if (pendingSubscribers.compareAndSet(current, subscriber +: current))
impl ! wakeUpMsg
else
doSubscribe(subscriber) // CAS retry
doSubscribe() // CAS retry
}
}
doSubscribe(subscriber)
doSubscribe()
}
def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = {
@ -82,11 +83,11 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
try shutdownReason match {
case Some(e: SpecViolation) // ok, not allowed to call onError
case Some(e)
if (shutdownReason eq ActorPublisher.NormalShutdownReason)
(new RuntimeException("BOOM")).printStackTrace()
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnError(subscriber, e)
case None tryOnComplete(subscriber)
case None
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnComplete(subscriber)
} catch {
case _: SpecViolation // nothing to do
}

View file

@ -19,15 +19,20 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscribe
private val subscription: AtomicReference[Subscription] = new AtomicReference(null)
override def onSubscribe(sub: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(sub)
if (subscription.compareAndSet(null, sub)) requestMore()
else sub.cancel()
}
override def onError(cause: Throwable): Unit = ()
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
()
}
override def onComplete(): Unit = ()
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
requested -= 1
requestMore()
}

View file

@ -4,6 +4,7 @@
package akka.stream.impl
import org.reactivestreams.{ Subscriber, Publisher }
import org.reactivestreams.Subscription
/**
* INTERNAL API
@ -11,7 +12,11 @@ import org.reactivestreams.{ Subscriber, Publisher }
private[akka] case object EmptyPublisher extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try tryOnComplete(subscriber) catch {
try {
requireNonNullSubscriber(subscriber)
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnComplete(subscriber)
} catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
@ -24,9 +29,33 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try tryOnError(subscriber, t) catch {
try {
requireNonNullSubscriber(subscriber)
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnError(subscriber, t)
} catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = name
}
/**
* INTERNAL API
* This is only a legal subscription when it is immediately followed by
* a termination signal (onComplete, onError).
*/
private[akka] case object CancelledSubscription extends Subscription {
override def request(elements: Long): Unit = ()
override def cancel(): Unit = ()
}
/**
* INTERNAL API
*/
private[akka] case object NullSubscriber extends Subscriber[Any] {
def onComplete(): Unit = ()
def onError(cause: Throwable): Unit = ()
def onNext(elem: Any): Unit = ()
def onSubscribe(s: Subscription): Unit = ()
}

View file

@ -25,10 +25,19 @@ private[akka] object FanIn {
final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression
private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(id, cause)
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! OnError(id, cause)
}
override def onComplete(): Unit = impl ! OnComplete(id)
override def onNext(element: T): Unit = impl ! OnNext(id, element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OnSubscribe(id, subscription)
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! OnNext(id, element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
impl ! OnSubscribe(id, subscription)
}
}
abstract class InputBunch(inputCount: Int, bufferSize: Int, pump: Pump) {
@ -226,17 +235,16 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings,
override def pumpFailed(e: Throwable): Unit = fail(e)
protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
inputBunch.cancel()
primaryOutputs.cancel(e)
primaryOutputs.error(e)
context.stop(self)
}
override def postStop(): Unit = {
inputBunch.cancel()
primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly"))
primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly"))
}
override def postRestart(reason: Throwable): Unit = {

View file

@ -25,9 +25,7 @@ private[akka] object FanOut {
final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression
class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription {
override def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else parent ! SubstreamRequestMore(id, elements)
override def request(elements: Long): Unit = parent ! SubstreamRequestMore(id, elements)
override def cancel(): Unit = parent ! SubstreamCancel(id)
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
}
@ -100,7 +98,7 @@ private[akka] object FanOut {
def error(output: Int, e: Throwable): Unit =
if (!errored(output) && !cancelled(output) && !completed(output)) {
outputs(output).cancel(e)
outputs(output).error(e)
errored(output) = true
unmarkOutput(output)
}
@ -217,9 +215,13 @@ private[akka] object FanOut {
}
case SubstreamRequestMore(id, demand)
if (marked(id) && !pending(id)) markedPending += 1
pending(id) = true
outputs(id).subreceive(RequestMore(null, demand))
if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError
error(id, ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
else {
if (marked(id) && !pending(id)) markedPending += 1
pending(id) = true
outputs(id).subreceive(RequestMore(null, demand))
}
case SubstreamCancel(id)
if (unmarkCancelled) {
unmarkOutput(id)
@ -256,7 +258,6 @@ private[akka] abstract class FanOut(val settings: ActorFlowMaterializerSettings,
override def pumpFailed(e: Throwable): Unit = fail(e)
protected def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel()

View file

@ -25,17 +25,20 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
override val subreceive = new SubReceive(waitingExposedPublisher)
def enqueueOutputElement(elem: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
downstreamBufferSpace -= 1
pushToDownstream(elem)
}
def complete(): Unit =
override def complete(): Unit =
if (!downstreamCompleted) {
downstreamCompleted = true
completeDownstream()
}
def cancel(e: Throwable): Unit = {
override def cancel(): Unit = complete()
override def error(e: Throwable): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
abortDownstream(e)
@ -105,11 +108,10 @@ private[akka] class FanoutProcessorImpl(
}
override def fail(e: Throwable): Unit = {
// FIXME: escalate to supervisor
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
primaryInputs.cancel()
primaryOutputs.cancel(e)
primaryOutputs.error(e)
// Stopping will happen after flush
}

View file

@ -121,7 +121,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality
if (subscribers.contains(subscriber))
rejectDuplicateSubscriber(subscriber)
else {
val subscription = new FutureSubscription(self)

View file

@ -141,7 +141,10 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
val future = f(elem)
submittedSeqNo += 1
val seqNo = submittedSeqNo
future.map(FutureElement(seqNo, _)).recover {
future.map { elem
ReactiveStreamsCompliance.requireNonNullElement(elem)
FutureElement(seqNo, elem)
}.recover {
case err: Throwable if decider(err) != Supervision.Stop
FutureElement(seqNo, RecoveredError(elem, err))
case err FutureFailure(err)

View file

@ -80,7 +80,10 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
try {
val future = f(elem)
inProgressCount += 1
future.map(FutureElement.apply).recover {
future.map { elem
ReactiveStreamsCompliance.requireNonNullElement(elem)
FutureElement(elem)
}.recover {
case err FutureFailure(elem, err)
}.pipeTo(self)
} catch {

View file

@ -17,11 +17,13 @@ private[stream] object ReactiveStreamsCompliance {
final val NumberOfElementsInRequestMustBePositiveMsg =
"The number of requested elements must be > 0 (see reactive-streams specification, rule 3.9)"
final val TotalPendingDemandMustNotExceedLongMaxValue =
"Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)"
final val SubscriberMustNotBeNullMsg = "Subscriber must not be null, rule 1.9"
final def totalPendingDemandMustNotExceedLongMaxValueException: Throwable =
new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue)
final val ExceptionMustNotBeNullMsg = "Exception must not be null, rule 2.13"
final val ElementMustNotBeNullMsg = "Element must not be null, rule 2.13"
final val SubscriptionMustNotBeNullMsg = "Subscription must not be null, rule 2.13"
final def numberOfElementsInRequestMustBePositiveException: Throwable =
new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg)
@ -29,18 +31,44 @@ private[stream] object ReactiveStreamsCompliance {
final def canNotSubscribeTheSameSubscriberMultipleTimesException: Throwable =
new IllegalStateException(CanNotSubscribeTheSameSubscriberMultipleTimes)
final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit =
final def subscriberMustNotBeNullException: Throwable =
new NullPointerException(SubscriberMustNotBeNullMsg)
final def exceptionMustNotBeNullException: Throwable =
new NullPointerException(ExceptionMustNotBeNullMsg)
final def elementMustNotBeNullException: Throwable =
new NullPointerException(ElementMustNotBeNullMsg)
final def subscriptionMustNotBeNullException: Throwable =
new NullPointerException(SubscriptionMustNotBeNullMsg)
final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit = {
// since it is already subscribed it has received the subscription first
// and we can emit onError immediately
tryOnError(subscriber, canNotSubscribeTheSameSubscriberMultipleTimesException)
}
final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: Publisher[T]): Unit =
final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: String): Unit = {
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnError(subscriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber"))
final def rejectDueToOverflow[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, totalPendingDemandMustNotExceedLongMaxValueException)
}
final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException)
final def requireNonNullSubscriber[T](subscriber: Subscriber[T]): Unit =
if (subscriber eq null) throw subscriberMustNotBeNullException
final def requireNonNullException(cause: Throwable): Unit =
if (cause eq null) throw exceptionMustNotBeNullException
final def requireNonNullElement[T](element: T): Unit =
if (element == null) throw elementMustNotBeNullException
final def requireNonNullSubscription(subscription: Subscription): Unit =
if (subscription == null) throw subscriptionMustNotBeNullException
@SerialVersionUID(1L)
sealed trait SpecViolation extends Throwable
@ -56,15 +84,18 @@ private[stream] object ReactiveStreamsCompliance {
}
}
final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit =
final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit = {
requireNonNullElement(element)
try subscriber.onNext(element) catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onNext", t)
}
}
final def tryOnSubscribe[T](subscriber: Subscriber[T], subscription: Subscription): Unit =
final def tryOnSubscribe[T](subscriber: Subscriber[T], subscription: Subscription): Unit = {
try subscriber.onSubscribe(subscription) catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onSubscribe", t)
}
}
final def tryOnComplete[T](subscriber: Subscriber[T]): Unit =
try subscriber.onComplete() catch {

View file

@ -82,12 +82,23 @@ object HeadSink {
/** INTERNAL API */
private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] {
private val sub = new AtomicReference[Subscription]
override def onSubscribe(s: Subscription): Unit =
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
if (!sub.compareAndSet(null, s)) s.cancel()
else s.request(1)
}
override def onNext(elem: In): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
p.trySuccess(elem)
sub.get.cancel()
}
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
p.tryFailure(t)
}
override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() }
override def onError(t: Throwable): Unit = p.tryFailure(t)
override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream"))
}

View file

@ -83,7 +83,7 @@ final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAtt
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { // FIXME Why does this have anything to do with Actors?
final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) =
future.value match {
case Some(Success(element))
@ -105,15 +105,12 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val p = Promise[Unit]()
// Not TCK verified as RC1 does not allow "empty publishers",
// reactive-streams on master now contains support for empty publishers.
// so we can enable it then, though it will require external completing of the promise
val pub = new Publisher[Unit] {
override def subscribe(s: Subscriber[_ >: Unit]) = {
requireNonNullSubscriber(s)
tryOnSubscribe(s, new Subscription {
override def request(n: Long): Unit = ()
override def cancel(): Unit = p.success(())
override def cancel(): Unit = p.trySuccess(())
})
p.future.onComplete {
case Success(_) tryOnComplete(s)
@ -136,7 +133,7 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { // FIXME Why does this have anything to do with Actors?
final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val cancelled = new AtomicBoolean(false)
@ -169,4 +166,4 @@ final class PropsSource[Out](props: Props, val attributes: OperationAttributes,
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, shape)
}
}

View file

@ -89,4 +89,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti
super.completeSubstreamOutput(substream)
}
override def cancelSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.cancelSubstreamOutput(substream)
}
}

View file

@ -114,13 +114,11 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
// FIXME Replace with OperateAsync
final case class MapAsync(f: Any Future[Any], attributes: OperationAttributes = mapAsync) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
//FIXME Should be OperateUnorderedAsync
final case class MapAsyncUnordered(f: Any Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
@ -133,14 +131,12 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
//FIXME should be `n: Long`
final case class Take(n: Int, attributes: OperationAttributes = take) extends StageModule {
final case class Take(n: Long, attributes: OperationAttributes = take) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
//FIXME should be `n: Long`
final case class Drop(n: Int, attributes: OperationAttributes = drop) extends StageModule {
final case class Drop(n: Long, attributes: OperationAttributes = drop) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}

View file

@ -35,6 +35,7 @@ private[akka] object MultiStreamOutputProcessor {
case object Open extends PublisherState
final case class Attached(sub: Subscriber[Any]) extends PublisherState
case object Completed extends CompletedState
case object Cancelled extends CompletedState
final case class Failed(e: Throwable) extends CompletedState
}
@ -57,13 +58,21 @@ private[akka] object MultiStreamOutputProcessor {
pump.pump()
}
override def cancel(e: Throwable): Unit = {
override def error(e: Throwable): Unit = {
if (!downstreamCompleted) {
closePublisher(Failed(e))
downstreamCompleted = true
}
}
override def cancel(): Unit = {
if (!downstreamCompleted) {
closePublisher(Cancelled)
subscriber = NullSubscriber // FIXME unreference real subscriber, should not be needed after #16986
downstreamCompleted = true
}
}
override def complete(): Unit = {
if (!downstreamCompleted) {
closePublisher(Completed)
@ -82,18 +91,21 @@ private[akka] object MultiStreamOutputProcessor {
private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match {
case Completed tryOnComplete(s)
case Cancelled // nothing to do
case Failed(e: SpecViolation) // nothing to do
case Failed(e) tryOnError(s, e)
}
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
requireNonNullSubscriber(s)
subscriptionTimeout.cancel()
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached
tryOnError(s, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
case _: Attached | Cancelled
rejectAdditionalSubscriber(s, "Substream publisher")
case c: CompletedState
tryOnSubscribe(s, CancelledSubscription)
closeSubscriber(s, c)
case Open
throw new IllegalStateException("Publisher cannot become open after being used before")
@ -106,7 +118,7 @@ private[akka] object MultiStreamOutputProcessor {
subscriber = s
tryOnSubscribe(subscriber, subscription)
} else
tryOnError(subscriber, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
rejectAdditionalSubscriber(s, "Substream publisher")
}
}
@ -133,10 +145,19 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
completeSubstreamOutput(substream)
cancelSubstreamOutput(substream)
pump()
}
protected def cancelSubstreamOutput(substream: SubstreamKey): Unit = {
substreamOutputs.get(substream) match {
case Some(sub)
sub.cancel()
substreamOutputs -= substream
case _ // ignore, already completed...
}
}
protected def completeSubstreamOutput(substream: SubstreamKey): Unit = {
substreamOutputs.get(substream) match {
case Some(sub)
@ -147,7 +168,7 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
protected def failOutputs(e: Throwable): Unit = {
substreamOutputs.values foreach (_.cancel(e))
substreamOutputs.values foreach (_.error(e))
}
protected def finishOutputs(): Unit = {
@ -155,10 +176,15 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
val outputSubstreamManagement: Receive = {
case SubstreamRequestMore(key, demand) substreamOutputs.get(key) match {
case Some(sub) sub.enqueueOutputDemand(demand)
case _ // ignore...
}
case SubstreamRequestMore(key, demand)
substreamOutputs.get(key) match {
case Some(sub)
if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError
sub.error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
else
sub.enqueueOutputDemand(demand)
case _ // ignore...
}
case SubstreamSubscribe(key, subscriber) substreamOutputs.get(key) match {
case Some(sub) sub.attachSubscriber(subscriber)
case _ // ignore...
@ -167,12 +193,13 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
case Some(sub) if !sub.isAttached() subscriptionTimedOut(sub)
case _ // ignore...
}
case SubstreamCancel(key) invalidateSubstreamOutput(key)
case SubstreamCancel(key)
invalidateSubstreamOutput(key)
}
override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match {
case s: SubstreamOutput
s.cancel(cause)
s.error(cause)
s.attachSubscriber(CancelingSubscriber)
case _ // ignore
}
@ -205,10 +232,19 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorFlowMate
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause)
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! OtherStreamOnError(cause)
}
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! OtherStreamOnNext(element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
impl ! OtherStreamOnSubscribe(subscription)
}
}
case object OtherStreamOnComplete extends DeadLetterSuppression
@ -264,10 +300,19 @@ private[akka] object MultiStreamInputProcessor {
case class SubstreamKey(id: Long)
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause)
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! SubstreamOnError(key, cause)
}
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element)
override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription)
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! SubstreamOnNext(key, element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
impl ! SubstreamStreamOnSubscribe(key, subscription)
}
}
case class SubstreamOnComplete(key: SubstreamKey)

View file

@ -16,10 +16,19 @@ object StreamSubscriptionTimeoutSupport {
* A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks.
*/
final case object CancelingSubscriber extends Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onError(t: Throwable): Unit = ()
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
s.cancel()
}
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
()
}
override def onComplete(): Unit = ()
override def onNext(t: Any): Unit = ()
override def onNext(elem: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
()
}
}
/**

View file

@ -113,40 +113,36 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
} else {
endOfStream match {
case eos @ (NotReached | Completed)
val demand = subscription.totalDemand + elements
//Check for overflow
if (demand < 1) {
try tryOnError(subscription.subscriber, totalPendingDemandMustNotExceedLongMaxValueException)
finally unregisterSubscriptionInternal(subscription)
} else {
subscription.totalDemand = demand
// returns Long.MinValue if the subscription is to be terminated
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
if (requested == 0) {
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
} else if (buffer.count(subscription) > 0) {
val goOn = try {
subscription.dispatch(buffer.read(subscription))
true
} catch {
case _: SpecViolation
unregisterSubscriptionInternal(subscription)
false
}
if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
else Long.MinValue
} else if (eos ne NotReached) Long.MinValue
else requested
val d = subscription.totalDemand + elements
// Long overflow, Reactive Streams Spec 3:17: effectively unbounded
val demand = if (d < 1) Long.MaxValue else d
subscription.totalDemand = demand
// returns Long.MinValue if the subscription is to be terminated
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
if (requested == 0) {
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
} else if (buffer.count(subscription) > 0) {
val goOn = try {
subscription.dispatch(buffer.read(subscription))
true
} catch {
case _: SpecViolation
unregisterSubscriptionInternal(subscription)
false
}
if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
else Long.MinValue
} else if (eos ne NotReached) Long.MinValue
else requested
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
case Long.MinValue
eos(subscription.subscriber)
unregisterSubscriptionInternal(subscription)
case x
subscription.totalDemand = x
requestFromUpstreamIfRequired()
}
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
case Long.MinValue
eos(subscription.subscriber)
unregisterSubscriptionInternal(subscription)
case x
subscription.totalDemand = x
requestFromUpstreamIfRequired()
}
case ErrorCompleted(_) // ignore, the Subscriber might not have seen our error event yet
}
@ -227,7 +223,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
* Register a new subscriber.
*/
protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
case NotReached if subscriptions.exists(_.subscriber eq subscriber) ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
case NotReached if subscriptions.exists(_.subscriber == subscriber) ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
case NotReached addSubscription(subscriber)
case Completed if buffer.nonEmpty addSubscription(subscriber)
case eos eos(subscriber)

View file

@ -10,6 +10,7 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.stream.impl.ReactiveStreamsCompliance._
/**
* INTERNAL API
@ -56,10 +57,9 @@ private[akka] object SynchronousIterablePublisher {
rejectDueToNonPositiveDemand(subscriber)
} else {
pendingDemand += elements
if (pendingDemand < 1) { // According to Reactive Streams Spec 3:17, if we overflow 2^63-1, we need to yield onError
cancel()
rejectDueToOverflow(subscriber)
} else if (!pushing) {
if (pendingDemand < 1)
pendingDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
if (!pushing) {
// According to Reactive Streams Spec 3:3, we must prevent unbounded recursion
try {
pushing = true
@ -113,8 +113,10 @@ private[akka] final class SynchronousIterablePublisher[T](
import SynchronousIterablePublisher.IteratorSubscription
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(subscriber)
IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) Iterator.continually(throw t) })
}
override def toString: String = name
}

View file

@ -11,6 +11,7 @@ import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.actor.DeadLetterSuppression
import akka.event.Logging
/**
* INTERNAL API
@ -98,8 +99,8 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
handleFailure(numberOfElementsInRequestMustBePositiveException)
} else {
demand += elements
if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17
handleFailure(totalPendingDemandMustNotExceedLongMaxValueException)
if (demand < 0)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
}
case Cancel
@ -116,7 +117,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
subscriber = s
tryOnSubscribe(s, subscription)
case _
rejectAdditionalSubscriber(s, exposedPublisher)
rejectAdditionalSubscriber(s, s"${Logging.simpleName(this)}")
}
override def postStop(): Unit = {

View file

@ -69,7 +69,8 @@ private[akka] trait Outputs {
def demandCount: Long = -1L
def complete(): Unit
def cancel(e: Throwable): Unit
def cancel(): Unit
def error(e: Throwable): Unit
def isClosed: Boolean
def isOpen: Boolean = !isClosed
}

View file

@ -218,7 +218,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole
subscriber = sub
tryOnSubscribe(subscriber, new ActorSubscription(actor, subscriber))
} else
tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}"))
rejectAdditionalSubscriber(subscriber, s"${Logging.simpleName(this)}")
}
protected def waitingExposedPublisher: Actor.Receive = {
@ -238,11 +238,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole
fail(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
downstreamDemand += elements
// Long has overflown
if (downstreamDemand < 0) {
enter().finish()
fail(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException)
} else if (upstreamWaiting) {
if (downstreamDemand < 0)
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
if (upstreamWaiting) {
upstreamWaiting = false
enter().pull()
}

View file

@ -8,6 +8,7 @@ import scala.collection.breakOut
import scala.util.control.NonFatal
import akka.stream.stage._
import akka.stream.Supervision
import akka.stream.impl.ReactiveStreamsCompliance
// TODO:
// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions)
@ -221,6 +222,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
def run(): Unit
override def push(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (currentOp.holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull")
currentOp.allowedToPush = false
elementInFlight = elem
@ -244,6 +246,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
def isFinishing: Boolean = currentOp.terminationPending
override def pushAndFinish(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
@ -271,6 +274,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def isHolding: Boolean = currentOp.holding
override def pushAndPull(elem: Any): FreeDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (!currentOp.holding) throw new IllegalStateException("Cannot pushAndPull without holding first")
currentOp.holding = false
fork(Pushing, elem)
@ -301,6 +305,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
override def pushAndFinish(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
elementInFlight = elem
state = PushFinish
null
@ -493,6 +498,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def advance(): Unit = ()
override def push(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
activeOpIndex = entryPoint
super.push(elem)
execute()
@ -528,6 +534,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def pushAndPull(elem: Any): FreeDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
activeOpIndex = entryPoint
super.pushAndPull(elem)
execute()

View file

@ -75,8 +75,8 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out],
/**
* INTERNAL API
*/
private[akka] final case class Take[T](count: Int) extends PushStage[T, T] {
private var left: Int = count
private[akka] final case class Take[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
override def onPush(elem: T, ctx: Context[T]): Directive = {
left -= 1
@ -89,8 +89,8 @@ private[akka] final case class Take[T](count: Int) extends PushStage[T, T] {
/**
* INTERNAL API
*/
private[akka] final case class Drop[T](count: Int) extends PushStage[T, T] {
private var left: Int = count
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
override def onPush(elem: T, ctx: Context[T]): Directive =
if (left > 0) {
left -= 1

View file

@ -20,12 +20,16 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[
@volatile private var impl: Processor[I, O] = _
private val setVarFuture = implFuture.andThen { case Success(p) impl = p }
override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete {
case Success(x) tryOnSubscribe(x, s)
case Failure(_) s.cancel()
override def onSubscribe(s: Subscription): Unit = {
requireNonNullSubscription(s)
setVarFuture.onComplete {
case Success(x) tryOnSubscribe(x, s)
case Failure(_) s.cancel()
}
}
override def onError(t: Throwable): Unit = {
requireNonNullException(t)
if (impl eq null) setVarFuture.onSuccess { case p p.onError(t) }
else impl.onError(t)
}
@ -35,10 +39,16 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[
else impl.onComplete()
}
override def onNext(t: I): Unit = impl.onNext(t)
override def onNext(t: I): Unit = {
requireNonNullElement(t)
impl.onNext(t)
}
override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete {
case Success(x) x.subscribe(s)
case Failure(e) s.onError(e)
override def subscribe(s: Subscriber[_ >: O]): Unit = {
requireNonNullSubscriber(s)
setVarFuture.onComplete {
case Success(x) x.subscribe(s)
case Failure(e) s.onError(e)
}
}
}

View file

@ -133,10 +133,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
}
override def cancel(e: Throwable): Unit = {
override def error(e: Throwable): Unit = {
if (!closed && initialized) connection ! Abort
closed = true
}
override def complete(): Unit = {
if (!closed && initialized) {
closed = true
@ -144,10 +145,13 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
connection ! Close
else
connection ! ConfirmedClose
}
}
override def cancel(): Unit = complete()
override def enqueueOutputElement(elem: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
connection ! Write(elem.asInstanceOf[ByteString], WriteAck)
pendingDemand = false
}
@ -217,9 +221,9 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
tcpInputs.cancel()
tcpOutputs.cancel(e)
tcpOutputs.error(e)
primaryInputs.cancel()
primaryOutputs.cancel(e)
primaryOutputs.error(e)
}
def tryShutdown(): Unit =

View file

@ -90,8 +90,10 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
val ex = BindFailedException
localAddressPromise.failure(ex)
unbindPromise.success(() Future.successful(()))
try tryOnError(flowSubscriber, ex)
finally fail(ex)
try {
tryOnSubscribe(flowSubscriber, CancelledSubscription)
tryOnError(flowSubscriber, ex)
} finally fail(ex)
}
def running: Receive = {
@ -159,6 +161,6 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage)
incomingConnections.cancel()
primaryOutputs.cancel(e)
primaryOutputs.error(e)
}
}

View file

@ -210,7 +210,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): javadsl.Flow[In, Out, Mat] =
def drop(n: Long): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.drop(n))
/**
@ -228,7 +228,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*/
def take(n: Int): javadsl.Flow[In, Out, Mat] =
def take(n: Long): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.take(n))
/**

View file

@ -333,7 +333,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): javadsl.Source[Out, Mat] =
def drop(n: Long): javadsl.Source[Out, Mat] =
new Source(delegate.drop(n))
/**
@ -350,7 +350,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
*
* @param n if `n` is zero or negative the stream will be completed without producing any elements.
*/
def take(n: Int): javadsl.Source[Out, Mat] =
def take(n: Long): javadsl.Source[Out, Mat] =
new Source(delegate.take(n))
/**

View file

@ -325,7 +325,7 @@ trait FlowOps[+Out, +Mat] {
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): Repr[Out, Mat] = andThen(Drop(n))
def drop(n: Long): Repr[Out, Mat] = andThen(Drop(n))
/**
* Discard the elements received within the given duration at beginning of the stream.
@ -355,7 +355,7 @@ trait FlowOps[+Out, +Mat] {
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*/
def take(n: Int): Repr[Out, Mat] = andThen(Take(n))
def take(n: Long): Repr[Out, Mat] = andThen(Take(n))
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -556,4 +556,4 @@ private[stream] object FlowOps {
def completedTransformer[T]: TransformerLike[T, T] = CompletedTransformer.asInstanceOf[TransformerLike[T, T]]
def identityTransformer[T]: TransformerLike[T, T] = IdentityTransformer.asInstanceOf[TransformerLike[T, T]]
}
}

View file

@ -36,17 +36,6 @@ final case class OperationAttributes private (attributes: List[OperationAttribut
if ((this eq OperationAttributes.none) || (this eq node.attributes)) node
else node.withAttributes(attributes = this and node.attributes)
/**
* Filtering out name attributes is needed for Vertex.newInstance().
* However there is an ongoing discussion for removing this feature,
* after which this will not be needed anymore.
*
* https://github.com/akka/akka/issues/16392
*/
private[akka] def withoutName = this.copy( // FIXME should return OperationAttributes.none if empty
attributes = attributes.filterNot { // FIXME should return the same instance if didn't have any Name
case attr: Name true
})
}
object OperationAttributes {

View file

@ -21,6 +21,7 @@ import akka.io.Inet.SocketOption
import akka.io.Tcp
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.scaladsl._
import akka.util.ByteString
import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription }
@ -92,6 +93,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension {
val publisher = new Publisher[IncomingConnection] {
override def subscribe(s: Subscriber[_ >: IncomingConnection]): Unit = {
requireNonNullSubscriber(s)
manager ! StreamTcpManager.Bind(
localAddressPromise,
unbindPromise,