save kafka data to hive with pyspark spark streaming: a real case

目录 TOC

前言

这是 pull kafka message and save to hive table in pyspark 的一个实操,所以基本代码都和那边一致。基本有三个部分:

保存 offset
pyspark 实现的spark streaming job 记录 kafka topic 的 offset至hdfs(每五分钟记录一次)
批量拉取消息
pyspark job 利用createRDD api 拉取 30 分钟前至 20 分钟前的消息,转为DataFrame并且存入 hive
删除过期 offset
一个定时删除hdfs中过期 offset 的一个脚本,以避免大量小文件消耗 hadoop namenode

保存kafka消息的offset至hdfs

保存offset至hdfs主要用的createDirectStream api:

def create_direct_stream(brokers):
    return KafkaUtils.createDirectStream(
        ssc,
        ['<topic_name>'],
        {
            "auto.offset.reset": "largest", # smallest 或者 largest
            "bootstrap.servers": brokers_list(brokers)
        },
        valueDecoder=lambda x: x # 原始二进制数据,因为我们的kafka message并非编码的字符串
        # valueDecoder=decode_kafka
        # , fromOffsets=fromOffset
    )

KafkaUtils#createDirectStream 会返回一个 resilient distributed dataset (RDD)。Rdd上会带有对应的kafka offset range:

kafka_message_stream = create_direct_stream(brokers)

def takeAndPrint(time, rdd):
    """ for test """
    taken = rdd.take(1)
    print("########################")
    print("Time: %s" % time)
    print(rdd.offsetRanges())
    print("########################")
    # for record in taken:
    #     print(str(record))

kafka_message_stream.foreachRDD(takeAndPrint)

如上 rdd.offsetRanges() 返回即kafka topic的每个分区对应的offset range,大致形式如下:

OffsetRange(topic: topic_name, partition: 2, range: [4039119466 -> 4039121238])
OffsetRange(topic: topic_name, partition: 28, range: [14786654 -> 14788067])
OffsetRange(topic: topic_name, partition: 7, range: [4038951906 -> 4038953542])
OffsetRange(topic: topic_name, partition: 8, range: [4039455572 -> 4039457035])

关于类型 OffsetRange 可以参考 api pyspark.streaming.kafka.OffsetRange

保存rdd至hdfs可以通过将上面的 offset range list转为rdd,然后调用spark的相关api完成:


sparkSession = SparkSession.builder \
        .master('yarn') \
        .appName("kafka_message_save_offset_as_checkpoint") \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport() \
        .getOrCreate()

def saveOffsets(time, rdd):
    if rdd.id() >= 2: return
    date = str(time.date())
    hour = time.hour

    offsetRanges = sparkSession.sparkContext.parallelize(
        [(time, o.topic, o.partition, o.fromOffset, o.untilOffset)
         for o in rdd.offsetRanges()], 1)
    offset_file = f"/user/foo/kafka_message/d={date}/h={hour}/offset_{time.timestamp()}"

    offsetRanges.saveAsTextFile(offset_file, "org.apache.hadoop.io.compress.GzipCodec")

因为我们只需要记录具体的offset,而不具体处理消息,所以这里只处理了id为1的offsetRanges。 利用 spark 的 parallelize api 将offset range list转为rdd,然后利用rdd的saveAsTextFile保存至hdfs。 可以参考api:

因为这里保存至hdfs的文件经过了gzip压缩,所以下面获取offset信息时需要进行解压。 更多相关内容可以参考:hadoop中的压缩算法

记录完相关offset后,便可以终止当前 spark streaming 任务,这个可以通过 StreamingContext 的 awaitTermination 方法实现:

ssc.start()
ssc.awaitTermination(timeout=60)

相关api可以参考: StreamingContext#awaitTermination

完整的job脚本如下(这里用了run python script as shell script more than sheban提到的一个trick,以更方便的在shell中直接执行python脚本):

#!/usr/bin/env bash
# encoding=utf-8

