=str: various minor cleanups

This commit is contained in:
André Rüdiger 2015-08-01 00:13:14 +02:00 committed by 2beaucoup
parent 0c6ed6d83d
commit 6af9ced35c
46 changed files with 188 additions and 231 deletions

View file

@ -5,6 +5,8 @@ package akka.stream.javadsl;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import org.junit.Test; import org.junit.Test;
import akka.stream.Attributes; import akka.stream.Attributes;
@ -22,7 +24,7 @@ public class AttributesTest {
Arrays.asList(new Attributes.Name("a"), new Attributes.Name("b")), Arrays.asList(new Attributes.Name("a"), new Attributes.Name("b")),
attributes.getAttributeList(Attributes.Name.class)); attributes.getAttributeList(Attributes.Name.class));
assertEquals( assertEquals(
Arrays.asList(new Attributes.InputBuffer(1, 2)), Collections.singletonList(new Attributes.InputBuffer(1, 2)),
attributes.getAttributeList(Attributes.InputBuffer.class)); attributes.getAttributeList(Attributes.InputBuffer.class));
} }

View file

@ -263,7 +263,7 @@ public class BidiFlowTest extends StreamTest {
final Future<List<Long>> r = result.second(); final Future<List<Long>> r = result.second();
assertEquals((Integer) 1, Await.result(l, oneSec)); assertEquals((Integer) 1, Await.result(l, oneSec));
assertEquals((Integer) 42, Await.result(m, oneSec)); assertEquals((Integer) 42, Await.result(m, oneSec));
final Long[] rr = Await.result(r, oneSec).toArray(new Long[0]); final Long[] rr = Await.result(r, oneSec).toArray(new Long[2]);
Arrays.sort(rr); Arrays.sort(rr);
assertArrayEquals(new Long[] { 3L, 12L }, rr); assertArrayEquals(new Long[] { 3L, 12L }, rr);
} }

View file

