pull kafka message and save to hive table in pyspark

TOC TOC

Introduction

This note comes from a camus task that comsuming kafka message and saving it to hive table seems missing some data occassinally. Though by adding more kafka partitions and more size of each partition this problem maybe have been solved. Here we log a way to pull kafka message and save it to hive table using pyspark and spark streaming, which should be more flexible comparing to camus.

Comsuming the kafka message

Configuring the dependency

First we have to add the dependency org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:<version> when submiting the pyspark job. The standard way seems to be:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

But we running the job in a offline enviroment. And using the --packages seems pulling all the dependencices from internet. We use the option --jar:

spark-submit \
    --jars spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \
    --num-executors 20 --executor-memory 9g \
    --executor-cores 8 --driver-memory 3g \
    --conf "spark.pyspark.python=/usr/bin/python3.6" \
    --conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
    script.py

Setting up the spark streaming job

As in the normal pyspark job, we import all the necessary stuff and set up the sparkSession, sparkContext, sqlContext and streamingContext:

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

sparkSession = SparkSession.builder \
                           .master('yarn') \
                           .appName("test_pyspark_streaming") \
                           .config("hive.exec.dynamic.partition", "true") \
                           .config("hive.exec.dynamic.partition.mode", "nonstrict") \
                           .enableHiveSupport() \
                           .getOrCreate()
sc = sparkSession.sparkContext
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, 60)

Here the second parameter for pyspark.streaming.StreamingContext means that we use a mircobatch with duration 60 seconds.

Tnd then we create the pypsark.streaming.dstream object with the pyspark.streaming.kafka.KafkaUtils.createDirectStream api:

brokers = """
10.0.0.1:9092
10.0.0.2:9092
10.0.0.3:9092
10.0.0.4:9092
"""

def brokers_list(brokers):
    return ",".join(b for b in brokers.split("\n") if b and "#" not in b)


def create_direct_stream(brokers):
    return KafkaUtils.createDirectStream(
      ssc,
      ['topic_name'], 
      {
        "auto.offset.reset": "largest", # maybe later we can consume from smallest
        "bootstrap.servers": brokers_list(brokers)
      },
      valueDecoder=lambda x: x # raw data
      # , fromOffsets=fromOffset
    )

dstream = create_direct_stream(brokers)

The option valueDecoder is for comsuming the kafka message as raw banary data (in fact we use a private codec).

Here we specify the option auto.offset.reset to largest for pulling from the largest offset. There are some other options in KafkaUtils that can help us to pull from the smallest offset, pull from the specified start offset as dstream or pull a specified range of offset as rdd (which is helpful for treating kafka message as offline data source), for more information:

Then with the dstream object we can easily take and print some message:

stream.pprint()

pprint is a simple api defined in dstream:

def pprint(self, num=10):
    """
    Print the first num elements of each RDD generated in this DStream.

    @param num: the number of elements from the first will be printed.
    """
    def takeAndPrint(time, rdd):
        taken = rdd.take(num + 1)
        print("-------------------------------------------")
        print("Time: %s" % time)
        print("-------------------------------------------")
        for record in taken[:num]:
            print(record)
        if len(taken) > num:
            print("...")
        print("")

    self.foreachRDD(takeAndPrint)

Saving the message to hive table

Normally hive table stores data in quite a large amount (in each partition). Since we set the micro-batch duration to 60s, we save the data first in lots of quite small intermediate files. Then we load it to a hive table and merge it to some large file in hivve table.

Saving the kafka message to hdfs orc file

For saving the records in kafla to hdfs files, we convert our kafka message to pyspark.sql.Row and use the api SparkSession.createDataFrame to create a dataframe. And then we write the dataframe to a path in hdfs.

StreamRecord = Row('kafka_time', 'job_time', 'value')

def save_log(time, rdd):
    if rdd.isEmpty():
        return
    def row_with_timeinfo(x):
        return StreamRecord(time, datetime.now(), x)

    date = str(time.date())
    hour = time.hour
    orc_path = f"{LOG_BASE_PATH}/d={date}/h={hour}"

    rdd = rdd.mapValues(row_with_timeinfo)
    # only need the value, we add time info to it
    rdd = rdd.values()
    sqlContext.createDataFrame(rdd) \
        .write \
        .option("orc.compress", "snappy") \
        .mode('append') \
        .orc(orc_path)

In this way lots small orc file will be created. Normally the hadoop distributed file system (hdfs) stores quite some large files but not tiny files. We need to merge all the filed generated in this step to some large files.

Loading the orc files to a hive partition table

for loading data (orc files) into a target hive partition table, just do:

load data inpath '' overwrite into table db.example_table partition (partition_col_name='partition_value');