# Shell commands follow
# Next line is bilingual: it starts a comment in Python, and is a no-op in shell
""":"
# exec replace current process
# zeus run shell script with source command
# and spark-submit only treat py file as pyscript
download[hdfs:///zeus/hdfs-upload-dir/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar-20210310-170734.jar spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar]

cp ${BASH_SOURCE[0]} ${BASH_SOURCE[0]}.py
exec spark-submit \
    --jars spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \
    --num-executors 2 --executor-memory 2g \
    --executor-cores 2 --driver-memory 1g \
    --conf "spark.pyspark.python=/usr/bin/python3.6" \
    --conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
    ${BASH_SOURCE[0]}.py
":"""
# Previous line is bilingual: it ends a comment in Python, and is a no-op in shell
# Shell commands end here

# Python script follows
#!/usr/bin/env python
# encoding: utf-8

import json
import uuid
import requests
from datetime import datetime
import base64 as b64
import gzip, zlib
from collections import namedtuple
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

def brokers_list(brokers):
    return ",".join(b for b in brokers.split("\n") if b and "#" not in b)


sparkSession = SparkSession.builder \
    .master('yarn') \
    .appName("kafka_message_save_offset_as_checkpoint") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

sc = sparkSession.sparkContext

sqlContext = HiveContext(sc)

ssc = StreamingContext(sc, 1)
brokers = """
10.0.0.1:9092
10.0.0.2:9092
10.0.0.3:9092
10.0.0.4:9092
"""

def create_direct_stream(brokers):
    return KafkaUtils.createDirectStream(
        ssc,
        ['<topic_name>'],
        {
            "auto.offset.reset": "largest", # maybe later we can consume from smallest
            "bootstrap.servers": brokers_list(brokers)
        },
        valueDecoder=lambda x: x # raw data
        # valueDecoder=decode_kafka
        # , fromOffsets=fromOffset
    )

kafka_message_stream = create_direct_stream(brokers)


def saveOffsets(time, rdd):
    if rdd.id() >= 2: return
    date = str(time.date())
    hour = time.hour

    offsetRanges = sparkSession.sparkContext.parallelize(
        [(time, o.topic, o.partition, o.fromOffset, o.untilOffset)
         for o in rdd.offsetRanges()], 1)
    offset_file = f"/user/foo/kafka_message/d={date}/h={hour}/offset_{time.timestamp()}"

    offsetRanges.saveAsTextFile(offset_file, "org.apache.hadoop.io.compress.GzipCodec")


def takeAndPrint(time, rdd):
    """ for test """
    taken = rdd.take(1)
    print("########################")
    print("Time: %s" % time)
    print(rdd.offsetRanges())
    print("########################")
    # for record in taken:
    #     print(str(record))

kafka_message_stream.foreachRDD(saveOffsets)

ssc.start()
ssc.awaitTermination(timeout=60)

pyspark批量拉取kafka消息至hive

如前文所提,kafka topic的offset已经被每五分钟保存至hdfs一次,这里想要用 KafkaUtils#createRDD api 批量拉取offset ranges范围的消息至hive时,可以先实现一个辅助函数拿指定范围对应的offset range:

def get_current_offsets_range(start_time, end_time):
    """
    get the current offset ranges that we should pull from kafka
    """
    subprocess.call("hadoop fs -copyToLocal hdfs://ns//user/foo/kafka_message kafka_message".split(" "))
    offsets = dict()
    for root, dirs, files in os.walk("kafka_message"):
        for f in files:
            if not f.endswith(".gz"):
                continue
            with gzip.open(os.path.join(root, f)) as f:
                offsets_text = f.read().decode("utf-8")
                for i in offsets_text.split("\n"):
                    if not i:
                        continue
                    offset_time, topic, partition, from_offset, to_offset = eval(i)
                    if partition not in offsets:
                        if offset_time < start_time:
                            offsets[partition] = (from_offset, None)
                        if offset_time > end_time:
                            offsets[partition] = (None, to_offset)
                    else:
                        old_from_offset, old_to_offset = offsets[partition]
                        if offset_time < start_time:
                            if not old_from_offset or from_offset > old_from_offset:
                                old_from_offset = from_offset
                        if offset_time > end_time:
                            if not old_to_offset or to_offset < old_to_offset:
                                old_to_offset = to_offset
                        offsets[partition] = (old_from_offset, old_to_offset)

这里的辅助函数将 hdfs 存储offset ranges的目录下载到本地,并遍历所有文件以选出包含指定目标范围的最小offset range

