离线模块总结

离线抽取

​ 一共11道题目,难点就一题,其他的几个都是可以用同一个方法解决的,先封装一个方法把mysql表抽取并设置临时表,然后写一个mysql统一到hive的方法,参数为sparkSession,ods表名还有增量字段名即可,基本都是根据modified_time抽取增量数据,要注意的点是有些表是没有数据的,所以对于没数据的表就直接全量抽取即可。如果直接使用封装的增量抽取方法会导致没有数据进去,因为max的时候取值是NULL,而NULL是比普通的字符型要大的,会导致where的时候数据无法进入。然后就是一个coupon_use表,这个比较难,ods表还是有数据的,要根据三个时间字段取最大值进行增量抽取,这题简单说一些解题思路吧,首先你会发现ods表内三个时间字段get_time、used_time、pay_time 中的pay_time和used_time会有null值,之前说过NULL值是最大的,所以按照平常的greatest(max()),取出来是NULL,所以select取值的时候一定要where设置时间字段不为null where pay_time <> ‘NULL’ and used_time <> ‘NULL’,同理在写入的时候,where的时候要加入不为NULL的条件。其他的就是正常的增量抽取,写入模式用append,如果报错就saveAsTable和insertInto都试试,离线抽取练多了就是送分的,非常稳定。抽完一定要对照mysql和hive的数据量,如果写错了,就删除对应分区。alter table tbName drop partition(分区=分区值);

离线清洗

​ 没记错的话清洗这边我花费了大概3个半小时才做完,难点和原因后面说,前三题可以公用一个方法解决,比较简单的和七月国赛以及四合服务器一样的要求,合并清洗,按字段分区,取modifid_time最新的数据,然后都设置ods的分区,一个开窗函数然后withColum还有一个UnionByName就能解决。然后是6 7 8 10 11是直接全量抽取添加四个字段即可。12题是三个表关联,直接三表join,因为答案只要条数,所以只需要直接放一个字段saveAsTable保存到一个表看条数截图即可。4 5 9这三题要从HBase和hive合并处理再append表中,HBase后面细说。

指标计算

​ 四题都挺简单的,因为前面浪费了很多时间其实这四题加起来花了1个小时不到就做完了。

​ 第一题是求每个省的支付转化率,只要group by province按省分组然后分别取状态为已下单和已付款的,求count(*),设置两个临时表t1 t2 然后join,用已付款/已下单,然后round保留3位小数,最后row_number()开窗排序就完成了。

​ 第二题是求商品的销售额和销量统计,这个简单,重点在于题目是说:已退款的订单号不计入统计。所以要fact_order_master表和fact_order_detail关联完成。fact_order_master表是有订单状态的,所以要根据master表找到订单号中没有已退款的状态的。如果找到,直接写sql语句了,如下:

1
2
3
4
5
6
7
8
9
with t1 as (
select order_sn
from dwd.fact_order_master
where order_status = '已退款')
select order_sn
from dwd.fact_order_master
where order_sn not in
(select order_sn from t1)
group by order_sn

建议先去弄懂master表的数据形式,他是有很多重复order_sn但是只有状态是不一样的,也就是说订单的每个状态,他都是一条数据。上面查询到的就是非已退款的订单号,然后直接和fact_order_detail关联按商品id分组求sum(coupon_cnt)和sum(coupon_cnt*product_price)最后排个序就行了。

​ 第三题是求连续三周,登录的用户数量。我的方法比较简单,直接看日历,题目要求是2022 8 10号,去看日历,最近三周是7.25-8.14然后求每周登录过的用户id,记得去重,设置临时表。join求公共id,然后count求有多少个用户就解决了。

​ 第四题题目好像是根据customer_level_inf表的等级,好像是5个等级,求各个等级的用户所消费的订单总额。解题挺简单的,先看表求订单总额和等级肯定要先看customer_level_inf和fact_order_master,然后你发现他们没有公共字段,所以要找能关联这两个表的dwd层的表,是dim_customer_info,然后就是基础的三表连接,(记得用left join或者right join,因为题目要求如果用户的level_id在customer_level_inf中没有名字的话,就以无会员等级填充)。直接按照level_name分组 sum(order_money)就好了,还要注意点是sum(order_money)因为太大了,记得用cast转换成Double。

踩坑点

