save binary data to hdfs from spark job using avro

TOC TOC

Introduction

Since hive does not support using byte array as collumn type, we save data in spark job to hdfs directly using avro.

Some reference can be find in:

Code in Spark Job

first we add the deps:

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>4.0.0</version>
</dependency>

and according to sql data sources avro, we don't need this since spark 2.4.0.

Then in the spark job we setup the schema for avro:

// final SparkSession session = ...
final Job job = new Job(session.sparkContext().hadoopConfiguration());
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, new Schema.Parser().parse(MoreResources.toStringUTF8Unchecked("/data.avsc")));

here data.avsc is a quite complicated object related to our domain model.

the content of it is as follow:

{
  "name": "FeatureWithGeohash",
  "type": "record",
  "fields": [
    {
      "name": "geohash",
      "type": "string"
    },
    {
      "name": "details",
      "type": [
        {
          "type": "array",
          "items": {
            "name": "FeatureDetail",
            "type": "record",
            "fields": [
              {
                "name": "error",
                "type": [
                  "string",
                  "null"
                ]
              },
              {
                "type": [
                  "bytes",
                  "null"
                ],
                "name": "feature"
              },
              {
                "name": "detail",
                "type": {
                  "type": "array",
                  "items": [
                    "int",
                    "string",
                    "null",
                    "long"
                  ]
                }
              }
            ]
          }
        },
        "null"
      ]
    }
  ]
}

We can see that it's a record namd FeatureWithGeohash, which having a field named details that is an array of record. For definition of embed type, ref to how-to-create-schema-containing-list-of-objects-using-avro. And here in the definition for field detail of record FeatureDetail, we use the array type, but in fact the type in spark job of it is Row. We changed it to array for occuring some problems when Serializing Spark Row Type To Avro Directly(Not Work, Got An Problem)

The data.avsc schema definition is the same sturcture with our po types we used:

  public class FeatureWithGeohash {
      private String geohash;

      private List<FeatureDetail> details;

      public String getGeohash() {
          return geohash;
      }

      public void setGeohash(String geohash) {
          this.geohash = geohash;
      }

      public List<FeatureDetail> getDetails() {
          return details;
      }

      public void setDetails(List<FeatureDetail> details) {
          this.details = details;
      }
  }

public class FeatureDetail {

    private List<Object> detail;

    private ByteBuffer feature;

    private String error;

    public List<Object> getDetail() {
        return detail;
    }

    public void setDetail(List<Object> detail) {
        this.detail = detail;
    }

    public ByteBuffer getFeature() {
        return feature;
    }

    public void setFeature(ByteBuffer feature) {
        this.feature = feature;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }
}

And in the spark job, we generate a JavaPairRDD for using the hadoop api:

featuresWithSameGeohashRDD.mapToPair(new PairFunction<FeatureWithGeohash, AvroKey<String>, AvroValue<FeatureWithGeohash>>() {
        @Override
        public Tuple2<AvroKey<String>, AvroValue<FeatureWithGeohash>> call(FeatureWithGeohash featureWithGeohash) throws Exception {
            return new Tuple2<>(new AvroKey<>(featureWithGeohash.getGeohash()), new AvroValue<>(featureWithGeohash));
        }
    }).saveAsNewAPIHadoopFile(
        "hdfs://...",
        AvroKey.class,
        AvroValue.class,
        AvroKeyValueOutputFormat.class,
        job.getConfiguration()
        );

Generally each partiton would create one file.

Serializing Spark Row Type To Avro Directly(Not Work, Got An Problem)

We try to use type Row directly in the po types:

public class FeatureDetail {

    private Row detail;

    // other definitions ...

}

and generate the schema for Row detail using:

SchemaBuilder.RecordBuilder<Schema> rb = SchemaBuilder.record("testrecord").namespace("test_namespace");
Schema sc = SchemaConverters.convertStructToAvro(dataset.schema(), rb, "test_namespace");
System.out.println(sc);

But in this way we got an error like:

org.apache.avro.SchemaParseException: Illegal character in: bitmap$0

It may be because that spark's Row type containing some meta infomation cannot be handle by avro. So we convert it to type List<Object>. It makes that we cannot define the schema of row in our avro schema definition.

Parsing the data with Python

For reading the data in python, install the required libraries:

pip install avro
pip install python-snappy

then read it like:

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

with open("part-r-00000.avro", "rb") as f:
   reader = DataFileReader(f, DatumReader())
   data = list(reader)

for parsing we can do:

for d in data:
    if any(dd["error"] for dd in d["value"]["details"]):
        pictures = [str(dd["detail"][0]) for dd in d["value"]["details"]]
        pictures_str = ", ".join(pictures)
        print(f"{d['key']} has no data for {len(pictures)} pictures: {pictures_str}")
    else:
        pictures = [str(dd["detail"][0]) for dd in d["value"]["details"]]
        pictures_str = ", ".join(pictures)
        print(f"{d['key']} has got data for {len(pictures)} pictures: {pictures_str}")