模版

学习日期:

所学内容概述

BUG

日总结

分栏标签外挂

1
2
3
4
5
6
7
8
{% tabs 分栏%}
<!-- tab test1@fas fa-bomb -->
Any content (support inline tags too).
<!-- endtab -->
<!-- tab test2@fas fa-bomb -->
Any content (support inline tags too).
<!-- endtab -->
{% endtabs %}

四合服务器测试

学习日期: 4.3

所学内容概述

四合服务器测试

使用基本的spark测试是否能连接到服务器, 使用spark提交代码是否能连接到hive以及mysql

BUG

IDEA直接跑出现现在的报错,因为四合服务器似乎有两个ip地址,一个局域一个直连,我看里面他配的hosts映射是局域的,刚开始自己在自己电脑配置的是直连的,报了如下的错误,然后把自己电脑映射全改成服务器局域的,还是报错,打包放到服务器用spark提交,是好的,也不知道什么问题,因为自己ping 局域是ping不同的,未能解决。

2023-04-03 20-47-56屏幕截图

日总结

今天服务器刚到,上午修改路由以及连交换机什么的,下午很晚服务器才能使用,而且没有外网,题也没练成,还出现了bug。这个bug之前遇到过,但是这服务器中集群都是有不一样ip,有直连和局域,所以也不知道映射应该挂哪一个,等明天再解决了。

蓝桥杯备赛算法

学习日期: 4.4

所学内容概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//手搓冒泡排序
public static int[] sortForMyself(int[] nums){
for (int i=0;i<nums.length-1;i++){
for (int j = 0; j < nums.length - 1 - i; j++) {
if (nums[j] > nums[j+1]){
int temp = nums[j];
nums[j] = nums[j+1];
nums[j+1] = temp;
}
}
}
return nums;
}

//蓝桥杯Java2019 C 4
public static void java1904(){
int i = 1;
int count = 0;
while (count!=2023){//第2023个
i++;
if (primeMath(i)){
count++;
}
}
System.out.println(i);

}


//判断质数
public static boolean primeMath(int num){
for (int i=2;i<=num/2;i++){
if (num % i == 0){
return false;
}
}
return true;
}
//蓝桥杯Java2019 C 三
public static void java1903(){
String n = "0100110001010001";
HashSet<String> res = new HashSet<>();
for (int i=0;i<n.length();i++){
for (int j = i+1;j<=n.length();j++){
res.add(n.substring(i,j));
}
}
System.out.println(res.size());

}
//蓝桥杯Java2018 C 二
public static void java1802() {
int a = 0;
while (true){
a++;
int n = a;
if (n % 5 == 1){
n -= 1;
n = n/5*4;
if (n % 5 == 2){
n -= 2;
n = n/5*4;
if (n % 5 == 3){
n-=3;
n = n/5*4;
if (n % 5 == 4){
n-=4;
n = n/5*4;
if (n % 5 == 0 && n>0){
break;
}
}
}
}
}
}
System.out.println(a);
}

//蓝桥杯Java2018 C 一
public static void java1801(){
int n = 108;
int res = 0;
int num=1;
int day=0;
while (res < n){
res+=num;
num+=2;
day++;
}
System.out.println(day);
}

//蓝桥杯Java2019C组第二题
public static void java1902(){
int a = 2019;
int b = 324;
int count = 0;
while (true){
if (a > b){
count++;
a -= b;
}else if(a < b){
count++;
b -= a;
}
else{
count++;
break;
}
}
System.out.println(count);
}
//蓝桥杯Java2015 C 二
public static void java1502(){
int count = 0;
for (int i=1;i<=10000;i++){
long j = (long) i *i*i; //数字的立方
long k = (i+"").length(); //算出该数字的长度 方便取余
if (j%Math.pow(10,k) == i){
count++;
System.out.println(j+"___"+i);
}
}
System.out.println(count);
}
//蓝桥杯Java2019C组第一题
public static void java1901(){
int count = 0;
for(int i=1;i<2020;i++){
String j = i + "";
char[] chars = j.toCharArray();
for (char k : chars){
if (k=='2' || k=='0' || k=='1' || k=='9'){
count += i;
break;
}
}
}
System.out.println(count);
}

BUG

解决昨天bug,今天报错变了Permission denied: user=anonymous, access=EXECUTE, inode=“/tmp“,我把本地映射全部设置成直连,报错信息变了,去启动了hive2,还是报错,看信息好像是tmp的权限,我先把本地的tmp文件chmod 777 没有什么用,然后去服务器把hdfs上面的tmp权限设置777,就正常了。

img

服务器正常配置好了,只用hive的时候发现,每次使用聚合函数hive cli的时候就会卡住,搜csdn发现也没有什么统一的解决办法,最多的是统一集群时间,但是自己不知道怎么查看整个集群的时间,看日志的时候,发现bigdata1主机好像比其他两个机子慢了1秒,在想这样也会有问题吗,如果要统一集群时间的话,要联网下命令,又因为服务器没网,这BUG就无法解决了,只能等四合老师回应了。

image-20230405000456143

日总结

今天上午依旧在测试服务器机子是否可用性,解决了昨天bug,然后又新出来了bug,解决完以后,卡在hive的bug,问题描述在上面也有,上午得知无法解决,下午就去练蓝桥杯的题目了,从基础的算法开始了,做了七八题吧难度都挺简单的,基本都是C组前面的几题还没有上难度。后面花一天练一下贪心算法和二分感觉问题应该就不会很大了。

7月任务一

学习日期: 4.5

所学内容概述

任务一任务完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package gz07.subject1

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

import java.sql.Timestamp