@ -3,8 +3,6 @@
*/ */
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.actor.ActorRef;
import akka.dispatch.japi;
import akka.japi.Pair; import akka.japi.Pair;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.japi.tuple.Tuple4; import akka.japi.tuple.Tuple4;
@ -63,7 +61,7 @@ public class FlowGraphTest extends StreamTest {
final Flow<String, String, BoxedUnit> f2 = final Flow<String, String, BoxedUnit> f2 =
Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f2"); Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f2");
@SuppressWarnings("unused") @SuppressWarnings("unused")
final Flow<String, String, BoxedUnit> f3 = final Flow<String, String, BoxedUnit> f3 =
Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f3"); Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f3");
final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c")); final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c"));
@ -282,6 +280,7 @@ public class FlowGraphTest extends StreamTest {
@Test @Test
public void mustBeAbleToUseMatValue() throws Exception { public void mustBeAbleToUseMatValue() throws Exception {
@SuppressWarnings("unused")
final Source<Integer, BoxedUnit> in1 = Source.single(1); final Source<Integer, BoxedUnit> in1 = Source.single(1);
final TestProbe probe = TestProbe.apply(system); final TestProbe probe = TestProbe.apply(system);

View file

@ -168,8 +168,8 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
ref ! Err("wrong") ref ! Err("wrong")
s.expectSubscription s.expectSubscription()
s.expectError.getMessage should be("wrong") s.expectError().getMessage should be("wrong")
} }
"not terminate after signalling onError" in { "not terminate after signalling onError" in {
@ -177,10 +177,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscription s.expectSubscription()
probe.watch(ref) probe.watch(ref)
ref ! Err("wrong") ref ! Err("wrong")
s.expectError.getMessage should be("wrong") s.expectError().getMessage should be("wrong")
probe.expectNoMsg(200.millis) probe.expectNoMsg(200.millis)
} }
@ -189,10 +189,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscription s.expectSubscription()
probe.watch(ref) probe.watch(ref)
ref ! ErrThenStop("wrong") ref ! ErrThenStop("wrong")
s.expectError.getMessage should be("wrong") s.expectError().getMessage should be("wrong")
probe.expectTerminated(ref, 3.seconds) probe.expectTerminated(ref, 3.seconds)
} }
@ -202,7 +202,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
ref ! Err("early err") ref ! Err("early err")
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscriptionAndError.getMessage should be("early err") s.expectSubscriptionAndError().getMessage should be("early err")
} }
"drop onNext elements after cancel" in { "drop onNext elements after cancel" in {
@ -246,7 +246,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
ref ! Produce("elem-1") ref ! Produce("elem-1")
ref ! Complete ref ! Complete
s.expectNext("elem-1") s.expectNext("elem-1")
s.expectComplete s.expectComplete()
} }
"not terminate after signalling onComplete" in { "not terminate after signalling onComplete" in {
@ -254,14 +254,14 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription val sub = s.expectSubscription()
sub.request(3) sub.request(3)
probe.expectMsg(TotalDemand(3)) probe.expectMsg(TotalDemand(3))
probe.watch(ref) probe.watch(ref)
ref ! Produce("elem-1") ref ! Produce("elem-1")
ref ! Complete ref ! Complete
s.expectNext("elem-1") s.expectNext("elem-1")
s.expectComplete s.expectComplete()
probe.expectNoMsg(200.millis) probe.expectNoMsg(200.millis)
} }
@ -270,14 +270,14 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription val sub = s.expectSubscription()
sub.request(3) sub.request(3)
probe.expectMsg(TotalDemand(3)) probe.expectMsg(TotalDemand(3))
probe.watch(ref) probe.watch(ref)
ref ! Produce("elem-1") ref ! Produce("elem-1")
ref ! CompleteThenStop ref ! CompleteThenStop
s.expectNext("elem-1") s.expectNext("elem-1")
s.expectComplete s.expectComplete()
probe.expectTerminated(ref, 3.seconds) probe.expectTerminated(ref, 3.seconds)
} }
@ -287,7 +287,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
ref ! Complete ref ! Complete
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscriptionAndComplete s.expectSubscriptionAndComplete()
} }
"only allow one subscriber" in { "only allow one subscriber" in {
@ -295,10 +295,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscription s.expectSubscription()
val s2 = TestSubscriber.manualProbe[String]() val s2 = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s2) ActorPublisher[String](ref).subscribe(s2)
s2.expectSubscriptionAndError.getClass should be(classOf[IllegalStateException]) s2.expectSubscriptionAndError().getClass should be(classOf[IllegalStateException])
} }
"signal onCompete when actor is stopped" in { "signal onCompete when actor is stopped" in {
@ -306,9 +306,9 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]() val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s) ActorPublisher[String](ref).subscribe(s)
s.expectSubscription s.expectSubscription()
ref ! PoisonPill ref ! PoisonPill
s.expectComplete s.expectComplete()
} }
"work together with Flow and ActorSubscriber" in { "work together with Flow and ActorSubscriber" in {

View file

@ -212,7 +212,7 @@ class StreamLayoutSpec extends AkkaSpec {
def getAllAtomic(module: Module): Set[Module] = { def getAllAtomic(module: Module): Set[Module] = {
val (atomics, composites) = module.subModules.partition(_.isAtomic) val (atomics, composites) = module.subModules.partition(_.isAtomic)
atomics ++ composites.map(getAllAtomic).flatten atomics ++ composites.flatMap(getAllAtomic)
} }
val allAtomic = getAllAtomic(topLevel) val allAtomic = getAllAtomic(topLevel)

View file

@ -67,7 +67,7 @@ class FramingSpec extends AkkaSpec {
.named("lineFraming") .named("lineFraming")
def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] = def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] =
for (prefix 0 until delimiter.size; s baseTestSequences) for (prefix delimiter.indices; s baseTestSequences)
yield delimiter.take(prefix) ++ s yield delimiter.take(prefix) ++ s
"work with various delimiters and test sequences" in { "work with various delimiters and test sequences" in {

View file

@ -340,8 +340,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
} }
"properly full-close if requested" in assertAllStagesStopped { "properly full-close if requested" in assertAllStagesStopped {
import system.dispatcher
val serverAddress = temporaryServerAddress() val serverAddress = temporaryServerAddress()
val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] =
Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right)
@ -362,8 +360,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
} }
"Echo should work even if server is in full close mode" in { "Echo should work even if server is in full close mode" in {
import system.dispatcher
val serverAddress = temporaryServerAddress() val serverAddress = temporaryServerAddress()
val binding = val binding =

View file

@ -1,6 +1,5 @@
package akka.stream.io package akka.stream.io
import java.net.InetSocketAddress
import java.security.KeyStore import java.security.KeyStore
import java.security.SecureRandom import java.security.SecureRandom
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
@ -15,7 +14,6 @@ import akka.actor.ActorSystem
import akka.pattern.{ after later } import akka.pattern.{ after later }
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.scaladsl.FlowGraph.Implicits._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
@ -60,7 +58,7 @@ object TlsSpec {
private var last: ByteString = _ private var last: ByteString = _
override def preStart(ctx: AsyncContext[ByteString, Unit]) = { override def preStart(ctx: AsyncContext[ByteString, Unit]) = {
val cb = ctx.getAsyncCallback() val cb = ctx.getAsyncCallback
system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher) system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher)
} }

View file

@ -45,7 +45,7 @@ class FlowAppendSpec extends AkkaSpec with River {
trait River { self: Matchers trait River { self: Matchers
val elements = (1 to 10) val elements = 1 to 10
val otherFlow = Flow[Int].map(_.toString) val otherFlow = Flow[Int].map(_.toString)
def riverOf[T](flowConstructor: Subscriber[T] Unit)(implicit system: ActorSystem) = { def riverOf[T](flowConstructor: Subscriber[T] Unit)(implicit system: ActorSystem) = {

View file

@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec {
.expand(seed = i i)(extrapolate = i (i, i)) .expand(seed = i i)(extrapolate = i (i, i))
.runFold(Set.empty[Int])(_ + _) .runFold(Set.empty[Int])(_ + _)
Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet) Await.result(future, 10.seconds) should contain theSameElementsAs (1 to 100).toSet
} }
"backpressure publisher when subscriber is slower" in { "backpressure publisher when subscriber is slower" in {

View file

@ -6,7 +6,7 @@ package akka.stream.scaladsl
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.{ OverflowStrategy, ActorMaterializer } import akka.stream.ActorMaterializer
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -16,7 +16,7 @@ class FlowFoldSpec extends AkkaSpec {
"A Fold" must { "A Fold" must {
val input = 1 to 100 val input = 1 to 100
val expected = input.fold(0)(_ + _) val expected = input.sum
val inputSource = Source(input).filter(_ true).map(identity) val inputSource = Source(input).filter(_ true).map(identity)
val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ true).map(identity) val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ true).map(identity)
val foldFlow = Flow[Int].filter(_ true).map(identity).fold(0)(_ + _).filter(_ true).map(identity) val foldFlow = Flow[Int].filter(_ true).map(identity).fold(0)(_ + _).filter(_ true).map(identity)

View file

@ -39,11 +39,11 @@ class FlowForeachSpec extends AkkaSpec {
Source(p).runForeach(testActor ! _) onFailure { Source(p).runForeach(testActor ! _) onFailure {
case ex testActor ! ex case ex testActor ! ex
} }
val proc = p.expectSubscription val proc = p.expectSubscription()
proc.expectRequest() proc.expectRequest()
val ex = new RuntimeException("ex") with NoStackTrace val rte = new RuntimeException("ex") with NoStackTrace
proc.sendError(ex) proc.sendError(rte)
expectMsg(ex) expectMsg(rte)
} }
"complete future with failure when function throws" in assertAllStagesStopped { "complete future with failure when function throws" in assertAllStagesStopped {

View file

@ -28,7 +28,7 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest {
"group with rest" in { "group with rest" in {
val testLen = random.nextInt(1, 16) val testLen = random.nextInt(1, 16)
def script = Script((TestConfig.RandomTestRange.map { _ randomTest(testLen) } :+ randomTest(1)): _*) def script = Script(TestConfig.RandomTestRange.map { _ randomTest(testLen) } :+ randomTest(1): _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(testLen))) TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(testLen)))
} }

View file

@ -39,7 +39,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec {
c.expectNext(1) c.expectNext(1)
c.expectNoMsg(100.millis) c.expectNoMsg(100.millis)
sub.request(2) sub.request(2)
c.expectError.getMessage should be("not two") c.expectError().getMessage should be("not two")
sub.request(2) sub.request(2)
c.expectNoMsg(100.millis) c.expectNoMsg(100.millis)
} }

View file

@ -28,7 +28,7 @@ object FlowMapAsyncSpec {
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = { override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = {
val future = f(elem) val future = f(elem)
val cb = ctx.getAsyncCallback() val cb = ctx.getAsyncCallback
future.onComplete(cb.invoke) future.onComplete(cb.invoke)
ctx.holdUpstream() ctx.holdUpstream()
} }
@ -132,7 +132,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
}).to(Sink(c)).run() }).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err1") c.expectError().getMessage should be("err1")
latch.countDown() latch.countDown()
} }
@ -151,7 +151,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
to(Sink(c)).run() to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be("err2") c.expectError().getMessage should be("err2")
latch.countDown() latch.countDown()
} }
@ -216,7 +216,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val p = Source(List("a", "b")).mapAsync(4)(elem Future.successful(null)).to(Sink(c)).run() val p = Source(List("a", "b")).mapAsync(4)(elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) c.expectError().getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
} }
"resume when future is completed with null" in { "resume when future is completed with null" in {

View file

@ -78,8 +78,8 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
//FIXME: Flow has no simple toString anymore //FIXME: Flow has no simple toString anymore
pending pending
val n = "Uppercase reverser" val n = "Uppercase reverser"
val f1 = Flow[String].map(_.toLowerCase()) val f1 = Flow[String].map(_.toLowerCase)
val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase()) val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase)
f1.via(f2).toString should include(n) f1.via(f2).toString should include(n)
} }

View file

