Kafka, Avro Serialization, and the Schema Registry

Confluent Schema Registry stores Avro Schemas for Kafka producers and consumers. The Schema Registry provides a RESTful interface for managing Avro schemas and allows for the storage of a history of schemas that are versioned. The Confluent Schema Registry supports checking schema compatibility for Kafka.

You can configure compatibility settings to support the evolution of schemas using Avro. The Kafka Avro serialization project provides serializers. Kafka producers and consumers that use Kafka Avro serialization handle schema management and the serialization of records using Avro and the Schema Registry. When using the Confluent Schema Registry, producers don’t have to send schema — just the schema ID, which is unique. The consumer uses the schema ID to look up the full schema from the Confluent Schema Registry if it’s not already cached. Since you don’t have to send the schema with each set of records, this saves time. Not sending the schema with each record or batch of records speeds up the serialization, as only the ID of the schema is sent.

If you have never used Avro before, please read Avro Introduction for Big Data and Data Streams.

This article is going to cover what the Schema Registry is and why you should use it with Kafka. We’ll drill down into understanding Avro schema evolution and setting up and using Schema Registry with Kafka Avro Serializers. We’ll show how to manage Avro Schemas with the REST interface of the Schema Registry and then how to write serializer-based producers and deserializer-based consumers for Kafka.

The Kafka producer creates a record/message that is an Avro record. The record contains a schema ID and data. With the Kafka Avro Serializer, the schema is registered if needed and then it serializes the data and schema ID. The Kafka Avro Serializer keeps a cache of registered schemas from the Schema Registry their schema IDs.

Consumers receive payloads and deserialize them with Kafka Avro Deserializers, which use the Confluent Schema Registry. The Deserializer looks up the full schema from the cache or Schema Registry based on ID.

Why Schema Registry?

The consumer’s schema could differ from the producer’s. The consumer schema is what the consumer is expecting the record/message to conform to. With the Schema Registry, a compatibility check is performed, and if the two schemas don’t match but are compatible, then the payload transformation happens via Avro Schema Evolution. Kafka records can have a key and a value and both can have a schema.

Schema Registry Operations

The Schema Registry can store schemas for keys and values of Kafka records. It can also list schemas by subject. It can list all versions of a subject (schema). It can retrieve a schema by version or ID. It can get the latest version of a schema. Importantly, the Schema Registry can check to see if a schema is compatible with a certain version. There is a compatibility level (i.e. backward, forward, full, none) setting for the Schema Registry and an individual subject. You can manage schemas via a REST API with the Schema registry.

Schema Registry Schema Compatibility Settings

Backward compatibility refers to data written with an older schema that is readable with a newer schema. Forward compatibility means data written with a newer schema is readable with old schemas. Full compatibility means a new version of a schema is backward- and forward-compatible. The “none” status disables schema validation and it is not recommended. If you set the level to “none,” then Schema Registry just stores the schema and it will not be validated for compatibility.

Schema Registry Configuration

The schema compatibility checks can be configured globally or per subject.

The compatibility value will be one of the following:

  • None: Don’t check for schema compatibility.
  • Forward: Check to make sure the last schema version is forward-compatible with new schemas.
  • Backward (default): Make sure the new schema is backward-compatible with the latest.
  • Full: Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.

Schema Evolution

If an Avro schema is changed after data has been written to store using an older version of that schema, then Avro might do a schema evolution when you try to read that data.

From Kafka perspective, schema evolution happens only during deserialization at the consumer (read). If the consumer’s schema is different from the producer’s schema, then the value or key is automatically modified during deserialization to conform to the consumer’s read schema if possible.

Avro schema evolution is an automatic transformation of Avro schemas between the consumer schema version and what schema the producer put into the Kafka log. When the consumer schema is not identical to the producer schema used to serialize the Kafka record, a data transformation is performed on the Kafka record’s key or value. If the schemas match, then there is no need to do a transformation.

Allowed Modification During Schema Evolution

