From bba1a8b779531650d306d50ff36a9ee847a2ea3c Mon Sep 17 00:00:00 2001 From: Santiago Date: Sat, 29 Nov 2025 11:27:04 -0500 Subject: [PATCH 1/3] docs(examples): initial scaffolding for large-messages module Signed-off-by: Santiago --- .../README.md | 77 +++ .../pom.xml | 232 +++++++ .../template.yaml | 59 ++ .../tools/README.md | 73 ++ .../tools/pom.xml | 110 +++ .../java/org/demo/kafka/avro/AvroProduct.java | 476 +++++++++++++ .../demo/kafka/protobuf/ProtobufProduct.java | 636 ++++++++++++++++++ .../protobuf/ProtobufProductOrBuilder.java | 36 + .../protobuf/ProtobufProductOuterClass.java | 63 ++ .../demo/kafka/tools/GenerateAvroSamples.java | 127 ++++ .../demo/kafka/tools/GenerateJsonSamples.java | 130 ++++ .../kafka/tools/GenerateProtobufSamples.java | 215 ++++++ 12 files changed, 2234 insertions(+) create mode 100644 examples/powertools-examples-large-messages/README.md create mode 100644 examples/powertools-examples-large-messages/pom.xml create mode 100644 examples/powertools-examples-large-messages/template.yaml create mode 100644 examples/powertools-examples-large-messages/tools/README.md create mode 100644 examples/powertools-examples-large-messages/tools/pom.xml create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java create mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java diff --git a/examples/powertools-examples-large-messages/README.md b/examples/powertools-examples-large-messages/README.md new file mode 100644 index 000000000..76cd81cb9 --- /dev/null +++ b/examples/powertools-examples-large-messages/README.md @@ -0,0 +1,77 @@ +# Powertools for AWS Lambda (Java) - Kafka Example + +This project demonstrates how to use Powertools for AWS Lambda (Java) to deserialize Kafka Lambda events directly into strongly typed Kafka ConsumerRecords using different serialization formats. + +## Overview + +The example showcases automatic deserialization of Kafka Lambda events into ConsumerRecords using three formats: +- JSON - Using standard JSON serialization +- Avro - Using Apache Avro schema-based serialization +- Protobuf - Using Google Protocol Buffers serialization + +Each format has its own Lambda function handler that demonstrates how to use the `@Deserialization` annotation with the appropriate `DeserializationType`, eliminating the need to handle complex deserialization logic manually. + +## Build and Deploy + +### Prerequisites +- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +- Java 11+ +- Maven + +### Build + +```bash +# Build the application +sam build +``` + +### Deploy + +```bash +# Deploy the application to AWS +sam deploy --guided +``` + +During the guided deployment, you'll be prompted to provide values for required parameters. After deployment, SAM will output the ARNs of the deployed Lambda functions. + +### Build with Different Serialization Formats + +The project includes Maven profiles to build with different serialization formats: + +```bash +# Build with JSON only (no Avro or Protobuf) +mvn clean package -P base + +# Build with Avro only +mvn clean package -P avro-only + +# Build with Protobuf only +mvn clean package -P protobuf-only + +# Build with all formats (default) +mvn clean package -P full +``` + +## Testing + +The `events` directory contains sample events for each serialization format: +- `kafka-json-event.json` - Sample event with JSON-serialized products +- `kafka-avro-event.json` - Sample event with Avro-serialized products +- `kafka-protobuf-event.json` - Sample event with Protobuf-serialized products + +You can use these events to test the Lambda functions: + +```bash +# Test the JSON deserialization function +sam local invoke JsonDeserializationFunction --event events/kafka-json-event.json + +# Test the Avro deserialization function +sam local invoke AvroDeserializationFunction --event events/kafka-avro-event.json + +# Test the Protobuf deserialization function +sam local invoke ProtobufDeserializationFunction --event events/kafka-protobuf-event.json +``` + +## Sample Generator Tool + +The project includes a tool to generate sample JSON, Avro, and Protobuf serialized data. See the [tools/README.md](tools/README.md) for more information. \ No newline at end of file diff --git a/examples/powertools-examples-large-messages/pom.xml b/examples/powertools-examples-large-messages/pom.xml new file mode 100644 index 000000000..f4b75722c --- /dev/null +++ b/examples/powertools-examples-large-messages/pom.xml @@ -0,0 +1,232 @@ + + 4.0.0 + software.amazon.lambda.examples + 2.8.0 + powertools-examples-kafka + jar + Powertools for AWS Lambda (Java) - Examples - Kafka + + + 11 + 11 + 1.9.20.1 + 1.12.1 + 4.33.0 + + + + + software.amazon.lambda + powertools-kafka + ${project.version} + + + org.apache.kafka + kafka-clients + 4.1.1 + + + org.apache.avro + avro + ${avro.version} + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + + software.amazon.lambda + powertools-logging-log4j + ${project.version} + + + org.aspectj + aspectjrt + ${aspectj.version} + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 3.1.4 + + true + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.1 + + + package + + shade + + + false + + + + + + + + + org.apache.logging.log4j + log4j-transform-maven-shade-plugin-extensions + 0.2.0 + + + + + dev.aspectj + aspectj-maven-plugin + 1.14.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.target} + + + software.amazon.lambda + powertools-logging + + + + + + + compile + + + + + + org.aspectj + aspectjtools + ${aspectj.version} + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + ${project.basedir}/src/main/java/ + String + + + + + + + io.github.ascopes + protobuf-maven-plugin + 3.10.3 + + + + generate + + generate-sources + + ${protobuf.version} + + ${project.basedir}/src/main/proto + + ${project.basedir}/src/main/java + false + + + + + + + + + + + base + + base + + + + + org.apache.avro + avro + ${avro.version} + provided + + + com.google.protobuf + protobuf-java + ${protobuf.version} + provided + + + + + + + avro-only + + avro-only + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + provided + + + + + + + protobuf-only + + protobuf-only + + + + org.apache.avro + avro + ${avro.version} + provided + + + + + + + full + + true + + + full + + + + diff --git a/examples/powertools-examples-large-messages/template.yaml b/examples/powertools-examples-large-messages/template.yaml new file mode 100644 index 000000000..509b13ca3 --- /dev/null +++ b/examples/powertools-examples-large-messages/template.yaml @@ -0,0 +1,59 @@ +AWSTemplateFormatVersion: "2010-09-09" +Transform: AWS::Serverless-2016-10-31 +Description: > + Kafka Deserialization example with Kafka Lambda ESM + +Globals: + Function: + Timeout: 20 + Runtime: java11 + MemorySize: 512 + Tracing: Active + +Resources: + JsonDeserializationFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: . + Handler: org.demo.kafka.JsonDeserializationFunction::handleRequest + Environment: + Variables: + JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" + POWERTOOLS_LOG_LEVEL: DEBUG + POWERTOOLS_SERVICE_NAME: JsonDeserialization + POWERTOOLS_METRICS_NAMESPACE: JsonDeserializationFunction + + AvroDeserializationFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: . + Handler: org.demo.kafka.AvroDeserializationFunction::handleRequest + Environment: + Variables: + JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" + POWERTOOLS_LOG_LEVEL: DEBUG + POWERTOOLS_SERVICE_NAME: AvroDeserialization + POWERTOOLS_METRICS_NAMESPACE: AvroDeserializationFunction + + ProtobufDeserializationFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: . + Handler: org.demo.kafka.ProtobufDeserializationFunction::handleRequest + Environment: + Variables: + JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" + POWERTOOLS_LOG_LEVEL: DEBUG + POWERTOOLS_SERVICE_NAME: ProtobufDeserialization + POWERTOOLS_METRICS_NAMESPACE: ProtobufDeserializationFunction + +Outputs: + JsonFunction: + Description: "Kafka JSON Lambda Function ARN" + Value: !GetAtt JsonDeserializationFunction.Arn + AvroFunction: + Description: "Kafka Avro Lambda Function ARN" + Value: !GetAtt AvroDeserializationFunction.Arn + ProtobufFunction: + Description: "Kafka Protobuf Lambda Function ARN" + Value: !GetAtt ProtobufDeserializationFunction.Arn diff --git a/examples/powertools-examples-large-messages/tools/README.md b/examples/powertools-examples-large-messages/tools/README.md new file mode 100644 index 000000000..02e8dde9b --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/README.md @@ -0,0 +1,73 @@ +# Kafka Sample Generator Tool + +This tool generates base64-encoded serialized products for testing the Kafka consumer functions with different serialization formats. + +## Supported Formats + +- **JSON**: Generates base64-encoded JSON serialized products +- **Avro**: Generates base64-encoded Avro serialized products +- **Protobuf**: Generates base64-encoded Protobuf serialized products + +## Usage + +Run the following Maven commands from this directory: + +```bash +# Generate Avro and Protobuf classes from schemas +mvn generate-sources + +# Compile the code +mvn compile +``` + +### Generate JSON Samples + +```bash +# Run the JSON sample generator +mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateJsonSamples" +``` + +The tool will output base64-encoded values for JSON products that can be used in `../events/kafka-json-event.json`. + +### Generate Avro Samples + +```bash +# Run the Avro sample generator +mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateAvroSamples" +``` + +The tool will output base64-encoded values for Avro products that can be used in `../events/kafka-avro-event.json`. + +### Generate Protobuf Samples + +```bash +# Run the Protobuf sample generator +mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples" +``` + +The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios. + +## Output + +Each generator produces: + +1. Three different products (Laptop, Smartphone, Headphones) +2. An integer key (42) and one entry with a nullish key to test for edge-cases +3. A complete sample event structure that can be used directly for testing + +The Protobuf generators additionally create samples with different Confluent message-index formats: +- Standard protobuf (no message indexes) +- Simple message index (single 0 byte) +- Complex message index (length-prefixed array) + +For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format). + +## Example + +After generating the samples, you can copy the output into the respective event files: + +- `../events/kafka-json-event.json` for JSON samples +- `../events/kafka-avro-event.json` for Avro samples +- `../events/kafka-protobuf-event.json` for Protobuf samples + +These event files can then be used to test the Lambda functions with the appropriate deserializer. diff --git a/examples/powertools-examples-large-messages/tools/pom.xml b/examples/powertools-examples-large-messages/tools/pom.xml new file mode 100644 index 000000000..e6f2654d1 --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/pom.xml @@ -0,0 +1,110 @@ + + + 4.0.0 + + software.amazon.lambda.examples + powertools-examples-kafka-tools + 2.0.0 + + + 11 + 11 + 1.12.0 + 4.31.0 + 4.0.0 + + + + + org.apache.avro + avro + ${avro.version} + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + com.fasterxml.jackson.core + jackson-databind + 2.19.0 + + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/../src/main/avro/ + ${project.basedir}/src/main/java/ + String + + + + + + io.github.ascopes + protobuf-maven-plugin + 3.10.2 + + + + generate + + generate-sources + + ${protobuf.version} + + ${project.basedir}/../src/main/proto + + ${project.basedir}/src/main/java + false + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + + generate-json-samples + + org.demo.kafka.tools.GenerateJsonSamples + + + + generate-avro-samples + + org.demo.kafka.tools.GenerateAvroSamples + + + + generate-protobuf-samples + + org.demo.kafka.tools.GenerateProtobufSamples + + + + + + + diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java new file mode 100644 index 000000000..fad7e2fbf --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java @@ -0,0 +1,476 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.demo.kafka.avro; + +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class AvroProduct extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -2929699301240218341L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroProduct\",\"namespace\":\"org.demo.kafka.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"price\",\"type\":\"double\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this AvroProduct to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a AvroProduct from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a AvroProduct instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static AvroProduct fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private int id; + private java.lang.String name; + private double price; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public AvroProduct() {} + + /** + * All-args constructor. + * @param id The new value for id + * @param name The new value for name + * @param price The new value for price + */ + public AvroProduct(java.lang.Integer id, java.lang.String name, java.lang.Double price) { + this.id = id; + this.name = name; + this.price = price; + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + + @Override + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return name; + case 2: return price; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.Integer)value$; break; + case 1: name = value$ != null ? value$.toString() : null; break; + case 2: price = (java.lang.Double)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'id' field. + * @return The value of the 'id' field. + */ + public int getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value the value to set. + */ + public void setId(int value) { + this.id = value; + } + + /** + * Gets the value of the 'name' field. + * @return The value of the 'name' field. + */ + public java.lang.String getName() { + return name; + } + + + /** + * Sets the value of the 'name' field. + * @param value the value to set. + */ + public void setName(java.lang.String value) { + this.name = value; + } + + /** + * Gets the value of the 'price' field. + * @return The value of the 'price' field. + */ + public double getPrice() { + return price; + } + + + /** + * Sets the value of the 'price' field. + * @param value the value to set. + */ + public void setPrice(double value) { + this.price = value; + } + + /** + * Creates a new AvroProduct RecordBuilder. + * @return A new AvroProduct RecordBuilder + */ + public static org.demo.kafka.avro.AvroProduct.Builder newBuilder() { + return new org.demo.kafka.avro.AvroProduct.Builder(); + } + + /** + * Creates a new AvroProduct RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new AvroProduct RecordBuilder + */ + public static org.demo.kafka.avro.AvroProduct.Builder newBuilder(org.demo.kafka.avro.AvroProduct.Builder other) { + if (other == null) { + return new org.demo.kafka.avro.AvroProduct.Builder(); + } else { + return new org.demo.kafka.avro.AvroProduct.Builder(other); + } + } + + /** + * Creates a new AvroProduct RecordBuilder by copying an existing AvroProduct instance. + * @param other The existing instance to copy. + * @return A new AvroProduct RecordBuilder + */ + public static org.demo.kafka.avro.AvroProduct.Builder newBuilder(org.demo.kafka.avro.AvroProduct other) { + if (other == null) { + return new org.demo.kafka.avro.AvroProduct.Builder(); + } else { + return new org.demo.kafka.avro.AvroProduct.Builder(other); + } + } + + /** + * RecordBuilder for AvroProduct instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private int id; + private java.lang.String name; + private double price; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(org.demo.kafka.avro.AvroProduct.Builder other) { + super(other); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.price)) { + this.price = data().deepCopy(fields()[2].schema(), other.price); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + } + + /** + * Creates a Builder by copying an existing AvroProduct instance + * @param other The existing instance to copy. + */ + private Builder(org.demo.kafka.avro.AvroProduct other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.price)) { + this.price = data().deepCopy(fields()[2].schema(), other.price); + fieldSetFlags()[2] = true; + } + } + + /** + * Gets the value of the 'id' field. + * @return The value. + */ + public int getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value The value of 'id'. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder setId(int value) { + validate(fields()[0], value); + this.id = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'id' field has been set. + * @return True if the 'id' field has been set, false otherwise. + */ + public boolean hasId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'id' field. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder clearId() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'name' field. + * @return The value. + */ + public java.lang.String getName() { + return name; + } + + + /** + * Sets the value of the 'name' field. + * @param value The value of 'name'. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder setName(java.lang.String value) { + validate(fields()[1], value); + this.name = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'name' field. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder clearName() { + name = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'price' field. + * @return The value. + */ + public double getPrice() { + return price; + } + + + /** + * Sets the value of the 'price' field. + * @param value The value of 'price'. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder setPrice(double value) { + validate(fields()[2], value); + this.price = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'price' field has been set. + * @return True if the 'price' field has been set, false otherwise. + */ + public boolean hasPrice() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'price' field. + * @return This builder. + */ + public org.demo.kafka.avro.AvroProduct.Builder clearPrice() { + fieldSetFlags()[2] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public AvroProduct build() { + try { + AvroProduct record = new AvroProduct(); + record.id = fieldSetFlags()[0] ? this.id : (java.lang.Integer) defaultValue(fields()[0]); + record.name = fieldSetFlags()[1] ? this.name : (java.lang.String) defaultValue(fields()[1]); + record.price = fieldSetFlags()[2] ? this.price : (java.lang.Double) defaultValue(fields()[2]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeInt(this.id); + + out.writeString(this.name); + + out.writeDouble(this.price); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.id = in.readInt(); + + this.name = in.readString(); + + this.price = in.readDouble(); + + } else { + for (int i = 0; i < 3; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.id = in.readInt(); + break; + + case 1: + this.name = in.readString(); + break; + + case 2: + this.price = in.readDouble(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java new file mode 100644 index 000000000..6da9113fc --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java @@ -0,0 +1,636 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// NO CHECKED-IN PROTOBUF GENCODE +// source: ProtobufProduct.proto +// Protobuf Java Version: 4.31.0 + +package org.demo.kafka.protobuf; + +/** + * Protobuf type {@code org.demo.kafka.protobuf.ProtobufProduct} + */ +@com.google.protobuf.Generated +public final class ProtobufProduct extends + com.google.protobuf.GeneratedMessage implements + // @@protoc_insertion_point(message_implements:org.demo.kafka.protobuf.ProtobufProduct) + ProtobufProductOrBuilder { +private static final long serialVersionUID = 0L; + static { + com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( + com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, + /* major= */ 4, + /* minor= */ 31, + /* patch= */ 0, + /* suffix= */ "", + ProtobufProduct.class.getName()); + } + // Use ProtobufProduct.newBuilder() to construct. + private ProtobufProduct(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + } + private ProtobufProduct() { + name_ = ""; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.demo.kafka.protobuf.ProtobufProduct.class, org.demo.kafka.protobuf.ProtobufProduct.Builder.class); + } + + public static final int ID_FIELD_NUMBER = 1; + private int id_ = 0; + /** + * int32 id = 1; + * @return The id. + */ + @java.lang.Override + public int getId() { + return id_; + } + + public static final int NAME_FIELD_NUMBER = 2; + @SuppressWarnings("serial") + private volatile java.lang.Object name_ = ""; + /** + * string name = 2; + * @return The name. + */ + @java.lang.Override + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } + } + /** + * string name = 2; + * @return The bytes for name. + */ + @java.lang.Override + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int PRICE_FIELD_NUMBER = 3; + private double price_ = 0D; + /** + * double price = 3; + * @return The price. + */ + @java.lang.Override + public double getPrice() { + return price_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (id_ != 0) { + output.writeInt32(1, id_); + } + if (!com.google.protobuf.GeneratedMessage.isStringEmpty(name_)) { + com.google.protobuf.GeneratedMessage.writeString(output, 2, name_); + } + if (java.lang.Double.doubleToRawLongBits(price_) != 0) { + output.writeDouble(3, price_); + } + getUnknownFields().writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (id_ != 0) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, id_); + } + if (!com.google.protobuf.GeneratedMessage.isStringEmpty(name_)) { + size += com.google.protobuf.GeneratedMessage.computeStringSize(2, name_); + } + if (java.lang.Double.doubleToRawLongBits(price_) != 0) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(3, price_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.demo.kafka.protobuf.ProtobufProduct)) { + return super.equals(obj); + } + org.demo.kafka.protobuf.ProtobufProduct other = (org.demo.kafka.protobuf.ProtobufProduct) obj; + + if (getId() + != other.getId()) return false; + if (!getName() + .equals(other.getName())) return false; + if (java.lang.Double.doubleToLongBits(getPrice()) + != java.lang.Double.doubleToLongBits( + other.getPrice())) return false; + if (!getUnknownFields().equals(other.getUnknownFields())) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + getId(); + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getName().hashCode(); + hash = (37 * hash) + PRICE_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getPrice())); + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input, extensionRegistry); + } + + public static org.demo.kafka.protobuf.ProtobufProduct parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseDelimitedWithIOException(PARSER, input); + } + + public static org.demo.kafka.protobuf.ProtobufProduct parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input); + } + public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(org.demo.kafka.protobuf.ProtobufProduct prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code org.demo.kafka.protobuf.ProtobufProduct} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder implements + // @@protoc_insertion_point(builder_implements:org.demo.kafka.protobuf.ProtobufProduct) + org.demo.kafka.protobuf.ProtobufProductOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.demo.kafka.protobuf.ProtobufProduct.class, org.demo.kafka.protobuf.ProtobufProduct.Builder.class); + } + + // Construct using org.demo.kafka.protobuf.ProtobufProduct.newBuilder() + private Builder() { + + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + + } + @java.lang.Override + public Builder clear() { + super.clear(); + bitField0_ = 0; + id_ = 0; + name_ = ""; + price_ = 0D; + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + } + + @java.lang.Override + public org.demo.kafka.protobuf.ProtobufProduct getDefaultInstanceForType() { + return org.demo.kafka.protobuf.ProtobufProduct.getDefaultInstance(); + } + + @java.lang.Override + public org.demo.kafka.protobuf.ProtobufProduct build() { + org.demo.kafka.protobuf.ProtobufProduct result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public org.demo.kafka.protobuf.ProtobufProduct buildPartial() { + org.demo.kafka.protobuf.ProtobufProduct result = new org.demo.kafka.protobuf.ProtobufProduct(this); + if (bitField0_ != 0) { buildPartial0(result); } + onBuilt(); + return result; + } + + private void buildPartial0(org.demo.kafka.protobuf.ProtobufProduct result) { + int from_bitField0_ = bitField0_; + if (((from_bitField0_ & 0x00000001) != 0)) { + result.id_ = id_; + } + if (((from_bitField0_ & 0x00000002) != 0)) { + result.name_ = name_; + } + if (((from_bitField0_ & 0x00000004) != 0)) { + result.price_ = price_; + } + } + + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.demo.kafka.protobuf.ProtobufProduct) { + return mergeFrom((org.demo.kafka.protobuf.ProtobufProduct)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.demo.kafka.protobuf.ProtobufProduct other) { + if (other == org.demo.kafka.protobuf.ProtobufProduct.getDefaultInstance()) return this; + if (other.getId() != 0) { + setId(other.getId()); + } + if (!other.getName().isEmpty()) { + name_ = other.name_; + bitField0_ |= 0x00000002; + onChanged(); + } + if (java.lang.Double.doubleToRawLongBits(other.getPrice()) != 0) { + setPrice(other.getPrice()); + } + this.mergeUnknownFields(other.getUnknownFields()); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + id_ = input.readInt32(); + bitField0_ |= 0x00000001; + break; + } // case 8 + case 18: { + name_ = input.readStringRequireUtf8(); + bitField0_ |= 0x00000002; + break; + } // case 18 + case 25: { + price_ = input.readDouble(); + bitField0_ |= 0x00000004; + break; + } // case 25 + default: { + if (!super.parseUnknownField(input, extensionRegistry, tag)) { + done = true; // was an endgroup tag + } + break; + } // default: + } // switch (tag) + } // while (!done) + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } finally { + onChanged(); + } // finally + return this; + } + private int bitField0_; + + private int id_ ; + /** + * int32 id = 1; + * @return The id. + */ + @java.lang.Override + public int getId() { + return id_; + } + /** + * int32 id = 1; + * @param value The id to set. + * @return This builder for chaining. + */ + public Builder setId(int value) { + + id_ = value; + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + /** + * int32 id = 1; + * @return This builder for chaining. + */ + public Builder clearId() { + bitField0_ = (bitField0_ & ~0x00000001); + id_ = 0; + onChanged(); + return this; + } + + private java.lang.Object name_ = ""; + /** + * string name = 2; + * @return The name. + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * string name = 2; + * @return The bytes for name. + */ + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * string name = 2; + * @param value The name to set. + * @return This builder for chaining. + */ + public Builder setName( + java.lang.String value) { + if (value == null) { throw new NullPointerException(); } + name_ = value; + bitField0_ |= 0x00000002; + onChanged(); + return this; + } + /** + * string name = 2; + * @return This builder for chaining. + */ + public Builder clearName() { + name_ = getDefaultInstance().getName(); + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + return this; + } + /** + * string name = 2; + * @param value The bytes for name to set. + * @return This builder for chaining. + */ + public Builder setNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { throw new NullPointerException(); } + checkByteStringIsUtf8(value); + name_ = value; + bitField0_ |= 0x00000002; + onChanged(); + return this; + } + + private double price_ ; + /** + * double price = 3; + * @return The price. + */ + @java.lang.Override + public double getPrice() { + return price_; + } + /** + * double price = 3; + * @param value The price to set. + * @return This builder for chaining. + */ + public Builder setPrice(double value) { + + price_ = value; + bitField0_ |= 0x00000004; + onChanged(); + return this; + } + /** + * double price = 3; + * @return This builder for chaining. + */ + public Builder clearPrice() { + bitField0_ = (bitField0_ & ~0x00000004); + price_ = 0D; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:org.demo.kafka.protobuf.ProtobufProduct) + } + + // @@protoc_insertion_point(class_scope:org.demo.kafka.protobuf.ProtobufProduct) + private static final org.demo.kafka.protobuf.ProtobufProduct DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new org.demo.kafka.protobuf.ProtobufProduct(); + } + + public static org.demo.kafka.protobuf.ProtobufProduct getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @java.lang.Override + public ProtobufProduct parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + Builder builder = newBuilder(); + try { + builder.mergeFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(builder.buildPartial()); + } catch (com.google.protobuf.UninitializedMessageException e) { + throw e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial()); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException(e) + .setUnfinishedMessage(builder.buildPartial()); + } + return builder.buildPartial(); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public org.demo.kafka.protobuf.ProtobufProduct getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + +} + diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java new file mode 100644 index 000000000..9c1518db3 --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java @@ -0,0 +1,36 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// NO CHECKED-IN PROTOBUF GENCODE +// source: ProtobufProduct.proto +// Protobuf Java Version: 4.31.0 + +package org.demo.kafka.protobuf; + +@com.google.protobuf.Generated +public interface ProtobufProductOrBuilder extends + // @@protoc_insertion_point(interface_extends:org.demo.kafka.protobuf.ProtobufProduct) + com.google.protobuf.MessageOrBuilder { + + /** + * int32 id = 1; + * @return The id. + */ + int getId(); + + /** + * string name = 2; + * @return The name. + */ + java.lang.String getName(); + /** + * string name = 2; + * @return The bytes for name. + */ + com.google.protobuf.ByteString + getNameBytes(); + + /** + * double price = 3; + * @return The price. + */ + double getPrice(); +} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java new file mode 100644 index 000000000..6a99f35ec --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java @@ -0,0 +1,63 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// NO CHECKED-IN PROTOBUF GENCODE +// source: ProtobufProduct.proto +// Protobuf Java Version: 4.31.0 + +package org.demo.kafka.protobuf; + +@com.google.protobuf.Generated +public final class ProtobufProductOuterClass { + private ProtobufProductOuterClass() {} + static { + com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( + com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, + /* major= */ 4, + /* minor= */ 31, + /* patch= */ 0, + /* suffix= */ "", + ProtobufProductOuterClass.class.getName()); + } + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + static final com.google.protobuf.Descriptors.Descriptor + internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + static final + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025ProtobufProduct.proto\022\027org.demo.kafka." + + "protobuf\":\n\017ProtobufProduct\022\n\n\002id\030\001 \001(\005\022" + + "\014\n\004name\030\002 \001(\t\022\r\n\005price\030\003 \001(\001B6\n\027org.demo" + + ".kafka.protobufB\031ProtobufProductOuterCla" + + "ssP\001b\006proto3" + }; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }); + internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor, + new java.lang.String[] { "Id", "Name", "Price", }); + descriptor.resolveAllFeaturesImmutable(); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java new file mode 100644 index 000000000..e6f4d38fd --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java @@ -0,0 +1,127 @@ +package org.demo.kafka.tools; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Base64; + +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.demo.kafka.avro.AvroProduct; + +/** + * Utility class to generate base64-encoded Avro serialized products + * for use in test events. + */ +public final class GenerateAvroSamples { + + private GenerateAvroSamples() { + // Utility class + } + + public static void main(String[] args) throws IOException { + // Create three different products + AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99); + AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99); + AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99); + + // Serialize and encode each product + String encodedProduct1 = serializeAndEncode(product1); + String encodedProduct2 = serializeAndEncode(product2); + String encodedProduct3 = serializeAndEncode(product3); + + // Serialize and encode an integer key + String encodedKey = serializeAndEncodeInteger(42); + + // Print the results + System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:"); + System.out.println("\nProduct 1 (with key):"); + System.out.println("key: \"" + encodedKey + "\","); + System.out.println("value: \"" + encodedProduct1 + "\","); + + System.out.println("\nProduct 2 (with key):"); + System.out.println("key: \"" + encodedKey + "\","); + System.out.println("value: \"" + encodedProduct2 + "\","); + + System.out.println("\nProduct 3 (without key):"); + System.out.println("key: null,"); + System.out.println("value: \"" + encodedProduct3 + "\","); + + // Print a sample event structure + System.out.println("\nSample event structure:"); + printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); + } + + private static String serializeAndEncode(AvroProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + DatumWriter writer = new SpecificDatumWriter<>(AvroProduct.class); + + writer.write(product, encoder); + encoder.flush(); + + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + private static String serializeAndEncodeInteger(Integer value) throws IOException { + // For simple types like integers, we'll just convert to string and encode + return Base64.getEncoder().encodeToString(value.toString().getBytes()); + } + + private static void printSampleEvent(String key, String product1, String product2, String product3) { + System.out.println("{\n" + + " \"eventSource\": \"aws:kafka\",\n" + + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" + + + " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" + + + " \"records\": {\n" + + " \"mytopic-0\": [\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + product1 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 16,\n" + + " \"timestamp\": 1545084650988,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + product2 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 17,\n" + + " \"timestamp\": 1545084650989,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": null,\n" + + " \"value\": \"" + product3 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"); + } +} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java new file mode 100644 index 000000000..d0ef7cb55 --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java @@ -0,0 +1,130 @@ +package org.demo.kafka.tools; + +import java.io.IOException; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Utility class to generate base64-encoded JSON serialized products + * for use in test events. + */ +public final class GenerateJsonSamples { + + private GenerateJsonSamples() { + // Utility class + } + + public static void main(String[] args) throws IOException { + // Create three different products + Map product1 = new HashMap<>(); + product1.put("id", 1001); + product1.put("name", "Laptop"); + product1.put("price", 999.99); + + Map product2 = new HashMap<>(); + product2.put("id", 1002); + product2.put("name", "Smartphone"); + product2.put("price", 599.99); + + Map product3 = new HashMap<>(); + product3.put("id", 1003); + product3.put("name", "Headphones"); + product3.put("price", 149.99); + + // Serialize and encode each product + String encodedProduct1 = serializeAndEncode(product1); + String encodedProduct2 = serializeAndEncode(product2); + String encodedProduct3 = serializeAndEncode(product3); + + // Serialize and encode an integer key + String encodedKey = serializeAndEncodeInteger(42); + + // Print the results + System.out.println("Base64 encoded JSON products for use in kafka-json-event.json:"); + System.out.println("\nProduct 1 (with key):"); + System.out.println("key: \"" + encodedKey + "\","); + System.out.println("value: \"" + encodedProduct1 + "\","); + + System.out.println("\nProduct 2 (with key):"); + System.out.println("key: \"" + encodedKey + "\","); + System.out.println("value: \"" + encodedProduct2 + "\","); + + System.out.println("\nProduct 3 (without key):"); + System.out.println("key: null,"); + System.out.println("value: \"" + encodedProduct3 + "\","); + + // Print a sample event structure + System.out.println("\nSample event structure:"); + printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); + } + + private static String serializeAndEncode(Map product) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(product); + return Base64.getEncoder().encodeToString(json.getBytes()); + } + + private static String serializeAndEncodeInteger(Integer value) { + // For simple types like integers, we'll just convert to string and encode + return Base64.getEncoder().encodeToString(value.toString().getBytes()); + } + + private static void printSampleEvent(String key, String product1, String product2, String product3) { + System.out.println("{\n" + + " \"eventSource\": \"aws:kafka\",\n" + + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" + + + " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" + + + " \"records\": {\n" + + " \"mytopic-0\": [\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + product1 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + product2 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": null,\n" + + " \"value\": \"" + product3 + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"); + } +} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java new file mode 100644 index 000000000..aa5f6e330 --- /dev/null +++ b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java @@ -0,0 +1,215 @@ +package org.demo.kafka.tools; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Base64; + +import org.apache.kafka.common.utils.ByteUtils; +import org.demo.kafka.protobuf.ProtobufProduct; + +import com.google.protobuf.CodedOutputStream; + +/** + * Utility class to generate base64-encoded Protobuf serialized products + * for use in test events. + */ +public final class GenerateProtobufSamples { + + private GenerateProtobufSamples() { + // Utility class + } + + public static void main(String[] args) throws IOException { + // Create a single product that will be used for all four scenarios + ProtobufProduct product = ProtobufProduct.newBuilder() + .setId(1001) + .setName("Laptop") + .setPrice(999.99) + .build(); + + // Create four different serializations of the same product + String standardProduct = serializeAndEncode(product); + String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex(product); + String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex(product); + String productWithGlueMagicByte = serializeWithGlueMagicByte(product); + + // Serialize and encode an integer key (same for all records) + String encodedKey = serializeAndEncodeInteger(42); + + // Print the results + System.out.println("Base64 encoded Protobuf products with different scenarios:"); + System.out.println("\n1. Plain Protobuf (no schema registry):"); + System.out.println("value: \"" + standardProduct + "\""); + + System.out.println("\n2. Confluent with Simple Message Index (optimized single 0):"); + System.out.println("value: \"" + productWithConfluentSimpleIndex + "\""); + + System.out.println("\n3. Confluent with Complex Message Index (array [1,0]):"); + System.out.println("value: \"" + productWithConfluentComplexIndex + "\""); + + System.out.println("\n4. Glue with Magic Byte:"); + System.out.println("value: \"" + productWithGlueMagicByte + "\""); + + // Print the merged event structure + System.out.println("\n" + "=".repeat(80)); + System.out.println("MERGED EVENT WITH ALL FOUR SCENARIOS"); + System.out.println("=".repeat(80)); + printSampleEvent(encodedKey, standardProduct, productWithConfluentSimpleIndex, productWithConfluentComplexIndex, + productWithGlueMagicByte); + } + + private static String serializeAndEncode(ProtobufProduct product) { + return Base64.getEncoder().encodeToString(product.toByteArray()); + } + + /** + * Serializes a protobuf product with a simple Confluent message index (optimized single 0). + * Format: [0][protobuf_data] + * + * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} + */ + private static String serializeWithConfluentSimpleMessageIndex(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Write optimized simple message index for Confluent (single 0 byte for [0]) + baos.write(0); + + // Write the protobuf data + baos.write(product.toByteArray()); + + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + /** + * Serializes a protobuf product with a complex Confluent message index (array [1,0]). + * Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding + * + * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} + */ + private static String serializeWithConfluentComplexMessageIndex(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Write complex message index array [1,0] using ByteUtils + ByteBuffer buffer = ByteBuffer.allocate(1024); + ByteUtils.writeVarint(2, buffer); // Array length + ByteUtils.writeVarint(1, buffer); // First index value + ByteUtils.writeVarint(0, buffer); // Second index value + + buffer.flip(); + byte[] indexData = new byte[buffer.remaining()]; + buffer.get(indexData); + baos.write(indexData); + + // Write the protobuf data + baos.write(product.toByteArray()); + + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + /** + * Serializes a protobuf product with Glue magic byte. + * Format: [1][protobuf_data] where 1 is the magic byte + */ + private static String serializeWithGlueMagicByte(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); + + // Write Glue magic byte (single UInt32) + codedOutput.writeUInt32NoTag(1); + + // Write the protobuf data + product.writeTo(codedOutput); + + codedOutput.flush(); + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + private static String serializeAndEncodeInteger(Integer value) { + // For simple types like integers, we'll just convert to string and encode + return Base64.getEncoder().encodeToString(value.toString().getBytes()); + } + + private static void printSampleEvent(String key, String standardProduct, String confluentSimpleProduct, + String confluentComplexProduct, String glueProduct) { + System.out.println("{\n" + + " \"eventSource\": \"aws:kafka\",\n" + + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" + + + " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" + + + " \"records\": {\n" + + " \"mytopic-0\": [\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + standardProduct + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 16,\n" + + " \"timestamp\": 1545084650988,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + confluentSimpleProduct + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"123\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 17,\n" + + " \"timestamp\": 1545084650989,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": null,\n" + + " \"value\": \"" + confluentComplexProduct + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"456\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 18,\n" + + " \"timestamp\": 1545084650990,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + glueProduct + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"); + } +} From 9b2e860f7eea4d649a26b312a466732dcf8e9e7e Mon Sep 17 00:00:00 2001 From: Santiago Date: Sat, 29 Nov 2025 11:39:44 -0500 Subject: [PATCH 2/3] docs(examples): add SAM template with S3 and SQS resources Signed-off-by: Santiago --- examples/pom.xml | 1 + .../pom.xml | 22 ++--- .../template.yaml | 82 ++++++++++--------- 3 files changed, 50 insertions(+), 55 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index 511cdd5a9..1fdbd85aa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -45,6 +45,7 @@ powertools-examples-batch powertools-examples-validation powertools-examples-cloudformation + powertools-examples-large-messages diff --git a/examples/powertools-examples-large-messages/pom.xml b/examples/powertools-examples-large-messages/pom.xml index f4b75722c..b96a6fd6b 100644 --- a/examples/powertools-examples-large-messages/pom.xml +++ b/examples/powertools-examples-large-messages/pom.xml @@ -3,9 +3,9 @@ 4.0.0 software.amazon.lambda.examples 2.8.0 - powertools-examples-kafka + powertools-examples-large-messages jar - Powertools for AWS Lambda (Java) - Examples - Kafka + Powertools for AWS Lambda (Java) - Examples - Large Messages 11 @@ -18,23 +18,13 @@ software.amazon.lambda - powertools-kafka + powertools-large-messages ${project.version} - org.apache.kafka - kafka-clients - 4.1.1 - - - org.apache.avro - avro - ${avro.version} - - - com.google.protobuf - protobuf-java - ${protobuf.version} + com.amazonaws + amazon-sqs-java-extended-client-lib + 2.1.1 diff --git a/examples/powertools-examples-large-messages/template.yaml b/examples/powertools-examples-large-messages/template.yaml index 509b13ca3..a55991271 100644 --- a/examples/powertools-examples-large-messages/template.yaml +++ b/examples/powertools-examples-large-messages/template.yaml @@ -1,59 +1,63 @@ AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: > - Kafka Deserialization example with Kafka Lambda ESM + Large Message Handling Example using SQS and S3 offloading Globals: Function: - Timeout: 20 + Timeout: 30 Runtime: java11 MemorySize: 512 Tracing: Active + Environment: + Variables: + JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" + POWERTOOLS_LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: LargeMessageExample Resources: - JsonDeserializationFunction: - Type: AWS::Serverless::Function + LargeMessageBucket: + Type: AWS::S3::Bucket Properties: - CodeUri: . - Handler: org.demo.kafka.JsonDeserializationFunction::handleRequest - Environment: - Variables: - JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" - POWERTOOLS_LOG_LEVEL: DEBUG - POWERTOOLS_SERVICE_NAME: JsonDeserialization - POWERTOOLS_METRICS_NAMESPACE: JsonDeserializationFunction - - AvroDeserializationFunction: - Type: AWS::Serverless::Function + LifecycleConfiguration: + Rules: + - Id: DeleteOldMessages + Status: Enabled + ExpirationInDays: 1 + MyLargeMessageQueue: + Type: AWS::SQS::Queue Properties: - CodeUri: . - Handler: org.demo.kafka.AvroDeserializationFunction::handleRequest - Environment: - Variables: - JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" - POWERTOOLS_LOG_LEVEL: DEBUG - POWERTOOLS_SERVICE_NAME: AvroDeserialization - POWERTOOLS_METRICS_NAMESPACE: AvroDeserializationFunction - - ProtobufDeserializationFunction: + VisibilityTimeout: 30 + LargeMessageProcessingFunction: Type: AWS::Serverless::Function Properties: CodeUri: . - Handler: org.demo.kafka.ProtobufDeserializationFunction::handleRequest + # IMPORTANTE: Asegúrate que esto coincida con tu paquete y clase Java + Handler: helloworld.App::handleRequest + + Policies: + - S3CrudPolicy: + BucketName: !Ref LargeMessageBucket + - SQSPollerPolicy: + QueueName: !GetAtt MyLargeMessageQueue.QueueName + Environment: Variables: - JAVA_TOOL_OPTIONS: "-XX:+TieredCompilation -XX:TieredStopAtLevel=1" - POWERTOOLS_LOG_LEVEL: DEBUG - POWERTOOLS_SERVICE_NAME: ProtobufDeserialization - POWERTOOLS_METRICS_NAMESPACE: ProtobufDeserializationFunction + POWERTOOLS_LARGE_MESSAGES_BUCKET: !Ref LargeMessageBucket + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt MyLargeMessageQueue.Arn + BatchSize: 1 Outputs: - JsonFunction: - Description: "Kafka JSON Lambda Function ARN" - Value: !GetAtt JsonDeserializationFunction.Arn - AvroFunction: - Description: "Kafka Avro Lambda Function ARN" - Value: !GetAtt AvroDeserializationFunction.Arn - ProtobufFunction: - Description: "Kafka Protobuf Lambda Function ARN" - Value: !GetAtt ProtobufDeserializationFunction.Arn + LargeMessageBucketName: + Description: "S3 Bucket for large payloads" + Value: !Ref LargeMessageBucket + QueueURL: + Description: "SQS Queue URL" + Value: !Ref MyLargeMessageQueue + FunctionArn: + Description: "Lambda Function ARN" + Value: !GetAtt LargeMessageProcessingFunction.Arn From 0be4bd8ad6d38fd062f4217008c08aaa599e98bb Mon Sep 17 00:00:00 2001 From: Santiago Date: Sat, 29 Nov 2025 14:45:28 -0500 Subject: [PATCH 3/3] docs(examples): complete large-messages example implementation Signed-off-by: Santiago --- .../README.md | 86 +-- .../pom.xml | 179 +---- .../src/main/java/helloworld/App.java | 47 ++ .../src/main/resources/log4j2.xml | 16 + .../template.yaml | 1 - .../tools/README.md | 73 -- .../tools/pom.xml | 110 --- .../java/org/demo/kafka/avro/AvroProduct.java | 476 ------------- .../demo/kafka/protobuf/ProtobufProduct.java | 636 ------------------ .../protobuf/ProtobufProductOrBuilder.java | 36 - .../protobuf/ProtobufProductOuterClass.java | 63 -- .../demo/kafka/tools/GenerateAvroSamples.java | 127 ---- .../demo/kafka/tools/GenerateJsonSamples.java | 130 ---- .../kafka/tools/GenerateProtobufSamples.java | 215 ------ 14 files changed, 116 insertions(+), 2079 deletions(-) create mode 100644 examples/powertools-examples-large-messages/src/main/java/helloworld/App.java create mode 100644 examples/powertools-examples-large-messages/src/main/resources/log4j2.xml delete mode 100644 examples/powertools-examples-large-messages/tools/README.md delete mode 100644 examples/powertools-examples-large-messages/tools/pom.xml delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java delete mode 100644 examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java diff --git a/examples/powertools-examples-large-messages/README.md b/examples/powertools-examples-large-messages/README.md index 76cd81cb9..528ad2c5c 100644 --- a/examples/powertools-examples-large-messages/README.md +++ b/examples/powertools-examples-large-messages/README.md @@ -1,77 +1,27 @@ -# Powertools for AWS Lambda (Java) - Kafka Example +# Powertools for AWS Lambda (Java) - Large Messages Example -This project demonstrates how to use Powertools for AWS Lambda (Java) to deserialize Kafka Lambda events directly into strongly typed Kafka ConsumerRecords using different serialization formats. +This project contains an example of a Lambda function using the **Large Messages** module of Powertools for AWS Lambda (Java). For more information on this module, please refer to the [documentation](https://docs.powertools.aws.dev/lambda-java/utilities/large_messages/). -## Overview +The example demonstrates an SQS listener that processes messages using the `LargeMessages` functional utility. It handles the retrieval of large payloads offloaded to S3 automatically. -The example showcases automatic deserialization of Kafka Lambda events into ConsumerRecords using three formats: -- JSON - Using standard JSON serialization -- Avro - Using Apache Avro schema-based serialization -- Protobuf - Using Google Protocol Buffers serialization +## Deploy the sample application -Each format has its own Lambda function handler that demonstrates how to use the `@Deserialization` annotation with the appropriate `DeserializationType`, eliminating the need to handle complex deserialization logic manually. +This sample is based on Serverless Application Model (SAM). To deploy it, check out the instructions for getting +started with SAM in [the examples directory](../README.md). -## Build and Deploy +## Test the application -### Prerequisites -- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) -- Java 11+ -- Maven +Since this function is triggered by an SQS Queue, you can test it by sending a message to the queue created by the SAM template. -### Build +1. **Find your Queue URL:** + Run the following command (replacing `LargeMessageExample` with the name of your deployed stack): + ```bash + aws cloudformation describe-stacks --stack-name LargeMessageExample --query "Stacks[0].Outputs[?OutputKey=='QueueURL'].OutputValue" --output text -```bash -# Build the application -sam build -``` +2. **Send a Test Message:** + Note: To test the actual "Large Message" functionality (payload offloading), you would typically use the SQS Extended Client in a producer application. However, you can verify the Lambda trigger with a standard message: + ```bash + aws sqs send-message --queue-url [YOUR_QUEUE_URL] --message-body '{"message": "Hello from CLI"}' -### Deploy - -```bash -# Deploy the application to AWS -sam deploy --guided -``` - -During the guided deployment, you'll be prompted to provide values for required parameters. After deployment, SAM will output the ARNs of the deployed Lambda functions. - -### Build with Different Serialization Formats - -The project includes Maven profiles to build with different serialization formats: - -```bash -# Build with JSON only (no Avro or Protobuf) -mvn clean package -P base - -# Build with Avro only -mvn clean package -P avro-only - -# Build with Protobuf only -mvn clean package -P protobuf-only - -# Build with all formats (default) -mvn clean package -P full -``` - -## Testing - -The `events` directory contains sample events for each serialization format: -- `kafka-json-event.json` - Sample event with JSON-serialized products -- `kafka-avro-event.json` - Sample event with Avro-serialized products -- `kafka-protobuf-event.json` - Sample event with Protobuf-serialized products - -You can use these events to test the Lambda functions: - -```bash -# Test the JSON deserialization function -sam local invoke JsonDeserializationFunction --event events/kafka-json-event.json - -# Test the Avro deserialization function -sam local invoke AvroDeserializationFunction --event events/kafka-avro-event.json - -# Test the Protobuf deserialization function -sam local invoke ProtobufDeserializationFunction --event events/kafka-protobuf-event.json -``` - -## Sample Generator Tool - -The project includes a tool to generate sample JSON, Avro, and Protobuf serialized data. See the [tools/README.md](tools/README.md) for more information. \ No newline at end of file +3. **Verify Logs:** + Go to AWS CloudWatch Logs and check the Log Group for your function. You should see the processed message logged by the application. \ No newline at end of file diff --git a/examples/powertools-examples-large-messages/pom.xml b/examples/powertools-examples-large-messages/pom.xml index b96a6fd6b..9b2c92f0a 100644 --- a/examples/powertools-examples-large-messages/pom.xml +++ b/examples/powertools-examples-large-messages/pom.xml @@ -1,18 +1,18 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 + software.amazon.lambda.examples - 2.8.0 powertools-examples-large-messages - jar + 2.8.0 Powertools for AWS Lambda (Java) - Examples - Large Messages 11 11 - 1.9.20.1 - 1.12.1 - 4.33.0 + 2.21.1 + 1.9.21 + 2.1.1 @@ -21,13 +21,19 @@ powertools-large-messages ${project.version} + com.amazonaws amazon-sqs-java-extended-client-lib - 2.1.1 + ${amazon-sqs-java-extended-client-lib.version} + + + + com.amazonaws + aws-lambda-java-events + 3.11.4 - software.amazon.lambda powertools-logging-log4j @@ -42,7 +48,6 @@ - org.apache.maven.plugins maven-deploy-plugin @@ -51,37 +56,11 @@ true - - org.apache.maven.plugins - maven-shade-plugin - 3.6.1 - - - package - - shade - - - false - - - - - - - - - org.apache.logging.log4j - log4j-transform-maven-shade-plugin-extensions - 0.2.0 - - - + dev.aspectj aspectj-maven-plugin - 1.14.1 + 1.14 ${maven.compiler.source} ${maven.compiler.target} @@ -100,123 +79,35 @@ - - - org.aspectj - aspectjtools - ${aspectj.version} - - - - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - schema - - - ${project.basedir}/src/main/avro/ - ${project.basedir}/src/main/java/ - String - - - - - + - io.github.ascopes - protobuf-maven-plugin - 3.10.3 + org.apache.maven.plugins + maven-shade-plugin + 3.6.1 + package - generate + shade - generate-sources - ${protobuf.version} - - ${project.basedir}/src/main/proto - - ${project.basedir}/src/main/java - false + false + + + + + + org.apache.logging.log4j + log4j-transform-maven-shade-plugin-extensions + 0.1.0 + + - - - - - base - - base - - - - - org.apache.avro - avro - ${avro.version} - provided - - - com.google.protobuf - protobuf-java - ${protobuf.version} - provided - - - - - - - avro-only - - avro-only - - - - com.google.protobuf - protobuf-java - ${protobuf.version} - provided - - - - - - - protobuf-only - - protobuf-only - - - - org.apache.avro - avro - ${avro.version} - provided - - - - - - - full - - true - - - full - - - - + \ No newline at end of file diff --git a/examples/powertools-examples-large-messages/src/main/java/helloworld/App.java b/examples/powertools-examples-large-messages/src/main/java/helloworld/App.java new file mode 100644 index 000000000..275d0ad3c --- /dev/null +++ b/examples/powertools-examples-large-messages/src/main/java/helloworld/App.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package helloworld; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.lambda.powertools.largemessages.LargeMessages; + +/** + * Example handler showing how to use LargeMessageProcessor functionally. + * This approach gives you more control than the @LargeMessage annotation. + */ +public final class App implements RequestHandler { + + private static final Logger LOG = LogManager.getLogger(App.class); + + @Override + public String handleRequest(final SQSEvent event, final Context context) { + LOG.info("Received event with {} records", event.getRecords().size()); + + for (SQSMessage message : event.getRecords()) { + LargeMessages.processLargeMessage(message, (processedMessage) -> { + LOG.info("Processing message ID: {}", processedMessage.getMessageId()); + LOG.info("Processing body content: {}", processedMessage.getBody()); + return "Processed"; + }); + } + + return "SUCCESS"; + } +} diff --git a/examples/powertools-examples-large-messages/src/main/resources/log4j2.xml b/examples/powertools-examples-large-messages/src/main/resources/log4j2.xml new file mode 100644 index 000000000..5dede7b58 --- /dev/null +++ b/examples/powertools-examples-large-messages/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/examples/powertools-examples-large-messages/template.yaml b/examples/powertools-examples-large-messages/template.yaml index a55991271..4fc3368a6 100644 --- a/examples/powertools-examples-large-messages/template.yaml +++ b/examples/powertools-examples-large-messages/template.yaml @@ -32,7 +32,6 @@ Resources: Type: AWS::Serverless::Function Properties: CodeUri: . - # IMPORTANTE: Asegúrate que esto coincida con tu paquete y clase Java Handler: helloworld.App::handleRequest Policies: diff --git a/examples/powertools-examples-large-messages/tools/README.md b/examples/powertools-examples-large-messages/tools/README.md deleted file mode 100644 index 02e8dde9b..000000000 --- a/examples/powertools-examples-large-messages/tools/README.md +++ /dev/null @@ -1,73 +0,0 @@ -# Kafka Sample Generator Tool - -This tool generates base64-encoded serialized products for testing the Kafka consumer functions with different serialization formats. - -## Supported Formats - -- **JSON**: Generates base64-encoded JSON serialized products -- **Avro**: Generates base64-encoded Avro serialized products -- **Protobuf**: Generates base64-encoded Protobuf serialized products - -## Usage - -Run the following Maven commands from this directory: - -```bash -# Generate Avro and Protobuf classes from schemas -mvn generate-sources - -# Compile the code -mvn compile -``` - -### Generate JSON Samples - -```bash -# Run the JSON sample generator -mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateJsonSamples" -``` - -The tool will output base64-encoded values for JSON products that can be used in `../events/kafka-json-event.json`. - -### Generate Avro Samples - -```bash -# Run the Avro sample generator -mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateAvroSamples" -``` - -The tool will output base64-encoded values for Avro products that can be used in `../events/kafka-avro-event.json`. - -### Generate Protobuf Samples - -```bash -# Run the Protobuf sample generator -mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples" -``` - -The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios. - -## Output - -Each generator produces: - -1. Three different products (Laptop, Smartphone, Headphones) -2. An integer key (42) and one entry with a nullish key to test for edge-cases -3. A complete sample event structure that can be used directly for testing - -The Protobuf generators additionally create samples with different Confluent message-index formats: -- Standard protobuf (no message indexes) -- Simple message index (single 0 byte) -- Complex message index (length-prefixed array) - -For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format). - -## Example - -After generating the samples, you can copy the output into the respective event files: - -- `../events/kafka-json-event.json` for JSON samples -- `../events/kafka-avro-event.json` for Avro samples -- `../events/kafka-protobuf-event.json` for Protobuf samples - -These event files can then be used to test the Lambda functions with the appropriate deserializer. diff --git a/examples/powertools-examples-large-messages/tools/pom.xml b/examples/powertools-examples-large-messages/tools/pom.xml deleted file mode 100644 index e6f2654d1..000000000 --- a/examples/powertools-examples-large-messages/tools/pom.xml +++ /dev/null @@ -1,110 +0,0 @@ - - - 4.0.0 - - software.amazon.lambda.examples - powertools-examples-kafka-tools - 2.0.0 - - - 11 - 11 - 1.12.0 - 4.31.0 - 4.0.0 - - - - - org.apache.avro - avro - ${avro.version} - - - com.google.protobuf - protobuf-java - ${protobuf.version} - - - org.apache.kafka - kafka-clients - ${kafka-clients.version} - - - com.fasterxml.jackson.core - jackson-databind - 2.19.0 - - - - - - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - schema - - - ${project.basedir}/../src/main/avro/ - ${project.basedir}/src/main/java/ - String - - - - - - io.github.ascopes - protobuf-maven-plugin - 3.10.2 - - - - generate - - generate-sources - - ${protobuf.version} - - ${project.basedir}/../src/main/proto - - ${project.basedir}/src/main/java - false - - - - - - org.codehaus.mojo - exec-maven-plugin - 3.1.0 - - - generate-json-samples - - org.demo.kafka.tools.GenerateJsonSamples - - - - generate-avro-samples - - org.demo.kafka.tools.GenerateAvroSamples - - - - generate-protobuf-samples - - org.demo.kafka.tools.GenerateProtobufSamples - - - - - - - diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java deleted file mode 100644 index fad7e2fbf..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/avro/AvroProduct.java +++ /dev/null @@ -1,476 +0,0 @@ -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.demo.kafka.avro; - -import org.apache.avro.specific.SpecificData; -import org.apache.avro.util.Utf8; -import org.apache.avro.message.BinaryMessageEncoder; -import org.apache.avro.message.BinaryMessageDecoder; -import org.apache.avro.message.SchemaStore; - -@org.apache.avro.specific.AvroGenerated -public class AvroProduct extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = -2929699301240218341L; - - - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroProduct\",\"namespace\":\"org.demo.kafka.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"price\",\"type\":\"double\"}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - - private static final SpecificData MODEL$ = new SpecificData(); - - private static final BinaryMessageEncoder ENCODER = - new BinaryMessageEncoder<>(MODEL$, SCHEMA$); - - private static final BinaryMessageDecoder DECODER = - new BinaryMessageDecoder<>(MODEL$, SCHEMA$); - - /** - * Return the BinaryMessageEncoder instance used by this class. - * @return the message encoder used by this class - */ - public static BinaryMessageEncoder getEncoder() { - return ENCODER; - } - - /** - * Return the BinaryMessageDecoder instance used by this class. - * @return the message decoder used by this class - */ - public static BinaryMessageDecoder getDecoder() { - return DECODER; - } - - /** - * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. - * @param resolver a {@link SchemaStore} used to find schemas by fingerprint - * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore - */ - public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { - return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); - } - - /** - * Serializes this AvroProduct to a ByteBuffer. - * @return a buffer holding the serialized data for this instance - * @throws java.io.IOException if this instance could not be serialized - */ - public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { - return ENCODER.encode(this); - } - - /** - * Deserializes a AvroProduct from a ByteBuffer. - * @param b a byte buffer holding serialized data for an instance of this class - * @return a AvroProduct instance decoded from the given buffer - * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class - */ - public static AvroProduct fromByteBuffer( - java.nio.ByteBuffer b) throws java.io.IOException { - return DECODER.decode(b); - } - - private int id; - private java.lang.String name; - private double price; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use newBuilder(). - */ - public AvroProduct() {} - - /** - * All-args constructor. - * @param id The new value for id - * @param name The new value for name - * @param price The new value for price - */ - public AvroProduct(java.lang.Integer id, java.lang.String name, java.lang.Double price) { - this.id = id; - this.name = name; - this.price = price; - } - - @Override - public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } - - @Override - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - - // Used by DatumWriter. Applications should not call. - @Override - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return id; - case 1: return name; - case 2: return price; - default: throw new IndexOutOfBoundsException("Invalid index: " + field$); - } - } - - // Used by DatumReader. Applications should not call. - @Override - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: id = (java.lang.Integer)value$; break; - case 1: name = value$ != null ? value$.toString() : null; break; - case 2: price = (java.lang.Double)value$; break; - default: throw new IndexOutOfBoundsException("Invalid index: " + field$); - } - } - - /** - * Gets the value of the 'id' field. - * @return The value of the 'id' field. - */ - public int getId() { - return id; - } - - - /** - * Sets the value of the 'id' field. - * @param value the value to set. - */ - public void setId(int value) { - this.id = value; - } - - /** - * Gets the value of the 'name' field. - * @return The value of the 'name' field. - */ - public java.lang.String getName() { - return name; - } - - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.String value) { - this.name = value; - } - - /** - * Gets the value of the 'price' field. - * @return The value of the 'price' field. - */ - public double getPrice() { - return price; - } - - - /** - * Sets the value of the 'price' field. - * @param value the value to set. - */ - public void setPrice(double value) { - this.price = value; - } - - /** - * Creates a new AvroProduct RecordBuilder. - * @return A new AvroProduct RecordBuilder - */ - public static org.demo.kafka.avro.AvroProduct.Builder newBuilder() { - return new org.demo.kafka.avro.AvroProduct.Builder(); - } - - /** - * Creates a new AvroProduct RecordBuilder by copying an existing Builder. - * @param other The existing builder to copy. - * @return A new AvroProduct RecordBuilder - */ - public static org.demo.kafka.avro.AvroProduct.Builder newBuilder(org.demo.kafka.avro.AvroProduct.Builder other) { - if (other == null) { - return new org.demo.kafka.avro.AvroProduct.Builder(); - } else { - return new org.demo.kafka.avro.AvroProduct.Builder(other); - } - } - - /** - * Creates a new AvroProduct RecordBuilder by copying an existing AvroProduct instance. - * @param other The existing instance to copy. - * @return A new AvroProduct RecordBuilder - */ - public static org.demo.kafka.avro.AvroProduct.Builder newBuilder(org.demo.kafka.avro.AvroProduct other) { - if (other == null) { - return new org.demo.kafka.avro.AvroProduct.Builder(); - } else { - return new org.demo.kafka.avro.AvroProduct.Builder(other); - } - } - - /** - * RecordBuilder for AvroProduct instances. - */ - @org.apache.avro.specific.AvroGenerated - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase - implements org.apache.avro.data.RecordBuilder { - - private int id; - private java.lang.String name; - private double price; - - /** Creates a new Builder */ - private Builder() { - super(SCHEMA$, MODEL$); - } - - /** - * Creates a Builder by copying an existing Builder. - * @param other The existing Builder to copy. - */ - private Builder(org.demo.kafka.avro.AvroProduct.Builder other) { - super(other); - if (isValidValue(fields()[0], other.id)) { - this.id = data().deepCopy(fields()[0].schema(), other.id); - fieldSetFlags()[0] = other.fieldSetFlags()[0]; - } - if (isValidValue(fields()[1], other.name)) { - this.name = data().deepCopy(fields()[1].schema(), other.name); - fieldSetFlags()[1] = other.fieldSetFlags()[1]; - } - if (isValidValue(fields()[2], other.price)) { - this.price = data().deepCopy(fields()[2].schema(), other.price); - fieldSetFlags()[2] = other.fieldSetFlags()[2]; - } - } - - /** - * Creates a Builder by copying an existing AvroProduct instance - * @param other The existing instance to copy. - */ - private Builder(org.demo.kafka.avro.AvroProduct other) { - super(SCHEMA$, MODEL$); - if (isValidValue(fields()[0], other.id)) { - this.id = data().deepCopy(fields()[0].schema(), other.id); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.name)) { - this.name = data().deepCopy(fields()[1].schema(), other.name); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.price)) { - this.price = data().deepCopy(fields()[2].schema(), other.price); - fieldSetFlags()[2] = true; - } - } - - /** - * Gets the value of the 'id' field. - * @return The value. - */ - public int getId() { - return id; - } - - - /** - * Sets the value of the 'id' field. - * @param value The value of 'id'. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder setId(int value) { - validate(fields()[0], value); - this.id = value; - fieldSetFlags()[0] = true; - return this; - } - - /** - * Checks whether the 'id' field has been set. - * @return True if the 'id' field has been set, false otherwise. - */ - public boolean hasId() { - return fieldSetFlags()[0]; - } - - - /** - * Clears the value of the 'id' field. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder clearId() { - fieldSetFlags()[0] = false; - return this; - } - - /** - * Gets the value of the 'name' field. - * @return The value. - */ - public java.lang.String getName() { - return name; - } - - - /** - * Sets the value of the 'name' field. - * @param value The value of 'name'. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder setName(java.lang.String value) { - validate(fields()[1], value); - this.name = value; - fieldSetFlags()[1] = true; - return this; - } - - /** - * Checks whether the 'name' field has been set. - * @return True if the 'name' field has been set, false otherwise. - */ - public boolean hasName() { - return fieldSetFlags()[1]; - } - - - /** - * Clears the value of the 'name' field. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder clearName() { - name = null; - fieldSetFlags()[1] = false; - return this; - } - - /** - * Gets the value of the 'price' field. - * @return The value. - */ - public double getPrice() { - return price; - } - - - /** - * Sets the value of the 'price' field. - * @param value The value of 'price'. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder setPrice(double value) { - validate(fields()[2], value); - this.price = value; - fieldSetFlags()[2] = true; - return this; - } - - /** - * Checks whether the 'price' field has been set. - * @return True if the 'price' field has been set, false otherwise. - */ - public boolean hasPrice() { - return fieldSetFlags()[2]; - } - - - /** - * Clears the value of the 'price' field. - * @return This builder. - */ - public org.demo.kafka.avro.AvroProduct.Builder clearPrice() { - fieldSetFlags()[2] = false; - return this; - } - - @Override - @SuppressWarnings("unchecked") - public AvroProduct build() { - try { - AvroProduct record = new AvroProduct(); - record.id = fieldSetFlags()[0] ? this.id : (java.lang.Integer) defaultValue(fields()[0]); - record.name = fieldSetFlags()[1] ? this.name : (java.lang.String) defaultValue(fields()[1]); - record.price = fieldSetFlags()[2] ? this.price : (java.lang.Double) defaultValue(fields()[2]); - return record; - } catch (org.apache.avro.AvroMissingFieldException e) { - throw e; - } catch (java.lang.Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } - - @SuppressWarnings("unchecked") - private static final org.apache.avro.io.DatumWriter - WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); - - @Override public void writeExternal(java.io.ObjectOutput out) - throws java.io.IOException { - WRITER$.write(this, SpecificData.getEncoder(out)); - } - - @SuppressWarnings("unchecked") - private static final org.apache.avro.io.DatumReader - READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); - - @Override public void readExternal(java.io.ObjectInput in) - throws java.io.IOException { - READER$.read(this, SpecificData.getDecoder(in)); - } - - @Override protected boolean hasCustomCoders() { return true; } - - @Override public void customEncode(org.apache.avro.io.Encoder out) - throws java.io.IOException - { - out.writeInt(this.id); - - out.writeString(this.name); - - out.writeDouble(this.price); - - } - - @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) - throws java.io.IOException - { - org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); - if (fieldOrder == null) { - this.id = in.readInt(); - - this.name = in.readString(); - - this.price = in.readDouble(); - - } else { - for (int i = 0; i < 3; i++) { - switch (fieldOrder[i].pos()) { - case 0: - this.id = in.readInt(); - break; - - case 1: - this.name = in.readString(); - break; - - case 2: - this.price = in.readDouble(); - break; - - default: - throw new java.io.IOException("Corrupt ResolvingDecoder."); - } - } - } - } -} - - - - - - - - - - diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java deleted file mode 100644 index 6da9113fc..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java +++ /dev/null @@ -1,636 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// NO CHECKED-IN PROTOBUF GENCODE -// source: ProtobufProduct.proto -// Protobuf Java Version: 4.31.0 - -package org.demo.kafka.protobuf; - -/** - * Protobuf type {@code org.demo.kafka.protobuf.ProtobufProduct} - */ -@com.google.protobuf.Generated -public final class ProtobufProduct extends - com.google.protobuf.GeneratedMessage implements - // @@protoc_insertion_point(message_implements:org.demo.kafka.protobuf.ProtobufProduct) - ProtobufProductOrBuilder { -private static final long serialVersionUID = 0L; - static { - com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( - com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, - /* major= */ 4, - /* minor= */ 31, - /* patch= */ 0, - /* suffix= */ "", - ProtobufProduct.class.getName()); - } - // Use ProtobufProduct.newBuilder() to construct. - private ProtobufProduct(com.google.protobuf.GeneratedMessage.Builder builder) { - super(builder); - } - private ProtobufProduct() { - name_ = ""; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; - } - - @java.lang.Override - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.demo.kafka.protobuf.ProtobufProduct.class, org.demo.kafka.protobuf.ProtobufProduct.Builder.class); - } - - public static final int ID_FIELD_NUMBER = 1; - private int id_ = 0; - /** - * int32 id = 1; - * @return The id. - */ - @java.lang.Override - public int getId() { - return id_; - } - - public static final int NAME_FIELD_NUMBER = 2; - @SuppressWarnings("serial") - private volatile java.lang.Object name_ = ""; - /** - * string name = 2; - * @return The name. - */ - @java.lang.Override - public java.lang.String getName() { - java.lang.Object ref = name_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - name_ = s; - return s; - } - } - /** - * string name = 2; - * @return The bytes for name. - */ - @java.lang.Override - public com.google.protobuf.ByteString - getNameBytes() { - java.lang.Object ref = name_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - name_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int PRICE_FIELD_NUMBER = 3; - private double price_ = 0D; - /** - * double price = 3; - * @return The price. - */ - @java.lang.Override - public double getPrice() { - return price_; - } - - private byte memoizedIsInitialized = -1; - @java.lang.Override - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized == 1) return true; - if (isInitialized == 0) return false; - - memoizedIsInitialized = 1; - return true; - } - - @java.lang.Override - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - if (id_ != 0) { - output.writeInt32(1, id_); - } - if (!com.google.protobuf.GeneratedMessage.isStringEmpty(name_)) { - com.google.protobuf.GeneratedMessage.writeString(output, 2, name_); - } - if (java.lang.Double.doubleToRawLongBits(price_) != 0) { - output.writeDouble(3, price_); - } - getUnknownFields().writeTo(output); - } - - @java.lang.Override - public int getSerializedSize() { - int size = memoizedSize; - if (size != -1) return size; - - size = 0; - if (id_ != 0) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(1, id_); - } - if (!com.google.protobuf.GeneratedMessage.isStringEmpty(name_)) { - size += com.google.protobuf.GeneratedMessage.computeStringSize(2, name_); - } - if (java.lang.Double.doubleToRawLongBits(price_) != 0) { - size += com.google.protobuf.CodedOutputStream - .computeDoubleSize(3, price_); - } - size += getUnknownFields().getSerializedSize(); - memoizedSize = size; - return size; - } - - @java.lang.Override - public boolean equals(final java.lang.Object obj) { - if (obj == this) { - return true; - } - if (!(obj instanceof org.demo.kafka.protobuf.ProtobufProduct)) { - return super.equals(obj); - } - org.demo.kafka.protobuf.ProtobufProduct other = (org.demo.kafka.protobuf.ProtobufProduct) obj; - - if (getId() - != other.getId()) return false; - if (!getName() - .equals(other.getName())) return false; - if (java.lang.Double.doubleToLongBits(getPrice()) - != java.lang.Double.doubleToLongBits( - other.getPrice())) return false; - if (!getUnknownFields().equals(other.getUnknownFields())) return false; - return true; - } - - @java.lang.Override - public int hashCode() { - if (memoizedHashCode != 0) { - return memoizedHashCode; - } - int hash = 41; - hash = (19 * hash) + getDescriptor().hashCode(); - hash = (37 * hash) + ID_FIELD_NUMBER; - hash = (53 * hash) + getId(); - hash = (37 * hash) + NAME_FIELD_NUMBER; - hash = (53 * hash) + getName().hashCode(); - hash = (37 * hash) + PRICE_FIELD_NUMBER; - hash = (53 * hash) + com.google.protobuf.Internal.hashLong( - java.lang.Double.doubleToLongBits(getPrice())); - hash = (29 * hash) + getUnknownFields().hashCode(); - memoizedHashCode = hash; - return hash; - } - - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - java.nio.ByteBuffer data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - java.nio.ByteBuffer data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom(java.io.InputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseWithIOException(PARSER, input); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseWithIOException(PARSER, input, extensionRegistry); - } - - public static org.demo.kafka.protobuf.ProtobufProduct parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseDelimitedWithIOException(PARSER, input); - } - - public static org.demo.kafka.protobuf.ProtobufProduct parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseDelimitedWithIOException(PARSER, input, extensionRegistry); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseWithIOException(PARSER, input); - } - public static org.demo.kafka.protobuf.ProtobufProduct parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessage - .parseWithIOException(PARSER, input, extensionRegistry); - } - - @java.lang.Override - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder() { - return DEFAULT_INSTANCE.toBuilder(); - } - public static Builder newBuilder(org.demo.kafka.protobuf.ProtobufProduct prototype) { - return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); - } - @java.lang.Override - public Builder toBuilder() { - return this == DEFAULT_INSTANCE - ? new Builder() : new Builder().mergeFrom(this); - } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - /** - * Protobuf type {@code org.demo.kafka.protobuf.ProtobufProduct} - */ - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder implements - // @@protoc_insertion_point(builder_implements:org.demo.kafka.protobuf.ProtobufProduct) - org.demo.kafka.protobuf.ProtobufProductOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; - } - - @java.lang.Override - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.demo.kafka.protobuf.ProtobufProduct.class, org.demo.kafka.protobuf.ProtobufProduct.Builder.class); - } - - // Construct using org.demo.kafka.protobuf.ProtobufProduct.newBuilder() - private Builder() { - - } - - private Builder( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - - } - @java.lang.Override - public Builder clear() { - super.clear(); - bitField0_ = 0; - id_ = 0; - name_ = ""; - price_ = 0D; - return this; - } - - @java.lang.Override - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; - } - - @java.lang.Override - public org.demo.kafka.protobuf.ProtobufProduct getDefaultInstanceForType() { - return org.demo.kafka.protobuf.ProtobufProduct.getDefaultInstance(); - } - - @java.lang.Override - public org.demo.kafka.protobuf.ProtobufProduct build() { - org.demo.kafka.protobuf.ProtobufProduct result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - @java.lang.Override - public org.demo.kafka.protobuf.ProtobufProduct buildPartial() { - org.demo.kafka.protobuf.ProtobufProduct result = new org.demo.kafka.protobuf.ProtobufProduct(this); - if (bitField0_ != 0) { buildPartial0(result); } - onBuilt(); - return result; - } - - private void buildPartial0(org.demo.kafka.protobuf.ProtobufProduct result) { - int from_bitField0_ = bitField0_; - if (((from_bitField0_ & 0x00000001) != 0)) { - result.id_ = id_; - } - if (((from_bitField0_ & 0x00000002) != 0)) { - result.name_ = name_; - } - if (((from_bitField0_ & 0x00000004) != 0)) { - result.price_ = price_; - } - } - - @java.lang.Override - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof org.demo.kafka.protobuf.ProtobufProduct) { - return mergeFrom((org.demo.kafka.protobuf.ProtobufProduct)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(org.demo.kafka.protobuf.ProtobufProduct other) { - if (other == org.demo.kafka.protobuf.ProtobufProduct.getDefaultInstance()) return this; - if (other.getId() != 0) { - setId(other.getId()); - } - if (!other.getName().isEmpty()) { - name_ = other.name_; - bitField0_ |= 0x00000002; - onChanged(); - } - if (java.lang.Double.doubleToRawLongBits(other.getPrice()) != 0) { - setPrice(other.getPrice()); - } - this.mergeUnknownFields(other.getUnknownFields()); - onChanged(); - return this; - } - - @java.lang.Override - public final boolean isInitialized() { - return true; - } - - @java.lang.Override - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - if (extensionRegistry == null) { - throw new java.lang.NullPointerException(); - } - try { - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - case 8: { - id_ = input.readInt32(); - bitField0_ |= 0x00000001; - break; - } // case 8 - case 18: { - name_ = input.readStringRequireUtf8(); - bitField0_ |= 0x00000002; - break; - } // case 18 - case 25: { - price_ = input.readDouble(); - bitField0_ |= 0x00000004; - break; - } // case 25 - default: { - if (!super.parseUnknownField(input, extensionRegistry, tag)) { - done = true; // was an endgroup tag - } - break; - } // default: - } // switch (tag) - } // while (!done) - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - throw e.unwrapIOException(); - } finally { - onChanged(); - } // finally - return this; - } - private int bitField0_; - - private int id_ ; - /** - * int32 id = 1; - * @return The id. - */ - @java.lang.Override - public int getId() { - return id_; - } - /** - * int32 id = 1; - * @param value The id to set. - * @return This builder for chaining. - */ - public Builder setId(int value) { - - id_ = value; - bitField0_ |= 0x00000001; - onChanged(); - return this; - } - /** - * int32 id = 1; - * @return This builder for chaining. - */ - public Builder clearId() { - bitField0_ = (bitField0_ & ~0x00000001); - id_ = 0; - onChanged(); - return this; - } - - private java.lang.Object name_ = ""; - /** - * string name = 2; - * @return The name. - */ - public java.lang.String getName() { - java.lang.Object ref = name_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - name_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string name = 2; - * @return The bytes for name. - */ - public com.google.protobuf.ByteString - getNameBytes() { - java.lang.Object ref = name_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - name_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string name = 2; - * @param value The name to set. - * @return This builder for chaining. - */ - public Builder setName( - java.lang.String value) { - if (value == null) { throw new NullPointerException(); } - name_ = value; - bitField0_ |= 0x00000002; - onChanged(); - return this; - } - /** - * string name = 2; - * @return This builder for chaining. - */ - public Builder clearName() { - name_ = getDefaultInstance().getName(); - bitField0_ = (bitField0_ & ~0x00000002); - onChanged(); - return this; - } - /** - * string name = 2; - * @param value The bytes for name to set. - * @return This builder for chaining. - */ - public Builder setNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { throw new NullPointerException(); } - checkByteStringIsUtf8(value); - name_ = value; - bitField0_ |= 0x00000002; - onChanged(); - return this; - } - - private double price_ ; - /** - * double price = 3; - * @return The price. - */ - @java.lang.Override - public double getPrice() { - return price_; - } - /** - * double price = 3; - * @param value The price to set. - * @return This builder for chaining. - */ - public Builder setPrice(double value) { - - price_ = value; - bitField0_ |= 0x00000004; - onChanged(); - return this; - } - /** - * double price = 3; - * @return This builder for chaining. - */ - public Builder clearPrice() { - bitField0_ = (bitField0_ & ~0x00000004); - price_ = 0D; - onChanged(); - return this; - } - - // @@protoc_insertion_point(builder_scope:org.demo.kafka.protobuf.ProtobufProduct) - } - - // @@protoc_insertion_point(class_scope:org.demo.kafka.protobuf.ProtobufProduct) - private static final org.demo.kafka.protobuf.ProtobufProduct DEFAULT_INSTANCE; - static { - DEFAULT_INSTANCE = new org.demo.kafka.protobuf.ProtobufProduct(); - } - - public static org.demo.kafka.protobuf.ProtobufProduct getDefaultInstance() { - return DEFAULT_INSTANCE; - } - - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { - @java.lang.Override - public ProtobufProduct parsePartialFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - Builder builder = newBuilder(); - try { - builder.mergeFrom(input, extensionRegistry); - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - throw e.setUnfinishedMessage(builder.buildPartial()); - } catch (com.google.protobuf.UninitializedMessageException e) { - throw e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial()); - } catch (java.io.IOException e) { - throw new com.google.protobuf.InvalidProtocolBufferException(e) - .setUnfinishedMessage(builder.buildPartial()); - } - return builder.buildPartial(); - } - }; - - public static com.google.protobuf.Parser parser() { - return PARSER; - } - - @java.lang.Override - public com.google.protobuf.Parser getParserForType() { - return PARSER; - } - - @java.lang.Override - public org.demo.kafka.protobuf.ProtobufProduct getDefaultInstanceForType() { - return DEFAULT_INSTANCE; - } - -} - diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java deleted file mode 100644 index 9c1518db3..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java +++ /dev/null @@ -1,36 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// NO CHECKED-IN PROTOBUF GENCODE -// source: ProtobufProduct.proto -// Protobuf Java Version: 4.31.0 - -package org.demo.kafka.protobuf; - -@com.google.protobuf.Generated -public interface ProtobufProductOrBuilder extends - // @@protoc_insertion_point(interface_extends:org.demo.kafka.protobuf.ProtobufProduct) - com.google.protobuf.MessageOrBuilder { - - /** - * int32 id = 1; - * @return The id. - */ - int getId(); - - /** - * string name = 2; - * @return The name. - */ - java.lang.String getName(); - /** - * string name = 2; - * @return The bytes for name. - */ - com.google.protobuf.ByteString - getNameBytes(); - - /** - * double price = 3; - * @return The price. - */ - double getPrice(); -} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java deleted file mode 100644 index 6a99f35ec..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java +++ /dev/null @@ -1,63 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// NO CHECKED-IN PROTOBUF GENCODE -// source: ProtobufProduct.proto -// Protobuf Java Version: 4.31.0 - -package org.demo.kafka.protobuf; - -@com.google.protobuf.Generated -public final class ProtobufProductOuterClass { - private ProtobufProductOuterClass() {} - static { - com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( - com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, - /* major= */ 4, - /* minor= */ 31, - /* patch= */ 0, - /* suffix= */ "", - ProtobufProductOuterClass.class.getName()); - } - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistryLite registry) { - } - - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - registerAllExtensions( - (com.google.protobuf.ExtensionRegistryLite) registry); - } - static final com.google.protobuf.Descriptors.Descriptor - internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; - static final - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - private static com.google.protobuf.Descriptors.FileDescriptor - descriptor; - static { - java.lang.String[] descriptorData = { - "\n\025ProtobufProduct.proto\022\027org.demo.kafka." + - "protobuf\":\n\017ProtobufProduct\022\n\n\002id\030\001 \001(\005\022" + - "\014\n\004name\030\002 \001(\t\022\r\n\005price\030\003 \001(\001B6\n\027org.demo" + - ".kafka.protobufB\031ProtobufProductOuterCla" + - "ssP\001b\006proto3" - }; - descriptor = com.google.protobuf.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new com.google.protobuf.Descriptors.FileDescriptor[] { - }); - internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_org_demo_kafka_protobuf_ProtobufProduct_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor, - new java.lang.String[] { "Id", "Name", "Price", }); - descriptor.resolveAllFeaturesImmutable(); - } - - // @@protoc_insertion_point(outer_class_scope) -} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java deleted file mode 100644 index e6f4d38fd..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.demo.kafka.tools; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Base64; - -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumWriter; -import org.demo.kafka.avro.AvroProduct; - -/** - * Utility class to generate base64-encoded Avro serialized products - * for use in test events. - */ -public final class GenerateAvroSamples { - - private GenerateAvroSamples() { - // Utility class - } - - public static void main(String[] args) throws IOException { - // Create three different products - AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99); - AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99); - AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99); - - // Serialize and encode each product - String encodedProduct1 = serializeAndEncode(product1); - String encodedProduct2 = serializeAndEncode(product2); - String encodedProduct3 = serializeAndEncode(product3); - - // Serialize and encode an integer key - String encodedKey = serializeAndEncodeInteger(42); - - // Print the results - System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:"); - System.out.println("\nProduct 1 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct1 + "\","); - - System.out.println("\nProduct 2 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct2 + "\","); - - System.out.println("\nProduct 3 (without key):"); - System.out.println("key: null,"); - System.out.println("value: \"" + encodedProduct3 + "\","); - - // Print a sample event structure - System.out.println("\nSample event structure:"); - printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); - } - - private static String serializeAndEncode(AvroProduct product) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); - DatumWriter writer = new SpecificDatumWriter<>(AvroProduct.class); - - writer.write(product, encoder); - encoder.flush(); - - return Base64.getEncoder().encodeToString(baos.toByteArray()); - } - - private static String serializeAndEncodeInteger(Integer value) throws IOException { - // For simple types like integers, we'll just convert to string and encode - return Base64.getEncoder().encodeToString(value.toString().getBytes()); - } - - private static void printSampleEvent(String key, String product1, String product2, String product3) { - System.out.println("{\n" + - " \"eventSource\": \"aws:kafka\",\n" + - " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" - + - " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" - + - " \"records\": {\n" + - " \"mytopic-0\": [\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 15,\n" + - " \"timestamp\": 1545084650987,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product1 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 16,\n" + - " \"timestamp\": 1545084650988,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product2 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 17,\n" + - " \"timestamp\": 1545084650989,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": null,\n" + - " \"value\": \"" + product3 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " }\n" + - " ]\n" + - " }\n" + - "}"); - } -} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java deleted file mode 100644 index d0ef7cb55..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.demo.kafka.tools; - -import java.io.IOException; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; - -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * Utility class to generate base64-encoded JSON serialized products - * for use in test events. - */ -public final class GenerateJsonSamples { - - private GenerateJsonSamples() { - // Utility class - } - - public static void main(String[] args) throws IOException { - // Create three different products - Map product1 = new HashMap<>(); - product1.put("id", 1001); - product1.put("name", "Laptop"); - product1.put("price", 999.99); - - Map product2 = new HashMap<>(); - product2.put("id", 1002); - product2.put("name", "Smartphone"); - product2.put("price", 599.99); - - Map product3 = new HashMap<>(); - product3.put("id", 1003); - product3.put("name", "Headphones"); - product3.put("price", 149.99); - - // Serialize and encode each product - String encodedProduct1 = serializeAndEncode(product1); - String encodedProduct2 = serializeAndEncode(product2); - String encodedProduct3 = serializeAndEncode(product3); - - // Serialize and encode an integer key - String encodedKey = serializeAndEncodeInteger(42); - - // Print the results - System.out.println("Base64 encoded JSON products for use in kafka-json-event.json:"); - System.out.println("\nProduct 1 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct1 + "\","); - - System.out.println("\nProduct 2 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct2 + "\","); - - System.out.println("\nProduct 3 (without key):"); - System.out.println("key: null,"); - System.out.println("value: \"" + encodedProduct3 + "\","); - - // Print a sample event structure - System.out.println("\nSample event structure:"); - printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); - } - - private static String serializeAndEncode(Map product) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - String json = mapper.writeValueAsString(product); - return Base64.getEncoder().encodeToString(json.getBytes()); - } - - private static String serializeAndEncodeInteger(Integer value) { - // For simple types like integers, we'll just convert to string and encode - return Base64.getEncoder().encodeToString(value.toString().getBytes()); - } - - private static void printSampleEvent(String key, String product1, String product2, String product3) { - System.out.println("{\n" + - " \"eventSource\": \"aws:kafka\",\n" + - " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" - + - " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" - + - " \"records\": {\n" + - " \"mytopic-0\": [\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 15,\n" + - " \"timestamp\": 1545084650987,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product1 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 15,\n" + - " \"timestamp\": 1545084650987,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product2 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 15,\n" + - " \"timestamp\": 1545084650987,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": null,\n" + - " \"value\": \"" + product3 + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " }\n" + - " ]\n" + - " }\n" + - "}"); - } -} diff --git a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java b/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java deleted file mode 100644 index aa5f6e330..000000000 --- a/examples/powertools-examples-large-messages/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.demo.kafka.tools; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Base64; - -import org.apache.kafka.common.utils.ByteUtils; -import org.demo.kafka.protobuf.ProtobufProduct; - -import com.google.protobuf.CodedOutputStream; - -/** - * Utility class to generate base64-encoded Protobuf serialized products - * for use in test events. - */ -public final class GenerateProtobufSamples { - - private GenerateProtobufSamples() { - // Utility class - } - - public static void main(String[] args) throws IOException { - // Create a single product that will be used for all four scenarios - ProtobufProduct product = ProtobufProduct.newBuilder() - .setId(1001) - .setName("Laptop") - .setPrice(999.99) - .build(); - - // Create four different serializations of the same product - String standardProduct = serializeAndEncode(product); - String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex(product); - String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex(product); - String productWithGlueMagicByte = serializeWithGlueMagicByte(product); - - // Serialize and encode an integer key (same for all records) - String encodedKey = serializeAndEncodeInteger(42); - - // Print the results - System.out.println("Base64 encoded Protobuf products with different scenarios:"); - System.out.println("\n1. Plain Protobuf (no schema registry):"); - System.out.println("value: \"" + standardProduct + "\""); - - System.out.println("\n2. Confluent with Simple Message Index (optimized single 0):"); - System.out.println("value: \"" + productWithConfluentSimpleIndex + "\""); - - System.out.println("\n3. Confluent with Complex Message Index (array [1,0]):"); - System.out.println("value: \"" + productWithConfluentComplexIndex + "\""); - - System.out.println("\n4. Glue with Magic Byte:"); - System.out.println("value: \"" + productWithGlueMagicByte + "\""); - - // Print the merged event structure - System.out.println("\n" + "=".repeat(80)); - System.out.println("MERGED EVENT WITH ALL FOUR SCENARIOS"); - System.out.println("=".repeat(80)); - printSampleEvent(encodedKey, standardProduct, productWithConfluentSimpleIndex, productWithConfluentComplexIndex, - productWithGlueMagicByte); - } - - private static String serializeAndEncode(ProtobufProduct product) { - return Base64.getEncoder().encodeToString(product.toByteArray()); - } - - /** - * Serializes a protobuf product with a simple Confluent message index (optimized single 0). - * Format: [0][protobuf_data] - * - * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} - */ - private static String serializeWithConfluentSimpleMessageIndex(ProtobufProduct product) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - // Write optimized simple message index for Confluent (single 0 byte for [0]) - baos.write(0); - - // Write the protobuf data - baos.write(product.toByteArray()); - - return Base64.getEncoder().encodeToString(baos.toByteArray()); - } - - /** - * Serializes a protobuf product with a complex Confluent message index (array [1,0]). - * Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding - * - * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} - */ - private static String serializeWithConfluentComplexMessageIndex(ProtobufProduct product) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - // Write complex message index array [1,0] using ByteUtils - ByteBuffer buffer = ByteBuffer.allocate(1024); - ByteUtils.writeVarint(2, buffer); // Array length - ByteUtils.writeVarint(1, buffer); // First index value - ByteUtils.writeVarint(0, buffer); // Second index value - - buffer.flip(); - byte[] indexData = new byte[buffer.remaining()]; - buffer.get(indexData); - baos.write(indexData); - - // Write the protobuf data - baos.write(product.toByteArray()); - - return Base64.getEncoder().encodeToString(baos.toByteArray()); - } - - /** - * Serializes a protobuf product with Glue magic byte. - * Format: [1][protobuf_data] where 1 is the magic byte - */ - private static String serializeWithGlueMagicByte(ProtobufProduct product) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); - - // Write Glue magic byte (single UInt32) - codedOutput.writeUInt32NoTag(1); - - // Write the protobuf data - product.writeTo(codedOutput); - - codedOutput.flush(); - return Base64.getEncoder().encodeToString(baos.toByteArray()); - } - - private static String serializeAndEncodeInteger(Integer value) { - // For simple types like integers, we'll just convert to string and encode - return Base64.getEncoder().encodeToString(value.toString().getBytes()); - } - - private static void printSampleEvent(String key, String standardProduct, String confluentSimpleProduct, - String confluentComplexProduct, String glueProduct) { - System.out.println("{\n" + - " \"eventSource\": \"aws:kafka\",\n" + - " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" - + - " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" - + - " \"records\": {\n" + - " \"mytopic-0\": [\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 15,\n" + - " \"timestamp\": 1545084650987,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + standardProduct + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ]\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 16,\n" + - " \"timestamp\": 1545084650988,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + confluentSimpleProduct + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ],\n" + - " \"valueSchemaMetadata\": {\n" + - " \"schemaId\": \"123\",\n" + - " \"dataFormat\": \"PROTOBUF\"\n" + - " }\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 17,\n" + - " \"timestamp\": 1545084650989,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": null,\n" + - " \"value\": \"" + confluentComplexProduct + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ],\n" + - " \"valueSchemaMetadata\": {\n" + - " \"schemaId\": \"456\",\n" + - " \"dataFormat\": \"PROTOBUF\"\n" + - " }\n" + - " },\n" + - " {\n" + - " \"topic\": \"mytopic\",\n" + - " \"partition\": 0,\n" + - " \"offset\": 18,\n" + - " \"timestamp\": 1545084650990,\n" + - " \"timestampType\": \"CREATE_TIME\",\n" + - " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + glueProduct + "\",\n" + - " \"headers\": [\n" + - " {\n" + - " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + - " }\n" + - " ],\n" + - " \"valueSchemaMetadata\": {\n" + - " \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" + - " \"dataFormat\": \"PROTOBUF\"\n" + - " }\n" + - " }\n" + - " ]\n" + - " }\n" + - "}"); - } -}