利用这个辅助函数和 KafkaUtils#createRDD api,就可以拉对应范围的消息:

job_time = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S.%f")
print(job_time)
start_time = job_time - datetime.timedelta(minutes=20)
end_time = start_time + datetime.timedelta(minutes=10)
delta = datetime.timedelta(minutes=3)
offsets = get_current_offsets_range(start_time-delta, end_time+delta)
print(offsets)

pre_hourly_rdd = KafkaUtils.createRDD(
    sc, {"metadata.broker.list": brokers_list(brokers)},
    [OffsetRange("<topic_name>", p, f, t) for p, (f, t) in offsets.items()]
    ,valueDecoder=lambda x: x
)

这里每20分钟,拉取30分钟前到20分钟前的消息,为了减少 event time and processing time 的影响,对开始时间和结束时间加了3分钟的容差。

拉到的rdd进行解码,并用event time进行过滤然后存表即可:

sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame(pre_hourly_rdd.map(lambda x:decode_kafka(x[1])).filter(lambda x: start_time <= x.born_time < end_time))

temp_view_name = f"temp_table_for_kafka_message_{int(start_time.timestamp())}"
df.createTempView(temp_view_name)
sparkSession.sql(f"""
insert overwrite table mydb.my_kafka_message partition(d='{start_time.date()}', h='{start_time.hour}', idx='{int(start_time.minute)}') 
select * from {temp_view_name}
""")

这里 decode_kafka 是具体任务相关的一个解码kafka message的函数,可以根据实际情况更换为具体的其他函数。 随后即用event time进行过滤。

完整脚本如下:

#!/usr/bin/env bash
# encoding=utf-8

# Shell commands follow
# Next line is bilingual: it starts a comment in Python, and is a no-op in shell
""":"
# exec replace current process
# zeus run shell script with source command
# and spark-submit only treat py file as pyscript
download[hdfs:///zeus/hdfs-upload-dir/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar-20210310-170734.jar spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar]

cp ${BASH_SOURCE[0]} ${BASH_SOURCE[0]}.py
exec spark-submit \
    --jars spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \
    --num-executors 20 --executor-memory 9g \
    --executor-cores 8 --driver-memory 3g \
    --conf "spark.pyspark.python=/usr/bin/python3.6" \
    --conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
    ${BASH_SOURCE[0]}.py "${zdt.format("yyyy-MM-dd HH:mm:ss.SSS")}"
":"""
# Previous line is bilingual: it ends a comment in Python, and is a no-op in shell
# Shell commands end here

# Python script follows
#!/usr/bin/env python
# encoding: utf-8

import re
import os
import json
import uuid
import requests
import datetime
import base64 as b64
import gzip, zlib
from collections import namedtuple
import subprocess
import sys
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, OffsetRange
from pyspark.streaming import StreamingContext


def get_current_offsets_range(start_time, end_time):
    """
    get the current offset ranges that we should pull from kafka
    """
    subprocess.call("hadoop fs -copyToLocal hdfs://ns//user/foo/kafka_message kafka_message".split(" "))
    # offset_pattern = re.compile(r".+partition: (\d+), range: \[(\d+) -> (\d+)\]")
    offsets = dict()
    for root, dirs, files in os.walk("kafka_message"):
        for f in files:
            if not f.endswith(".gz"):
                continue
            with gzip.open(os.path.join(root, f)) as f:
                offsets_text = f.read().decode("utf-8")
                for i in offsets_text.split("\n"):
                    if not i:
                        continue
                    offset_time, topic, partition, from_offset, to_offset = eval(i)
                    if partition not in offsets:
                        if offset_time < start_time:
                            offsets[partition] = (from_offset, None)
                        if offset_time > end_time:
                            offsets[partition] = (None, to_offset)
                    else:
                        old_from_offset, old_to_offset = offsets[partition]
                        if offset_time < start_time:
                            if not old_from_offset or from_offset > old_from_offset:
                                old_from_offset = from_offset
                        if offset_time > end_time:
                            if not old_to_offset or to_offset < old_to_offset:
                                old_to_offset = to_offset
                        offsets[partition] = (old_from_offset, old_to_offset)
    return offsets

def brokers_list(brokers):
    return ",".join(b for b in brokers.split("\n") if b and "#" not in b)