@ -160,12 +160,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
source.subscribe(flowIn) source.subscribe(flowIn)
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription()
sub1.request(3) sub1.request(3)
c1.expectNext("1") c1.expectNext("1")
c1.expectNext("2") c1.expectNext("2")
c1.expectNext("3") c1.expectNext("3")
c1.expectComplete c1.expectComplete()
} }
"materialize into Publisher/Subscriber and transformation processor" in { "materialize into Publisher/Subscriber and transformation processor" in {
@ -174,7 +174,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val c1 = TestSubscriber.manualProbe[String]() val c1 = TestSubscriber.manualProbe[String]()
flowOut.subscribe(c1) flowOut.subscribe(c1)
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription()
sub1.request(3) sub1.request(3)
c1.expectNoMsg(200.millis) c1.expectNoMsg(200.millis)
@ -184,7 +184,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
c1.expectNext("1") c1.expectNext("1")
c1.expectNext("2") c1.expectNext("2")
c1.expectNext("3") c1.expectNext("3")
c1.expectComplete c1.expectComplete()
} }
"materialize into Publisher/Subscriber and multiple transformation processors" in { "materialize into Publisher/Subscriber and multiple transformation processors" in {
@ -193,7 +193,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val c1 = TestSubscriber.manualProbe[String]() val c1 = TestSubscriber.manualProbe[String]()
flowOut.subscribe(c1) flowOut.subscribe(c1)
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription()
sub1.request(3) sub1.request(3)
c1.expectNoMsg(200.millis) c1.expectNoMsg(200.millis)
@ -203,7 +203,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
c1.expectNext("elem-1") c1.expectNext("elem-1")
c1.expectNext("elem-2") c1.expectNext("elem-2")
c1.expectNext("elem-3") c1.expectNext("elem-3")
c1.expectComplete c1.expectComplete()
} }
"subscribe Subscriber" in { "subscribe Subscriber" in {
@ -213,12 +213,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
Source(publisher).to(sink).run() Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription()
sub1.request(3) sub1.request(3)
c1.expectNext("1") c1.expectNext("1")
c1.expectNext("2") c1.expectNext("2")
c1.expectNext("3") c1.expectNext("3")
c1.expectComplete c1.expectComplete()
} }
"perform transformation operation" in { "perform transformation operation" in {
@ -239,12 +239,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
Source(publisher).to(sink).run() Source(publisher).to(sink).run()
val sub1 = c1.expectSubscription val sub1 = c1.expectSubscription()
sub1.request(3) sub1.request(3)
c1.expectNext("1") c1.expectNext("1")
c1.expectNext("2") c1.expectNext("2")
c1.expectNext("3") c1.expectNext("3")
c1.expectComplete c1.expectComplete()
} }
"be materializable several times with fanout publisher" in assertAllStagesStopped { "be materializable several times with fanout publisher" in assertAllStagesStopped {
@ -258,26 +258,26 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
p2.subscribe(s2) p2.subscribe(s2)
p2.subscribe(s3) p2.subscribe(s3)
val sub1 = s1.expectSubscription val sub1 = s1.expectSubscription()
val sub2 = s2.expectSubscription val sub2 = s2.expectSubscription()
val sub3 = s3.expectSubscription val sub3 = s3.expectSubscription()
sub1.request(3) sub1.request(3)
s1.expectNext("1") s1.expectNext("1")
s1.expectNext("2") s1.expectNext("2")
s1.expectNext("3") s1.expectNext("3")
s1.expectComplete s1.expectComplete()
sub2.request(3) sub2.request(3)
sub3.request(3) sub3.request(3)
s2.expectNext("1") s2.expectNext("1")
s2.expectNext("2") s2.expectNext("2")
s2.expectNext("3") s2.expectNext("3")
s2.expectComplete s2.expectComplete()
s3.expectNext("1") s3.expectNext("1")
s3.expectNext("2") s3.expectNext("2")
s3.expectNext("3") s3.expectNext("3")
s3.expectComplete s3.expectComplete()
} }
"be covariant" in { "be covariant" in {

View file

@ -27,7 +27,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield the first value" in assertAllStagesStopped { "yield the first value" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]() val p = TestPublisher.manualProbe[Int]()
val f: Future[Int] = Source(p).map(identity).runWith(Sink.head) val f: Future[Int] = Source(p).map(identity).runWith(Sink.head)
val proc = p.expectSubscription val proc = p.expectSubscription()
proc.expectRequest() proc.expectRequest()
proc.sendNext(42) proc.sendNext(42)
Await.result(f, 100.millis) should be(42) Await.result(f, 100.millis) should be(42)
@ -41,7 +41,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
val (subscriber, future) = s.toMat(f)(Keep.both).run() val (subscriber, future) = s.toMat(f)(Keep.both).run()
p.subscribe(subscriber) p.subscribe(subscriber)
val proc = p.expectSubscription val proc = p.expectSubscription()
proc.expectRequest() proc.expectRequest()
proc.sendNext(42) proc.sendNext(42)
Await.result(future, 100.millis) should be(42) Await.result(future, 100.millis) should be(42)
@ -51,7 +51,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in assertAllStagesStopped { "yield the first error" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]() val p = TestPublisher.manualProbe[Int]()
val f = Source(p).runWith(Sink.head) val f = Source(p).runWith(Sink.head)
val proc = p.expectSubscription val proc = p.expectSubscription()
proc.expectRequest() proc.expectRequest()
val ex = new RuntimeException("ex") val ex = new RuntimeException("ex")
proc.sendError(ex) proc.sendError(ex)
@ -62,12 +62,12 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
"yield NoSuchElementExcption for empty stream" in assertAllStagesStopped { "yield NoSuchElementExcption for empty stream" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]() val p = TestPublisher.manualProbe[Int]()
val f = Source(p).runWith(Sink.head) val f = Source(p).runWith(Sink.head)
val proc = p.expectSubscription val proc = p.expectSubscription()
proc.expectRequest() proc.expectRequest()
proc.sendComplete() proc.sendComplete()
Await.ready(f, 100.millis) Await.ready(f, 100.millis)
f.value.get match { f.value.get match {
case Failure(e: NoSuchElementException) e.getMessage() should be("empty stream") case Failure(e: NoSuchElementException) e.getMessage should be("empty stream")
case x fail("expected NoSuchElementException, got " + x) case x fail("expected NoSuchElementException, got " + x)
} }
} }

View file