object dataExtract {
def main(args: Array[String]): Unit = {
//TODO 连接Hive
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstict")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
import spark.sql
val date = "20230404"
//TODO 将mysql表抽取并创建临时表为mysql_xxx
mysqlExtract(spark, "user_info")
mysqlExtract(spark, "sku_info")
mysqlExtract(spark, "base_province")
mysqlExtract(spark, "base_region")
mysqlExtract(spark, "order_info")
mysqlExtract(spark, "order_detail")
//TODO 导入hive
hiveExtract1(spark, "user_info")
hiveExtract2(spark, "sku_info")
hiveExtract3(spark, "base_province")
hiveExtract3(spark, "base_region")
hiveExtract1(spark, "order_info")
hiveExtract2(spark, "order_detail")


//TODO 根据题目要求 发现有三套差不多的数据处理方式,封装为方法
def hiveExtract1(sparkSession: SparkSession, hiveName: String): Unit = {
//TODO 两列最大时间
val maxTime: Timestamp = sql(s"select greatest(max(operate_time),max(create_time)) from ods.$hiveName").first().getAs[Timestamp](0)
//TODO 根据题意只要operate_time或者create_time任意一个大于最大时间即可导入
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select * from mysql_$hiveName
|where operate_time > '$maxTime' or create_time > '$maxTime'
|""".stripMargin)
}

def hiveExtract2(sparkSession: SparkSession, hiveName: String): Unit = {
//TODO 和方法一差不多 只需要去create_time
val maxTime: Timestamp = sql(s"select max(create_time) from ods.$hiveName").first().getAs[Timestamp](0)
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select * from mysql_$hiveName
|where create_time > '$maxTime'
|""".stripMargin)
}

def hiveExtract3(sparkSession: SparkSession, hiveName: String): Unit = {
//TODO 取id
val maxId: Long = sql(s"select max(id) from ods.$hiveName").first().getAs[Long](0)
//TODO 添加一列数据并做类型转换Hive3导入需要类型匹配
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select *,cast(date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss') as timestamp)
|from mysql_$hiveName
|where id > '$maxId'
|""".stripMargin)
}

//TODO 将mysql表抽入spark中临时表 临时表名为mysql_xxx
def mysqlExtract(sparkSession: SparkSession, mysqlName: String): Unit = {
val user = "root"
val password = "123456"
val url = "jdbc:mysql://192.168.23.69/shtd_store?useSSL=false"
val driver = "com.mysql.jdbc.Driver"
sparkSession.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createOrReplaceTempView(s"mysql_$mysqlName")
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package gz07.subject1

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object dataClear {
def main(args: Array[String]): Unit = {
//TODO 连接Hive
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t1").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstict")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
//TODO 导入SparkSQL
import spark.sql

//TODO 静态分区日期 yyyyMMdd

val date = "20230404"
odsUserClear(spark,"ods.user_info","dwd.dim_user_info")
//TODO 下面三个用一个方法即可 要求都一样
odsSkuClear(spark,"ods.sku_info","dwd.dim_sku_info")
odsSkuClear(spark,"ods.base_province","dwd.dim_province")
odsSkuClear(spark,"ods.base_region","dwd.dim_region")
sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.order_info
|""".stripMargin)
.withColumn("operate_time",coalesce(col("operate_time"),col("create_time")).cast("timestamp"))
.withColumn("etl_date",date_format(col("create_time"),"yyyyMMdd"))
.write.format("Hive").mode("append").partitionBy("etl_date").saveAsTable("dwd.fact_order_info")

sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.order_detail
|where etl_date=$date
|""".stripMargin)
.withColumn("etl_date", date_format(col("create_time"), "yyyyMMdd"))
.write.format("Hive").mode("append").partitionBy("etl_date").saveAsTable("dwd.fact_order_detail")

def odsUserClear(sparkSession: SparkSession, odsName: String, dwdName: String): Unit = {
//TODO ods取最新分区添加四列并删除etl_date方便后续union
val ods_data: DataFrame = sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from $odsName
|where etl_date = $date
|""".stripMargin)
.drop("etl_date")
//TODO 找到dwd最新分区
val newEtl_Date: String = sparkSession.sql(s"select max(etl_date) from $dwdName").first().getString(0)
//TODO 抽取dwd最新分区并删除etl_date
val dwd_data: DataFrame = sparkSession.sql(
s"""
|select * from $dwdName
|where etl_date = $newEtl_Date
|""".stripMargin)
.drop("etl_date")
//TODO 两表拼接并对operate_time做非null处理 添加etl_date并设置date值
ods_data.union(dwd_data)
.withColumn("operate_time", coalesce(col("operate_time"), col("create_time")))
.withColumn("etl_date", lit(date))
.createOrReplaceTempView("union_data")
//TODO 窗口函数取operate_time最新值设置dwd_modify_time为当前时间并类型转换为timestamp
sparkSession.sql(
"""
|select *,row_number() over(partition by id order by operate_time desc) as time_rank
|from union_data
|""".stripMargin)
.where("time_rank=1")
.drop("time_rank")
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.write.mode("overwrite").partitionBy("etl_date").saveAsTable("temptable")
//TODO hive表不能边读取边写入,需要先放入spark临时表,再导入
sparkSession.table("temptable")
.write.mode("append")
.insertInto(dwdName)
sparkSession.sql("drop table temptable")
}
//TODO 和上面方法一样无非是按照create_time取最新
def odsSkuClear(sparkSession: SparkSession, odsName: String, dwdName: String): Unit = {
val ods_data: DataFrame = sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from $odsName
|where etl_date = $date
|""".stripMargin)
.drop("etl_date")
val newEtl_Date: String = sparkSession.sql(s"select max(etl_date) from $dwdName").first().getString(0)

val dwd_data: DataFrame = sparkSession.sql(
s"""
|select * from $dwdName
|where etl_date = $newEtl_Date
|""".stripMargin)
.drop("etl_date")


ods_data.union(dwd_data)
.withColumn("etl_date", lit(date))
.createOrReplaceTempView("union_data")

sparkSession.sql(
"""
|select *,row_number() over(partition by id order by create_time desc) as time_rank
|from union_data
|""".stripMargin)
.where("time_rank=1")
.drop("time_rank")
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.write.mode("overwrite").partitionBy("etl_date").saveAsTable("temptable")

sparkSession.table("temptable")
.write.mode("append")
.insertInto(dwdName)
sparkSession.sql("drop table temptable")
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package gz07.subject1

import org.apache.spark.sql.{DataFrame, SparkSession}

object dataMath {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t3").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstict")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
import spark.sql
//TODO t1
val t1: DataFrame = sql(
"""
|with t1 as (
|select
|province_id,
|final_total_amount,
|substring(etl_date,1,4) as year,
|substring(etl_date,5,2) as month
|from dwd.fact_order_info foi
|where etl_date = '20200426'), t2 as (
|select
|province_id,
|sum(final_total_amount) as total_amount,
|count(1) as total_count,
|year,
|month
|from t1
|group by province_id,year,month)
|select
|province_id,
|dp.name as province_name,
|dr.id as region_id,
|dr.region_name as region_name,
|total_amount,
|total_count,
|row_number() over(partition by year,month,dr.id order by total_amount desc) as sequence,
|year,month
|from t2
|left join dwd.dim_province dp on dp.id = t2.province_id
|left join dwd.dim_region dr on dr.id = dp.region_id
|""".stripMargin)
t1.show()
t1.write.format("Hive").mode("overwrite")
.saveAsTable("dws.province_consumption_day_aggr")

//TODO t2
sql(
"""
|with t1 as (
|select
|region_id,
|sum(total_amount)/sum(total_count) as regionavgconsumption
|from dws.province_consumption_day_aggr
|group by region_id), t2 as (
|select
|province_id as provinceid,
|province_name as provincename,
|total_amount/total_count as provinceavgconsumption,
|t1.region_id as regionid,
|region_name as regionname,
|t1.regionavgconsumption as regionavgconsumption
|from dws.province_consumption_day_aggr pcda
|left join t1 on pcda.region_id = t1.region_id)
|select t2.*,
|case
| when provinceavgconsumption>regionavgconsumption then '高'
| when provinceavgconsumption=regionavgconsumption then '相同'
| when provinceavgconsumption<regionavgconsumption then '低' end
|from t2
|""".stripMargin).createOrReplaceTempView("t2TableTemp")
mysqlTempTable(spark,"provinceavgcmpregion")
sql(
"""
|insert overwrite table mysql_provinceavgcmpregion
|select * from t2TableTemp
|""".stripMargin)

//TODO t3
sql(
"""
|select
|region_id,
|region_name,
|province_id,
|province_name,
|total_amount,
|row_number() over(partition by region_id order by total_amount desc) as threeprovince
|from dws.province_consumption_day_aggr
|""".stripMargin).where("threeprovince<=3").drop("threeprovince").createOrReplaceTempView("t3Temp")
sql(
"""
|select
|region_id,
|region_name,
|concat_ws(',',collect_set(cast(province_id as string))) as provinceids,
|concat_ws(',',collect_set(province_name)) as provincenames,
|concat_ws(',',collect_set(cast(total_amount as string))) as provinceamount
|from
|t3Temp
|group by region_id,region_name
|""".stripMargin).createOrReplaceTempView("t333")
mysqlTempTable(spark,"regiontopthree")
sql(
"""
|insert overwrite table mysql_regiontopthree
|select * from t333
|""".stripMargin)
spark.stop()

def mysqlTempTable(sparkSession: SparkSession, mysqlName: String): Unit = {
val url = "jdbc:mysql://192.168.23.69/shtd_result?useSSL = false"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
sparkSession.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createTempView(s"mysql_$mysqlName")
}
}
}

BUG

img

如上报错,看信息是HiveFileFormat,想起来之前自己有个format(“Hive”)这个在saveAsTable的时候一直不知道加了有什么用,把这个加以后就没这个报错信息了。

img

如上报错信息,在存表的时候,看报错信息似乎是cast是类型转换,后面翻译是string到timestamp,感觉应该是说String类型的数据不能放在列为timestamp中意思,代码修改:如果是sql加的数据,类型使用cast( xx as timestamp )是比较常规的,如果是用withColumn添加的数据需要在第二个参数后面加.cast(“timestamp”)

日总结

今天再做7月任务书使用服务器做的,虽然和小白一样都是新版本,但是还是遇到不少BUG的,应该是在服务器中用了三台机子的原因,出现集群的BUG,好在看报错日志顺利解决了BUG,hive cli的问题还是没解决,不行的话只能用spark跑hive cli,然后用vim伪造截图了。

处理hive cli的BUG

学习日期: 4.6

所学内容概述

今天备战蓝桥杯的主要内容是递归,难点在回溯算法的使用,还不是很能弄懂,虽然有模版套,但是因为采用的是递归算法所以时间损耗很大,能解决前5题的问题。

算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public static void zuhe4(int step) {
if (step==4){
for (int re : res) {
System.out.print(re);
}
System.out.println();
return;
}
for (int i=1;i<5;i++){
count++;
if (!bool[i-1]){
res[step] = i;
bool[i-1] = true;
zuhe4(step+1);
bool[i-1]=false;
}
}
}
//输入两个数字求最大公约数
public static void maxTogetherNumber(){
Scanner scanner = new Scanner(System.in);
int a = scanner.nextInt();
int b = scanner.nextInt();
while (b!=0){
int c=a%b;
a=b;
b=c;
}
System.out.println(a);
}
//Java 2020 C 3递归算法 1 5 13 25 41
public static int se(int num){
if (num == 1){
return 1;
}
return se(num-1) + 4*(num-1);
}
//Java 2020 C 3递归算法 1 5 13 25 41 循环版
public static void seFor(int num){
int res = 0;
for (int i=num;i>0;i--){
if (i == 1){
res +=1;
break;
}else {
res += (i-1)*4;
}
}
System.out.println(res);
}
//输入数字判断是第几个斐波那契数字是什么
public static int Fn(int num){
if (num==1 || num==2){
return 1;
}
return Fn(num-1)+Fn(num-2);
}

等边三角形题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int num [] = new int[10];
static int count = 0;
static boolean bool[] = new boolean[10];
public static void main(String[] args) {
//在此输入您的代码...
dfs(0);
System.out.print(count/6);
}
private static void dfs(int step) {
// TODO Auto-generated method stub
if (step == 9) {
if (num[0]+num[1]+num[2]+num[3] == num[4]+num[5]+num[6]+num[3] &&
num[0]+num[1]+num[2]+num[3]==num[6]+num[7]+num[8]+num[9]) {
count++;
}
return;
}
for(int i=0;i<9;i++) {
if (!bool[i]) {
bool[i] = true;
num[step] = i;
dfs(step+1);
bool[i] = false;
}
}
}

回溯算法

回溯算法是一种深度优先搜索算法,所以深搜的特点回溯算法都有。一、它是一种递归算法。二、它是一种暴力算法。三、本质是穷举,穷举所有可能,然后找出我们想要的答案

1
2
3
4
5
6
7
8
9
10
11
12
void backtracking(路径,选择列表,结果集...) {
if (终止条件) {
存放结果操作;
return;
}

for (i = start; i <= n && (剪枝操作); i++) { // 剪枝操作不强制要求有
处理当前节点;
backtracking(路径,选择列表,结果集...); // 递归
状态重置,撤销处理结果
}
}

BUG

解决一旦使用hive cli就会报错的BUG,经过长达一天半的研究,只需要在yarn-site.xml中加入如下的配置即可,如果比赛环境也是如此的话,那真的太难为选手了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>yarn.application.classpath</name>
<value>
/opt/module/hadoop-3.1.3/etc/hadoop,
/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*,
/opt/module/hadoop-3.1.3/share/hadoop/common/*,
/opt/module/hadoop-3.1.3/share/hadoop/hdfs,
/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*,
/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*,
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*,
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*,
/opt/module/hadoop-3.1.3/share/hadoop/yarn,
/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*,
/opt/module/hadoop-3.1.3/share/hadoop/yarn/*
</value>
</property>

日总结

今天上午的时间把hive cli的bug解决了,就是不知道比赛的时候该怎么办,这个配置文件只能先背下来了。如果比赛要求禁止修改环境的话,就把xml文件修改完,再弄回去好了。然后今天蓝桥杯备赛内容学了不擅长的递归,用到递归的经典算法回溯,也算是暴力的一种吧,其实用for循环也能做出来。还是后面的贪心和二分比较重要。

7月任务书四(改良版)

学习日期: 4.7 /4.10

所学内容概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package Julysubject.subject4

import org.apache.spark.sql.SparkSession

import java.sql.Timestamp

object dataExtract {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
import spark.sql
val date = "20230408"
mysqlTempTable(spark,"user_info")
mysqlTempTable(spark,"sku_info")
mysqlTempTable(spark,"order_info")
mysqlTempTable(spark,"order_detail")
mysqlTempTable(spark,"base_province")
mysqlTempTable(spark,"base_region")
hiveExtract1(spark,"user_info")
hiveExtract2(spark,"sku_info")
hiveExtract3(spark,"base_province")
hiveExtract3(spark,"base_region")
hiveExtract1(spark,"order_info")
hiveExtract2(spark,"order_detail")
def hiveExtract1(sparkSession: SparkSession,hiveName:String): Unit = {
val timestamp: Timestamp = sparkSession.sql(s"select greatest(max(create_time),max(operate_time)) from ods.$hiveName")
.first().getTimestamp(0)
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select * from mysql_$hiveName
|where create_time > '$timestamp' or operate_time > '$timestamp'
|""".stripMargin)
}
def hiveExtract2(sparkSession: SparkSession,hiveName:String): Unit = {
val timestamp: Timestamp = sparkSession.sql(s"select max(create_time) from ods.$hiveName")
.first().getTimestamp(0)
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select * from mysql_$hiveName
|where create_time > '$timestamp'
|""".stripMargin)
}

def hiveExtract3(sparkSession: SparkSession, hiveName: String): Unit = {
val value: Long = sparkSession.sql(s"select max(id) from ods.$hiveName")
.first().getLong(0)
sparkSession.sql(
s"""
|insert into table ods.$hiveName partition (etl_date=$date)
|select *,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as create_time from mysql_$hiveName
|where id > '$value'
|""".stripMargin)
}
def mysqlTempTable(sparkSession: SparkSession,mysqlName:String): Unit = {
sparkSession.read.format("jdbc")
.option("url","jdbc:mysql://192.168.23.69/shtd_store?useSSL=false")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable",mysqlName).load().createOrReplaceTempView(s"mysql_$mysqlName")
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package Julysubject.subject4

import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object dataClear {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
val date = "20230408"

dwdClear_1(spark,"user_info","dim_user_info")
dwdClear_2(spark,"sku_info","dim_sku_info")
dwdClear_2(spark,"base_province","dim_province")
dwdClear_2(spark,"base_region","dim_region")
dwdClear_3(spark,"order_info","fact_order_info")
dwdClear_4(spark,"order_detail","fact_order_detail")


def dwdClear_1(sparkSession: SparkSession,odsHiveName:String,dwdHiveName:String): Unit = {
val odaData: DataFrame = sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.$odsHiveName
|where etl_date = $date
|""".stripMargin).drop("etl_date")
val maxEtlDate: Any = sparkSession.sql(s"select max(etl_date) from dwd.$dwdHiveName").first().get(0)
val dwdData: DataFrame = sparkSession.sql(
s"""
|select * from dwd.$dwdHiveName
|where etl_date=$maxEtlDate
|""".stripMargin).drop("etl_date")
//窗口函数 insert和modify按题目要求取值
val w1: WindowSpec = Window.partitionBy("id")
val w2: WindowSpec = Window.partitionBy("id").orderBy(col("operate_time").desc)
dwdData.union(odaData)
.withColumn("rw",row_number().over(w2))
.withColumn("dwd_insert_time",min(col("dwd_insert_time")).over(w1))
.withColumn("dwd_modify_time",max(col("dwd_modify_time")).over(w1))
.where("rw = 1").drop("rw")
.withColumn("etl_date",lit(date))
.write.format("hive").insertInto(s"dwd.$dwdHiveName")
}
def dwdClear_2(sparkSession: SparkSession,odsHiveName:String,dwdHiveName:String): Unit = {
val odsData: DataFrame = sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.$odsHiveName
|where etl_date = $date
|""".stripMargin).drop("etl_date")
val NewDate: String = sparkSession.sql(s"select max(etl_date) from dwd.$dwdHiveName").first().getString(0)
val dwdData: DataFrame = sparkSession.sql(
s"""
|select * from dwd.$dwdHiveName
|where etl_date=$NewDate
|""".stripMargin).drop("etl_date")

val w1: WindowSpec = Window.partitionBy("id")
val w2: WindowSpec = Window.partitionBy("id").orderBy(col("create_time").desc)
odsData.union(dwdData)
.withColumn("rw",row_number().over(w2))
.withColumn("dwd_insert_time",min(col("dwd_insert_time")).over(w1))
.withColumn("dwd_modify_time",max(col("dwd_modify_time")).over(w1))
.where("rw=1").drop("rw")
.withColumn("etl_date",lit(date))
.write.format("hive").insertInto(s"dwd.$dwdHiveName")
}
def dwdClear_3(sparkSession: SparkSession,odsHiveName:String,dwdHiveName:String): Unit = {
sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.$odsHiveName
|where etl_date=$date
|""".stripMargin).drop("eil_date")
.withColumn("etl_date",date_format(col("create_time"),"yyyyMMdd"))
.withColumn("operate_time",coalesce(col("operate_time"),col("create_time")))
.write.format("hive").mode("append").partitionBy("etl_date").saveAsTable(s"dwd.$dwdHiveName")
}