sparkSession = SparkSession.builder \
        .master('yarn') \
        .appName("save_kafka_message_to_hive") \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport() \
        .getOrCreate()

sc = sparkSession.sparkContext

brokers = """
10.0.0.1:9092
10.0.0.2:9092
10.0.0.3:9092
10.0.0.4:9092
"""

job_time = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S.%f")
print(job_time)
start_time = job_time - datetime.timedelta(minutes=20)
end_time = start_time + datetime.timedelta(minutes=10)
delta = datetime.timedelta(minutes=3)
offsets = get_current_offsets_range(start_time-delta, end_time+delta)
print(offsets)

pre_hourly_rdd = KafkaUtils.createRDD(
    sc, {"metadata.broker.list": brokers_list(brokers)},
    [OffsetRange("<topic_name>", p, f, t) for p, (f, t) in offsets.items()]
    ,valueDecoder=lambda x: x
)
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame(pre_hourly_rdd.map(lambda x:decode_kafka(x[1])).filter(lambda x: start_time <= x.born_time < end_time))

temp_view_name = f"temp_table_for_kafka_message_{int(start_time.timestamp())}"
df.createTempView(temp_view_name)
sparkSession.sql(f"""
insert overwrite table mydb.my_kafka_message partition(d='{start_time.date()}', h='{start_time.hour}', idx='{int(start_time.minute)}') 
select * from {temp_view_name}
""")

删除过期offset

使用一段简单的shell脚本删除即可(目前每六小时删除六小时前的所有offset)

# delete empty directory
for d in $(hadoop fs -du hdfs://ns//user/foo/kafka_message\
       |perl -ne 'print if /^0\s/'\
       |awk '{print $3}'|perl -pe 's/^hdfs:\/\/ns\/user\/foo\/kafka_message\/d=//')
do
echo "hdfs://ns//user/foo/kafka_message/d=$d"
# so why we replace the prefix to empty and then add it back when remove?
# just for safe
hadoop fs -rm -r "hdfs://ns//user/foo/kafka_message/d=$d"
done


# remove outdated checkpoint
for p in $(for d in $(hadoop fs -ls hdfs://ns//user/foo/kafka_message|awk '{print $8}')
do
    dir_file=$(hadoop fs -ls $d | awk '{print $8}')
    echo $dir_file
done | python -c '
import re
import sys
from datetime import datetime, timedelta
for file_full_name in sys.stdin.read().split():
   file_full_name = file_full_name.strip()
   file_path_prefix = "hdfs://ns/user/foo/kafka_message/"
   checkpoint_file_pattern = re.compile(file_path_prefix+"d=(.+)/h=(.+)")
   m = checkpoint_file_pattern.match(file_full_name)
   now = datetime.now()
   if m:
      d, h = m.groups()
      checkpoint_time = datetime.strptime("{} {}".format(d, h), "%Y-%m-%d %H")
      if checkpoint_time < now - timedelta(hours = 6):
         # TODO maybe a little dangerous
         assert file_path_prefix and " " not in file_path_prefix, file_path_prefix
         assert d and not " " in d, d
         assert h and not " " in h, h
         print("d={}/h={}".format(d, h))
')
do
echo "$p"
# the file path prefix is duplicated for even in the very wrong case
# though I think it will not happen
# we will not delete the wrong path

hadoop fs -rm -r "hdfs://ns/user/foo/kafka_message/$p"
done

总结

目前这个方法其实主要是为了替换 camus(https://github.com/LinkedInAttic/camus, https://github.com/apache/gobblin)。 因为前者是一种以声明式的方法来做的数据同步,而我对其api及实现尚不了解。

不过目前这种实现方式并不一定优于 camus,可能只是在可掌控性和可扩展性上更好一些,能够用于其他的一些任务。

目前关于可用性:

目前可能存在的一些问题:

利用clickhouse记录的数据校验实操中的记录的hive表数据:

In [33]: data["diff"].describe()
Out[33]:
count    119.000000
mean      -0.005155
std        0.065469
min       -0.611382
25%       -0.000151
50%        0.003264
75%        0.009399
max        0.030129
Name: diff, dtype: float6

大概每小时分区的误差在0.5%至1%之间,这个误差的原因可能有:

实际上丢失消息的比例应该小于这个误差值。