Avrocheck
What?
A small library to generate random GenericRecord
s from a given Avro schema using ScalaCheck. Available for both scala 2.13 and 2.12.
Why?
First of all we always want to make sure our custom deserialisation code is able to deserialise any message using its reader schema that was written using the writer schema.
Example schema:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "favourite_number",
"type": [
"int",
"null"
]
}
]
}
An example test to check this using avro4s would be something like:
import com.sksamuel.avro4s.{AvroName, AvroSchema, Decoder}
import org.apache.avro.Schema
case class User(name: String, @AvroName("favourite_number") favouriteNumber: Option[Int])
object User {
val decoder: Decoder[User] = Decoder[User]
val schema: Schema = AvroSchema[User]
}
import com.sksamuel.avro4s.{AvroSchema, Decoder, DefaultFieldMapper, Encoder}
import org.scalacheck.{Arbitrary, Gen, Properties}
import org.scalacheck.Prop.forAll
import io.github.olib963.avrocheck._
import io.github.olib963.avrocheck.Implicits._
import scala.util.{Success, Try}
object SerdeProperty extends Properties("Serde") {
// This test checks that Avro4s can deserialise a User case class from a generic record created by it's own autogenerated schema
property("avro4s round trip") = forAll(Gen.resultOf(User.apply(_: String, _: Option[Int]))) {
user =>
val encoded = Encoder[User].encode(user, User.schema, DefaultFieldMapper)
val decoded = User.decoder.decode(encoded, User.schema, DefaultFieldMapper)
decoded == user
}
// The problem with this can be shown by creating a case class with invalid typing that does not match our schema
case class InvalidUser(name: Boolean, // Name has the wrong type
favouriteNumber: Option[Int] // favouriteNumber should be favourite_number
)
// and seeing that the same test would pass
property("avro4s invalid round trip") = forAll(Gen.resultOf(InvalidUser)) {
user =>
val encoded = Encoder[InvalidUser].encode(user, AvroSchema[InvalidUser], DefaultFieldMapper)
val decoded = Decoder[InvalidUser].decode(encoded, AvroSchema[InvalidUser], DefaultFieldMapper)
decoded == user
}
// Using avrocheck we can instead check that a record created using the full writer schema is compatible with our case class.
private val schema = schemaFromResource("user-schema.avsc")
property("deserialises user messages") = forAll(genFromSchema(schema)) { record =>
Try(User.decoder.decode(record, User.schema, DefaultFieldMapper)).isSuccess
}
// Or if you want to be more precise:
property("deserialises user messages with correct values") = {
val generator = for {
name <- Arbitrary.arbString.arbitrary
favNum <- Arbitrary.arbOption[Int].arbitrary
favourite_number = favNum.map(Int.box).orNull
// Notice that here we do not override age (or in the later schema favourite_colour) because these
// values are of no interest to our application code
overrides = overrideFields("name" -> name, "favourite_number" -> favourite_number)
record <- genFromSchema(schema, overrides = overrides)
} yield (record, User(name, favNum))
forAll(generator) { case (record, user) =>
Try(User.decoder.decode(record, User.schema, DefaultFieldMapper)) == Success(user)
}
}
}
Secondly it can be useful to write high level system property tests in terms of messages in and out. As an example we will use the above schema to write a very simple application that signs up users with their favourite number. If their favourite number is negative we give them a £10 sign up bonus and if it is between -1000 and -2000 we give them a double bonus.
import io.github.olib963.avrocheck._
import io.github.olib963.avrocheck.Implicits._
import org.apache.avro.Schema
import org.scalacheck.Prop.forAll
import org.scalacheck.{Gen, Properties}
import scala.util.Success
object ApplicationProperty extends Properties("My application") {
val schema = schemaFromResource("user-schema.avsc")
property("persists users with negative favourite numbers and gives them a bonus") = {
val generator = for {
name <- Gen.alphaNumStr
// Any number in (-inf, -2001] or [-1000, -1]
favNum <- Gen.oneOf(Gen.negNum[Int].map(_ - 2001), Gen.chooseNum[Int](-1000, -1))
overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
message <- genFromSchema(schema, overrides = overrides)
} yield (name, message)
forAll(generator) { case (name, message) =>
val result = Application.processUser(message)
result == Success(PersistedWithBonus(name, 10))
}
}
property("gives a double bonus if their favourite number is between -2000 and -1000") = {
val generator = for {
name <- Gen.alphaNumStr
favNum <- Gen.chooseNum[Int](-2000, -1001)
overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
message <- genFromSchema(schema, overrides = overrides)
} yield (name, message)
forAll(generator) { case (name, message) =>
val result = Application.processUser(message)
result == Success(PersistedWithBonus(name, 20))
}
}
property("persists users with a positive or no favourite number with no bonus") = {
val generator = for {
name <- Gen.alphaNumStr
favNum <- Gen.oneOf(Gen.const(null), Gen.posNum[Int])
overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
message <- genFromSchema(schema, overrides = overrides)
} yield (name, message)
forAll(generator) { case (name, message) =>
val result = Application.processUser(message)
result == Success(Persisted(name))
}
}
}
Due to the compatibility features of Avro, producers upstream of you should be able to make backwards compatible changes without affecting your codebase. It is easy now to verify this by just updating the schema file. For example by adding the following:
{
"name": "favourite_colour",
"type": [
"string",
"null"
],
"default": "null"
}
to the above schema, the example tests all still pass.
How?
Import io.github.olib963.avrocheck._
to get access to generation from Avro schemas. Configuration can be provided explicitly or implicitly (by importing io.github.olib963.avrocheck.Implicits._
). There is a utility function to read schemas from a resource file. The schema you are passing currently must either be for a RECORD
or a UNION
of RECORD
s.
To change the default generators used by the Gen
you can either explicitly pass the configuration or provide an implicit arbitrary if using the implicit configuration.
package io.github.olib963.avrocheck.documentation
import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.scalacheck.Prop.forAll
import org.scalacheck.{Arbitrary, Gen, Properties}
object RecordGeneration extends Properties("generating random values from schema") {
private val schema: Schema = schemaFromResource("user-schema.avsc")
property("My explicit test") = forAll(genFromSchema(schema)){
genericRecord => genericRecord.isInstanceOf[GenericRecord]
}
property("My explicit positive age test") = forAll(genFromSchema(schema, Configuration.Default.copy(intGen = Gen.posNum[Int]))){
userRecord => userRecord.get("age").asInstanceOf[Int] >= 0
}
// Implicit configuration
import io.github.olib963.avrocheck.Implicits._
property("My implicit test") = forAll(genFromSchemaImplicit(schema)){
genericRecord => genericRecord.isInstanceOf[GenericRecord]
}
property("My implicit positive age test") = {
implicit val onlyPositiveInts: Arbitrary[Int] = Arbitrary(Gen.posNum[Int])
forAll(genFromSchemaImplicit(schema)){
userRecord => userRecord.get("age").asInstanceOf[Int] >= 0
}
}
}
Logical Types
Logical types will automatically be generated using the types:
-
timestamp-millis
→java.time.Instant
-
timestamp-micros
→java.time.Instant
-
time-millis
→java.time.LocalTime
-
time-micros
→java.time.LocalTime
-
date
→java.time.LocalDate
-
uuid
→java.util.UUID
-
decimal
→scala.math.BigDecimal
If you want to provide overrides or implicit Arbitrary
s for logical types you must use these types.
If you don’t want to go through the hassle of adding logical type conversions to your serialiser you can set the configuration option preserialiseLogicalTypes
to true
, and the values will automatically be transformed into their underlying primitives.
package io.github.olib963.avrocheck.documentation
import java.time.LocalDate
import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.scalacheck.Prop.forAll
import org.scalacheck.{Arbitrary, Gen, Properties}
object LogicalTypeConfiguration extends Properties("Logical type configuration"){
// This schema has a field "date" with schema {"type": "int", "logicalType": "date"}
val schema: Schema = io.github.olib963.avrocheck.schemaFromResource("record-with-logical-types.avsc")
private val onlyDaysSinceEpoch = Gen.posNum[Int].map(LocalDate.ofEpochDay(_))
private val onlyDaysBeforeEpoch = Gen.negNum[Int].map(_ - 1)
private val overriddenConfig: Configuration = Configuration.Default.copy(
intGen = onlyDaysBeforeEpoch,
localDateGen = onlyDaysSinceEpoch
)
// Generates a local date not an int
property("Explicitly override date type") = forAll(genFromSchema(schema, overriddenConfig)) {
record => record.get("date").isInstanceOf[LocalDate]
}
// Serialises the local date to an int for you, but is still using the Gen[LocalDate] not the Gen[Int] to create the value
property("Explicitly override date type preserialised") = forAll(genFromSchema(schema, overriddenConfig.copy(preserialiseLogicalTypes = true))) {
record => record.get("date").asInstanceOf[Int] >= 0
}
// Using implicit configuration
import io.github.olib963.avrocheck.Implicits._
implicit val onlyDaysSinceEpochArb: Arbitrary[LocalDate] = Arbitrary(onlyDaysSinceEpoch)
implicit val onlyDaysBeforeEpochArb: Arbitrary[Int] = Arbitrary(onlyDaysBeforeEpoch)
// Generates a local date not an int
property("Implicitly override date type") = forAll(genFromSchemaImplicit(schema)) {
record => record.get("date").isInstanceOf[LocalDate]
}
// Serialises the local date to an int for you, but is still using the Gen[LocalDate] not the Gen[Int] to create the value
property("Implicitly override date type preserialised") = {
implicit val preserialise: PreserialiseLogicalTypes = true
forAll(genFromSchemaImplicit(schema)) {
record => record.get("date").asInstanceOf[Int] >= 0
}
}
}
Overrides
If you want to customise the generation of your GenericRecord
even more you can provide an explicit/implicit Overrides
object.
package io.github.olib963.avrocheck.documentation
import io.github.olib963.avrocheck.CollectionConverters._
import org.scalacheck.Prop.forAll
import org.scalacheck.{Arbitrary, Gen, Properties}
import io.github.olib963.avrocheck._
object OverrideConfiguration extends Properties("Overriding generation") {
//****************************//
// General Record Overrides //
//****************************//
// User schema we have used above in documentation
private val userSchema = schemaFromResource("user-schema.avsc")
property("Explicitly override primitive fields") = {
val overrides = overrideFields( // Override fields in the record by name
"name" -> constantOverride("oli"), // Always generate the string "oli" for "name"
"favourite_number" -> generatorOverride(Gen.posNum[Int].map(_ + 1)) // Always generate a positive Int for "favourite_number"
)
forAll(genFromSchema(userSchema, overrides = overrides)) { record =>
val namedOli = record.get("name") == "oli"
val randomIntAge = record.get("age").isInstanceOf[Int]
val positiveFavouriteNUmber = record.get("favourite_number").asInstanceOf[Int] > 0
namedOli && randomIntAge && positiveFavouriteNUmber
}
}
property("Implicitly override primitive fields") = {
import io.github.olib963.avrocheck.Implicits._
// Implicitly infer the override type for each field
implicit val overrides: Overrides = overrideFields(
"name" -> "oli",
"favourite_number" -> Gen.posNum[Int].map(_ + 1)
)
forAll(genFromSchemaImplicit(userSchema)) { record =>
val namedOli = record.get("name") == "oli"
val randomIntAge = record.get("age").isInstanceOf[Int]
val positiveFavouriteNUmber = record.get("favourite_number").asInstanceOf[Int] > 0
namedOli && randomIntAge && positiveFavouriteNUmber
}
}
//******************//
// Union Overides //
//******************//
// Schema of two records named "Foo" and "Bar". "Foo" has an "int" field of type "int".
private val unionSchema = schemaFromResource("union-of-records.avsc")
property("Explicitly select a branch") = {
val fooOverrides = overrideFields("int" -> constantOverride(10))
val overrides = selectNamedUnion(
"Foo", // Selecting the specific "Foo" branch
overrides = fooOverrides // Within the "Foo" branch we are setting overrides
)
forAll(genFromSchema(unionSchema, overrides = overrides)) { record =>
val correctSchema = record.getSchema.getName == "Foo"
val always10 = record.get("int") == 10
correctSchema && always10
}
}
//******************//
// Array Overides //
//******************//
// Contains a field called "longArray" with schema {"type": "array", "items": "long"}
private val compositeSchema = schemaFromResource("record-with-composites.avsc")
property("Override array generation") = {
val fiveOrTenPositiveLongs = arrayGenerationOverride(sizeGenerator = Gen.oneOf(5, 10), generatorOverride(Gen.posNum[Long]))
val overrides = overrideFields("longArray" -> fiveOrTenPositiveLongs)
forAll(genFromSchema(compositeSchema, overrides = overrides)) { r =>
val array = toScala(r.get("longArray").asInstanceOf[java.util.List[Long]])
val elementAssertion = array.forall(_ >= 0) // Array should only contain non negative longs
val sizeAssertion = array.size == 5 || array.size == 10
sizeAssertion && elementAssertion
}
}
property("Explicitly override each element in an array") = {
val positiveLongThenOne = arrayOverride(List(generatorOverride(Gen.posNum[Long]), constantOverride(1L)))
val overrides = overrideFields("longArray" -> positiveLongThenOne)
forAll(genFromSchema(compositeSchema, overrides = overrides)) { r =>
val array = toScala(r.get("longArray").asInstanceOf[java.util.List[Long]])
val firstElement = array.headOption
val firstElementAssertion = firstElement.exists(_ >= 0) // First element of the array should only contain non negative longs
val secondElement = array.tail.headOption
val secondElementAssertion = secondElement.contains(1L) // Second element of the array should be 1
val sizeAssertion = array.size == 2
sizeAssertion && firstElementAssertion && secondElementAssertion
}
}
//******************//
// Map Overides //
//******************//
// Contains a field called "stringMap" with schema {"type": "map", "values": "string"}
private val mapSchema = schemaFromResource("record-with-composites.avsc")
property("Override map generation") = {
val oneToFourSixLetteredStringsWithNumericKeys = mapGenerationOverride(
sizeGenerator = Gen.choose(1, 4),
keyGenerator = Gen.numStr,
generatorOverride(Gen.listOfN(6, Arbitrary.arbChar.arbitrary).map(_.mkString))
)
val overrides = overrideFields("stringMap" -> oneToFourSixLetteredStringsWithNumericKeys)
forAll(genFromSchema(mapSchema, overrides = overrides)) { r =>
val map = toScalaMap(r.get("stringMap").asInstanceOf[java.util.Map[String, String]])
val valueAssertion = map.values.forall(_.length == 6) // All values should have length 6
val keyAssertion = map.keys.forall(_.toCharArray.forall(Character.isDigit)) // All keys should be numeric strings
val sizeAssertion = map.size >= 1 && map.size <= 4
sizeAssertion && keyAssertion && valueAssertion
}
}
property("Explicitly override each entry in the map") = {
val alphaStringThenBaz = mapOverride(Map(
"foo" -> generatorOverride(Gen.alphaStr),
"bar" -> constantOverride("baz")
))
val overrides = overrideFields("stringMap" -> alphaStringThenBaz)
forAll(genFromSchema(mapSchema, overrides = overrides)) { r =>
val map = toScalaMap(r.get("stringMap").asInstanceOf[java.util.Map[String, String]])
// The key foo should map to a string that only contains characters
val fooAssertion = map.get("foo").exists(_.forall(Character.isLetter))
// The key bar should map to "baz"
val barAssertion = map.get("bar").contains("baz")
val sizeAssertion = map.size == 2
sizeAssertion && fooAssertion && barAssertion
}
}
}
Confluent Stack Warning
If you are using this library to run tests that integrate with Kafka and the confluent stack you should be aware of this:
Schema Registry with Unions
If you are generating messages that are a UNION
of RECORD
s at the top level and you are using schema registry you will want the union schema to be posted for your topic. This means you cannot simply serialise the GenericRecord
, instead you will need to do this:
package io.github.olib963.avrocheck.documentation
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer, NonRecordContainer}
import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.scalacheck.Prop.forAll
import org.scalacheck.{Gen, Properties}
object SchemaRegistrySerialisation extends Properties("Confluent stack test") {
// Schema of two records named "Foo" and "Bar"
private val unionSchema = schemaFromResource("union-of-records.avsc")
private val gen: Gen[GenericRecord] = genFromSchema(unionSchema)
property("serialises with correct schema") = forAll(gen){ genericRecord =>
val schemaRegistryClient = new MockSchemaRegistryClient()
val config = Map(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8080",
AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS -> true,
)
val serialiser = new KafkaAvroSerializer(schemaRegistryClient, CollectionConverters.toJava(config))
// This is NOT what you want, this will post the schema for the specific branch of the union, not the union as a whole.
val incorrectlySerialisedTopic = "wrong-topic"
serialiser.serialize(incorrectlySerialisedTopic, genericRecord)
// This is what you want, this will post the union schema for the topic
val correctlySerialisedTopic = "right-topic"
serialiser.serialize(correctlySerialisedTopic, new NonRecordContainer(unionSchema, genericRecord))
(schemaRegistryClient.latestSchemaForTopic(incorrectlySerialisedTopic) != unionSchema) &&
(schemaRegistryClient.latestSchemaForTopic(correctlySerialisedTopic) == unionSchema)
}
implicit class SchemaRegistryOps(schemaRegistryClient: SchemaRegistryClient) {
def latestSchemaForTopic(topicName: String): Schema = {
val metadata = schemaRegistryClient.getLatestSchemaMetadata(s"$topicName-value")
new Schema.Parser().parse(metadata.getSchema)
}
}
}