def dwdClear_4(sparkSession: SparkSession, odsHiveName: String, dwdHiveName: String): Unit = {
sparkSession.sql(
s"""
|select *,
|'user1' as dwd_insert_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_insert_time,
|'user1' as dwd_modify_user,cast(date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as timestamp) as dwd_modify_time
|from ods.$odsHiveName
|where etl_date=$date
|""".stripMargin).drop("etl_date")
.withColumn("etl_date", date_format(col("create_time"), "yyyyMMdd"))
.write.format("hive").mode("append").partitionBy("etl_date").saveAsTable(s"dwd.$dwdHiveName")
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package Julysubject.subject4

import org.apache.spark.sql.{DataFrame, SparkSession}

object dataMath {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t3").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
import spark.sql
val date = "20230408"
val t1: DataFrame = sql(
s"""
|WITH t1 AS(
|SELECT
|dp.id AS province_id,
|dp.name AS province_name,
|dr.id AS region_id,
|dr.region_name AS region_name,
|foi.final_total_amount,
|cast(substring(foi.etl_date,1,4) as int) AS year,
|cast(substring(foi.etl_date,5,2) as int) AS month
|FROM
|dwd.fact_order_info foi
|JOIN dwd.dim_province dp
|ON dp.id = foi.province_id and dp.etl_date=20230408
|JOIN dwd.dim_region dr
|ON dr.id = cast(dp.region_id as bigint) and dr.etl_date=20230408)
|SELECT
|province_id,
|province_name,
|region_id,
|region_name,
|SUM(final_total_amount) as total_amount,
|COUNT(final_total_amount) as total_count,
|row_number() over(PARTITION BY year,month,region_id ORDER BY SUM(final_total_amount) DESC) AS sequence,
|year,
|month
|FROM t1
|GROUP BY
|year,month,province_id,province_name,region_id,region_name
|""".stripMargin)
t1.write.mode("overwrite").saveAsTable("dws.province_consumption_day_aggr")
sql(
s"""
|WITH t1 AS(
|SELECT
|fod.sku_id AS topquantityid,
|fod.sku_name AS topquantityname,
|SUM(fod.sku_num) AS topquantity
|FROM
|dwd.fact_order_detail fod
|JOIN dwd.fact_order_info foi
|ON foi.id = fod.order_id
|GROUP BY
|sku_id,sku_name)
|select *,
|row_number() over(order by topquantity desc) AS sequence
|from t1
|limit 10;
|""".stripMargin).createOrReplaceTempView("pq")

sql(
s"""
|WITH t1 AS(
|SELECT
|fod.sku_id AS toppriceid,
|fod.sku_name AS toppricename,
|SUM(final_total_amount) AS topprice
|FROM
|dwd.fact_order_detail fod
|JOIN dwd.fact_order_info foi
|ON foi.id = fod.order_id
|GROUP BY
|sku_id,sku_name)
|select *,
|row_number() over(order by topprice desc) AS sequence
|from t1
|limit 10;
|""".stripMargin).createOrReplaceTempView("pp")

sql(
"""
|SELECT
|pq.topquantityid,
|pq.topquantityname,
|pq.topquantity,
|pp.*
|FROM
|pq
|JOIN pp
|ON pp.sequence = pq.sequence
|""".stripMargin).createOrReplaceTempView("t2")
mysqlTempTable(spark,"topten")
sql(
"""
|INSERT OVERWRITE TABLE mysql_topten
|SELECT * FROM t2
|""".stripMargin)
sql(
s"""
|WITH t1 AS(
|SELECT
|dp.id AS provinceid,
|dp.name AS provincename,
|dr.id AS regionid,
|dr.region_name AS regionname,
|percentile(cast(final_total_amount as bigint),0.5) AS provincemedian
|FROM
|dwd.fact_order_info foi
|JOIN dwd.dim_province dp
|ON dp.id = foi.province_id and dp.etl_date=20230408
|JOIN dwd.dim_region dr
|ON dr.id = cast(dp.region_id as bigint) and dr.etl_date=20230408
|GROUP BY
|dp.id,dp.name,dr.id,dr.region_name),
|t2 AS(
|SELECT
|dr.id AS regionid,
|dr.region_name AS regionname,
|percentile(cast(final_total_amount as bigint),0.5) AS regionmedian
|FROM
|dwd.fact_order_info foi
|JOIN dwd.dim_province dp
|ON dp.id = foi.province_id and dp.etl_date=20230408
|JOIN dwd.dim_region dr
|ON dr.id = cast(dp.region_id as bigint) and dr.etl_date=20230408
|GROUP BY
|dr.id,dr.region_name)
|SELECT t1.*,t2.regionmedian
|FROM
|t1
|JOIN t2 ON t2.regionid = t1.regionid
|""".stripMargin).createOrReplaceTempView("t3")
mysqlTempTable(spark,"nationmedian")
sql(
"""
|insert overwrite table mysql_nationmedian
|select * from t3
|""".stripMargin)

def mysqlTempTable(sparkSession: SparkSession, mysqlName: String): Unit = {
sparkSession.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.23.69/shtd_result?useSSL=false&characterEncoding=utf8&useUnicode=true")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", mysqlName).load().createOrReplaceTempView(s"mysql_$mysqlName")
}
}
}