You can add a field with a default to a schema. You can remove a field that had a default value. You can change a field’s order attribute. You can change a field’s default value to another value or add a default value to a field that did not have one. You can remove or add a field alias (keep in mind that this could break some consumers that depend on the alias). You can change a type to a union that contains original type. If you do any of the above, then your schema can use Avro’s schema evolution when reading with an old schema.

Rules of the Road for Modifying Schemas

If you want to make your schema evolvable, then follow these guidelines. Provide a default value for fields in your schema, as this allows you to delete the field later. Never change a field’s data type. When adding a new field to your schema, you have to provide a default value for the field. Don’t rename an existing field (use aliases instead).

Let’s use an example to talk about this. The following example is from our Avro tutorial.

{"namespace": "com.cloudurable.phonebook",
  "type": "record",
  "name": "Employee",
  "doc" : "Represents an Employee at a company",
  "fields": [
    {"name": "firstName", "type": "string", "doc": "The persons given name"},
    {"name": "nickName", "type": ["null", "string"], "default" : null},
    {"name": "lastName", "type": "string"},
    {"name": "age",  "type": "int", "default": -1},
    {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
    {"name": "phoneNumber",  "type":
    [ "null",
      { "type": "record",   "name": "PhoneNumber",
        "fields": [
          {"name": "areaCode", "type": "string"},
          {"name": "countryCode", "type": "string", "default" : ""},
          {"name": "prefix", "type": "string"},
          {"name": "number", "type": "string"}
        ]
      }
    ]
    },
    {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
      "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
    }
  ]
}

Avro Schema Evolution Scenario

Let’s say our Employee record did not have an age in version 1 of the schema, and then later, we decided to add an age field with a default value of -1. Now, let’s say we have a producer using version 2 of the schema with age and a consumer using version 1 with no age.

The Producer uses version 2 of the Employee schema, creates a com.cloudurable.Employee record, sets age field to 42, then sends it to Kafka topic new-employees. The consumer consumes records from new-employees using version 1 of the Employee schema. Since the consumer is using version 1 of the schema, the age field gets removed during deserialization.

The same consumer modifies some records and then writes the record to a NoSQL store. When the consumer does this, the age field is missing from the record that it writes to the NoSQL store. Another client using version 2 of the schema, which has the age, reads the record from the NoSQL store. The age field is missing from the record because the Consumer wrote it with version 1, thus the client reads the record and the age is set to default value of -1.

If you added the age and it was not optional, i.e. the age field did not have a default, then the Schema Registry could reject the schema and the producer could never it add it to the Kafka log.

Using the Schema Registry REST API

Recall that the Schema Registry allows you to manage schemas using the following operations:

  • Store schemas for keys and values of Kafka records
  • List schemas by subject
  • List all versions of a subject (schema)
  • Retrieves a schema by version
  • Retrieves a schema by ID
  • Retrieve the latest version of a schema
  • Perform compatibility checks
  • Set compatibility level globally

Recall that all of this is available via a REST API with the Schema Registry.

To post a new schema, you could do the following:

Posting a new schema:

curl -X POST -H "Content-Type:
application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": …}’ \
    http://localhost:8081/subjects/Employee/versions

To list all of the schemas:

curl -X GET http://localhost:8081/subjects

If you have a good HTTP client, you can basically perform all of the above operations via the REST interface for the Schema Registry. I wrote a little example to do this so I could understand the Schema Registry a little better using the OkHttp client from Square (com.squareup.okhttp3:okhttp:3.7.0+) as follows:

Using REST endpoints to try out all of the Schema Registry options:

package com.cloudurable.kafka.schema;

import okhttp3.*;

import java.io.IOException;

public class SchemaMain {

    private final static MediaType SCHEMA_CONTENT =
            MediaType.parse("application/vnd.schemaregistry.v1+json");

    private final static String EMPLOYEE_SCHEMA = "{\n" +
            "  \"schema\": \"" +
            "  {" +
            "    \\\"namespace\\\": \\\"com.cloudurable.phonebook\\\"," +
            "    \\\"type\\\": \\\"record\\\"," +
            "    \\\"name\\\": \\\"Employee\\\"," +
            "    \\\"fields\\\": [" +
            "        {\\\"name\\\": \\\"fName\\\", \\\"type\\\": \\\"string\\\"}," +
            "        {\\\"name\\\": \\\"lName\\\", \\\"type\\\": \\\"string\\\"}," +
            "        {\\\"name\\\": \\\"age\\\",  \\\"type\\\": \\\"int\\\"}," +
            "        {\\\"name\\\": \\\"phoneNumber\\\",  \\\"type\\\": \\\"string\\\"}" +
            "    ]" +
            "  }\"" +
            "}";

    public static void main(String... args) throws IOException {

        System.out.println(EMPLOYEE_SCHEMA);

        final OkHttpClient client = new OkHttpClient();

        //POST A NEW SCHEMA
        Request request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/subjects/Employee/versions")
                .build();

        String output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //LIST ALL SCHEMAS
        request = new Request.Builder()
                .url("http://localhost:8081/subjects")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //SHOW ALL VERSIONS OF EMPLOYEE
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //SHOW VERSION 2 OF EMPLOYEE
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/2")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        //SHOW THE SCHEMA WITH ID 3
        request = new Request.Builder()
                .url("http://localhost:8081/schemas/ids/3")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //SHOW THE LATEST VERSION OF EMPLOYEE 2
        request = new Request.Builder()
                .url("http://localhost:8081/subjects/Employee/versions/latest")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);



        //CHECK IF SCHEMA IS REGISTERED
        request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/subjects/Employee")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        //TEST COMPATIBILITY
        request = new Request.Builder()
                .post(RequestBody.create(SCHEMA_CONTENT, EMPLOYEE_SCHEMA))
                .url("http://localhost:8081/compatibility/subjects/Employee/versions/latest")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        // TOP LEVEL CONFIG
        request = new Request.Builder()
                .url("http://localhost:8081/config")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);


        // SET TOP LEVEL CONFIG
        // VALUES are none, backward, forward and full
        request = new Request.Builder()
                .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"none\"}"))
                .url("http://localhost:8081/config")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);

        // SET CONFIG FOR EMPLOYEE
        // VALUES are none, backward, forward and full
        request = new Request.Builder()
                .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"backward\"}"))
                .url("http://localhost:8081/config/Employee")
                .build();

        output = client.newCall(request).execute().body().string();
        System.out.println(output);



    }
}