@ -174,7 +174,7 @@ trait ActorPublisher[T] extends Actor {
* otherwise `onNext` will throw `IllegalStateException`. * otherwise `onNext` will throw `IllegalStateException`.
*/ */
def onNext(element: T): Unit = lifecycleState match { def onNext(element: T): Unit = lifecycleState match {
case Active | PreSubscriber case Active | PreSubscriber | CompleteThenStop
if (demand > 0) { if (demand > 0) {
demand -= 1 demand -= 1
tryOnNext(subscriber, element) tryOnNext(subscriber, element)
@ -193,7 +193,7 @@ trait ActorPublisher[T] extends Actor {
* call [[#onNext]], [[#onError]] and [[#onComplete]]. * call [[#onNext]], [[#onError]] and [[#onComplete]].
*/ */
def onComplete(): Unit = lifecycleState match { def onComplete(): Unit = lifecycleState match {
case Active | PreSubscriber case Active | PreSubscriber | CompleteThenStop
lifecycleState = Completed lifecycleState = Completed
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber) finally subscriber = null try tryOnComplete(subscriber) finally subscriber = null
@ -226,8 +226,8 @@ trait ActorPublisher[T] extends Actor {
* call [[#onNext]], [[#onError]] and [[#onComplete]]. * call [[#onNext]], [[#onError]] and [[#onComplete]].
*/ */
def onError(cause: Throwable): Unit = lifecycleState match { def onError(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber case Active | PreSubscriber | CompleteThenStop
lifecycleState = ErrorEmitted(cause, false) lifecycleState = ErrorEmitted(cause, stop = false)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause) finally subscriber = null try tryOnError(subscriber, cause) finally subscriber = null
case _: ErrorEmitted case _: ErrorEmitted

View file

@ -310,7 +310,7 @@ private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Sub
private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider { private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider {
override def get(system: ActorSystem): ActorSubscriberState = super.get(system) override def get(system: ActorSystem): ActorSubscriberState = super.get(system)
override def lookup = ActorSubscriberState override def lookup() = ActorSubscriberState
override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = override def createExtension(system: ExtendedActorSystem): ActorSubscriberState =
new ActorSubscriberState new ActorSubscriberState

View file

@ -15,12 +15,9 @@ import akka.stream.impl.Junctions._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.ActorInterpreter import akka.stream.impl.fusing.ActorInterpreter
import akka.stream.impl.io.SslTlsCipherActor import akka.stream.impl.io.SslTlsCipherActor
import akka.stream.scaladsl._
import akka.stream._ import akka.stream._
import akka.stream.io._
import akka.stream.io.SslTls.TlsModule import akka.stream.io.SslTls.TlsModule
import akka.stream.stage.Stage import akka.stream.stage.Stage
import akka.util.ByteString
import org.reactivestreams._ import org.reactivestreams._
import scala.concurrent.{ Await, ExecutionContextExecutor } import scala.concurrent.{ Await, ExecutionContextExecutor }
@ -28,17 +25,14 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ActorMaterializerImpl( private[akka] case class ActorMaterializerImpl(val system: ActorSystem,
val system: ActorSystem, override val settings: ActorMaterializerSettings,
override val settings: ActorMaterializerSettings, dispatchers: Dispatchers,
dispatchers: Dispatchers, val supervisor: ActorRef,
val supervisor: ActorRef, val haveShutDown: AtomicBoolean,
val haveShutDown: AtomicBoolean, flowNameCounter: AtomicLong,
flowNameCounter: AtomicLong, namePrefix: String,
namePrefix: String, optimizations: Optimizations) extends ActorMaterializer {
optimizations: Optimizations)
extends ActorMaterializer {
import ActorMaterializerImpl._
import akka.stream.impl.Stages._ import akka.stream.impl.Stages._
override def shutdown(): Unit = override def shutdown(): Unit =
@ -103,7 +97,7 @@ private[akka] case class ActorMaterializerImpl(
case tls: TlsModule // TODO solve this so TlsModule doesn't need special treatment here case tls: TlsModule // TODO solve this so TlsModule doesn't need special treatment here
val es = effectiveSettings(effectiveAttributes) val es = effectiveSettings(effectiveAttributes)
val props = val props =
SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing, tls.hostInfo) SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo)
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
def factory(id: Int) = new ActorPublisher[Any](impl) { def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
@ -174,10 +168,10 @@ private[akka] case class ActorMaterializerImpl(
(FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets)
case BroadcastModule(shape, eagerCancel, _) case BroadcastModule(shape, eagerCancel, _)
(Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.length), shape.in, shape.outArray.toSeq)
case BalanceModule(shape, waitForDownstreams, _) case BalanceModule(shape, waitForDownstreams, _)
(Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) (Balance.props(effectiveSettings, shape.outArray.length, waitForDownstreams), shape.in, shape.outArray.toSeq)
case unzip: UnzipWithModule case unzip: UnzipWithModule
(unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
@ -237,7 +231,7 @@ private[akka] case class ActorMaterializerImpl(
*/ */
private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider {
override def get(system: ActorSystem): FlowNameCounter = super.get(system) override def get(system: ActorSystem): FlowNameCounter = super.get(system)
override def lookup = FlowNameCounter override def lookup() = FlowNameCounter
override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter
} }
@ -259,13 +253,13 @@ private[akka] object StreamSupervisor {
extends DeadLetterSuppression with NoSerializationVerificationNeeded extends DeadLetterSuppression with NoSerializationVerificationNeeded
/** Testing purpose */ /** Testing purpose */
final case object GetChildren case object GetChildren
/** Testing purpose */ /** Testing purpose */
final case class Children(children: Set[ActorRef]) final case class Children(children: Set[ActorRef])
/** Testing purpose */ /** Testing purpose */
final case object StopChildren case object StopChildren
/** Testing purpose */ /** Testing purpose */
final case object StoppedChildren case object StoppedChildren
} }
private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
@ -291,7 +285,6 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveSh
*/ */
private[akka] object ActorProcessorFactory { private[akka] object ActorProcessorFactory {
import akka.stream.impl.Stages._ import akka.stream.impl.Stages._
import ActorMaterializerImpl._
def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = { def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = {
val att = parentAttributes and op.attributes val att = parentAttributes and op.attributes
@ -309,17 +302,17 @@ private[akka] object ActorProcessorFactory {
case Collect(pf, _) interp(fusing.Collect(pf, settings.supervisionDecider)) case Collect(pf, _) interp(fusing.Collect(pf, settings.supervisionDecider))
case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider)) case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider))
case Fold(z, f, _) interp(fusing.Fold(z, f, settings.supervisionDecider)) case Fold(z, f, _) interp(fusing.Fold(z, f, settings.supervisionDecider))
case Recover(pf, _) (ActorInterpreter.props(settings, List(fusing.Recover(pf)), materializer, att), ()) case Recover(pf, _) interp(fusing.Recover(pf))
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ()) case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider))
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ()) case Expand(s, f, _) interp(fusing.Expand(s, f))
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ()) case Conflate(s, f, _) interp(fusing.Conflate(s, f, settings.supervisionDecider))
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ()) case Buffer(n, s, _) interp(fusing.Buffer(n, s))
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ()) case MapConcat(f, _) interp(fusing.MapConcat(f, settings.supervisionDecider))
case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) case MapAsync(p, f, _) interp(fusing.MapAsync(p, f, settings.supervisionDecider))
case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) case MapAsyncUnordered(p, f, _) interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider))
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) case Grouped(n, _) interp(fusing.Grouped(n))
case Sliding(n, step, _) (ActorInterpreter.props(settings, List(fusing.Sliding(n, step)), materializer, att), ()) case Sliding(n, step, _) interp(fusing.Sliding(n, step))
case Log(n, e, l, _) (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) case Log(n, e, l, _) interp(fusing.Log(n, e, l))
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ()) case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ()) case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ()) case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())

View file

@ -3,7 +3,6 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.Arrays
import akka.actor._ import akka.actor._
import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings }
import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriber.OnSubscribe
@ -101,7 +100,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
override def isClosed: Boolean = upstreamCompleted override def isClosed: Boolean = upstreamCompleted
private def clear(): Unit = { private def clear(): Unit = {
Arrays.fill(inputBuffer, 0, inputBuffer.length, null) java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0 inputBufferElements = 0
} }

View file

@ -9,8 +9,6 @@ import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber }
import akka.stream.scaladsl.FlexiMerge.MergeLogic import akka.stream.scaladsl.FlexiMerge.MergeLogic
import org.reactivestreams.{ Subscription, Subscriber } import org.reactivestreams.{ Subscription, Subscriber }
import scala.collection.immutable
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -105,7 +103,7 @@ private[akka] object FanIn {
def cancel(input: Int) = def cancel(input: Int) =
if (!cancelled(input)) { if (!cancelled(input)) {
inputs(input).cancel() inputs(input).cancel()
cancelled(input, true) cancelled(input, on = true)
unmarkInput(input) unmarkInput(input)
} }
@ -117,7 +115,7 @@ private[akka] object FanIn {
if (!marked(input)) { if (!marked(input)) {
if (depleted(input)) markedDepleted += 1 if (depleted(input)) markedDepleted += 1
if (pending(input)) markedPending += 1 if (pending(input)) markedPending += 1
marked(input, true) marked(input, on = true)
markCount += 1 markCount += 1
} }
} }
@ -126,7 +124,7 @@ private[akka] object FanIn {
if (marked(input)) { if (marked(input)) {
if (depleted(input)) markedDepleted -= 1 if (depleted(input)) markedDepleted -= 1
if (pending(input)) markedPending -= 1 if (pending(input)) markedPending -= 1
marked(input, false) marked(input, on = false)
markCount -= 1 markCount -= 1
} }
} }
@ -171,11 +169,11 @@ private[akka] object FanIn {
val elem = input.dequeueInputElement() val elem = input.dequeueInputElement()
if (!input.inputsAvailable) { if (!input.inputsAvailable) {
if (marked(id)) markedPending -= 1 if (marked(id)) markedPending -= 1
pending(id, false) pending(id, on = false)
} }
if (input.inputsDepleted) { if (input.inputsDepleted) {
if (marked(id)) markedDepleted += 1 if (marked(id)) markedDepleted += 1
depleted(id, true) depleted(id, on = true)
onDepleted(id) onDepleted(id)
} }
elem elem
@ -202,7 +200,7 @@ private[akka] object FanIn {
} }
val AnyOfMarkedInputs = new TransferState { val AnyOfMarkedInputs = new TransferState {
override def isCompleted: Boolean = (markedDepleted == markCount && markedPending == 0) override def isCompleted: Boolean = markedDepleted == markCount && markedPending == 0
override def isReady: Boolean = markedPending > 0 override def isReady: Boolean = markedPending > 0
} }
@ -222,15 +220,15 @@ private[akka] object FanIn {
inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription)) inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription))
case OnNext(id, elem) case OnNext(id, elem)
if (marked(id) && !pending(id)) markedPending += 1 if (marked(id) && !pending(id)) markedPending += 1
pending(id, true) pending(id, on = true)
inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem)) inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem))
case OnComplete(id) case OnComplete(id)
if (!pending(id)) { if (!pending(id)) {
if (marked(id) && !depleted(id)) markedDepleted += 1 if (marked(id) && !depleted(id)) markedDepleted += 1
depleted(id, true) depleted(id, on = true)
onDepleted(id) onDepleted(id)
} }
completed(id, true) completed(id, on = true)
inputs(id).subreceive(ActorSubscriberMessage.OnComplete) inputs(id).subreceive(ActorSubscriberMessage.OnComplete)
case OnError(id, e) case OnError(id, e)
onError(id, e) onError(id, e)