BUG

又报出类型不匹配的问题,我就很奇怪,我在sql中使用cast已经进行类型转换了,而且同样的sql语句在其他表能使用但是在这里就说没转换,加入表的语句也是一样的。后面把insertInto换成了saveAsTable没报错了,但是表内没有数据,然后又想起来是动态分区,是不是动态分区的原因,把partitionBy加到mode后面,数据也加入了报错解决。动态分区需要用saveAsTable,为什么insertinto不行,那是底层源码的问题了。

image-20230410195414461

日总结

这两天做了7月任务4,周六做完以后,周一的时候跟其他学校的交流了一下,主要改了是清洗部分的代码,对于合并以后insert和modify的取值,进行了讨论以及调整,遇到了一个奇怪的BUG,解决了但是不知道为什么insert不行,就不纠结了。

修理clickHouse的BUG

学习日期: 4.11

所学内容概述

修BUG的一天

BUG

这是今天修改任务书四的指标计算部分的时候,连接表的时候的警告,对结果是没什么影响,然后因为警告看着很麻烦。找原因的时候发现,region和province中两个表联立的公共字段是region_id,但是region表中该字段是bigint,province中是string,又是一个坑,就把on后面 也使用cast转换类型就行了。

image-20230411091845904

自己服务器的机子clickhouse都起不来,换了启动命令还是不行,把机子重置了几次,就好了,clickhouse-client -h bigdata1 --port 9001 --password 123456 没找到原因

image-20230411194348070

日总结

今天修了一天的bug,上午把之前7月任务书四改良了一下,后面做任务书的时候就可以按照现在这样来写,这几天把7月的五个任务书都摸透了,指标计算的15题,下午想用clickhouse,结果一下午都没弄好,最后搞好了也没找到原因。

spark集成clickHouse

学习日期: 4.12

所学内容概述

准备ClickHouse测试数据

创建一个名为test的数据库,并在该数据库中创建一个名为visit的表,用于跟踪网站访问时长。

1)先运行以下命令,启动一个客户端会话:

目前服务器的clickhouse只有这条命令启动成功过。

1
$ clickhouse-client -h bigdata1 --multiline --port 9001 --password 123456

2)通过执行以下命令创建test数据库:

1
bigdata1 :) CREATE DATABASE test;

3)确认要使用的数据库test:

1
bigdata1 :) USE test;

4)运行下面这个命令创建visits表:

1
2
3
4
5
6
7
8
bigdata1 :) CREATE TABLE visits (
id UInt64,
duration Float64,
url String,
created DateTime
) ENGINE = MergeTree()
PRIMARY KEY id
ORDER BY id;

5)通过运行以下语句将几行示例网站访问数据插入到刚创建的visits表中:

1
2
3
4
bigdata1 :) INSERT INTO visits VALUES (1, 10.5, 'http://example.com', '2019-01-01 00:01:01');
bigdata1 :) INSERT INTO visits VALUES (2, 40.2, 'http://example1.com', '2019-01-03 10:01:01');
bigdata1 :) INSERT INTO visits VALUES (3, 13, 'http://example2.com', '2019-01-03 12:01:01');
bigdata1 :) INSERT INTO visits VALUES (4, 2, 'http://example3.com', '2019-01-04 02:01:01');

6)查询数据:

1
bigdata1 :) SELECT * FROM visits;

spark使用clickHouse的jdbc驱动

有三个版本的jdbc

  • 0.3.1及之前的版本:驱动程序为ru.yandex.clickhouse.ClickHouseDriver。
  • 0.3.2版本:驱动程序同时支持ru.yandex.clickhouse.ClickHouseDriver和com.clickhouse.jdbc.ClickHouseDriver两种。
  • 0.4.x:驱动程序为com.clickhouse.jdbc.ClickHouseDriver。

因为自己的Maven只能装0.32的所以自己就只使用了0.32的

pom.xml导入配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--clickhouse-->
<!-- 连接ClickHouse需要驱动包-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.32</version>
<!-- 去除与Spark 冲突的包 -->
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>

在Spark中执行简单查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
mport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ClickhouseSparkTest {

def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk1(spark) // 官方jdbc(0.3.2)
}

/** 官方连接方式:JDBC Driver */
// 使用官方的jdbc(0.3.2版本)
def testCk1(spark:SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://192.168.23.49:8123" // 数据库连接url
val ckUser = "default"
val ckPassword = ""

val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}

插入数据(重点)

这里直接拿user_info导入clickhouse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.sql.SparkSession

object clickhouseTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
// 创建SparkSession实例
testCk1(spark)
spark.sql("select * from ods.user_info").show()
//clickhouse需要有该表存在,亲测使用saveAsTable不行。会提示表不存在
spark.sql(
"""
|select * from ods.user_info
|""".stripMargin).write.format("hive").mode("append").insertInto("test1")
spark.sql("select * from test1")

/** 官方连接方式:JDBC Driver */
// 使用官方的jdbc(0.3.2版本)
def testCk1(spark: SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://192.168.23.49:8123/ods" // 数据库连接url
val ckUser = "default"
val ckPassword = "123456"
//建立clickhouse临时表方便加入
spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "user_info")
.option("createTableOptions","engine=log()")
.load().createOrReplaceTempView("test1")
}
}
}

BUG

这条报错也不知道啥意思,看到个ENGINE,发现别人集成的时候有一条配置.option(“createTableOptions”,“engine=log()”)加上以后报错就没了。

image-20230412200456401

这个报错验证了很多次发现,只要导入clickhouse的时候,mode选择overwrite都会报错,不管是用saveAsTable还是Insertinto,赛规都是导入数据,使用append追加也是比较符合要求的,就没去修这个了。

image-20230412201141174

日总结

今天上午的时候在修c昨天启动的bug, 后面重置了几次,发现没问题了,也不知道原因,下午试着去spark集成clickhouse除了Maven导入依赖的时候浪费了很久,其他还是挺顺利的,clickhouse和mysql不一样, 用saveAsTable是不会自动创建表的,所以如果要导入的话,必须要先在clickhouse建表。

操作hbase

学习日期: 4.13

所学内容概述

指标计算导入clickhouse

  • Clickhouse建表语句

  • 和hive以及mysql不一样,类型不同,然后建表的时候必须要有ENGINE = MergeTree(),然后还要设置主键或者排序其他的任意函数,不然会提示报错。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    create table dws.province_consumption_day_aggr2 (
    province_id UInt64,
    province_name String,
    region_id UInt64,
    region_name String,
    total_amount Float64,
    total_count UInt64,
    sequence UInt64,
    year String,
    month String
    ) ENGINE = MergeTree() --必须加
    PRIMARY KEY province_id;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package JulySubjectTestNew.subject5