It will move all the orc files (not changed) to the target table. More information ref to 1.

Then we can merge the orc file by running spark sql:

insert overwrite table db.kafka_test PARTITION (d='2021-03-11', h='23') 
select
    kafka_time,
    job_time,
    key,
    born_time,
    body
from db.kafka_test_tmp where d='2021-03-11' and h='23'

after this step the orc files will be merged.

Loading the orc files into a hive partitionable using pyspark job directly (Not working yet)

Rather then load data in a temporary path first and then select it to the target path, we tried using spark job to do it directly too. Tough it works on some small dataset, but it does not work on a large dataset (which cause the oom problem). This way theoretically should be possible, but I have not yet tunning the implementation.

#!/usr/bin/env bash
# encoding=utf-8

# Shell commands follow
# Next line is bilingual: it starts a comment in Python, and is a no-op in shell
""":"
# exec replace current process
# z__s run shell script with source command
# and spark-submit only treat py file as pyscript

cp ${BASH_SOURCE[0]} ${BASH_SOURCE[0]}.py
exec spark-submit \
    --num-executors 10 --executor-memory 18g \
    --executor-cores 8 --driver-memory 3g \
    --conf "spark.pyspark.python=/usr/bin/python3.6" \
    --conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
    ${BASH_SOURCE[0]}.py
":"""
# Previous line is bilingual: it ends a comment in Python, and is a no-op in shell
# Shell commands end here
import requests
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from functools import reduce

d = "2021-03-11"
h = "23"

sparkSession = SparkSession.builder \
      .master('yarn') \
      .appName("load_orc_to_partittion_table") \
      .config("hive.exec.dynamic.partition", "true") \
      .config("hive.exec.dynamic.partition.mode", "nonstrict") \
      .enableHiveSupport() \
      .getOrCreate()

hadoop = sparkSession._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(f"/user/me/kafka_records/d={d}/h={h}")
files = [str(f.getPath()) for f in fs.get(conf).listStatus(path)]
files = [i for i in files if "part-" in i]

ord_df_list = [sparkSession.read.orc(i) for i in files]
df = reduce(lambda x,y:x.union(y), ord_df_list)

temp_view_name = f"kafka_records_{d}_{h}".replace("-", "")
df.createTempView(temp_view_name)
sparkSession.sql(f"insert overwrite table db.kafka_test partition(d='{d}', h='{h}') select * from {temp_view_name}")

Offset management

In the previous section, each time we starting the pyspark streaming job, we consume the records from the largest offset. In fact we should do some offset management.

By using KafkaRDD#offsetRanges() (mentioned in kafka.html), we can save the offset to hdfs too. Here we use the gzip codec but not snappy for python-snappy seem using different algorithm with hadoop's snappy codec2.

StreamRecord = Row('kafka_time', 'job_time', 'value')

def save_log(time, rdd):
    if rdd.isEmpty():
        return
    def row_with_timeinfo(x):
        return StreamRecord(time, datetime.now(), x)

    date = str(time.date())
    hour = time.hour
    orc_path = f"{LOG_BASE_PATH}/d={date}/h={hour}"

    offsetRanges = sparkSession.sparkContext.parallelize(rdd.offsetRanges(), 1)
    offset_file = f"{orc_path}/offset_{str(rdd.id())}"

    rdd = rdd.mapValues(row_with_timeinfo)
    # only need the value, we add time info to it
    rdd = rdd.values()
    # snappy cannot compress a lot, we remove it
    # .option("orc.compress", "snappy") \
    sqlContext.createDataFrame(rdd) \
        .write \
        .option("orc.compress", "snappy") \
        .mode('append') \
        .orc(orc_path)

    offsetRanges.saveAsTextFile(offset_file, "org.apache.hadoop.io.compress.GzipCodec")

The offset data we got is something like (compressed in gzip):

OffsetRange(topic: topic_name, partition: 2, range: [4039119466 -> 4039121238]
OffsetRange(topic: topic_name, partition: 28, range: [14786654 -> 14788067]
OffsetRange(topic: topic_name, partition: 7, range: [4038951906 -> 4038953542]
OffsetRange(topic: topic_name, partition: 8, range: [4039455572 -> 4039457035]

What's next

Currently this is just someway a prototype. And anyway it still seems more complexible comparing to the camus solution, tough it do provide the feasibility that we can merge our online job and offline job in just one workflow. In the next step we may manage to pull from the offsetRange we stored, handle error cases and deploy the code.

References

some useful links when finding for the solution

Footnotes


  1. https://stackoverflow.com/questions/13148187/hive-loading-in-partitioned-table↩︎

  2. https://stackoverflow.com/questions/29871209/pyspark-how-to-load-compressed-snappy-file↩︎