I suggest running the example, trying to force incompatible schemas to the Schema Registry, and noting the behavior for the various compatibility settings.

Running Schema Registry:

$ cat ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
debug=false

~/tools/confluent-3.2.1/bin/schema-registry-start  ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties

Writing Consumers and Producers That Use Kafka Avro Serializers and the Schema Registry

Now, let’s cover writing consumers and producers that use Kafka Avro Serializers, which in turn use the Schema Registry and Avro.

We will need to start up the Schema Registry server pointing to our ZooKeeper cluster. Then, we will need to import the Kafka Avro Serializer and Avro JARs into our Gradle project. You will then need to configure the producer to use Schema Registry and the KafkaAvroSerializer. To write the consumer, you will need to configure it to use Schema Registry and to use the KafkaAvroDeserializer.

Here is our build file, which shows the Avro JAR files and such that we need.

Gradle build file for Kafka Avro Serializer examples:

plugins {
    id "com.commercehub.gradle.plugin.avro" version "0.9.0"
}

group 'cloudurable'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8

dependencies {
    compile "org.apache.avro:avro:1.8.1"
    compile 'com.squareup.okhttp3:okhttp:3.7.0'
    testCompile 'junit:junit:4.11'
    compile 'org.apache.kafka:kafka-clients:0.10.2.0'
    compile 'io.confluent:kafka-avro-serializer:3.2.1'
}
repositories {
    jcenter()
    mavenCentral()
    maven {
        url "http://packages.confluent.io/maven/"
    }
}
avro {
    createSetters = false
    fieldVisibility = "PRIVATE"
}

Notice that we include the Kafka Avro Serializer lib (io.confluent:kafka-avro-serializer:3.2.1) and the Avro lib (org.apache.avro:avro:1.8.1).