View file

@ -42,9 +42,9 @@ private[akka] class FlexiMergeImpl[T, S <: Shape](
} }
override protected val inputBunch = new FanIn.InputBunch(inputCount, settings.maxInputBufferSize, this) { override protected val inputBunch = new FanIn.InputBunch(inputCount, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = { override def onError(input: Int, t: Throwable): Unit = {
changeBehavior( changeBehavior(
try completion.onUpstreamFailure(ctx, inputMapping(input), e) try completion.onUpstreamFailure(ctx, inputMapping(input), t)
catch { catch {
case NonFatal(e) fail(e); mergeLogic.SameState case NonFatal(e) fail(e); mergeLogic.SameState
}) })

View file

@ -51,9 +51,9 @@ private[akka] class FlexiRouteImpl[T, S <: Shape](_settings: ActorMaterializerSe
} }
override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) { override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) {
override def onError(e: Throwable): Unit = { override def onError(t: Throwable): Unit = {
try completion.onUpstreamFailure(ctx, e) catch { case NonFatal(e) fail(e) } try completion.onUpstreamFailure(ctx, t) catch { case NonFatal(e) fail(e) }
fail(e) fail(t)
} }
override def onComplete(): Unit = { override def onComplete(): Unit = {

View file

@ -10,7 +10,7 @@ import akka.stream._
import org.reactivestreams._ import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Promise } import scala.concurrent.Promise
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
/** /**

View file

@ -3,14 +3,13 @@
*/ */
package akka.stream.impl package akka.stream.impl
import akka.event.{ LoggingAdapter, Logging } import akka.event.LoggingAdapter
import akka.stream.impl.SplitDecision.SplitDecision import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.impl.StreamLayout._ import akka.stream.impl.StreamLayout._
import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes } import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes }
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream.stage.Stage import akka.stream.stage.Stage
import org.reactivestreams.Processor import org.reactivestreams.Processor
import akka.event.Logging.simpleName
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future

View file

@ -3,7 +3,7 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.MaterializerSession.MaterializationPanic
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
@ -58,7 +58,8 @@ private[akka] object StreamLayout {
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}"
val (allIn, dupIn, allOut, dupOut) = val (allIn, dupIn, allOut, dupOut) =
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
case ((ai, di, ao, doo), m) (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts)) case ((ai, di, ao, doo), sm)
(ai ++ sm.inPorts, di ++ ai.intersect(sm.inPorts), ao ++ sm.outPorts, doo ++ ao.intersect(sm.outPorts))
} }
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
@ -73,7 +74,7 @@ private[akka] object StreamLayout {
n match { n match {
case Ignore Set.empty case Ignore Set.empty
case Transform(f, dep) atomics(dep) case Transform(f, dep) atomics(dep)
case Atomic(m) Set(m) case Atomic(module) Set(module)
case Combine(f, left, right) atomics(left) ++ atomics(right) case Combine(f, left, right) atomics(left) ++ atomics(right)
} }
val atomic = atomics(materializedValueComputation) val atomic = atomics(materializedValueComputation)

View file

@ -8,7 +8,6 @@ import akka.actor._
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
/** /**
* INTERNAL API * INTERNAL API
@ -48,7 +47,7 @@ private[akka] object MultiStreamOutputProcessor {
override def subreceive: SubReceive = override def subreceive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
def isAttached() = state.get().isInstanceOf[Attached] def isAttached = state.get().isInstanceOf[Attached]
def enqueueOutputDemand(demand: Long): Unit = { def enqueueOutputDemand(demand: Long): Unit = {
downstreamDemand += demand downstreamDemand += demand
@ -188,8 +187,8 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
case _ // ignore... case _ // ignore...
} }
case SubstreamSubscriptionTimeout(key) substreamOutputs.get(key) match { case SubstreamSubscriptionTimeout(key) substreamOutputs.get(key) match {
case Some(sub) if !sub.isAttached() subscriptionTimedOut(sub) case Some(sub) if !sub.isAttached subscriptionTimedOut(sub)
case _ // ignore... case _ // ignore...
} }
case SubstreamCancel(key) case SubstreamCancel(key)
invalidateSubstreamOutput(key) invalidateSubstreamOutput(key)

View file

@ -15,7 +15,7 @@ object StreamSubscriptionTimeoutSupport {
/** /**
* A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks. * A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks.
*/ */
final case object CancelingSubscriber extends Subscriber[Any] { case object CancelingSubscriber extends Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = { override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s) ReactiveStreamsCompliance.requireNonNullSubscription(s)
s.cancel() s.cancel()
@ -37,7 +37,7 @@ object StreamSubscriptionTimeoutSupport {
* Subscription timeout which does not start any scheduled events and always returns `true`. * Subscription timeout which does not start any scheduled events and always returns `true`.
* This specialized implementation is to be used for "noop" timeout mode. * This specialized implementation is to be used for "noop" timeout mode.
*/ */
final case object NoopSubscriptionTimeout extends Cancellable { case object NoopSubscriptionTimeout extends Cancellable {
override def cancel() = true override def cancel() = true
override def isCancelled = true override def isCancelled = true
} }
@ -79,11 +79,11 @@ private[akka] trait StreamSubscriptionTimeoutSupport {
target match { target match {
case p: Processor[_, _] case p: Processor[_, _]
log.debug("Cancelling {} Processor's publisher and subscriber sides (after {} ms)", p, millis) log.debug("Cancelling {} Processor's publisher and subscriber sides (after {} ms)", p, millis)
handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline ($millis) ms") with NoStackTrace)
case p: Publisher[_] case p: Publisher[_]
log.debug("Cancelling {} (after: {} ms)", p, millis) log.debug("Cancelling {} (after: {} ms)", p, millis)
handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher (${p}) you are trying to subscribe to has been shut-down " + handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher ($p) you are trying to subscribe to has been shut-down " +
s"because exceeding it's subscription-timeout.") with NoStackTrace) s"because exceeding it's subscription-timeout.") with NoStackTrace)
} }
} }

