save kafka data to hive with pyspark spark streaming: a real case
目录 TOC
前言
这是 pull kafka message and save to hive table in pyspark 的一个实操,所以基本代码都和那边一致。基本有三个部分:
- 保存 offset
- pyspark 实现的spark streaming job 记录 kafka topic 的 offset至hdfs(每五分钟记录一次)
- 批量拉取消息
- pyspark job 利用createRDD api 拉取 30 分钟前至 20 分钟前的消息,转为DataFrame并且存入 hive
- 删除过期 offset
- 一个定时删除hdfs中过期 offset 的一个脚本,以避免大量小文件消耗 hadoop namenode
保存kafka消息的offset至hdfs
保存offset至hdfs主要用的createDirectStream api:
def create_direct_stream(brokers):
return KafkaUtils.createDirectStream(
ssc,
['<topic_name>'],
{
"auto.offset.reset": "largest", # smallest 或者 largest
"bootstrap.servers": brokers_list(brokers)
},
valueDecoder=lambda x: x # 原始二进制数据,因为我们的kafka message并非编码的字符串
# valueDecoder=decode_kafka
# , fromOffsets=fromOffset
)
KafkaUtils#createDirectStream 会返回一个 resilient distributed dataset (RDD)。Rdd上会带有对应的kafka offset range:
kafka_message_stream = create_direct_stream(brokers)
def takeAndPrint(time, rdd):
""" for test """
taken = rdd.take(1)
print("########################")
print("Time: %s" % time)
print(rdd.offsetRanges())
print("########################")
# for record in taken:
# print(str(record))
kafka_message_stream.foreachRDD(takeAndPrint)
如上 rdd.offsetRanges()
返回即kafka topic的每个分区对应的offset range,大致形式如下:
OffsetRange(topic: topic_name, partition: 2, range: [4039119466 -> 4039121238])
OffsetRange(topic: topic_name, partition: 28, range: [14786654 -> 14788067])
OffsetRange(topic: topic_name, partition: 7, range: [4038951906 -> 4038953542])
OffsetRange(topic: topic_name, partition: 8, range: [4039455572 -> 4039457035])
关于类型 OffsetRange 可以参考 api pyspark.streaming.kafka.OffsetRange
保存rdd至hdfs可以通过将上面的 offset range list转为rdd,然后调用spark的相关api完成:
sparkSession = SparkSession.builder \
.master('yarn') \
.appName("kafka_message_save_offset_as_checkpoint") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
def saveOffsets(time, rdd):
if rdd.id() >= 2: return
date = str(time.date())
hour = time.hour
offsetRanges = sparkSession.sparkContext.parallelize(
[(time, o.topic, o.partition, o.fromOffset, o.untilOffset)
for o in rdd.offsetRanges()], 1)
offset_file = f"/user/foo/kafka_message/d={date}/h={hour}/offset_{time.timestamp()}"
offsetRanges.saveAsTextFile(offset_file, "org.apache.hadoop.io.compress.GzipCodec")
因为我们只需要记录具体的offset,而不具体处理消息,所以这里只处理了id为1的offsetRanges。 利用 spark 的 parallelize api 将offset range list转为rdd,然后利用rdd的saveAsTextFile保存至hdfs。 可以参考api:
- https://spark.apache.org/docs/latest/rdd-programming-guide.html
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile.html
因为这里保存至hdfs的文件经过了gzip压缩,所以下面获取offset信息时需要进行解压。 更多相关内容可以参考:hadoop中的压缩算法
记录完相关offset后,便可以终止当前 spark streaming 任务,这个可以通过 StreamingContext 的 awaitTermination 方法实现:
ssc.start()
ssc.awaitTermination(timeout=60)
相关api可以参考: StreamingContext#awaitTermination
完整的job脚本如下(这里用了run python script as shell script more than sheban提到的一个trick,以更方便的在shell中直接执行python脚本):
#!/usr/bin/env bash
# encoding=utf-8
# Shell commands follow
# Next line is bilingual: it starts a comment in Python, and is a no-op in shell
""":"
# exec replace current process
# zeus run shell script with source command
# and spark-submit only treat py file as pyscript
download[hdfs:///zeus/hdfs-upload-dir/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar-20210310-170734.jar spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar]
cp ${BASH_SOURCE[0]} ${BASH_SOURCE[0]}.py
exec spark-submit \
--jars spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \
--num-executors 2 --executor-memory 2g \
--executor-cores 2 --driver-memory 1g \
--conf "spark.pyspark.python=/usr/bin/python3.6" \
--conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
${BASH_SOURCE[0]}.py
":"""
# Previous line is bilingual: it ends a comment in Python, and is a no-op in shell
# Shell commands end here
# Python script follows
#!/usr/bin/env python
# encoding: utf-8
import json
import uuid
import requests
from datetime import datetime
import base64 as b64
import gzip, zlib
from collections import namedtuple
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
def brokers_list(brokers):
return ",".join(b for b in brokers.split("\n") if b and "#" not in b)
sparkSession = SparkSession.builder \
.master('yarn') \
.appName("kafka_message_save_offset_as_checkpoint") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
sc = sparkSession.sparkContext
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, 1)
brokers = """
10.0.0.1:9092
10.0.0.2:9092
10.0.0.3:9092
10.0.0.4:9092
"""
def create_direct_stream(brokers):
return KafkaUtils.createDirectStream(
ssc,
['<topic_name>'],
{
"auto.offset.reset": "largest", # maybe later we can consume from smallest
"bootstrap.servers": brokers_list(brokers)
},
valueDecoder=lambda x: x # raw data
# valueDecoder=decode_kafka
# , fromOffsets=fromOffset
)
kafka_message_stream = create_direct_stream(brokers)
def saveOffsets(time, rdd):
if rdd.id() >= 2: return
date = str(time.date())
hour = time.hour
offsetRanges = sparkSession.sparkContext.parallelize(
[(time, o.topic, o.partition, o.fromOffset, o.untilOffset)
for o in rdd.offsetRanges()], 1)
offset_file = f"/user/foo/kafka_message/d={date}/h={hour}/offset_{time.timestamp()}"
offsetRanges.saveAsTextFile(offset_file, "org.apache.hadoop.io.compress.GzipCodec")
def takeAndPrint(time, rdd):
""" for test """
taken = rdd.take(1)
print("########################")
print("Time: %s" % time)
print(rdd.offsetRanges())
print("########################")
# for record in taken:
# print(str(record))
kafka_message_stream.foreachRDD(saveOffsets)
ssc.start()
ssc.awaitTermination(timeout=60)
pyspark批量拉取kafka消息至hive
如前文所提,kafka topic的offset已经被每五分钟保存至hdfs一次,这里想要用 KafkaUtils#createRDD api 批量拉取offset ranges范围的消息至hive时,可以先实现一个辅助函数拿指定范围对应的offset range:
def get_current_offsets_range(start_time, end_time):
"""
get the current offset ranges that we should pull from kafka
"""
subprocess.call("hadoop fs -copyToLocal hdfs://ns//user/foo/kafka_message kafka_message".split(" "))
offsets = dict()
for root, dirs, files in os.walk("kafka_message"):
for f in files:
if not f.endswith(".gz"):
continue
with gzip.open(os.path.join(root, f)) as f:
offsets_text = f.read().decode("utf-8")
for i in offsets_text.split("\n"):
if not i:
continue
offset_time, topic, partition, from_offset, to_offset = eval(i)
if partition not in offsets:
if offset_time < start_time:
offsets[partition] = (from_offset, None)
if offset_time > end_time:
offsets[partition] = (None, to_offset)
else:
old_from_offset, old_to_offset = offsets[partition]
if offset_time < start_time:
if not old_from_offset or from_offset > old_from_offset:
old_from_offset = from_offset
if offset_time > end_time:
if not old_to_offset or to_offset < old_to_offset:
old_to_offset = to_offset
offsets[partition] = (old_from_offset, old_to_offset)
这里的辅助函数将 hdfs 存储offset ranges的目录下载到本地,并遍历所有文件以选出包含指定目标范围的最小offset range
利用这个辅助函数和 KafkaUtils#createRDD api,就可以拉对应范围的消息:
job_time = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S.%f")
print(job_time)
start_time = job_time - datetime.timedelta(minutes=20)
end_time = start_time + datetime.timedelta(minutes=10)
delta = datetime.timedelta(minutes=3)
offsets = get_current_offsets_range(start_time-delta, end_time+delta)
print(offsets)
pre_hourly_rdd = KafkaUtils.createRDD(
sc, {"metadata.broker.list": brokers_list(brokers)},
[OffsetRange("<topic_name>", p, f, t) for p, (f, t) in offsets.items()]
,valueDecoder=lambda x: x
)
这里每20分钟,拉取30分钟前到20分钟前的消息,为了减少 event time and processing time 的影响,对开始时间和结束时间加了3分钟的容差。
拉到的rdd进行解码,并用event time进行过滤然后存表即可:
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame(pre_hourly_rdd.map(lambda x:decode_kafka(x[1])).filter(lambda x: start_time <= x.born_time < end_time))
temp_view_name = f"temp_table_for_kafka_message_{int(start_time.timestamp())}"
df.createTempView(temp_view_name)
sparkSession.sql(f"""
insert overwrite table mydb.my_kafka_message partition(d='{start_time.date()}', h='{start_time.hour}', idx='{int(start_time.minute)}')
select * from {temp_view_name}
""")
这里 decode_kafka
是具体任务相关的一个解码kafka message的函数,可以根据实际情况更换为具体的其他函数。 随后即用event time进行过滤。
完整脚本如下:
#!/usr/bin/env bash
# encoding=utf-8
# Shell commands follow
# Next line is bilingual: it starts a comment in Python, and is a no-op in shell
""":"
# exec replace current process
# zeus run shell script with source command
# and spark-submit only treat py file as pyscript
download[hdfs:///zeus/hdfs-upload-dir/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar-20210310-170734.jar spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar]
cp ${BASH_SOURCE[0]} ${BASH_SOURCE[0]}.py
exec spark-submit \
--jars spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \
--num-executors 20 --executor-memory 9g \
--executor-cores 8 --driver-memory 3g \
--conf "spark.pyspark.python=/usr/bin/python3.6" \
--conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
${BASH_SOURCE[0]}.py "${zdt.format("yyyy-MM-dd HH:mm:ss.SSS")}"
":"""
# Previous line is bilingual: it ends a comment in Python, and is a no-op in shell
# Shell commands end here
# Python script follows
#!/usr/bin/env python
# encoding: utf-8
import re
import os
import json
import uuid
import requests
import datetime
import base64 as b64
import gzip, zlib
from collections import namedtuple
import subprocess
import sys
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, OffsetRange
from pyspark.streaming import StreamingContext
def get_current_offsets_range(start_time, end_time):
"""
get the current offset ranges that we should pull from kafka
"""
subprocess.call("hadoop fs -copyToLocal hdfs://ns//user/foo/kafka_message kafka_message".split(" "))
# offset_pattern = re.compile(r".+partition: (\d+), range: \[(\d+) -> (\d+)\]")
offsets = dict()
for root, dirs, files in os.walk("kafka_message"):
for f in files:
if not f.endswith(".gz"):
continue
with gzip.open(os.path.join(root, f)) as f:
offsets_text = f.read().decode("utf-8")
for i in offsets_text.split("\n"):
if not i:
continue
offset_time, topic, partition, from_offset, to_offset = eval(i)
if partition not in offsets:
if offset_time < start_time:
offsets[partition] = (from_offset, None)
if offset_time > end_time:
offsets[partition] = (None, to_offset)
else:
old_from_offset, old_to_offset = offsets[partition]
if offset_time < start_time:
if not old_from_offset or from_offset > old_from_offset:
old_from_offset = from_offset
if offset_time > end_time:
if not old_to_offset or to_offset < old_to_offset:
old_to_offset = to_offset
offsets[partition] = (old_from_offset, old_to_offset)
return offsets
def brokers_list(brokers):
return ",".join(b for b in brokers.split("\n") if b and "#" not in b)
sparkSession = SparkSession.builder \
.master('yarn') \
.appName("save_kafka_message_to_hive") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
sc = sparkSession.sparkContext
brokers = """
10.0.0.1:9092
10.0.0.2:9092
10.0.0.3:9092
10.0.0.4:9092
"""
job_time = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S.%f")
print(job_time)
start_time = job_time - datetime.timedelta(minutes=20)
end_time = start_time + datetime.timedelta(minutes=10)
delta = datetime.timedelta(minutes=3)
offsets = get_current_offsets_range(start_time-delta, end_time+delta)
print(offsets)
pre_hourly_rdd = KafkaUtils.createRDD(
sc, {"metadata.broker.list": brokers_list(brokers)},
[OffsetRange("<topic_name>", p, f, t) for p, (f, t) in offsets.items()]
,valueDecoder=lambda x: x
)
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame(pre_hourly_rdd.map(lambda x:decode_kafka(x[1])).filter(lambda x: start_time <= x.born_time < end_time))
temp_view_name = f"temp_table_for_kafka_message_{int(start_time.timestamp())}"
df.createTempView(temp_view_name)
sparkSession.sql(f"""
insert overwrite table mydb.my_kafka_message partition(d='{start_time.date()}', h='{start_time.hour}', idx='{int(start_time.minute)}')
select * from {temp_view_name}
""")
删除过期offset
使用一段简单的shell脚本删除即可(目前每六小时删除六小时前的所有offset)
# delete empty directory
for d in $(hadoop fs -du hdfs://ns//user/foo/kafka_message\
|perl -ne 'print if /^0\s/'\
|awk '{print $3}'|perl -pe 's/^hdfs:\/\/ns\/user\/foo\/kafka_message\/d=//')
do
echo "hdfs://ns//user/foo/kafka_message/d=$d"
# so why we replace the prefix to empty and then add it back when remove?
# just for safe
hadoop fs -rm -r "hdfs://ns//user/foo/kafka_message/d=$d"
done
# remove outdated checkpoint
for p in $(for d in $(hadoop fs -ls hdfs://ns//user/foo/kafka_message|awk '{print $8}')
do
dir_file=$(hadoop fs -ls $d | awk '{print $8}')
echo $dir_file
done | python -c '
import re
import sys
from datetime import datetime, timedelta
for file_full_name in sys.stdin.read().split():
file_full_name = file_full_name.strip()
file_path_prefix = "hdfs://ns/user/foo/kafka_message/"
checkpoint_file_pattern = re.compile(file_path_prefix+"d=(.+)/h=(.+)")
m = checkpoint_file_pattern.match(file_full_name)
now = datetime.now()
if m:
d, h = m.groups()
checkpoint_time = datetime.strptime("{} {}".format(d, h), "%Y-%m-%d %H")
if checkpoint_time < now - timedelta(hours = 6):
# TODO maybe a little dangerous
assert file_path_prefix and " " not in file_path_prefix, file_path_prefix
assert d and not " " in d, d
assert h and not " " in h, h
print("d={}/h={}".format(d, h))
')
do
echo "$p"
# the file path prefix is duplicated for even in the very wrong case
# though I think it will not happen
# we will not delete the wrong path
hadoop fs -rm -r "hdfs://ns/user/foo/kafka_message/$p"
done
总结
目前这个方法其实主要是为了替换 camus(https://github.com/LinkedInAttic/camus, https://github.com/apache/gobblin)。 因为前者是一种以声明式的方法来做的数据同步,而我对其api及实现尚不了解。
不过目前这种实现方式并不一定优于 camus,可能只是在可掌控性和可扩展性上更好一些,能够用于其他的一些任务。
目前关于可用性:
- 允许一定程度的job调度失败
- 因为各个时间段的消费是独立的job,所以一定程度上可以应对对于消息量的较大幅度增长
目前可能存在的一些问题:
- 因为event time和processing time的处理,这个实现可能有一定的损耗 如前文所提,实际上拉取的kafka message范围更大一些,然后event time外的消息均被抛弃
- 另一种实现是直接在 createDirectStream 保存消息 如 pull kafka message and save to hive table in pyspark 里所提到的。 这种实现可能性能更好一点,实时任务和离线任务也可以整合到一起。 不过其实现应该更困难一些,可用性可能也会更差一些。 另外这种方式可能需要一个 kv 存储(如redis或者hbase)作为中转会更好一点。 如果用hdfs可能会有大量的小文件消耗namenode和datanode
- 目前存储kafka topic的offset用的还是pyspark的kafka api,其实也可以直接用kafka consumer api 具体可以参考
利用clickhouse记录的数据校验实操中的记录的hive表数据:
In [33]: data["diff"].describe()
Out[33]:
count 119.000000
mean -0.005155
std 0.065469
min -0.611382
25% -0.000151
50% 0.003264
75% 0.009399
max 0.030129
Name: diff, dtype: float6
大概每小时分区的误差在0.5%至1%之间,这个误差的原因可能有:
- 每小时分区的时间范围没有对齐
- 消费时丢失消息
实际上丢失消息的比例应该小于这个误差值。