To learn more about the Gradle Avro plugin, please read this article on using Avro.

Writing a Producer

Next, let’s write the producer as follows.

Producer that uses Kafka Avro Serialization and Kafka Registry:

package com.cloudurable.kafka.schema;

import com.cloudurable.phonebook.Employee;
import com.cloudurable.phonebook.PhoneNumber;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import java.util.Properties;
import java.util.stream.IntStream;

public class AvroProducer {

    private static Producer<Long, Employee> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class.getName());

        // Configure the KafkaAvroSerializer.
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class.getName());

        // Schema Registry location.
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081");

        return new KafkaProducer<>(props);
    }

    private final static String TOPIC = "new-employees";

    public static void main(String... args) {

        Producer<Long, Employee> producer = createProducer();

        Employee bob = Employee.newBuilder().setAge(35)
                .setFirstName("Bob")
                .setLastName("Jones")
                .setPhoneNumber(
                        PhoneNumber.newBuilder()
                                .setAreaCode("301")
                                .setCountryCode("1")
                                .setPrefix("555")
                                .setNumber("1234")
                                .build())
                .build();

        IntStream.range(1, 100).forEach(index->{
            producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));

        });

        producer.flush();
        producer.close();
    }

}

Notice that we configure the Schema Registry and the KafkaAvroSerializer as part of the producer setup.

// Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class.getName());

// Schema Registry location.        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081");

Then, we use the producer as expected.

Writing a Consumer

Next, we have to write the consumer.

Consumer that uses Kafka Avro Serialization and Schema Registry:

package com.cloudurable.kafka.schema;

import com.cloudurable.phonebook.Employee;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC = "new-employees";

    private static Consumer<Long, Employee> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                LongDeserializer.class.getName());

        //Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                KafkaAvroDeserializer.class.getName());  //<----------------------

        //Use Specific Record or else you get Avro GenericRecord.
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");


        //Schema registry location.
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081"); //<----- Run Schema Registry on 8081


        return new KafkaConsumer<>(props);
    }





    public static void main(String... args) {

        final Consumer<Long, Employee> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(TOPIC));

        IntStream.range(1, 100).forEach(index -> {

            final ConsumerRecords<Long, Employee> records =
                    consumer.poll(100);

            if (records.count() == 0) {
                System.out.println("None found");
            } else records.forEach(record -> {

                Employee employeeRecord = record.value();

                System.out.printf("%s %d %d %s \n", record.topic(),
                        record.partition(), record.offset(), employeeRecord);
            });
        });
    }


}

Notice that just like with the producer, we have to tell the consumer where to find the Registry, and we have to configure the Kafka Avro Deserializer.

Configuring Schema Registry for the consumer:

//Use Kafka Avro Deserializer.

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                KafkaAvroDeserializer.class.getName());  

//Use Specific Record or else you get Avro GenericRecord.
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

//Schema registry location.        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                "http://localhost:8081"); //<----- Run Schema Registry on 8081

An additional step is that we have to tell it to use the generated version of the Employee object. If we did not, then it would use the Avro GenericRecord instead of our generated Employee object, which is a SpecificRecord. To learn more about using GenericRecord and generating code from Avro, read the Avro Kafka tutorial as it has examples of both.

To run the above example, you need to start up Kafka and ZooKeeper. To learn how to do this if you have not done it before, see this Kafka tutorial. Essentially, there is a startup script for Kafka and ZooKeeper like there was with the Schema Registry and there is default configuration, you pass the default configuration to the startup scripts, and Kafka is running locally on your machine.

Running ZooKeeper and Kafka:

kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &

kafka/bin/kafka-server-start.sh kafka/config/server.properties

Conclusion

Confluent provides Schema Registry to manage Avro Schemas for Kafka consumers and producers. Avro provides schema migration, which is necessary for streaming and big data architectures.

Confluent uses schema compatibility checks to see if the producer’s schema and consumer’s schemas are compatible and to do schema evolution if needed. You use KafkaAvroSerializer from the producer and point to the Schema Registry. You use KafkaAvroDeserializer from the consumer and point to the Schema Registry.

Credits : https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments