Merge pull request #24653 from shkoder/fix-trigger-in-digest-cookbook

fix Trigger references in the stream cookbook example for digest #24317
This commit is contained in:
Patrik Nordwall 2018-03-05 17:36:51 +01:00 committed by GitHub
commit d9249226c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 52 additions and 90 deletions

View file

@ -17,7 +17,6 @@ import org.junit.Test;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -39,12 +38,11 @@ public class RecipeDigest extends RecipeTest {
mat = null; mat = null;
} }
//#calculating-digest //#calculating-digest
class DigestCalculator extends GraphStage<FlowShape<ByteString, ByteString>> { class DigestCalculator extends GraphStage<FlowShape<ByteString, ByteString>> {
private final String algorithm; private final String algorithm;
public Inlet<ByteString> in = Inlet.<ByteString>create("DigestCalculator.in"); public Inlet<ByteString> in = Inlet.create("DigestCalculator.in");
public Outlet<ByteString> out = Outlet.<ByteString>create("DigestCalculator.out"); public Outlet<ByteString> out = Outlet.create("DigestCalculator.out");
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out); private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
public DigestCalculator(String algorithm) { public DigestCalculator(String algorithm) {
@ -70,20 +68,21 @@ public class RecipeDigest extends RecipeTest {
setHandler(out, new AbstractOutHandler() { setHandler(out, new AbstractOutHandler() {
@Override @Override
public void onPull() throws Exception { public void onPull() {
pull(in); pull(in);
} }
}); });
setHandler(in, new AbstractInHandler() { setHandler(in, new AbstractInHandler() {
@Override @Override
public void onPush() throws Exception { public void onPush() {
ByteString chunk = grab(in); ByteString chunk = grab(in);
digest.update(chunk.toArray()); digest.update(chunk.toArray());
pull(in); pull(in);
} }
@Override @Override
public void onUpstreamFinish() throws Exception { public void onUpstreamFinish() {
// If the stream is finished, we need to emit the digest // If the stream is finished, we need to emit the digest
// before completing // before completing
emit(out, ByteString.fromArray(digest.digest())); emit(out, ByteString.fromArray(digest.digest()));
@ -91,38 +90,32 @@ public class RecipeDigest extends RecipeTest {
} }
}); });
} }
}; };
} }
} }
//#calculating-digest //#calculating-digest
@Test @Test
public void work() throws Exception { public void work() throws Exception {
new TestKit(system) { new TestKit(system) {
{ {
Source<ByteString, NotUsed> data = Source.from(Arrays.asList( Source<ByteString, NotUsed> data = Source.single(ByteString.fromString("abc"));
ByteString.fromString("abcdbcdecdef"),
ByteString.fromString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")));
//#calculating-digest2 //#calculating-digest2
final Source<ByteString, NotUsed> digest = data final Source<ByteString, NotUsed> digest = data.via(new DigestCalculator("SHA-256"));
.via(new DigestCalculator("SHA-256"));
//#calculating-digest2 //#calculating-digest2
ByteString got = digest.runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); ByteString got = digest.runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(ByteString.fromInts( assertEquals(ByteString.fromInts(
0x24, 0x8d, 0x6a, 0x61, 0xba, 0x78, 0x16, 0xbf,
0xd2, 0x06, 0x38, 0xb8, 0x8f, 0x01, 0xcf, 0xea,
0xe5, 0xc0, 0x26, 0x93, 0x41, 0x41, 0x40, 0xde,
0x0c, 0x3e, 0x60, 0x39, 0x5d, 0xae, 0x22, 0x23,
0xa3, 0x3c, 0xe4, 0x59, 0xb0, 0x03, 0x61, 0xa3,
0x64, 0xff, 0x21, 0x67, 0x96, 0x17, 0x7a, 0x9c,
0xf6, 0xec, 0xed, 0xd4, 0xb4, 0x10, 0xff, 0x61,
0x19, 0xdb, 0x06, 0xc1), got); 0xf2, 0x00, 0x15, 0xad), got);
} }
}; };
} }

View file

@ -33,10 +33,4 @@ public abstract class RecipeTest extends AbstractJavaTest {
return msg != null ? msg.hashCode() : 0; return msg != null ? msg.hashCode() : 0;
} }
} }
final class Trigger {
}
final class Job {
}
} }

View file

@ -2,9 +2,8 @@ package docs.stream.cookbook
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.stream.scaladsl.{ Keep, Sink, Source } import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Graph, SourceShape }
import akka.testkit.TimingTest import akka.testkit.TimingTest
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }

View file