import org.apache.spark.sql.{DataFrame, SparkSession}

object dataMath {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t3")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true").getOrCreate()
import spark.sql
val t1: DataFrame = sql(
s"""
|WITH t1 AS(
|SELECT
|dp.id AS province_id,
|dp.name AS province_name,
|dr.id AS region_id,
|dr.region_name AS region_name,
|foi.final_total_amount,
|cast(substring(foi.etl_date,1,4) as int) AS year,
|cast(substring(foi.etl_date,5,2) as int) AS month
|FROM
|dwd.fact_order_info foi
|JOIN dwd.dim_province dp
|ON dp.id = foi.province_id and dp.etl_date=20230412
|JOIN dwd.dim_region dr
|ON dr.id = cast(dp.region_id as bigint) and dr.etl_date=20230412)
|SELECT
|province_id,
|province_name,
|region_id,
|region_name,
|SUM(final_total_amount) as total_amount,
|COUNT(final_total_amount) as total_count,
|row_number() over(PARTITION BY year,month,region_id ORDER BY SUM(final_total_amount) DESC) AS sequence,
|year,
|month
|FROM t1
|GROUP BY
|year,month,province_id,province_name,region_id,region_name
|""".stripMargin)
t1.show()
testCk1(spark)
t1.write.format("hive").mode("append").insertInto("test1")


def testCk1(spark: SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://192.168.23.49:8123" // 数据库连接url
val ckUser = "default"
val ckPassword = "123456"

spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "dws.province_consumption_day_aggr2")
.option("createTableOptions", "engine=log()")
.load().createOrReplaceTempView("test1")
}
}
}

写入Hbase

需要建立hbase表

建表语句理解:第一个字段称为表名,我的理解更像关系型数据库的库名,第二个字段叫做列族,我觉得就像普通数据库的表名了,添加的时候key相当于列名,value相当于值

1
create 'panniuspark_user' ,'cf'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.hainiu.sparkhbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkHbaseTablePuts {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkHbaseTablePuts")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(20 until 30, 2)
// 一个分区创建一个hbase连接,批量写入,效率高
rdd.foreachPartition(it =>{
// 把每个Int 转成 Put对象
val puts: Iterator[Put] = it.map(f => {
// 创建Put对象
val put: Put = new Put(Bytes.toBytes(s"spark_puts_${f}"))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(s"${f}"))
put
})
val hbaseConf: Configuration = HBaseConfiguration.create()
var conn: Connection = null
var table: HTable = null
try{
// 创建hbase连接
conn = ConnectionFactory.createConnection(hbaseConf)
// 创建表操作对象
table = conn.getTable(TableName.valueOf("panniu:spark_user")).asInstanceOf[HTable]
// 通过隐式转换,将scala的List转成javaList
import scala.collection.convert.wrapAsJava.seqAsJavaList
// 一个分区的数据批量写入
table.put(puts.toList)
}catch {
case e:Exception => e.printStackTrace()
}finally {
table.close()
conn.close()
}
})
}
}

BUG

spark写入数据到hbase的报错,没找到原因。看样子似乎是依赖 版本的问题,也不知道是新了还是老了,索性就换其他方法写入了。

image-20230415112613172

日总结

今天主要是想办法操作hbase,一直在搜spark操作hbase,因为转换成df的话,用rdd算子做起来方便很多,就不需要重复的建表语句,但是网上的方法实在是太多了,而且又难以理解,只是简单的插入编造的数据,这样显然是不行的,所以在网上找了一天也没有找到比较好的,改天再试下别的使用方法。

hive外部表关联hbase以及指标计算

学习日期: 4.15

所学内容概述

任务书5和任务书3的指标计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package JulySubjectTestNew.subject3

import org.apache.spark.sql.{DataFrame, SparkSession}

object dataMath {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t3").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
t1(spark)
t2(spark)
t3(spark)
spark.stop()
def t3(sparkSession: SparkSession): Unit = {
sparkSession.sql(
"""
|select
|region_id,
|region_name,
|province_id,
|province_name,
|total_amount,
|row_number() over(partition by region_id order by total_amount desc) as rk
|from
|dws.province_consumption_day_aggr dd1
|""".stripMargin).where("rk<=3").drop("rk").createOrReplaceTempView("test3")
val t3Data: DataFrame = sparkSession.sql(
"""
|select
|region_id as regionid,
|region_name as regionname,
|concat_ws(',',collect_set(cast(province_id as string))) as provinceids,
|concat_ws(',',collect_set(cast(province_name as string))) as provincenames,
|concat_ws(',',collect_set(cast(total_amount as string))) as provinceamount
|from
|test3
|group by region_id,region_name
|""".stripMargin)
t3Data.show()
t3Data.write.saveAsTable("dws.regiontopthree")
}
def t2(sparkSession: SparkSession): Unit = {
val t2Data: DataFrame = sparkSession.sql(
"""
|with t1 as (
|select
|province_id,
|province_name,
|round((total_amount/total_count),2) as provinceavgconsumption,
|region_id,
|region_name
|from
|dws.province_consumption_day_aggr dd1),
|t2 as (
|select region_id,
|round(sum(total_amount)/sum(total_count),2) as regionavgconsumption
|from
|dws.province_consumption_day_aggr dd2
|group by region_id)
|select t1.*,t2.regionavgconsumption,
|(case
|when t1.provinceavgconsumption>t2.regionavgconsumption then '高'
|when t1.provinceavgconsumption<t2.regionavgconsumption then '低'
|else '相同' end
|) as comparison
|from
|t1
|join t2 on t2.region_id = t1.region_id
|""".stripMargin)
t2Data.show()
t2Data.write.mode("overwrite").saveAsTable("dws.provinceavgcmpregion")
}
def t1(sparkSession: SparkSession): Unit = {
val t1: DataFrame = sparkSession.sql(
"""
|with t1 as (
|select
|dp.id as province_id,
|dp.name as province_name,
|dr.id as region_id,
|dr.region_name as region_name,
|final_total_amount,
|cast(substring(foi.etl_date,1,4) as int) as year,
|cast(substring(foi.etl_date,5,2) as int)as month
|from dwd.fact_order_info foi
|join dwd.dim_province dp on dp.id = foi.province_id
|join dwd.dim_region dr on dr.id = dp.region_id)
|select
|province_id,
|province_name,
|region_id,
|region_name,
|sum(t1.final_total_amount) as total_amount,
|count(*) as total_count,
|row_number() over(partition by year,month,region_id order by sum(t1.final_total_amount) desc) as sequence,
|year,month
|from
|t1
|group by year,month,province_id,province_name,region_id,region_name
|""".stripMargin)
t1.show()
t1.write.mode("overwrite").saveAsTable("dws.province_consumption_day_aggr")
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package JulySubjectTestNew.subject5

import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object dataMath {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t3")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true").getOrCreate()
import spark.sql
// val t1: DataFrame = sql(
// s"""
// |WITH t1 AS(
// |SELECT
// |dp.id AS province_id,
// |dp.name AS province_name,
// |dr.id AS region_id,
// |dr.region_name AS region_name,
// |foi.final_total_amount,
// |cast(substring(foi.etl_date,1,4) as int) AS year,
// |cast(substring(foi.etl_date,5,2) as int) AS month
// |FROM
// |dwd.fact_order_info foi
// |JOIN dwd.dim_province dp
// |ON dp.id = foi.province_id and dp.etl_date=20230412
// |JOIN dwd.dim_region dr
// |ON dr.id = cast(dp.region_id as bigint) and dr.etl_date=20230412)
// |SELECT
// |province_id,
// |province_name,
// |region_id,
// |region_name,
// |SUM(final_total_amount) as total_amount,
// |COUNT(final_total_amount) as total_count,
// |row_number() over(PARTITION BY year,month,region_id ORDER BY SUM(final_total_amount) DESC) AS sequence,
// |year,
// |month
// |FROM t1
// |GROUP BY
// |year,month,province_id,province_name,region_id,region_name
// |""".stripMargin)
// t1.show()
// testCk1(spark)
// t1.write.format("hive").mode("append").insertInto("test1")
// val purchaseduser: Long = sql(
// """
// |select count(distinct(user_id)) from dwd.fact_order_info
// |""".stripMargin).first().getLong(0)
// val repurchaseduser: Long = sql(
// """
// |with t1 as (
// |select user_id,cast (date_format(create_time,"dd") as int) as day from dwd.fact_order_info)
// |select count(a.user_id)
// |from t1 a
// |join t1 b on a.user_id = b.user_id and a.day + 1 = b.day
// |""".stripMargin).first().getLong(0)
// //concat(round($l2/$l1,3)*100,'%') as repurchaserate
// val t2Data: DataFrame = sql(
// s"""
// |select
// |$purchaseduser as purchaseduser,
// |$repurchaseduser as repurchaseduser,
// |concat(round($repurchaseduser/$purchaseduser,3)*100,'%') as repurchaserate
// |""".stripMargin)
// mysqlExtract(spark,"userrepurchasedrate")
// t2Data.show()
// t2Data.write.format("hive").insertInto("mysql_userrepurchasedrate")
val dataTemp: DataFrame = sql(
"""
|select
|dp.name as province_name,
|count(*) as Amount
|from
|dwd.fact_order_info foi
|join dwd.dim_province dp on dp.id = foi.province_id
|group by dp.name
|order by Amount desc
|""".stripMargin)
dataTemp.show(false)
val data: Seq[(String, Long)] = dataTemp.rdd.map((x: Row) => (x.getString(0), x.getLong(1))).collect().toList
print(data)
import spark.implicits._
val dum: DataFrame = Seq("1").toDF("dum")
data.foldLeft(dum)((dum: DataFrame, d: (String, Long)) => dum.withColumn(d._1,lit(d._2)))
.drop("dum").show(false)

def mysqlExtract(sparkSession: SparkSession, mysqlName: String): Unit = {
val url = "jdbc:mysql://192.168.23.49/shtd_result"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
sparkSession.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createOrReplaceTempView(s"mysql_$mysqlName")
}
def testCk1(spark: SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://192.168.23.49:8123" // 数据库连接url
val ckUser = "default"
val ckPassword = "123456"

spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "dws.province_consumption_day_aggr2")
.option("createTableOptions", "engine=log()")
.load().createOrReplaceTempView("test1")
}
}
}