View file

@ -173,7 +173,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
val element = buffer.read(head) val element = buffer.read(head)
head.dispatch(element) head.dispatch(element)
head.totalDemand -= 1 head.totalDemand -= 1
dispatch(tail, true) dispatch(tail, sent = true)
} else dispatch(tail, sent) } else dispatch(tail, sent)
case _ sent case _ sent
} }

View file

@ -122,7 +122,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
} }
override def postStop(): Unit = { override def postStop(): Unit = {
tickTask.foreach(_.cancel) tickTask.foreach(_.cancel())
cancelled.set(true) cancelled.set(true)
if (exposedPublisher ne null) if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)

View file

@ -3,7 +3,6 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.LinkedList
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.TimerTransformer import akka.stream.TimerTransformer
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -46,7 +45,7 @@ private[akka] class TimerTransformerProcessorsImpl(
} }
val schedulerInputs: Inputs = new DefaultInputTransferStates { val schedulerInputs: Inputs = new DefaultInputTransferStates {
val queue = new LinkedList[Any] val queue = new java.util.LinkedList[Any]
override def dequeueInputElement(): Any = queue.removeFirst() override def dequeueInputElement(): Any = queue.removeFirst()
@ -76,6 +75,11 @@ private[akka] class TimerTransformerProcessorsImpl(
def isCompleted = false def isCompleted = false
} }
private val terminate = TransferPhase(Always) { ()
emits = transformer.onTermination(errorEvent)
emitAndThen(completedPhase)
}
private val running: TransferPhase = TransferPhase(RunningCondition) { () private val running: TransferPhase = TransferPhase(RunningCondition) { ()
if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) { if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) {
nextPhase(terminate) nextPhase(terminate)
@ -89,11 +93,6 @@ private[akka] class TimerTransformerProcessorsImpl(
} }
} }
private val terminate = TransferPhase(Always) { ()
emits = transformer.onTermination(errorEvent)
emitAndThen(completedPhase)
}
override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)"
} }

View file

@ -3,20 +3,17 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util.Arrays
import akka.actor._ import akka.actor._
import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Attributes, ActorMaterializer } import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Attributes, ActorMaterializer }
import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful } import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationSuccessful }
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import scala.util.control.NonFatal
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -101,7 +98,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
} }
private def clear(): Unit = { private def clear(): Unit = {
Arrays.fill(inputBuffer, 0, inputBuffer.length, null) java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0 inputBufferElements = 0
} }

View file

@ -21,7 +21,7 @@ private[akka] object OneBoundedInterpreter {
/** INTERNAL API */ /** INTERNAL API */
private[akka] sealed trait InitializationStatus private[akka] sealed trait InitializationStatus
/** INTERNAL API */ /** INTERNAL API */
private[akka] final case object InitializationSuccessful extends InitializationStatus private[akka] case object InitializationSuccessful extends InitializationStatus
/** INTERNAL API */ /** INTERNAL API */
private[akka] final case class InitializationFailed(failures: immutable.Seq[InitializationFailure]) extends InitializationStatus { private[akka] final case class InitializationFailed(failures: immutable.Seq[InitializationFailure]) extends InitializationStatus {
// exceptions are reverse ordered here, below methods help to avoid confusion when used from the outside // exceptions are reverse ordered here, below methods help to avoid confusion when used from the outside
@ -197,7 +197,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
private var lastOpFailing: Int = -1 private var lastOpFailing: Int = -1
private def pipeName(op: UntypedOp): String = { private def pipeName(op: UntypedOp): String = {
val o = (op: AbstractStage[_, _, _, _, _, _]) val o = op: AbstractStage[_, _, _, _, _, _]
(o match { (o match {
case Finished "finished" case Finished "finished"
case _: BoundaryStage "boundary" case _: BoundaryStage "boundary"
@ -219,7 +219,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
private def calculateJumpBacks: Array[Int] = { private def calculateJumpBacks: Array[Int] = {
val table = Array.ofDim[Int](pipeline.length) val table = Array.ofDim[Int](pipeline.length)
var nextJumpBack = -1 var nextJumpBack = -1
for (pos 0 until pipeline.length) { for (pos pipeline.indices) {
table(pos) = nextJumpBack table(pos) = nextJumpBack
if (!pipeline(pos).isInstanceOf[PushStage[_, _]]) nextJumpBack = pos if (!pipeline(pos).isInstanceOf[PushStage[_, _]]) nextJumpBack = pos
} }
@ -310,7 +310,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
null null
} }
override def getAsyncCallback(): AsyncCallback[Any] = { override def getAsyncCallback: AsyncCallback[Any] = {
val current = currentOp.asInstanceOf[AsyncStage[Any, Any, Any]] val current = currentOp.asInstanceOf[AsyncStage[Any, Any, Any]]
val context = current.context // avoid concurrent access (to avoid @volatile) val context = current.context // avoid concurrent access (to avoid @volatile)
new AsyncCallback[Any] { new AsyncCallback[Any] {

View file

@ -6,7 +6,6 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.LogLevels import akka.stream.Attributes.LogLevels
import akka.stream.Supervision.Resume
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.{ Supervision, _ } import akka.stream.{ Supervision, _ }
@ -68,7 +67,7 @@ private[akka] final case class DropWhile[T](p: T ⇒ Boolean, decider: Supervisi
override def decide(t: Throwable): Supervision.Directive = decider(t) override def decide(t: Throwable): Supervision.Directive = decider(t)
} }
private[akka] final object Collect { private[akka] object Collect {
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458 // Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
@ -336,12 +335,12 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = { override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (ctx.isFinishing) { if (ctx.isFinishing) {
val elem = buffer.dequeue().asInstanceOf[T] val elem = buffer.dequeue()
if (buffer.isEmpty) ctx.pushAndFinish(elem) if (buffer.isEmpty) ctx.pushAndFinish(elem)
else ctx.push(elem) else ctx.push(elem)
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue().asInstanceOf[T]) } else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
else if (buffer.isEmpty) ctx.holdDownstream() else if (buffer.isEmpty) ctx.holdDownstream()
else ctx.push(buffer.dequeue().asInstanceOf[T]) else ctx.push(buffer.dequeue())
} }
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective =
@ -350,37 +349,31 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
val enqueueAction: (DetachedContext[T], T) UpstreamDirective = { val enqueueAction: (DetachedContext[T], T) UpstreamDirective = {
overflowStrategy match { overflowStrategy match {
case DropHead { (ctx, elem) case DropHead (ctx, elem)
if (buffer.isFull) buffer.dropHead() if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem) buffer.enqueue(elem)
ctx.pull() ctx.pull()
} case DropTail (ctx, elem)
case DropTail { (ctx, elem)
if (buffer.isFull) buffer.dropTail() if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem) buffer.enqueue(elem)
ctx.pull() ctx.pull()
} case DropBuffer (ctx, elem)
case DropBuffer { (ctx, elem)
if (buffer.isFull) buffer.clear() if (buffer.isFull) buffer.clear()
buffer.enqueue(elem) buffer.enqueue(elem)
ctx.pull() ctx.pull()
} case DropNew (ctx, elem)
case DropNew { (ctx, elem)
if (!buffer.isFull) buffer.enqueue(elem) if (!buffer.isFull) buffer.enqueue(elem)
ctx.pull() ctx.pull()
} case Backpressure (ctx, elem)
case Backpressure { (ctx, elem)
buffer.enqueue(elem) buffer.enqueue(elem)
if (buffer.isFull) ctx.holdUpstream() if (buffer.isFull) ctx.holdUpstream()
else ctx.pull() else ctx.pull()
} case Fail (ctx, elem)
case Fail { (ctx, elem)
if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else { else {
buffer.enqueue(elem) buffer.enqueue(elem)
ctx.pull() ctx.pull()
} }
}
} }
} }
} }
@ -478,9 +471,9 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol
else ctx.absorbTermination() else ctx.absorbTermination()
} }
final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
final override def restart(): Expand[In, Out, Seed] = override def restart(): Expand[In, Out, Seed] =
throw new UnsupportedOperationException("Expand doesn't support restart") throw new UnsupportedOperationException("Expand doesn't support restart")
} }
@ -505,7 +498,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism) private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism)
override def preStart(ctx: AsyncContext[Out, Notification]): Unit = { override def preStart(ctx: AsyncContext[Out, Notification]): Unit = {
callback = ctx.getAsyncCallback() callback = ctx.getAsyncCallback
} }
override def decide(ex: Throwable) = decider(ex) override def decide(ex: Throwable) = decider(ex)
@ -554,7 +547,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
else ctx.ignore() else ctx.ignore()
} else ctx.fail(ex) } else ctx.fail(ex)
case (idx, s: Success[_]) case (idx, s: Success[_])
val ex = try { val exception = try {
ReactiveStreamsCompliance.requireNonNullElement(s.value) ReactiveStreamsCompliance.requireNonNullElement(s.value)
elemsInFlight.put(idx, s) elemsInFlight.put(idx, s)
null: Exception null: Exception
@ -565,7 +558,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
null: Exception null: Exception
} else ex } else ex
} }
if (ex != null) ctx.fail(ex) if (exception != null) ctx.fail(exception)
else if (ctx.isHoldingDownstream) rec() else if (ctx.isHoldingDownstream) rec()
else ctx.ignore() else ctx.ignore()
} }
@ -589,7 +582,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
private def todo = inFlight + buffer.used private def todo = inFlight + buffer.used
override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit = override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit =
callback = ctx.getAsyncCallback() callback = ctx.getAsyncCallback
override def decide(ex: Throwable) = decider(ex) override def decide(ex: Throwable) = decider(ex)