@ -3,16 +3,10 @@
*/ */
package docs.stream.cookbook package docs.stream.cookbook
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPOutputStream
import akka.stream.impl.io.compression.GzipCompressor
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.util.ByteString import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -1,12 +1,5 @@
package docs.stream.cookbook package docs.stream.cookbook
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -16,28 +9,32 @@ class RecipeDigest extends RecipeSpec {
"work" in { "work" in {
val data = Source(List(
ByteString("abcdbcdecdef"),
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")))
//#calculating-digest //#calculating-digest
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import akka.stream.stage._ import akka.stream.stage._
val data: Source[ByteString, NotUsed] = Source.single(ByteString("abc"))
class DigestCalculator(algorithm: String) extends GraphStage[FlowShape[ByteString, ByteString]] { class DigestCalculator(algorithm: String) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("DigestCalculator.in") val in = Inlet[ByteString]("DigestCalculator.in")
val out = Outlet[ByteString]("DigestCalculator.out") val out = Outlet[ByteString]("DigestCalculator.out")
override val shape = FlowShape.of(in, out) override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val digest = MessageDigest.getInstance(algorithm) private val digest = MessageDigest.getInstance(algorithm)
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onPull(): Trigger = { override def onPull(): Unit = pull(in)
pull(in)
}
}) })
setHandler(in, new InHandler { setHandler(in, new InHandler {
override def onPush(): Trigger = { override def onPush(): Unit = {
val chunk = grab(in) val chunk = grab(in)
digest.update(chunk.toArray) digest.update(chunk.toArray)
pull(in) pull(in)
@ -48,25 +45,22 @@ class RecipeDigest extends RecipeSpec {
completeStage() completeStage()
} }
}) })
} }
} }
val digest: Source[ByteString, NotUsed] = data.via(new DigestCalculator("SHA-256")) val digest: Source[ByteString, NotUsed] = data.via(new DigestCalculator("SHA-256"))
//#calculating-digest //#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be( Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString( ByteString(
0x24, 0x8d, 0x6a, 0x61, 0xba, 0x78, 0x16, 0xbf,
0xd2, 0x06, 0x38, 0xb8, 0x8f, 0x01, 0xcf, 0xea,
0xe5, 0xc0, 0x26, 0x93, 0x41, 0x41, 0x40, 0xde,
0x0c, 0x3e, 0x60, 0x39, 0x5d, 0xae, 0x22, 0x23,
0xa3, 0x3c, 0xe4, 0x59, 0xb0, 0x03, 0x61, 0xa3,
0x64, 0xff, 0x21, 0x67, 0x96, 0x17, 0x7a, 0x9c,
0xf6, 0xec, 0xed, 0xd4, 0xb4, 0x10, 0xff, 0x61,
0x19, 0xdb, 0x06, 0xc1)) 0xf2, 0x00, 0x15, 0xad))
} }
} }
} }

View file

@ -4,9 +4,6 @@ import akka.stream.{ ClosedShape, OverflowStrategy }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
class RecipeDroppyBroadcast extends RecipeSpec { class RecipeDroppyBroadcast extends RecipeSpec {
"Recipe for a droppy broadcast" must { "Recipe for a droppy broadcast" must {

View file

@ -3,7 +3,6 @@ package docs.stream.cookbook
import akka.NotUsed import akka.NotUsed
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -91,7 +91,7 @@ class RecipeHold extends RecipeSpec {
.run() .run()
val subscription = sub.expectSubscription() val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
subscription.request(1) subscription.request(1)
sub.expectNext(0) sub.expectNext(0)
@ -121,10 +121,10 @@ class RecipeHold extends RecipeSpec {
source.via(new HoldWithWait).to(sink).run() source.via(new HoldWithWait).to(sink).run()
val subscription = sub.expectSubscription() val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
subscription.request(1) subscription.request(1)
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(1) pub.sendNext(1)
sub.expectNext(1) sub.expectNext(1)

View file

@ -1,7 +1,6 @@
package docs.stream.cookbook package docs.stream.cookbook
import akka.NotUsed import akka.NotUsed
import akka.stream.ClosedShape
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.util.ByteString import akka.util.ByteString

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
class RecipeManualTrigger extends RecipeSpec { class RecipeManualTrigger extends RecipeSpec {
"Recipe for triggering a stream manually" must { "Recipe for triggering a stream manually" must {
type Trigger = Unit
"work" in { "work" in {
@ -31,17 +32,17 @@ class RecipeManualTrigger extends RecipeSpec {
graph.run() graph.run()
sub.expectSubscription().request(1000) sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
sub.expectNext("1") sub.expectNext("1")
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
pub.sendNext(()) pub.sendNext(())
sub.expectNext("2") sub.expectNext("2")
sub.expectNext("3") sub.expectNext("3")
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
sub.expectNext("4") sub.expectNext("4")
@ -71,17 +72,17 @@ class RecipeManualTrigger extends RecipeSpec {
graph.run() graph.run()
sub.expectSubscription().request(1000) sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
sub.expectNext("1") sub.expectNext("1")
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
pub.sendNext(()) pub.sendNext(())
sub.expectNext("2") sub.expectNext("2")
sub.expectNext("3") sub.expectNext("3")
sub.expectNoMsg(100.millis) sub.expectNoMessage(100.millis)
pub.sendNext(()) pub.sendNext(())
sub.expectNext("4") sub.expectNext("4")

View file

@ -4,7 +4,6 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.util.ByteString import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -1,11 +1,9 @@
package docs.stream.cookbook package docs.stream.cookbook
import akka.NotUsed import akka.NotUsed
import akka.stream.{ Graph, FlowShape, Inlet, Outlet, Attributes, OverflowStrategy }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import scala.concurrent.{ Await, Future } import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.stage.{ GraphStage, GraphStageLogic }
class RecipeReduceByKey extends RecipeSpec { class RecipeReduceByKey extends RecipeSpec {

View file

@ -5,8 +5,6 @@ package docs.stream.cookbook
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import scala.concurrent.Future import scala.concurrent.Future
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
class RecipeSeq extends RecipeSpec { class RecipeSeq extends RecipeSpec {

View file

@ -7,7 +7,5 @@ trait RecipeSpec extends AkkaSpec {
implicit val m = ActorMaterializer() implicit val m = ActorMaterializer()
type Message = String type Message = String
type Trigger = Unit
type Job = String
} }

View file

@ -3,7 +3,6 @@ package docs.stream.cookbook
import akka.NotUsed import akka.NotUsed
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.testkit.TestProbe
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._