BUG

BUG1

使用如下建表语句的时候,发现必须要有hive必须要有一个列的值和:key是一样的,否则就会报错

1
2
3
4
5
6
7
8
9
10
create external table students(
id string,
name string,
school string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with
serdeproperties ("hbase.columns.mapping" = ":key,info1:name,info2:school")
tblproperties ("hbase.table.name" = "test:student");

select * from students;

日总结

今天上午的时候把hbase和hive的关联表连接完成了,能使用外部连接hbase,也可以建立hbase和hive的关联表,区别就是外部表,相当于hive是一个查看器,外部表的话删除hive,hbase依旧存在。关联表就相当于是镜子,不管如何改变这两个任何一个,另一个都会改变。下午的时候把做了6道指标计算题,任务3和任务5的指标计算还是算难点的,所以还是浪费了一些时间的。

数据导入以及hbase抽取df测试

学习日期: 4.20

所学内容概述

将数据导入hbase

  • 抽取ods部分数据到hbase
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package mock

object MockHBase {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
mock_master()
mock_detail()

def mock_master(): Unit = {
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "192.168.23.49:2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, "fact_order_master")
val job = new JobConf(conf)
job.setOutputFormat(classOf[TableOutputFormat])
val frame = spark.table("ods.order_master")
.sample(0.7)
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", lit("2022-11-01 01:01:01") cast "timestamp")
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", lit("2022-11-01 01:01:01") cast "timestamp")
frame
.rdd
.map {
row =>
val rowKey = new Date().getTime + "_" + random().formatted("%.5f")
val put = new Put(rowKey.getBytes)
put.addColumn("info".getBytes, "order_id".getBytes, row.getLong(0).toString.getBytes)
put.addColumn("info".getBytes, "order_sn".getBytes, row.getString(1).getBytes)
put.addColumn("info".getBytes, "customer_id".getBytes, row.getLong(2).toString.getBytes)
put.addColumn("info".getBytes, "shipping_user".getBytes, row.getString(3).getBytes)
put.addColumn("info".getBytes, "province".getBytes, row.getString(4).getBytes)
put.addColumn("info".getBytes, "city".getBytes, row.getString(5).getBytes)
put.addColumn("info".getBytes, "address".getBytes, row.getString(6).getBytes)
put.addColumn("info".getBytes, "order_source".getBytes, row.getInt(7).toString.getBytes)
put.addColumn("info".getBytes, "payment_method".getBytes, row.getInt(8).toString.getBytes)
put.addColumn("info".getBytes, "order_money".getBytes, row.getDecimal(9).toString.getBytes)
put.addColumn("info".getBytes, "district_money".getBytes, row.getDecimal(10).toString.getBytes)
put.addColumn("info".getBytes, "shipping_money".getBytes, row.getDecimal(11).toString.getBytes)
put.addColumn("info".getBytes, "payment_money".getBytes, row.getDecimal(12).toString.getBytes)
put.addColumn("info".getBytes, "shipping_comp_name".getBytes, row.getString(13).getBytes)
put.addColumn("info".getBytes, "shipping_sn".getBytes, row.getString(14).getBytes)
put.addColumn("info".getBytes, "create_time".getBytes, row.getString(15).getBytes)
put.addColumn("info".getBytes, "shipping_time".getBytes, row.getString(16).getBytes)
put.addColumn("info".getBytes, "pay_time".getBytes, row.getString(17).getBytes)
put.addColumn("info".getBytes, "receive_time".getBytes, row.getString(18).getBytes)
put.addColumn("info".getBytes, "order_status".getBytes, row.getString(19).getBytes)
put.addColumn("info".getBytes, "order_point".getBytes, row.getLong(20).toString.getBytes)
put.addColumn("info".getBytes, "invoice_title".getBytes, row.getString(21).getBytes)
put.addColumn("info".getBytes, "modified_time".getBytes, row.getTimestamp(22).toString.getBytes)
put.addColumn("info".getBytes, "etl_date".getBytes, row.getString(23).getBytes)
put.addColumn("info".getBytes, "dwd_insert_user".getBytes, row.getString(24).getBytes)
put.addColumn("info".getBytes, "dwd_insert_time".getBytes, row.getTimestamp(25).toString.getBytes)
put.addColumn("info".getBytes, "dwd_modify_user".getBytes, row.getString(26).getBytes)
put.addColumn("info".getBytes, "dwd_modify_time".getBytes, row.getTimestamp(27).toString.getBytes)
(new ImmutableBytesWritable, put)
}.saveAsHadoopDataset(job)
}

def mock_detail(): Unit = {
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "192.168.23.49:2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, "fact_order_detail")
val job = new JobConf(conf)
job.setOutputFormat(classOf[TableOutputFormat])
val frame = spark.table("ods.order_detail")
.sample(0.7)
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", lit("2022-11-01 01:01:01") cast "timestamp")
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", lit("2022-11-01 01:01:01") cast "timestamp")
frame
.rdd
.map {
row =>
val rowKey = new Date().getTime + "_" + random().formatted("%.5f")
val put = new Put(rowKey.getBytes)
put.addColumn("info".getBytes, "order_detail_id".getBytes, row.getLong(0).toString.getBytes)
put.addColumn("info".getBytes, "order_sn".getBytes, row.getString(1).getBytes)
put.addColumn("info".getBytes, "product_id".getBytes, row.getLong(2).toString.getBytes)
put.addColumn("info".getBytes, "product_name".getBytes, row.getString(3).getBytes)
put.addColumn("info".getBytes, "product_cnt".getBytes, row.getInt(4).toString.getBytes)
put.addColumn("info".getBytes, "product_price".getBytes, row.getDecimal(5).toString.getBytes)
put.addColumn("info".getBytes, "average_cost".getBytes, row.getDecimal(6).toString.getBytes)
put.addColumn("info".getBytes, "weight".getBytes, row.getDouble(7).toString.getBytes)
put.addColumn("info".getBytes, "fee_money".getBytes, row.getDecimal(8).toString.getBytes)
put.addColumn("info".getBytes, "w_id".getBytes, row.getLong(9).toString.getBytes)
put.addColumn("info".getBytes, "create_time".getBytes, row.getString(10).getBytes)
put.addColumn("info".getBytes, "modified_time".getBytes, row.getTimestamp(11).toString.getBytes)
put.addColumn("info".getBytes, "etl_date".getBytes, row.getString(12).getBytes)
put.addColumn("info".getBytes, "dwd_insert_user".getBytes, row.getString(13).getBytes)
put.addColumn("info".getBytes, "dwd_insert_time".getBytes, row.getTimestamp(14).toString.getBytes)
put.addColumn("info".getBytes, "dwd_modify_user".getBytes, row.getString(15).getBytes)
put.addColumn("info".getBytes, "dwd_modify_time".getBytes, row.getTimestamp(16).toString.getBytes)
(new ImmutableBytesWritable, put)
}.saveAsHadoopDataset(job)
}
}
}

抽取hbase数据并转换dataframe

这种办法其实很麻烦,但是也没有什么比较好的办法了,外部表据说是数据会丢失一部分,所以保险起见还是使用这种方法吧。首先要创建一个类,类的参数要和你想要抽成的df表对应好,然后连接hbase模式为INPUT_TABLE,一个一个字段的抽出来,用map放类中,然后用implicit隐式转换toDF,转换成dataframe类型,就能操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private case class OrderDetail(order_detail_id: String, order_sn: String, product_id: String, product_name: String, product_cnt: String, product_price: String, average_cost: String, weight: String, fee_money: String, w_id: String, create_time: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String, etl_date: String)

