Use Apache Kafka® Streams with Aiven for Apache Kafka®
Apache Kafka® streams and streams API allows streaming data through the heart of Apache Kafka: the brokers.
As the data, a key-value structure, enters the brokers, it is serialized by the streams API to a byte array. The opposite happens when data exits the brokers, where the streams API de-serializes it back to the original key-value structure.
Apache Kafka streams also allows data transformation in real-time with the output feeding another, transformed, stream of data. This makes Kafka streams a powerful tool for the variety of use cases it can address in the world of real-time data processing and analysis.
The following article explores how to realize Kafka streams with Aiven for Apache Kafka and the schema registry functionality offered by Karapace.
Prerequisites
To use Kafka streams, you need an Aiven for Apache Kafka service up and running with Schema Registry (Karapace) Enabled.
Enabling Schema Registry (Karapace) is needed to make use of the schema registry features necessary for dealing with messages in Avro format. If you plan to use JSON format, enabling Schema Registry (Karapace) might not be necessary.
Two topics named song-feed and play-events should be created.
For the example, collect the following
information about the Aiven for Apache Kafka service:
APACHE_KAFKA_HOST: The hostname of the Apache Kafka serviceAPACHE_KAFKA_PORT: The port of the Apache Kafka serviceSCHEMA_REGISTRY_PORT: The Apache Kafka's schema registry port, only needed when using Avro as data formatSCHEMA_REGISTRY_USER: The Apache Kafka's schema registry username, only needed when using Avro as data formatSCHEMA_REGISTRY_PASSWORD: The Apache Kafka's schema registry user password, only needed when using Avro as data format
If you're using Aiven for Apache Kafka the above details are available
in the Aiven console service Overview tab
or via the dedicated avn service get command with the
Aiven CLI.
The following example assumes you have Apache Maven already installed.
Use Kafka streams with Aiven for Apache Kafka - KafkaMusicExample
The following example shows how to customise the KafkaMusicExample
available in the dedicated
repository to
work with Aiven for Apache Kafka.
-
Download the
kafka-streams-examplessources from GitHubgit clone https://github.com/confluentinc/kafka-streams-examples.git -
Build the packages using Maven
cd kafka-streams-examples/
mvn -DskipTests=true clean package
Setup the truststore and keystore
Create a Java keystore and truststore for the Aiven for Apache Kafka service. For the following example we assume:
- The keystore is available at
KEYSTORE_PATH/client.keystore.p12 - The truststore is available at
TRUSTSTORE_PATH/client.truststore.jks - For simplicity, the same secret (password) is used for both the
keystore and the truststore, and is shown as
KEY_TRUST_SECRET
Customizing the Java applications
The KafkaMusicExample example in the repository is constituted by two
classes under the
src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic
folder:
KafkaMusicExampleDriver.java: an Apache Kafka producer writing messages to a topic namedsong-feedKafkaMusicExample.java: a Kafka stream application reading from thesong-feedtopic and calculating aggregated metrics
To have the two applications working with Aiven for Apache Kafka we need to customise the files to use the right endpoints.
Customize KafkaMusicExampleDriver.java
-
Add the following dependencies
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap; -
After the
KafkaMusicExampleDriverclass declaration add the following two lines to set theDEFAULT_BOOTSTRAP_SERVERSandDEFAULT_SCHEMA_REGISTRY_URLendpoints replacing theAPACHE_KAFKA_HOST,APACHE_KAFKA_PORT,APACHE_KAFKA_HOST,SCHEMA_REGISTRY_PORTplaceholdersprivate static final String DEFAULT_BOOTSTRAP_SERVERS = "APACHE_KAFKA_HOST:APACHE_KAFKA_PORT";
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT"; -
Within the
mainfunction, replace thebootstrapServersandschemaRegistryUrldefault valuesfinal String bootstrapServers = args.length > 1 ? args[1] : DEFAULT_BOOTSTRAP_SERVERS;
final String schemaRegistryUrl = args.length > 2 ? args[2] : DEFAULT_SCHEMA_REGISTRY_URL; -
Within the
mainfunction, after the lineprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);define the keystore and truststore location and secrets for SSL connection, by replacing the placeholders
KEYSTORE_PATH,TRUSTSTORE_PATHandKEY_TRUST_SECRETwith the values set when creating the keystore and truststore.props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "TRUSTSTORE_PATH/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "KEYSTORE_PATH/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "KEY_TRUST_SECRET"); -
Within the
mainfunction, replace the linefinal Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);with the following, creating and configuring the
SpecificAvroSerdesrequired, passing the schema registry username and password and substituting theSCHEMA_REGISTRY_USERandSCHEMA_REGISTRY_PASSWORDplaceholdersfinal Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD");
Customize KafkaMusicExample.java
Similar changes need to be performed in the file
KafkaMusicExample.java, replacing the placeholders with the connection
parameters fetched in the
prerequisite phase.
-
Add the following dependencies
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs; -
Change the
DEFAULT_BOOTSTRAP_SERVERSandDEFAULT_SCHEMA_REGISTRY_URLendpoints replacing theAPACHE_KAFKA_HOST,APACHE_KAFKA_PORT,APACHE_KAFKA_HOST,SCHEMA_REGISTRY_PORTplaceholdersprivate static final String DEFAULT_BOOTSTRAP_SERVERS = "APACHE_KAFKA_HOST:APACHE_KAFKA_PORT";
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT"; -
Replace the
bootstrapServersandschemaRegistryUrldefault valuesfinal String bootstrapServers = args.length > 1 ? args[1] : DEFAULT_BOOTSTRAP_SERVERS;
final String schemaRegistryUrl = args.length > 2 ? args[2] : DEFAULT_SCHEMA_REGISTRY_URL; -
Within the
mainfunction, replace the linefinal KafkaStreams streams = new KafkaStreams(
buildTopology(singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)),
streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname)
);with the following, creating and configuring the
SpecificAvroSerdesrequired, passing the schema registry username and password and substituting theSCHEMA_REGISTRY_USERandSCHEMA_REGISTRY_PASSWORDplaceholdersfinal Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD");
final KafkaStreams streams = new KafkaStreams(
buildTopology(serdeConfig),
streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname)
); -
Within the
streamsConfigstatic function, after the linestreamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);define the keystore and truststore location and secrets for SSL connection, by replacing the placeholders
KEYSTORE_PATH,TRUSTSTORE_PATHandKEY_TRUST_SECRETwith the values set when creating the keystore and truststore.streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "TRUSTSTORE_PATH/client.truststore.jks");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "KEYSTORE_PATH/client.keystore.p12");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
streamsConfiguration.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
Build the applications
From the main kafka-streams-examples folder, execute the following
Maven command to build the applications:
mvn -DskipTests=true clean package
The above command should create a jar file named
kafka-streams-examples-<VERSION>-standalone.jar under the target
folder, where <VERSION> depends on the repository release number. When
using the 7.00 release the file name would be
kafka-streams-examples-7.0.0-standalone.jar.
Run the applications
To run the applications in parallel, have two terminal
sessions pointing at the main kafka-streams-examples folder.
From the first terminal session you can start the
KafkaMusicExampleDriver producer with:
java -cp ./target/kafka-streams-examples-7.0.0-standalone.jar \
io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver
Check the target folder of your project and change the version of the command if necessary.
From the second terminal session you can start the KafkaMusicExample
Kafka streams application with:
java -cp ./target/kafka-streams-examples-7.0.0-standalone.jar \
io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7070
Change the port number 7070 to the actual Kafka Rest port of your Aiven for Apache Kafka service.
Check the produced data
The results of the running applications are available by running the
following curl commands (and optionally jq to beautify the JSON
output):
-
Get the latest top five across all genres
curl http://localhost:7070/kafka-music/charts/top-five | jq -
Get the latest top five for the genre
punkcurl http://localhost:7070/kafka-music/charts/genre/punk | jq
More information for further customisations is available in the source GitHub repository.