View file

@ -4,8 +4,6 @@
package akka.stream.impl.io package akka.stream.impl.io
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.security.Principal
import java.security.cert.Certificate
import javax.net.ssl.SSLEngineResult.HandshakeStatus import javax.net.ssl.SSLEngineResult.HandshakeStatus
import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.HandshakeStatus._
import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl.SSLEngineResult.Status._
@ -16,13 +14,8 @@ import akka.stream.impl.FanIn.InputBunch
import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl.FanOut.OutputBunch
import akka.stream.impl._ import akka.stream.impl._
import akka.util.ByteString import akka.util.ByteString
import akka.util.ByteStringBuilder
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable
import akka.stream.io._ import akka.stream.io._
import akka.event.LoggingReceive
/** /**
* INTERNAL API. * INTERNAL API.
@ -32,11 +25,11 @@ private[akka] object SslTlsCipherActor {
def props(settings: ActorMaterializerSettings, def props(settings: ActorMaterializerSettings,
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
tracing: Boolean,
role: Role, role: Role,
closing: Closing, closing: Closing,
hostInfo: Option[(String, Int)]): Props = hostInfo: Option[(String, Int)],
Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing, hostInfo)).withDeploy(Deploy.local) tracing: Boolean = false): Props =
Props(new SslTlsCipherActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local)
final val TransportIn = 0 final val TransportIn = 0
final val TransportOut = 0 final val TransportOut = 0
@ -49,8 +42,8 @@ private[akka] object SslTlsCipherActor {
* INTERNAL API. * INTERNAL API.
*/ */
private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslContext: SSLContext, private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslContext: SSLContext,
firstSession: NegotiateNewSession, tracing: Boolean, firstSession: NegotiateNewSession, role: Role, closing: Closing,
role: Role, closing: Closing, hostInfo: Option[(String, Int)]) hostInfo: Option[(String, Int)], tracing: Boolean)
extends Actor with ActorLogging with Pump { extends Actor with ActorLogging with Pump {
import SslTlsCipherActor._ import SslTlsCipherActor._
@ -113,7 +106,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
* not know that we are runnable. * not know that we are runnable.
*/ */
def putBack(b: ByteBuffer): Unit = def putBack(b: ByteBuffer): Unit =
if (b.hasRemaining()) { if (b.hasRemaining) {
if (tracing) log.debug(s"putting back ${b.remaining} bytes into $name") if (tracing) log.debug(s"putting back ${b.remaining} bytes into $name")
val bs = ByteString(b) val bs = ByteString(b)
if (bs.nonEmpty) buffer = bs ++ buffer if (bs.nonEmpty) buffer = bs ++ buffer
@ -156,12 +149,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
e e
} }
var currentSession = engine.getSession var currentSession = engine.getSession
var currentSessionParameters = firstSession applySessionParameters(firstSession)
applySessionParameters()
def applySessionParameters(): Unit = { def applySessionParameters(params: NegotiateNewSession): Unit = {
val csp = currentSessionParameters import params._
import csp._
enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray)) enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray))
enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray)) enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray))
clientAuth match { clientAuth match {
@ -175,11 +166,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
lastHandshakeStatus = engine.getHandshakeStatus lastHandshakeStatus = engine.getHandshakeStatus
} }
def setNewSessionParameters(n: NegotiateNewSession): Unit = { def setNewSessionParameters(params: NegotiateNewSession): Unit = {
if (tracing) log.debug(s"applying $n") if (tracing) log.debug(s"applying $params")
currentSession.invalidate() currentSession.invalidate()
currentSessionParameters = n applySessionParameters(params)
applySessionParameters()
corkUser = true corkUser = true
} }
@ -280,7 +270,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
} }
def completeOrFlush(): Unit = def completeOrFlush(): Unit =
if (engine.isOutboundDone()) nextPhase(completedPhase) if (engine.isOutboundDone) nextPhase(completedPhase)
else nextPhase(flushingOutbound) else nextPhase(flushingOutbound)
private def doInbound(isOutboundClosed: Boolean, inboundState: TransferState): Boolean = private def doInbound(isOutboundClosed: Boolean, inboundState: TransferState): Boolean =
@ -370,7 +360,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
userInChoppingBlock.putBack(userInBuffer) userInChoppingBlock.putBack(userInBuffer)
case CLOSED case CLOSED
flushToTransport() flushToTransport()
if (engine.isInboundDone()) nextPhase(completedPhase) if (engine.isInboundDone) nextPhase(completedPhase)
else nextPhase(awaitingClose) else nextPhase(awaitingClose)
case s fail(new IllegalStateException(s"unexpected status $s in doWrap()")) case s fail(new IllegalStateException(s"unexpected status $s in doWrap()"))
} }
@ -392,12 +382,12 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
handshakeFinished() handshakeFinished()
transportInChoppingBlock.putBack(transportInBuffer) transportInChoppingBlock.putBack(transportInBuffer)
case _ case _
if (transportInBuffer.hasRemaining()) doUnwrap() if (transportInBuffer.hasRemaining) doUnwrap()
else flushToUser() else flushToUser()
} }
case CLOSED case CLOSED
flushToUser() flushToUser()
if (engine.isOutboundDone()) nextPhase(completedPhase) if (engine.isOutboundDone) nextPhase(completedPhase)
else nextPhase(flushingOutbound) else nextPhase(flushingOutbound)
case BUFFER_UNDERFLOW case BUFFER_UNDERFLOW
flushToUser() flushToUser()