1、每次打jar包进去提交的时候,会经常报错,问题就是类型不匹配,他表设计的很多字段估计设置成不一样的类型,打个比方有个字段是叫BI_id,在ods中他是int,题目要你到dwd,然后在dwd中字段类型是string,这种导致每次报错了都要去修改代码加个类型转换,又因为自己本来表抽取很多是统一代码的,这样有类型问题的表,一次提交只能解决一个表,scp又很慢就很浪费时间,估计自己比赛打jar,然后spark-submit至少花费了一个小时。

2、清洗的时候要求你时间都要写成yyyy-MM-dd HH:mm:ss 有的字段时间是yyyyMMddHHmmsss的看起来就像数字,这个转换的时候有点麻烦,比如是create_time就要写成from_unixtime(unix_timestamp($“create_time”, “yyyyMMddHHmmss”), “yyyy-MM-dd HH:mm:ss”).cast(“timestamp”),这个导入的时候记得修改withColumn,然后你会神奇的发现会报错提醒的from_unixtime好像是方法不支持啥的,然后就要分析报错信息了,说要你set Spark.sql什么什么 To Le啥的记不清了,然后你在代码中把配置修改一下,比如sql(“set spark.sql.writeLegacyFormat=true”)(当然报错信息不是这个,只是打个比方)

3.HBase后面单独讲

难点HBase

​ 首先要先理解以下代码将hbase转换成dataFrame,但是你会发现都是string类型的。你会发现都是string类型的转换成df还如此麻烦,因为平常练习导入假数据都是string类型的,在你比赛取hbase的时候会发现表是给你建好的。平常我们查看表代码都是scan ‘fact_order_detail’,{FORMATTER => ‘toString’},比赛的时候我这样看表发现很多条会乱码,刚开始一直找不到原因,问技术,说是我自己查看方式不对,然后就思考了一下,FORMATTER应该是设置每个字段都是String类型查看,导致可能hbase建表的时候他不是String类型建立的,tostring的时候就会乱码,发现了但是对我查看还是没有什么帮助,因为hbase不了解,也不知道有什么其他能不乱码设置每个字段的代码。但是取的时候我是可以改一下的,比如order_detail_id他乱码就设置成int,只要将row.toString 改成row.toInt,转换成df 的时候就不乱码了,然后你跟着任务书上面提供的hbase表结构全部改好以后,会发现还是有一些转换成df的时候其实是不对的(但是没乱码,不细心肯定发现不了),master表中的order_id就是如此,思来想去,想起来order_id好像有的是9位的,众所周知scala和java 的关系,之前算法比赛的时候发现int是只能展8位字符的,所以我就把order_id转换成long,发现就行了,这肯定是比赛方故意设计的坑,还好发现了。然后后面HBase注意的点我写代码下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
case class OrderDetail(order_detail_id: String, order_sn: String, product_id: String, product_name: String, product_cnt: String, product_price: String, average_cost: String, weight: String, fee_money: String, w_id: String, create_time: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String, etl_date: String)
// hbase的配置对象,指定zk的ip端口号和表名
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "xueai:2181")
conf.set(TableInputFormat.INPUT_TABLE, "fact_order_detail")
// 用sc对象获取rdd
val frame = spark.sparkContext
.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
// 映射成样例类对象
.map {
case (_, row) =>
// 取值
val order_detail_id = Bytes.toString(row.getValue("info".getBytes, "order_detail_id".getBytes))
val order_sn = Bytes.toString(row.getValue("info".getBytes, "order_sn".getBytes))
val product_id = Bytes.toString(row.getValue("info".getBytes, "product_id".getBytes))
val product_name = Bytes.toString(row.getValue("info".getBytes, "product_name".getBytes))
val product_cnt = Bytes.toString(row.getValue("info".getBytes, "product_cnt".getBytes))
val product_price = Bytes.toString(row.getValue("info".getBytes, "product_price".getBytes))
val average_cost = Bytes.toString(row.getValue("info".getBytes, "average_cost".getBytes))
val weight = Bytes.toString(row.getValue("info".getBytes, "weight".getBytes))
val fee_money = Bytes.toString(row.getValue("info".getBytes, "fee_money".getBytes))
val w_id = Bytes.toString(row.getValue("info".getBytes, "w_id".getBytes))
val create_time = Bytes.toString(row.getValue("info".getBytes, "create_time".getBytes))
val modified_time = Bytes.toString(row.getValue("info".getBytes, "modified_time".getBytes))
val dwd_insert_user = Bytes.toString(row.getValue("info".getBytes, "dwd_insert_user".getBytes))
val dwd_insert_time = Bytes.toString(row.getValue("info".getBytes, "dwd_insert_time".getBytes))
val dwd_modify_user = Bytes.toString(row.getValue("info".getBytes, "dwd_modify_user".getBytes))
val dwd_modify_time = Bytes.toString(row.getValue("info".getBytes, "dwd_modify_time".getBytes))
val etl_date = Bytes.toString(row.getValue("info".getBytes, "etl_date".getBytes))
OrderDetail(order_detail_id: String, order_sn: String, product_id: String, product_name: String, product_cnt: String, product_price: String, average_cost: String, weight: String, fee_money: String, w_id: String, create_time: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String, etl_date: String)

}
// 转成dataframe
.toDF
val result = spark.table("ods.order_detail")