def fact_order_detail(sparkSession: SparkSession,odsName:String,dwdName:String): Unit = {
import sparkSession.implicits._
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum","192.168.23.49:8123")
conf.set(TableInputFormat.INPUT_TABLE,dwdName)
val frame = sparkSession.sparkContext
.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map {
case (_, row) =>
OrderDetail(
Bytes.toString(row.getValue("info".getBytes, "order_detail_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "order_sn".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_name".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_cnt".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_price".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "average_cost".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "weight".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "fee_money".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "w_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "create_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "modified_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_insert_user".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_insert_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_modify_user".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_modify_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "etl_date".getBytes)): String
)
}.toDF
frame.show()
}

BUG

今日无BUG

日总结

今天的主要任务是总结从长沙学到的东西吧,学到的这种转hbase的办法比赛还是可以用的,因为自己之前虽然是有外部表,方便一点,但是数据多了的确是会丢失的,还是使用这样的办法比较安全。自己也是反复敲了很多遍,也学会了吧,自己也能敲出来,但是还没全部敲出来过,比较字段实在是太多了。

省赛任务书模拟

学习日期: 4.22

所学内容概述

抽取部分,难度不大,两个方法基本全部11个表都能抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package huNanSubject

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object dataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("t1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()

mysqlExtract(spark, "product_info")
mysqlExtract(spark, "order_master")
mysqlExtract(spark, "order_detail")
mysqlExtract(spark, "customer_inf")
mysqlExtract(spark, "coupon_info")
mysqlExtract(spark, "product_browse")
mysqlExtract(spark, "coupon_use")
mysqlExtract(spark, "customer_login_log")
mysqlExtract(spark, "order_cart")
mysqlExtract(spark, "customer_addr")
mysqlExtract(spark, "customer_level_inf")
spark.sql("show databases").show()

hiveExtract(spark, "coupon_use", "get_time")
hiveExtract(spark, "customer_inf", "modified_time")
hiveExtract(spark, "order_master", "modified_time")
hiveExtract(spark, "order_detail", "modified_time")
hiveExtract(spark, "coupon_info", "modified_time")
hiveExtract(spark, "product_browse", "modified_time")
hiveExtract(spark, "product_info", "modified_time")
hiveExtract(spark, "customer_login_log", "login_time")
hiveExtract(spark, "order_cart", "modified_time")
hiveExtract(spark, "customer_addr", "modified_time")
hiveExtract(spark, "customer_level_inf", "modified_time")
}

def hiveExtract(sparkSession: SparkSession, hiveName: String, maxTime: String): Unit = {
import sparkSession.sql
val timestamp = sql(s"select cast(max($maxTime) as timestamp) from ods.$hiveName").first().getTimestamp(0)
println(timestamp)
sql(
s"""
|select * from mysql_$hiveName
|""".stripMargin)
// .where(s"$maxTime>'$timestamp'")
.withColumn("etl_date", lit("20230420"))
.write.format("hive").partitionBy("etl_date").mode("append").saveAsTable(s"ods.$hiveName")
}

def mysqlExtract(sparkSession: SparkSession, mysqlName: String): Unit = {
val url = "jdbc:mysql://192.168.23.49/ds_db01?useSSL=false"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
sparkSession.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createOrReplaceTempView(s"mysql_$mysqlName")
}
}

清洗部分在这任务书中,三块里面最难最费时间的了,又要从hbase又要从ods,有增量有全量,所以这块还是很耗时间的,尤其是hbase的时候,太麻烦了,每个字段都要写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package huNanSubject

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object dataClear {
private case class OrderMaster(order_id: String, order_sn: String, customer_id: String, shipping_user: String, province: String, city: String, address: String, order_source: String, payment_method: String, order_money: String, district_money: String, shipping_money: String, payment_money: String, shipping_comp_name: String, shipping_sn: String, create_time: String, shipping_time: String, pay_time: String, receive_time: String, order_status: String, order_point: String, invoice_title: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String,etl_date: String)
private case class OrderDetail(order_detail_id: String, order_sn: String, product_id: String, product_name: String, product_cnt: String, product_price: String, average_cost: String, weight: String, fee_money: String, w_id: String, create_time: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String, etl_date: String)
val date = "20230420"
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("t1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
hiveClear1(spark,"customer_inf","dim_customer_inf","customer_id")
hiveClear1(spark,"coupon_info","dim_coupon_info","coupon_id")
hiveClear1(spark,"product_info","dim_product_info","product_core")
hiveClear2(spark,"coupon_use","fact_coupon_use")
hiveClear2(spark,"customer_login_log","log_customer_login")
hiveClear2(spark,"order_cart","fact_order_cart")
hiveClear2(spark,"product_browse","log_product_browse")
hiveClear2(spark,"customer_level_inf","dim_customer_level_inf")
hiveClear2(spark,"customer_addr","dim_customer_addr")
hiveClear3(spark,"order_master","fact_order_master")
hiveClear4(spark,"order_detail","fact_order_detail")
}

def hiveClear1(sparkSession: SparkSession, odsName: String, dwdName: String, partitionKey: String): Unit = {
import sparkSession.sql
val odsData = sql(
s"""
|select * from ods.$odsName
|""".stripMargin)
.where(s"etl_date = $date")
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.drop("etl_date")
.withColumn("etl_date", lit(date))

odsData.show()
val etl_date: String = sql(s"select max(etl_date) from dwd.$dwdName").first().getString(0)
val dwdData: Dataset[Row] = sql(s"select * from dwd.$dwdName").where(s"etl_date=$etl_date")
val w1 = Window.partitionBy(partitionKey)
val w2 = w1.orderBy(col("modified_time").desc)
odsData.union(dwdData)
.withColumn("rk", row_number().over(w2))
.withColumn("dwd_insert_time", min(col("dwd_insert_time")).over(w1))
.withColumn("dwd_modify_time", max(col("dwd_modify_time")).over(w1))
.where("rk=1").drop("rk")
.write.format("hive").mode("append").insertInto(s"dwd.$dwdName")
}

def hiveClear2(sparkSession: SparkSession, odsName: String, dwdName: String): Unit = {
import sparkSession.sql
sql(
s"""
|select * from ods.$odsName
|""".stripMargin)
.where(s"etl_date = $date")
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.drop("etl_date")
.withColumn("etl_date", lit(date))
.write.format("hive").mode("append").insertInto(s"dwd.$dwdName")
}

def hiveClear4(sparkSession: SparkSession,odsName:String,dwdName:String): Unit = {
import sparkSession.implicits._
val conf: Configuration = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum","bigdata1:2181")
conf.set(TableInputFormat.INPUT_TABLE,dwdName)
val frame: DataFrame = sparkSession.sparkContext
.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map {
case (_, row) =>
//取之
OrderDetail(
Bytes.toString(row.getValue("info".getBytes, "order_detail_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "order_sn".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_name".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_cnt".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "product_price".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "average_cost".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "weight".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "fee_money".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "w_id".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "create_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "modified_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_insert_user".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_insert_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_modify_user".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "dwd_modify_time".getBytes)): String,
Bytes.toString(row.getValue("info".getBytes, "etl_date".getBytes)): String
)
}.toDF
frame.show()

val d1: DataFrame = sparkSession.table(s"ods.$odsName")
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.where(s"etl_date=$date").drop("etl_date").withColumn("etl_date", lit(date))
.union(frame)
// 清洗列
.withColumn("etl_date", from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyyMMdd"))
d1.show()
d1.write.format("hive").saveAsTable(s"dwd.$dwdName")
}

def hiveClear3(sparkSession: SparkSession,odsName:String,dwdName:String): Unit = {
import sparkSession.implicits._
val conf: Configuration = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum","bigdata1:2181")
conf.set(TableInputFormat.INPUT_TABLE,dwdName)

val frame = sparkSession.sparkContext
.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map{
case (_,row) =>
val order_id: String = Bytes.toString(row.getValue("info".getBytes,"order_id".getBytes))
val order_sn = Bytes.toString(row.getValue("info".getBytes, "order_sn".getBytes))
val customer_id = Bytes.toString(row.getValue("info".getBytes, "customer_id".getBytes))
val shipping_user = Bytes.toString(row.getValue("info".getBytes, "shipping_user".getBytes))
val province = Bytes.toString(row.getValue("info".getBytes, "province".getBytes))
val city = Bytes.toString(row.getValue("info".getBytes, "city".getBytes))
val address = Bytes.toString(row.getValue("info".getBytes, "address".getBytes))
val order_source = Bytes.toString(row.getValue("info".getBytes, "order_source".getBytes))
val payment_method = Bytes.toString(row.getValue("info".getBytes, "payment_method".getBytes))
val order_money = Bytes.toString(row.getValue("info".getBytes, "order_money".getBytes))
val district_money = Bytes.toString(row.getValue("info".getBytes, "district_money".getBytes))
val shipping_money = Bytes.toString(row.getValue("info".getBytes, "shipping_money".getBytes))
val payment_money = Bytes.toString(row.getValue("info".getBytes, "payment_money".getBytes))
val shipping_comp_name = Bytes.toString(row.getValue("info".getBytes, "shipping_comp_name".getBytes))
val shipping_sn = Bytes.toString(row.getValue("info".getBytes, "shipping_sn".getBytes))
val create_time = Bytes.toString(row.getValue("info".getBytes, "create_time".getBytes))
val shipping_time = Bytes.toString(row.getValue("info".getBytes, "shipping_time".getBytes))
val pay_time = Bytes.toString(row.getValue("info".getBytes, "pay_time".getBytes))
val receive_time = Bytes.toString(row.getValue("info".getBytes, "receive_time".getBytes))
val order_status = Bytes.toString(row.getValue("info".getBytes, "order_status".getBytes))
val order_point = Bytes.toString(row.getValue("info".getBytes, "order_point".getBytes))
val invoice_title = Bytes.toString(row.getValue("info".getBytes, "invoice_title".getBytes))
val modified_time = Bytes.toString(row.getValue("info".getBytes, "modified_time".getBytes))
val dwd_insert_user = Bytes.toString(row.getValue("info".getBytes, "dwd_insert_user".getBytes))
val dwd_insert_time = Bytes.toString(row.getValue("info".getBytes, "dwd_insert_time".getBytes))
val dwd_modify_user = Bytes.toString(row.getValue("info".getBytes, "dwd_modify_user".getBytes))
val dwd_modify_time = Bytes.toString(row.getValue("info".getBytes, "dwd_modify_time".getBytes))
val etl_date = Bytes.toString(row.getValue("info".getBytes, "etl_date".getBytes))
OrderMaster(order_id: String, order_sn: String, customer_id: String, shipping_user: String, province: String, city: String, address: String, order_source: String, payment_method: String, order_money: String, district_money: String, shipping_money: String, payment_money: String, shipping_comp_name: String, shipping_sn: String, create_time: String, shipping_time: String, pay_time: String, receive_time: String, order_status: String, order_point: String, invoice_title: String, modified_time: String, dwd_insert_user: String, dwd_insert_time: String, dwd_modify_user: String, dwd_modify_time: String,etl_date: String)
}.toDF
frame.show()

sparkSession.table(s"ods.$odsName")
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
.where(s"etl_date=$date").drop("etl_date").withColumn("etl_date",lit(date))
.union(frame)
.where(length(col("city"))<=8)
// 清洗列
.withColumn("etl_date",from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyyMMdd"))
.withColumn("create_time", from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("shipping_time", from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("pay_time", from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("receive_time", from_unixtime(unix_timestamp($"create_time", "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
.write.format("hive").saveAsTable(s"dwd.$dwdName")
}
}

指标计算这三题都蛮简单的,三题加起来才40分钟基本就做完了注意的点就是订单状态要看取哪个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package huNanSubject

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import java.util.Properties

object dataMath {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("t1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.writeLegacyFormat", "true")
.getOrCreate()
data3(spark)
data2(spark)
data1(spark)

def data3(sparkSession: SparkSession): Unit = {
//7 25 8 14
import sparkSession.sql
sql(
"""
|with t1 as (
|select
|login_id,
|login_time
|from
|dwd.log_customer_login lcl
|where
|date_format(login_time,"yyyyMMdd") <= 20220731
|and
|date_format(login_time,"yyyyMMdd") >= 20220725),
|t2 as (
|select
|login_id,
|login_time
|from
|dwd.log_customer_login lcl
|where
|date_format(login_time,"yyyyMMdd") <= 20220807
|and
|date_format(login_time,"yyyyMMdd") >= 20220801
|),t3 as (
|select
|login_id,
|login_time
|from
|dwd.log_customer_login lcl
|where
|date_format(login_time,"yyyyMMdd") <= 20220814
|and
|date_format(login_time,"yyyyMMdd") >= 20220808
|),t4 as (
|select t1.login_id,
|t1.login_time,
|t2.login_time,
|t3.login_time
|from
|t1
|join t2 on t1.login_id = t2.login_id
|join t3 on t2.login_id = t3.login_id)
|select count(*) as active_total from t4
|""".stripMargin).createOrReplaceTempView("temp")
val frame = sql(
"""
|select
|'2022-08-10' as end_date,
|temp.active_total as active_total,
|'2022-07-25_2022-08-14' as date_range
|from
|temp
|""".stripMargin)
exportToClickHouse(frame,"ds_result.continuous_3week")
}

def data2(sparkSession: SparkSession): Unit = {
import sparkSession.sql
val t2Frame = sql(
"""
|with t1 as (
|select
|fod.product_id as product_id,
|sum(fom.order_money) as sales_amount,
|count(*) as product_totalcnt
|from
|dwd.fact_order_master fom
|join dwd.fact_order_detail fod on fom.order_sn = fod.order_sn
|where order_status <> '已退款'
|group by fod.product_id)
|select *,
|row_number() over(order by sales_amount desc) as sales_rank
|from t1
|""".stripMargin)
t2Frame.show()
exportToClickHouse(t2Frame,"ds_result.sales_amount_rank")
}

def data1(sparkSession: SparkSession): Unit = {
import sparkSession.implicits._
import sparkSession.sql
sql(
"""
|select * from dwd.fact_order_master
|""".stripMargin).distinct.createOrReplaceTempView("temp")
val t1Frame = sql(
"""
|with t1 as (
|select
|province,
|count(1) as creat_order
|from temp
|where order_status = '已下单'
|group by province),t2 as (
|select
|province,
|count(1) as payment
|from temp
|where order_status <> '已下单' and order_status <> '已退款'
|group by province
|),t3 as (
|select
|t1.province as province,
|t1.creat_order as creat_order,
|t2.payment as payment,
|round(t1.creat_order/t2.payment,3) as payCVR
|from
|t1
|join t2 on t1.province = t2.province)
|select
|province,creat_order,payment,payCVR,
|row_number() over(order by payCVR desc) as ranking
|from
|t3
|""".stripMargin)
t1Frame.show()
exportToClickHouse(t1Frame,"ds_result.payment_cvr")

}

def exportToClickHouse(dataFrame: DataFrame, tableName: String): Unit = {
val c_pro = new Properties()
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://192.168.23.49:8123" // 数据库连接url
val ckUser = "default"
val password = "123456"
c_pro.setProperty("user", ckUser)
c_pro.setProperty("driver", ckDriver)
c_pro.setProperty("password", password)
c_pro.setProperty("createTableOptions", "ENGINE=Log()")
dataFrame.write.format("hive").mode("overwrite").jdbc(ckUrl, tableName, c_pro)
}
}
}

BUG

基本无BUG,除了太卡了。

日总结

今天是做任务书,小结一下,一共做了将近5个小时,大概3个小时可能都是在导假数据,因为hbase和hive中都没有数据,然后昨天是不知道今天需要的表的,所以就现导了,也是出现了些问题的,类型和表字段不一样多各种问题,自己估摸了一下如果比赛的话这一套差不多3个小时自己就能做完。hbase那块还是要再去熟悉一下。

clickhouse不建表导入

学习日期: 4.24

所学内容概述

导入clickhouse 的方式,不建立临时表,直接用jdbc导入,properties中set除了url的基本配置,jdbc方法不用建表,会自动在clickhouse建表,列名为设置的列名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package huNanSubject

import org.apache.spark.sql.DataFrame

import java.util.Properties

object EtlUtil {
def extractToClickhouse(dataFrame: DataFrame,TableName:String): Unit = {
val c_url = "jdbc:clickhouse://192.168.23.49:8123"
val c_user = "default"
val c_password = "123456"
val c_driver = "com.clickhouse.jdbc.ClickHouseDriver"
val properties = new Properties()
properties.setProperty("user",c_user)
properties.setProperty("password",c_password)
properties.setProperty("driver",c_driver)
dataFrame.write.format("hive").mode("overwrite").jdbc(c_url,TableName,properties)
}
}

BUG

把之前在ubuntu写的代码,放自己电脑上面发现直接报错,然后发现了是scala版本的问题,xml中版本是2.12,自己电脑中是2.11的所以报了token的错误。

image-20230422201528750

日总结

今天又复盘了一下任务书,然后将导入clickhouse放到一个工具类中,感觉mysql也可以放这工具类中,不知道导入mysql的时候如果不建表用这种方法行不行,一般从mysql抽数据到hive还是使用建立临时表的方法吧,毕竟这样比较熟悉, 导入一般也不会报错。比赛的时候可以把一些方法全部放到一个工具类中,写入的时候直接调用封装好的方法就行,还是能节省下很多时间的。

湖南任务书重做

学习日期: 4.26

所学内容概述

重新从头做了一下任务书并把一些题目给修改了。

BUG

报了这个错误,翻译是会为null的字段,无法导入非null字段,应该没有特意设置过这四个字段为非null,导入的时候就很奇怪报了这个错误,想了想去把保存方式从write.format(“hive”).partitionBy(“etl_date”).mode(“append”).saveAsTable(“dwd.dim_customer_inf”)
改成了write.format(“hive”).insertInto(“dwd.dim_customer_inf”),两种导入的结果在表存在并设置分区的情况下,结果是一样的,应该是saveAsTable方法的一个bug了。

image-20230424155440477

日总结

今天再又去做了一遍任务书,将自己原数据都没有都10月1号改成了8月22,发现modified_time最少的数据都有800多条,所以就查了10条,然后就自己对hbase的抽取,因为一个一个字段要敲,比较麻烦,敲hbase都花费了十来分钟,还是挺浪费时间的,不过比赛6个小时的话,还是来得及的。预估能在4小时之内完成并截图。

赛前测试

学习日期: 4.27

所学内容概述

赛前对自己做过的任务书进行梳理,感觉hbase比赛提供好的话,类型可能是要转换的,或者自己额外建一个表都是String的。

BUG

这个小BUG其实问题不大,就是说没使用分组的时候,直接count会显示0。可以按照主键分组,然后max(count(*)),解决这个问题,但是可以设置参数直接让它可用,https://www.cnblogs.com/muyue123/p/14371799.html这里有方法说明

1
set hive.compute.query.using.stats=false;

image-20230504084912254

日总结

今天晚上又去做了一下前面部分的清洗和抽取,把该记住的配置都再加深一个印象,像导入clickhouse以及mysql的driver还有url都要多次加强一下印象,还有需要修改的配置信息,yarn也要再过一遍,希望明天好好发挥即可。