View file

@ -5,13 +5,12 @@ package akka.stream.impl.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.io.{ IO, Tcp } import akka.io.{ IO, Tcp }
import akka.stream.impl.io.StreamTcpManager.ExposedProcessor
import scala.concurrent.Promise import scala.concurrent.Promise
import akka.actor._ import akka.actor._
import akka.util.ByteString import akka.util.ByteString
import akka.io.Tcp._ import akka.io.Tcp._
import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorMaterializerSettings, StreamTcpException } import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, StreamTcpException }
import org.reactivestreams.{ Publisher, Processor } import org.reactivestreams.Processor
import akka.stream.impl._ import akka.stream.impl._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -248,7 +247,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti
case SubscriptionTimeout case SubscriptionTimeout
val millis = settings.subscriptionTimeoutSettings.timeout.toMillis val millis = settings.subscriptionTimeoutSettings.timeout.toMillis
if (!primaryOutputs.isSubscribed) { if (!primaryOutputs.isSubscribed) {
fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline ($millis) ms") with NoStackTrace)
context.stop(self) context.stop(self)
} }
} }
@ -305,7 +304,6 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B
_halfClose: Boolean, _halfClose: Boolean,
val connectCmd: Connect, _settings: ActorMaterializerSettings) val connectCmd: Connect, _settings: ActorMaterializerSettings)
extends TcpStreamActor(_settings, _halfClose) { extends TcpStreamActor(_settings, _halfClose) {
import TcpStreamActor._
import context.system import context.system
val initSteps = new SubReceive(waitingExposedProcessor) val initSteps = new SubReceive(waitingExposedProcessor)

View file

@ -121,7 +121,7 @@ object SslTls {
override def withAttributes(att: Attributes): Module = copy(attributes = att) override def withAttributes(att: Attributes): Module = copy(attributes = att)
override def carbonCopy: Module = { override def carbonCopy: Module = {
val mod = TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) val mod = TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
if (plainIn == shape.inlets(0)) mod if (plainIn == shape.inlets.head) mod
else mod.replaceShape(mod.shape.asInstanceOf[BidiShape[_, _, _, _]].reversed) else mod.replaceShape(mod.shape.asInstanceOf[BidiShape[_, _, _, _]].reversed)
} }
@ -158,7 +158,7 @@ object SslTlsPlacebo {
scaladsl.BidiFlow() { implicit b scaladsl.BidiFlow() { implicit b
// this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL
val session = SSLContext.getDefault.createSSLEngine.getSession val session = SSLContext.getDefault.createSSLEngine.getSession
val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(b) b }) val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) bytes })
val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _)))
BidiShape(top, bottom) BidiShape(top, bottom)
} }

View file

@ -5,18 +5,16 @@ package akka.stream.javadsl
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream._ import akka.stream._
import akka.japi.{ Util, Pair } import akka.japi.Pair
import akka.japi.function import akka.japi.function
import akka.stream.impl.Stages.Recover
import akka.stream.scaladsl import akka.stream.scaladsl
import akka.stream.scaladsl.{ Keep, Sink, Source } import org.reactivestreams.Processor
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.stage.Stage import akka.stream.stage.Stage
import akka.stream.impl.{ Stages, StreamLayout } import akka.stream.impl.StreamLayout
object Flow { object Flow {

View file

@ -150,12 +150,14 @@ object Balance {
/** /**
* Create a new `Balance` vertex with the specified input type. * Create a new `Balance` vertex with the specified input type.
*/ */
def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, false) def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] =
create(outputCount, waitForAllDownstreams = false)
/** /**
* Create a new `Balance` vertex with the specified input type. * Create a new `Balance` vertex with the specified input type.
*/ */
def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] =
create(outputCount)
/** /**
* Create a new `Balance` vertex with the specified input type. * Create a new `Balance` vertex with the specified input type.

View file

@ -4,11 +4,8 @@
package akka.stream.javadsl package akka.stream.javadsl
import akka.japi.function import akka.japi.function
import akka.stream.scaladsl
import akka.japi.Pair import akka.japi.Pair
import scala.runtime.BoxedUnit
object Keep { object Keep {
private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) Any) { def apply(l: Any, r: Any) = l } private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) Any) { def apply(l: Any, r: Any) = l }
private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) Any) { def apply(l: Any, r: Any) = r } private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) Any) { def apply(l: Any, r: Any) = r }

View file

@ -36,7 +36,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
* *
* The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed. * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
*/ */
def unbind(): Future[Unit] = delegate.unbind def unbind(): Future[Unit] = delegate.unbind()
} }
/** /**

View file

@ -214,9 +214,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
val outs = copy.shape.outlets val outs = copy.shape.outlets
new Flow(module new Flow(module
.compose(copy, combine) .compose(copy, combine)
.wire(shape.outlet, ins(0)) .wire(shape.outlet, ins.head)
.wire(outs(1), shape.inlet) .wire(outs(1), shape.inlet)
.replaceShape(FlowShape(ins(1), outs(0)))) .replaceShape(FlowShape(ins(1), outs.head)))
} }
/** /**
@ -365,7 +365,7 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module)
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
override def withAttributes(attr: Attributes): RunnableGraph[Mat] = override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableGraph(module.withAttributes(attr).nest) new RunnableGraph(module.withAttributes(attr).nest())
override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name))

View file

@ -3,8 +3,7 @@
*/ */
package akka.stream.stage package akka.stream.stage
import akka.event.{ Logging, LogSource } import akka.stream.{ Materializer, Attributes, Supervision }
import akka.stream.{ ActorMaterializer, Materializer, Attributes, Supervision }
/** /**
* General interface for stream transformation. * General interface for stream transformation.
@ -34,10 +33,10 @@ sealed trait Stage[-In, Out]
private[stream] object AbstractStage { private[stream] object AbstractStage {
final val UpstreamBall = 1 final val UpstreamBall = 1
final val DownstreamBall = 2 final val DownstreamBall = 2
final val BothBalls = UpstreamBall | DownstreamBall
final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending
final val PrecedingWasPull = 0x4000 final val PrecedingWasPull = 0x4000
final val NoTerminationPending = 0x8000 final val NoTerminationPending = 0x8000
final val BothBalls = UpstreamBall | DownstreamBall
final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending
} }
abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] {
@ -655,7 +654,7 @@ trait AsyncContext[Out, Ext] extends DetachedContext[Out] {
* *
* This object can be cached and reused within the same [[AsyncStage]]. * This object can be cached and reused within the same [[AsyncStage]].
*/ */
def getAsyncCallback(): AsyncCallback[Ext] def getAsyncCallback: AsyncCallback[Ext]
/** /**
* In response to an asynchronous notification an [[AsyncStage]] may choose * In response to an asynchronous notification an [[AsyncStage]] may choose
* to neither push nor pull nor terminate, which is represented as this * to neither push nor pull nor terminate, which is represented as this