​ 题目要求HBase只取20221001的数据,然后要按照rowkey进行取值,rowkey的格式是随 机 数 ( 0-9 )+yyyyMMddHHmmssSSS(date 的格式)。问题最大的是之前从来没取过rowkey,冷静下来以后,分析的一下rdd中Map的使用,我设置了两个参数一个row,一个下划线,其实row就相当于是hbase的一行,scan查看limit=1的时候会显示的很多行其实都是列簇的不同字段,在hbase中就是一行,那么我感觉row.方法,肯定有一个能取到rowkey,我就在.里面找方法,找到一个getRow的方法,而且里面是不需要设置参数的,我感觉肯定是这个我就创建类的时候多创建一个rowkey:String,值就设置成这个getRow,转换成toDF的时候,展示发现果然是rowKey,然后就好解决了。在创建类的时候转换成df,用where(substring(rowkey,2,8)=20221001),取出中间的年月日,where成10月1号的就行了,再用drop把rowkey删除。到这里基本问题就解决的差不多了,还有几个要注意的点就是,hbase中时间数据都是yyyyMMddHHmmsss的,一定转换成yyyy-MM-dd HH:mm:ss方法也很简单在你toDF后面用withColumn的修改,还是用之前提到过的from_unixtime(unix_timestamp($“create_time”, “yyyyMMddHHmmss”), “yyyy-MM-dd HH:mm:ss”).cast(“timestamp”),完事以后和ods最新分区合并然后加四个字段设置分区,append导入就行了,导入表的时候一般情况还是会报一次错,就是类型的问题,看报错信息把该改类型的用cast改了就行了。

总结

​ 比赛的时候实力还是正常发挥了的,平时训练没数据,这比赛的时候出现的问题,其实比赛前一天复盘的时候还有预料到一些的,比如那个hbase,只不过因为hbase添加数据自己是不会设置类型的,也就没法实现练习。还好比赛的时候随机应变解决了最难的一个点,感觉这个地方除了主办方,能解决的,负责清洗模块的应该没不会超过3个,除非他们平常训练的时候hbase数据就是类型设置过的,hbase比赛队伍应该都是接触不多的,如果没技术人员给他们添加假数据是有类型的,个人应该也不会专门去弄类型,都会觉得这是无关紧要的问题。这个点解决不掉的话,其实是很要命的,因为两个最重要的order表都是要合并hbase的,而后面的指标计算都是要这两个表,可以说这两个表如果没搞好,指标计算一分都没有,虽然hbase只有4行,但是我感觉对一些结果还是有影响的,有些人是没管hbase了可能,直接ods表append进入dwd了。还有的估计是直接insert into 手动导入了,因为这两个点我都想过要不要投机取巧试一下。感觉第一个方式不安全,先是清洗部分没法截图了,然后就是可能对指标计算有影响的,但是好像指标计算截图都是取前几条,运气好的话可能没影响,自己确实也不敢赌。第二个自己查看的时候都是乱码,insert into的时候自然没法放值了,而且hive要是直接insert 会很卡,之前自己试过,可能还要报错,就果断放弃了。题目自己在最后两三分钟才刚好做完,还好坚持下来了,感觉自己的答案基本都是对的,最多3分的容错。最后结果只有84分,百分之百是不能信服的,除非综合分析和工位15分全扣了。这15确实不可控,扣不扣都是有猫腻的,也没办法。