模版
学习日期:
项目任务
BUG
日总结
1 2 3 4 5 6 7 8 {% tabs 分栏%} <!-- tab BUG1@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> <!-- tab BUG2@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> {% endtabs %}
Py综合推荐指标算法
学习日期: 6.1
项目任务
综合指标计算
默认推荐指标数,因为还没想好需要推荐的字段,有哪些,所以就先把结合和景区有关的所有字段找出进行加权处理,算出一个比较合适的rating推荐指标系数,将该系数存放到spot表中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import pandas as pdfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleTypefrom commentDmTest.DmConnect import *os.environ["PYSPARK_PYTHON" ] = "E:\\software\\anaconda3\\envs\\pyspark\\python.exe" conn = connDm() cur = conn.cursor() cur.execute(""" with t1 as (select SPOT_ID, round(avg(FEEL_SCORE), 3) as avgScore from COMMENT_INFO_NLP group by SPOT_ID) select SPOT_NAME, GRADE, HOT, t2.SPOT_ID, replace(SUM, substring(SUM, locate('条', SUM)), '') as sum, avgScore from SPOT_INFO t2 join t1 on t1.SPOT_ID=t2.SPOT_ID """ )spot_info_df = pd.DataFrame(cur.fetchall(), columns=['SPOT_NAME' , 'GRADE' , 'HOT' , 'SPOT_ID' , 'SUM' , 'AVG' ]) cur.execute(""" select MEDIAN(convert(int,t1.sum)) avgCommentSum from (select replace(SUM, substring(SUM, locate('条', SUM)), '') as sum from SPOT_INFO) t1 """ )avg_comment_sum = cur.fetchall()[0 ][0 ] spot_info_df['SUM' ] = spot_info_df['SUM' ].apply(pd.to_numeric) max_comment_sum = spot_info_df['SUM' ].max () def process_row (r ): grade_count = r.GRADE if r.GRADE else 0.0 hot_count = r.HOT if r.HOT else 0.0 sum_count = r.SUM if r.SUM else avg_comment_sum avg_count = r.AVG if r.AVG else 0.0 grade_score = 0.2 * grade_count / 5 * 100 if grade_count <= 5 else 20.0 hit_score = 0.4 * hot_count / 10 * 100 if hot_count <= 10 else 40.0 sum_score = 0.2 * sum_count / max_comment_sum * 100 if sum_count <= max_comment_sum else 20 avg_score = 0.2 * avg_count / 5 * 100 if avg_count <= 5 else 20.0 rating = round (grade_score, 2 ) + round (hit_score, 2 ) + round (sum_score, 2 ) + round (avg_score, 2 ) return r.SPOT_ID, r.SPOT_NAME, round (rating, 2 ) spark = connSpark() schema = StructType([ StructField("SPOT_NAME" , StringType(), True ), StructField("GRADE" , DoubleType(), True ), StructField("HOT" , DoubleType(), True ), StructField("SPOT_ID" , IntegerType(), True ), StructField("SUM" , DoubleType(), True ), StructField("AVG" , DoubleType(), True ), ]) spot_info_df_spark = spark.createDataFrame(spot_info_df, schema=schema) spot_info_df_spark.printSchema() rating_rdd = spot_info_df_spark.rdd.map (process_row) rating_rdd = rating_rdd.map (lambda x: (int (x[0 ]), str (x[1 ]), float (x[2 ]))) rating_df = rating_rdd.toDF(["spot_id_rat" , "spot_name_rat" , "RATING" ]) spot_into = dmRead(spark, "SPOT_INFO" ) need_col: list = spot_into.toPandas().columns.values.tolist() need_col.append("RATING" ) spot_final = rating_df.join(spot_into, rating_df.spot_id_rat == spot_into.SPOT_ID) \ .selectExpr(need_col) dmWrite(spot_final, "SPOT_INFO" )
根据关键字找对应的评论
查询方法
1 2 3 4 5 6 7 8 9 10 11 12 13 def keyWordFindComment (spot_id, comment_key_word ): cur.execute(""" select spot_id, evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO_NLP where SPOT_ID={} and EVALUATION like '%{}%' """ .format (spot_id, comment_key_word)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
接口
1 2 3 4 5 6 7 8 9 10 11 @app.route('/spot/comment/<spot_id>' , methods=['POST' , 'GET' ] ) def spot_comments_high_word_find (spot_id ): if request.method != 'POST' : return json.dumps({'msg' : '查询失败,请使用POST传参' , 'code' : 500 , 'data' : 'null' }, ensure_ascii=False ) keyWord = request.args.to_dict() data = dmToDfpkuseg.keyWordFindComment(eval (spot_id), keyWord.get('keyWord' )) if len (data) == 0 : return json.dumps({'msg' : '查询失败,没有该景区关键字对应的评论' , 'code' : 500 , 'data' : 'null' }, ensure_ascii=False ) return json.dumps({'msg' : '查询成功' , 'code' : 200 , 'data' : data}, ensure_ascii=False )
BUG
这个是flask中一个serser.py中的方法不能过多,我之前是有两个方法名的存在,而且能正常使用的情况的,不知道为什么现在不行,后面经过我测试,发现方法最多存在两个,而且需要一个有参一个无参的,不能两个有参的,即使参数的数量和类型不一样也不被允许。
我使用pyspark连接dm数据库的时候报错,看样子是找不到driver,搜索了方法,发现其实跟之前MySQL连接spark差不多,需要把driver的jar包放入spark的环境下面,将dm的jar包放到pyspark的jars包下面问题解决。
日总结
今天完成的任务还是挺多的,而且梳理好了后面几天的计划,后几天打算把预测学会,看能不能举一反三把智能推荐部分解决了,这几天还需要想下推荐部分需要什么样的筛选条件。今天的任务主要有两个,根据关键字找到相关的评论,然后套加权算法把rating,也就是景区的综合推荐度计算出来,并放入数据库中,本来是打算写成接口的,但是spark接口响应实在是太慢了,直接存入数据库中了,如果spark写入接口一直都很慢的话,可能后期要弃用spark写推荐算法,要用pandas和numpy。
数据清洗方法整合
学习日期: 6.2
项目任务
起因
因为数据有些缺失值是没有补的,有些只能python进行填补,有部分是要spark补的,但是一般自己处理完一部分直接存入数据库,这样我要顺利清洗完,需要跑好几个jar包,打算利用spark临时表的特性,尽量把运行起来,只需要两三个jar包就能把数据搞好。
导入
步骤以及思路:现将提供的所有文件传送到一个表中,代码也进行了很大的优化,前期准备工作一个scala代码就能解决,本来是因为txt文件是有java跑的,但是其实可以用scala调java的main方法,这样就可以放一个文件中。
finalClearOne
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package clearDfimport clearDf.Utils .{dmWriterOverwrite, getSpark}import clearDf.fileClearTest.{csvToDf, xlsxToDf}import org.apache.spark.sql.functions._import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.{DataFrame , SparkSession }object finalClearOne { def main (args: Array [String ]): Unit = { val spark: SparkSession = getSpark import spark.implicits._ import spark.sql sql(" set spark.executor.processTreeMetrics=true" ) val frame2: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\2.csv" ) val frame3: DataFrame = xlsxToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\3.xlsx" ) val frame4: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\4.csv" ) val frameSmallFill: DataFrame = frame2.unionByName(frame3) .withColumn("GRADE" ,round(substring($"GRADE" ,1 ,2 ).cast(IntegerType ) / 20 ,1 )) .withColumn("PHONE" ,lit("" )) .withColumn("INTRO" ,lit("" )) .withColumn("HOT" ,lit("" )) .withColumn("COMMENT_GRADE" ,lit("" )) .withColumn("COMMENT_TIME" ,lit("" )) val frameAllData:DataFrame = frame4.unionByName(frameSmallFill) dmWriterOverwrite(frameAllData,"U7U7.All_TEMP" ) import clearTxt.txtClearArrayList txtClearArrayList.txtIntoDmData.main() } }
BUG
很困难解决的bug没有出现
日总结
因为据说下周服务器平台就会发放了,为了处理平台可能提供的hdfs的新数据,打算将自己之前数据处理的代码整合一下,放到一个jar包中,方法都设置参数通用,这样在平台执行,也只需要两个spark提交即可。没什么困难的,后面整合表抽取部分数据以及挂join城市表挂id这类的方法,重新梳理好方法可能会麻烦一些,需要把代码都理清楚。
清洗方法整合all
学习日期: 6.5
项目任务
任务
将csv和txt文件读取以后的all_temp分表处理,先是对自己电脑上对dm添加数据,然后移植到了服务器中。然后使用人工智能这块对缺失的数据预测掉。
代码如下
全部整合了分表的代码,并利用sparkml将hot字段进行缺失值填充。机器学习这部分我使用了随机森林算法,依据hot的皮平均值,和对应的grade和sum字段,推算出可能的hot值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 package clearDfimport clearDf.Utils .{dmRead, dmWriterOverwrite, getSpark}import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.regression.{RandomForestRegressionModel , RandomForestRegressor }import org.apache.spark.sql.expressions.{Window , WindowSpec }import org.apache.spark.sql.functions._import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.{DataFrame , SparkSession }object finalClearTwo { val spark: SparkSession = getSpark import spark.implicits._ import spark.sql def main (args: Array [String ]): Unit = { finalClearOne.main(args) val all_data: DataFrame = dmRead(spark, "ALL_TEMP" ) .withColumn("SPOT" , regexp_replace($"SPOT" , "\\?" , "" )) .where("SPOT <> 'null'" ) val spot_info: DataFrame = spotInfoClear(all_data) val comment_info: DataFrame = commentWithSpot(all_data, spot_info) println(comment_info.count()) val city_temp:DataFrame = cityInfoClear(spot_info) dmWriterOverwrite(city_temp,"city_info_temp" ) val spot_info_temp: DataFrame = spotWithCityId(spot_info, city_temp) val create_temp_city: DataFrame = city_temp .withColumn("PROVINCE_NAME" , lit("" )) .where("CITY_ID=0" ) dmWriterOverwrite(create_temp_city,"CITY_TEMP" ) import clearTxt.jsTest.jsonToDataTest jsonToDataTest.provinceNameIntoCity() val city_temp_two:DataFrame = dmRead(spark, "CITY_TEMP" ) val province_data: DataFrame = createToProvince(city_temp_two) dmWriterOverwrite(province_data,"PROVINCE_INFO" ) val city_info_final:DataFrame = cityJoinProvince(city_temp_two,province_data) dmWriterOverwrite(city_info_final,"CITY_INFO" ) val spot_Prediction: DataFrame = spotPredictionFinalTable(spot_info_temp) dmWriterOverwrite(spot_Prediction, "spot_info_temp" ) } def spotPredictionFinalTable (spot_info_temp: DataFrame ): DataFrame ={ val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID" ,"GRADE" ,"HOT" ,"SUM" ) .withColumn("GRADE" ,$"GRADE" .cast(DoubleType )) .withColumn("HOT" ,$"HOT" .cast(DoubleType )) .withColumn("SUM" ,regexp_extract($"SUM" ,"[^条]+()" ,0 ).cast(DoubleType )) val data2: DataFrame = spot_info_temp .withColumn("HOT" ,$"HOT" .cast(DoubleType )) val meanValues: Double = data.agg(round(avg(col("HOT" )),1 )).head().getDouble(0 ) val filledData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembler: VectorAssembler = new VectorAssembler () .setInputCols(Array ("GRADE" , "SUM" )) .setOutputCol("features" ) val assembledData: DataFrame = assembler.transform(filledData) val rf: RandomForestRegressor = new RandomForestRegressor () .setLabelCol("HOT" ) .setFeaturesCol("features" ) val model: RandomForestRegressionModel = rf.fit(assembledData) val predictedData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembledPredictedData: DataFrame = assembler.transform(predictedData) val filledPredictedData: DataFrame = model.transform(assembledPredictedData) .withColumn("prediction" , round(col("prediction" ), 1 )) val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID" ,"prediction" ).withColumnRenamed("SPOT_ID" ,"id" ) data2.join(predict,predict("id" ) === data2("SPOT_ID" )) .withColumn("HOT" , when(col("HOT" ).isNull, col("prediction" )).otherwise(col("hot" ))) .withColumn("HOT" ,round($"HOT" ,1 )) .withColumn("GRADE" ,round($"GRADE" ,1 )) .drop("id" ,"prediction" ) } def cityJoinProvince (city_data:DataFrame ,province_data:DataFrame ): DataFrame ={ val city_temp: DataFrame = city_data.join(province_data, city_data("PROVINCE_NAME" ) === province_data("NAME" )) .selectExpr("CITY_NAME" , "CITY_ID" , "PROVINCE_ID" ) city_temp } def createToProvince (city_data:DataFrame ): DataFrame ={ val province_data: DataFrame = city_data .selectExpr("PROVINCE_NAME" ) .distinct() .withColumnRenamed("PROVINCE_NAME" , "NAME" ) .withColumn("PROVINCE_ID" , row_number() over Window .orderBy($"NAME" )) province_data } def spotInfoClear (all_data: DataFrame ): DataFrame = { val SPOT_ID_ORDER : WindowSpec = Window .orderBy($"SPOT_NAME" .desc) val spot_info_temp: DataFrame = all_data .withColumnRenamed("SPOT" , "SPOT_NAME" ) .drop("COMMENT" ) .drop("COMMENT_GRADE" ) .drop("COMMENT_TIME" ) .distinct() .withColumn("SPOT_ID" , row_number() over SPOT_ID_ORDER ) .where("SPOT_ID<>9" ) .withColumn("SPOT_ID" , row_number() over SPOT_ID_ORDER ) spot_info_temp } def commentWithSpot (all_data: DataFrame , spot_data: DataFrame ): DataFrame = { val comment_info_temp: DataFrame = all_data.join(spot_data, all_data("SPOT" ) === spot_data("SPOT_NAME" )) .selectExpr("SPOT_ID" , "COMMENT" , "COMMENT_GRADE" , "COMMENT_TIME" ) .withColumnRenamed("COMMENT" , "EVALUATION" ) .withColumnRenamed("COMMENT_GRADE" , "EVALUATION_GRADE" ) .withColumnRenamed("COMMENT_TIME" , "EVALUATION_TIME" ) .where("EVALUATION<>'NULL'" ).distinct() .withColumn("EVALUATION_ID" , row_number() over Window .orderBy($"SPOT_ID" )) comment_info_temp } def cityInfoClear (spot_info:DataFrame ): DataFrame ={ spot_info.createOrReplaceTempView("spot_info" ) val city_final_frame: DataFrame = sql( """ |select |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^市]{2}()+市)"),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as CITY_NAME |from |spot_info |""" .stripMargin) .distinct() .withColumn("CITY_ID" , row_number() over Window .orderBy($"CITY_NAME" .desc)) city_final_frame } def spotWithCityId (spot_info:DataFrame ,city_temp:DataFrame ): DataFrame ={ spot_info.createOrReplaceTempView("spot_info" ) city_temp.createOrReplaceTempView("city_info" ) val spot_test: DataFrame = sql( s"" " |with t1 as ( |select |*, |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION," ([^市]{2 }()+市)"),regexp_extract(LOCATION," ([^省]+省)?([^自治州]+自治州)", 2)) as NAME |from |spot_info) |select |t1.*, |ci.CITY_ID |from |t1 |join city_info ci on ci.CITY_NAME = t1.NAME |" "" .stripMargin).drop("NAME" ) spot_test } }
随机森林对hot填充
这块自己研究了一下午+一晚上才搞定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def spotPredictionFinalTable (spot_info_temp: DataFrame ): DataFrame ={ val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID" ,"GRADE" ,"HOT" ,"SUM" ) .withColumn("GRADE" ,$"GRADE" .cast(DoubleType )) .withColumn("HOT" ,$"HOT" .cast(DoubleType )) .withColumn("SUM" ,regexp_extract($"SUM" ,"[^条]+()" ,0 ).cast(DoubleType )) val data2: DataFrame = spot_info_temp .withColumn("HOT" ,$"HOT" .cast(DoubleType )) val meanValues: Double = data.agg(round(avg(col("HOT" )),1 )).head().getDouble(0 ) val filledData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembler: VectorAssembler = new VectorAssembler () .setInputCols(Array ("GRADE" , "SUM" )) .setOutputCol("features" ) val assembledData: DataFrame = assembler.transform(filledData) val rf: RandomForestRegressor = new RandomForestRegressor () .setLabelCol("HOT" ) .setFeaturesCol("features" ) val model: RandomForestRegressionModel = rf.fit(assembledData) val predictedData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembledPredictedData: DataFrame = assembler.transform(predictedData) val filledPredictedData: DataFrame = model.transform(assembledPredictedData) .withColumn("prediction" , round(col("prediction" ), 1 )) val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID" ,"prediction" ).withColumnRenamed("SPOT_ID" ,"id" ) data2.join(predict,predict("id" ) === data2("SPOT_ID" )) .withColumn("HOT" , when(col("HOT" ).isNull, col("prediction" )).otherwise(col("hot" ))) .withColumn("HOT" ,round($"HOT" ,1 )) .withColumn("GRADE" ,round($"GRADE" ,1 )) .drop("id" ,"prediction" ) }
BUG
预测这边的报错还是挺多的,但是主要报错都是自己设置模型的时候字段没看清楚,都是xxx字段不存在之类的,慢慢修改以后发现预测效果还可以。
日总结
今天的任务还是整合和预测填充,本来打算用中位数或者avg平均值填充的,但是数据展示的话,一行平的不太好看,所以使用了预测,预测那块还是挺难懂的,简单说就是利用已知的会影响缺失值的数据,做模型的支撑数据,算出缺失的hot。原理就是回归算法,简单说就是你对一个数据打分了,数据其实也会反映出你的习惯之类的,是一个相互的过程,随机森林就是利用这点,反向推测出数据。
python评论表优化
学习日期: 6.6
项目任务
起因
因为评论表中grade字段很多是缺失的,又因为之前自己是对语言进行过情绪分分析的所以,对于这部分缺失值可以填充,然后就是ip以及时间了,发现了一个很好用的类faker,他可以随机生成省份以及地址,又因为其他数据的省份好像都是两字而且不带省和自治州这些的,就稍加处理了一下用了递归。
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import randomimport pandas as pdimport radarfrom faker import Fakerfrom finalMain.DmConnect import connDmfrom snownlp import SnowNLPfrom snownlp import sentimentimport osfrom sqlalchemy import create_enginedef provinceRandom () -> str : f = Faker('zh_CN' ) province = f.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 26 : province = provinceRandom() if province not in province_list: province_list.append(province) return province_list def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA@47.120.9.247:5236/' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) def commentDataFinalIntoDm (): data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'snownlpDrill/sentiment.marshal' ) sentiment.load(data_path) conn = connDm() cur = conn.cursor() cur.execute("select * from COMMENT_TEMP" ) comment_list: list = cur.fetchall() comment_df = pd.DataFrame(comment_list, columns=["SPOT_ID" , "EVALUATION" , "EVALUATION_GRADE" , "EVALUATION_TIME" , "EVALUATION_ID" ]) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' province_list = provinceList() for index in list (comment_df.index): comment_values = comment_df['EVALUATION' ] comment_value = comment_values.loc[index] if comment_df['EVALUATION_TIME' ].loc[index] == "" : random_index = random.randrange(len (province_list)) comment_df.iloc[index - 1 , 3 ] = str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) + "IP属地: " + \ province_list[random_index] score = round (SnowNLP(comment_value).sentiments, 2 ) feelScore = round (score * 5 , 1 ) comment_df.iloc[index - 1 , 5 ] = feelScore feel = scoreSnow(score) comment_df.iloc[index - 1 , 6 ] = feel if comment_df['EVALUATION_GRADE' ].loc[index] == "" : comment_df.iloc[index - 1 , 2 ] = str (int (round (feelScore, 0 ))) + "分 " + feel intoDm(comment_df, "COMMENT_INFO" ) commentDataFinalIntoDm()
数据处理效果
预测不准的情况还在解决,但不是现在的重点。
BUG
没找到原因,应该是自己的中位数是string类型的吧可能,因为这块本来是对sum字段用中位数填充的,但是后面发现数据都是有sum的,就直接注释了。
日总结
今日任务主要是优化,然后测试了多次,从空表直接执行一次spark代码一次py代码就将数据处理好并存入dm的数据库中了。今天主要是收获是发现了一个很好用的能填充随机参数了类,python faker库能随机生成很多需要的个人信息或者编码之类的挺多数据的,而且格式可以调,还是很不错的。
使用spark-submit提交
学习日期: 6.7
项目任务
将自己电脑内的hadoop启动,用spark提交尝试提高清洗运行速度。大概需要1分钟一次,问题出现在过程中使用了java的代码,java又不支持分布式计算平台,所以导致时间画得比较久,而且spark启动也会花费时间。
电脑又装了一个伪分布式的hadoop,花了一上午的时间,下午就在测试了,大概洗完样数据,只要2分钟,一次jar包,一次python执行。安装hadoop步骤就不写了,挂个网址好了,http://t.csdn.cn/SuQmN,找hadoop3.2.2花费了点时间,还需要对应版本的window.exe
BUG
使用spark-submit提交jar包读取发现有部分乱码,仔细看了看都是在后面的数据,好像就是3000条,那就是java洗的,去找java那部分代码的格式问题,发现读txt文件的时候就已经乱码了,起初还以为是导入数据库才乱码,还修改了url结果是IO流的问题reader = new InputStreamReader(file, StandardCharsets.UTF_8);读文件的时候加编码格式就行了。
java部分的报错,一直说找不到city.json,打完jar包也没搞懂他读取文件的路径相对路径到底怎么写,直接写绝对路径了,等放服务器中再改好了。
日总结
今天刚把平台发放,还咩有研究明白,现测试自己的清洗代码打jar包能不能顺利执行,上午配置了hadoop以及spark的环境,下午对打完jar包以后,一些路径和编码这样的小细节进行了处理,目前感觉最主要的是平台的使用,清洗这部分对样数据是没什么问题了,还得看hdfs的数据,如果相差很大就糟糕了。
清洗txt文件夹
学习日期: 6.8
项目任务
起因
因为关系到清洗测试以及速度问题,之前自己1w条数据就花费了1分钟清洗,这一些增加到了百万条,自己那种代码肯定不行了尤其是java处理txt那边,估计是内存都要爆,得换成pandas了,之前spark代码就要推翻重新搞了,要纯python清洗了。
清洗txt
因为txt是之前我觉得最难的,所以先使用一些python对txt能不能很好的处理以及效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 import randomimport pandas as pdimport reimport osimport radarfrom faker import Fakerfrom hdfs3 import HDFileSystemimport globfrom snownlp import SnowNLPdef scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def provinceRandom () -> str : f = Faker('zh_CN' ) province = f.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 26 : province = provinceRandom() if province not in province_list: province_list.append(province) return province_list def hdfsFileRead (): hdfs = HDFileSystem(host='hadoopb-namenode.damengb-zone.svc' , port=9000 ) file_list = hdfs.ls('/data/txt' ) hdfsConfig = hdfsFileRead() hdfs = hdfsConfig[0 ] file_list = hdfsConfig[1 ] for f in file_list: with hdfs.open (f) as file: text = file.read() return hdfs, file_list def LoadFileRead (): folder_path = r'/data/txt' file_names = os.listdir(folder_path) return file_names def clearSpot (): spot_df = df[df.columns[0 :11 ]].drop_duplicates() spot_df['rating' ] = 4 * spot_df['GRADE' ].astype(float ) + 4 * spot_df['HOT' ].astype(float ) if __name__ == '__main__' : listDf = [] folder_path = r'../data/txt' for file_path in glob.glob(os.path.join(folder_path, '*.txt' )): with open (file_path, 'r' , encoding="utf-8" ) as f: text = f.read() spot = text[text.find('[SPOT]' ) + 7 :text.find('[LOCATION]' )].strip() location = text[text.find('[LOCATION]' ) + 11 :text.find('[OPENTIME]' )].strip() opentime = text[text.find('[OPENTIME]' ) + 10 :text.find('[PHONE]' )].strip() phone = text[text.find('[PHONE]' ) + 7 :text.find('[INTRO]' )].strip() intro = text[text.find('[INTRO]' ) + 7 :text.find('[GRADE]' )].strip() grade = text[text.find('[GRADE]' ) + 7 :text.find('[HOT]' )].strip() hot = text[text.find('[HOT]' ) + 5 :text.find('[SUM]' )].strip() sum_ = text[text.find('[SUM]' ) + 5 :text.find('[COMMENT]' )].strip() comment = text[text.find('[COMMENT]' ) + 10 :text.find('[COMMENT_GRADE]' )].strip() comment_grade = text[text.find('[COMMENT_GRADE]' ) + 15 :text.find('[COMMENT_TIME]' )].strip() comment_time = text[text.find('[COMMENT_TIME]' ) + 14 :].strip() data = { 'SPOT_NAME' : spot, 'LEVEL' : '' , 'LOCATION' : location, 'OPENTIME' : opentime, 'PHONE' : phone, 'INTRO' : intro, 'NOTICE' : '' , 'ST' : '' , 'GRADE' : grade, 'HOT' : hot, 'QTY' : sum_, 'EVALUATION' : comment, 'EVALUATION_GRADE' : comment_grade, 'EVALUATION_TIME' : comment_time, 'EVALUATION_NAME' : '' , 'FEEL_SCORE' : '' , 'FEEL' : '' } listDf.append(data) df = pd.DataFrame(listDf) df['ST' ] = df['INTRO' ].apply(lambda x: re.search(r'服务设施([\s\S]*)' , x, re.S).group(1 )) df['NOTICE' ] = df['INTRO' ].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施' , x, re.S).group(1 )) df['INTRO' ] = df['INTRO' ].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文' , x, re.S).group(1 )) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" ))
结论
发现python使用pandas效果出奇的好,不仅速度快,而且可以直接使用一些预测的模型,不需要scala那边搞好python这里再把表读出来再填充了。读取txt加一些处理大概也就一秒的时间,后期导入dm的话,也就是不到一面。
BUG
读取的时候直接报错, 一看信息就是编码的问题,去搜了一下python处理文件的编码问题只要在外层读取的open方法中设置encoding就好了,with open(file_path, ‘r’, encoding=“utf-8”) 。
日总结
今天算是得到个坏消息,基本一下子将之前辛辛苦苦写的代码全部作废了,以为hdfs上面的数据蛮大的,主要是格式的问题,也不打算再头铁强行搞了,本来清洗就不应该是三个语言的使用,不仅拖慢了速度,执行起来也很奇怪,要跑好几次。速度方面测试都不敢测,1w就花费了那么长时间,这一下就是百万,直接打算换成python,先测试了一下最难的txt,发现处理起来还是很简单的,虽然方法有点笨,但是好在txt文件格式都是固定的,都拿到了存成dataframe了,后面慢慢处理好再导入dm就行了。
清洗csv文件夹
学习日期: 6.9
项目任务
梳理数据
新给的数据中csv和xlsx的字段都是有改变的,所有要分析好字段,因为后端接口都写很多了,不能大改,但是有些提供的字段还是要添加的。计划spot景区表14个,评论表7个前后没有改变多少,后面的dataframe就按照这个格式慢慢处理。看了一些HDFS的数据大致分为4种,csv有两种,字段是有些许不一样的,xlsx又一种,也是有些字段有,有些没有,没有的字段就预测得出,txt是最少的。打算就是能预测的就预测处理填充,一些没发预测又缺失了,要么丢弃,要么不显示。
清洗csv代码
都先把数据处理成所有需要的字段了,都先添加上 ,然后首要任务,关联好景区id,不知道能不能group by分组,明天再研究一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import osimport pandas as pdfrom faker import Fakerf = Faker('zh_CN' ) def readCsvToDf (folder_path ): all_files = os.listdir(folder_path) csv_files = [file for file in all_files if file.endswith('.csv' )] dfs = [] for file in csv_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) dfs.append(df) return pd.concat(dfs, ignore_index=True ) def test (x ): stringApply = '' .join(str (x).split()).strip('\n' ) return stringApply def clearAllDf (merged_df ): merged_df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) merged_df_copy = merged_df.applymap(test).drop(columns=['TGA' ]) merged_df_copy['SPOT_NAME' ] = merged_df_copy['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) merged_df_copy.insert(9 ,'RATING' ,'' ) merged_df_copy.insert(9 ,'CITY_ID' ,'' ) merged_df_copy.insert(9 ,'HOT' ,'' ) return merged_df_copy def clearSpotDf (allDf ): allDf.insert(3 , 'PHONE' , [f.phone_number() for _ in range (allDf.iloc[:, 0 ].size)]) if __name__ == '__main__' : folder_qnecsv_path = "../data/csv/qnecsv" merged_df = readCsvToDf(folder_qnecsv_path) allDf = clearAllDf(merged_df)
BUG
在使用pandas 的时候,出现如下的警告。虽然不会影响程序的正常运行,但是看着就很烦。
1 2 A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead
搜索以后发现是因为我的dataframe是其他dataframe赋值的,也就是a = b ,我对a进行了修改,这样不太好,最好直接对b进行修改,解决方案有三个
解决方案:
新建一个dataframe,在新的上面进行操作。
在复制dataframe的时候,使用.copy()。
使用.loc来赋值
日总结
今天打算先初步把每种文件都试着读成dataframe,先将一个文件夹能洗成自己需要的四个表,也就是四个dataframe,后面换文件也就大差不大,目前思路梳理清除了先将id挂好,后面还是很容易的,因为之前对缺失值的填充的代码都做好了,只要再重新对hot填充一下就行了。从spark的机器学习算法改成pandas预测即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import requestsdef get_province (city_name ): ak = '你的百度地图API key' url = 'http://api.map.baidu.com/geocoder?address={}&output=json&ak={}' .format (city_name, ak) res = requests.get(url).json() if res['status' ] == 0 : province = res['result' ]['addressComponent' ]['province' ] else : province = None return province cities = ['金华市' , '苏州市' , '舟山市' , '武汉市' , '桐乡市' , '杭州市' , '无锡市' , '扬州市' , '恩施市' ] province = [get_province(city) for city in cities] df = pd.DataFrame({'city' : cities, 'province' : province}) print (df)
读取hdfs文件
学习日期: 6.12
项目任务
因为考虑到hdfs3使用不了,所以使用了hdfs这个库,这个库是基于web访问hdfs的,又考虑到本地的数据读取应该比hdfs快上不少,所以第一步直接下载下来。后面只要写自己本地路径处理文件就好了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import osfrom hdfs import Client, InsecureClientprint ("启动" )client = InsecureClient('http://hadoopb-namenode.damengb-zone.svc:9870' ) hdfs_csv_path = "/data/csv/qnecsv" csv_data_list = client.list (hdfs_path=hdfs_csv_path) local_path = "/home/PyCode/data/hdfscsv/qnecsv" print (len (csv_data_list))def download_folder (client, hdfs_path, local_path ): if client.status(hdfs_path)['type' ] == 'FILE' : client.download(hdfs_path, local_path) elif client.status(hdfs_path)['type' ] == 'DIRECTORY' : if not os.path.exists(local_path): os.makedirs(local_path) for file in client.list (hdfs_path): sub_hdfs_path = hdfs_path + '/' + file sub_local_path = local_path + '/' + file download_folder(client, sub_hdfs_path, sub_local_path) download_folder(client, "/data/csv/qnecsv" , "/home/PyCode/data/hdfscsv/qnecsv" ) download_folder(client, "/data/csv/xccsv" , "/home/PyCode/data/hdfscsv/xccsv" ) download_folder(client, "/data/txt" , "/home/PyCode/data/hdfstxt" ) download_folder(client, "/data/excel" , "/home/PyCode/data/hdfsexcel" )
BUG
使用hdfs3的时候,一直说找不到libhdfs3.so依赖,但是自己的确是装了,而且使用sys指定了都,但是都没用,搞一上午没解决,换成hdfs了。
日总结
今天真的搞了半天,没敲什么代码,从平台发放账户以后,这个平台一直没整明白,今天想着去把hdfs数据拿一下,当我在平台上ping提供好的说hdfs存放的位置的时候发现是好的,能ping通应该,就能拿,结果发现他那个9000端口一直说web丢失之类的,我又去测试自己虚拟机上面的hdfs,发现能取到,搜索说是hadoop的配置文件需要查看,但是数据是比赛方提供的,我们又不能操作他们的机子,后面就试着换换其他端口,结果就试出来了。
优化pandas效率
学习日期: 6.13
项目任务
hdfs取到以后发现数据是非常的大,而且每当我处理数据的时候,内存会超出,而且会自动把自己杀死,就必须要优化效率了,毕竟处理评论那边的代码雀氏是不太好,是使用for循环的,效率会低百倍。
优化以后的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from snownlp import SnowNLPimport numpy as npdef clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df.insert(2 , 'EVALUATION_GRADE' , '' ) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x + " IP属地: " + np.random.choice(province_list) if "IP" not in x else x) def calculate_scores (row ): if row.EVALUATION == "用户未点评。系统默认好评。" : score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if row.EVALUATION_GRADE == "" : row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df
总结一下如何优化pandas效率。
1. 选择合适的数据类型
在数据处理中,选择正确的数据类型有可能利用内存空间并改善性能。
例如:
DataFrame[‘column_name’] = DataFrame[‘column_name’].astype(‘category’)
对于数值型数据,尽量使用’int’类型,根据数值大小的不同,还可以选择’int64’, ‘int32’, ‘int16’, ‘int8’。
对于类别型数据,使用’category’类型可以节省大量内存。
对于时间序列数据,你可以使用Pandas自带的’datetime64[ns]'类型。
2. 利用向量化操作,尽可能避免循环
Pandas的基础包Numpy,对向量运算做了大量的优化。因此,在编写代码时,我们建议你尽可能使用向量的操作,而不是使用Python的for循环。
1 2 3 4 5 6 for idx in df.index: df.loc[idx, 'column_name' ] = df.loc[idx, 'column_name' ] + 1 df['column_name' ] = df['column_name' ] + 1
3. 使用inplace 参数可以避免复制
对于一些可以改变原始DataFrame对象的操作,例如:drop, sort_values。 可以设置参数’inplace=True’,来保证操作在原数据上进行,避免创建新的复制品。
4. 避免链式操作
尽量避免使用链式操作,如df[df[‘A’]>0][‘B’] = 1,这样的操作会返回DataFrame的副本而非视图,对副本的操作不会反映到原始数据上。
5. 使用.isin()
当我们需要筛选出某列中属于某个list里的元素行时,使用.isin()是最有效率的方式。
1 chosen_ones = df[df.column_name.isin(list_of_values)]
6. 使用.loc(), .at(),.iat()
对数据的访问和修改,loc和iloc函数速度相对于普通方法要快,at 和iat为访问单个元素提供了更快的方法。
7. 对于大数据集,优选fillna()和interpolate()
对于大的数据集,使用fillna和interpolate来处理缺失值会比使用dropna快很多。
BUG
无困难bug
日总结
对于Pandas的性能优化, 主要的方向是减少内存消耗和增加计算效率,通过选择合适的数据类型,避免不必要的循环,合理使用Pandas提供的函数和方法,都是实现优化的有效策略。在编写代码前,明确了解数据的类型,选择最佳的数据处理方式,都可以帮助优化Pandas的性能。今天优化了很久,才将时间缩减到能成功运行。后面测试导入dm了,又要重新配置环境。
平台测试导入dm
学习日期: 6.14
项目任务
总结了一套如何将数据导入dm,从平台安装开始。
四表整合
将几个文件全部整成同意的样式,这样后面分表的时候,代码就全部都能用了,也只用执行一遍即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 import osimport reimport timeimport globimport jsonimport randomimport timeimport osimport radarfrom snownlp import SnowNLPimport numpy as npimport pandas as pdimport arrowfrom faker import FakerfakerCn = Faker('zh_CN' ) order = ['SPOT_NAME' , 'LEVEL' , 'LOCATION' , 'GRADE' , 'QTY' , 'INTRO' , 'OPENTIME' , 'NOTICE' , 'ST' , 'HOT' , 'CITY_ID' , 'RATING' , 'SPOT_ID' , 'EVALUATION_NAME' , 'EVALUATION_TIME' , 'EVALUATION' ] dataType = {'SPOT_NAME' : str , 'LEVEL' : str , 'LOCATION' : str , 'GRADE' : float , 'QTY' : str , 'INTRO' : str , 'OPENTIME' : str , 'NOTICE' : str , 'ST' : str , 'HOT' : float , 'CITY_ID' : str , 'RATING' : float , 'SPOT_ID' : str , 'EVALUATION_NAME' : str , 'EVALUATION_TIME' : str , 'EVALUATION' : str } def clearToAllDf (merged_df ): merged_df_copy = merged_df.copy() merged_df_copy = merged_df_copy spot_names = merged_df_copy['SPOT_NAME' ].unique() spot_id_dict = {name: i + 1 for i, name in enumerate (spot_names)} merged_df_copy['SPOT_ID' ] = merged_df_copy['SPOT_NAME' ].apply(lambda x: spot_id_dict[x]) return merged_df_copy def csvqne (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.drop(columns=['TGA' ]).reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def csvxc (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def xlsx (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_excel(file_path, engine='openpyxl' ) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def txt (folder_path ): listDf = [] for file_path in glob.glob(os.path.join(folder_path, '*.txt' )): with open (file_path, 'r' , encoding="utf-8" ) as f: text = f.read() spot = text[text.find('[SPOT]' ) + 7 :text.find('[LOCATION]' )].strip() location = text[text.find('[LOCATION]' ) + 11 :text.find('[OPENTIME]' )].strip() opentime = text[text.find('[OPENTIME]' ) + 10 :text.find('[PHONE]' )].strip() phone = text[text.find('[PHONE]' ) + 7 :text.find('[INTRO]' )].strip() intro = text[text.find('[INTRO]' ) + 7 :text.find('[GRADE]' )].strip() grade = text[text.find('[GRADE]' ) + 7 :text.find('[HOT]' )].strip() hot = text[text.find('[HOT]' ) + 5 :text.find('[SUM]' )].strip() sum_ = text[text.find('[SUM]' ) + 5 :text.find('[COMMENT]' )].strip() comment = text[text.find('[COMMENT]' ) + 10 :text.find('[COMMENT_GRADE]' )].strip() comment_grade = text[text.find('[COMMENT_GRADE]' ) + 15 :text.find('[COMMENT_TIME]' )].strip() comment_time = text[text.find('[COMMENT_TIME]' ) + 14 :].strip() data = { 'SPOT_NAME' : spot, 'LEVEL' : np.nan, 'LOCATION' : location, 'OPENTIME' : opentime, 'PHONE' : phone, 'INTRO' : intro, 'NOTICE' : np.nan, 'ST' : np.nan, 'GRADE' : grade, 'HOT' : hot, 'QTY' : sum_, 'EVALUATION' : comment, 'EVALUATION_GRADE' : comment_grade, 'EVALUATION_TIME' : comment_time, 'EVALUATION_NAME' : fakerCn.user_name() } listDf.append(data) df = pd.DataFrame(listDf).reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) df['ST' ] = df['INTRO' ].apply(lambda x: re.search(r'服务设施([\s\S]*)' , x, re.S).group(1 )) df['NOTICE' ] = df['INTRO' ].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施' , x, re.S).group(1 )) df['INTRO' ] = df['INTRO' ].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文' , x, re.S).group(1 )) return df def dataTopkl (): csvqne_list = csvqne("../data/hdfscsv/qnecsv" ) csvxc_list = csvxc("../data/hdfscsv/xccsv" ) excel_list = xlsx('../data/hdfsexcel' ) txt_list = [txt(r'../data/hdfstxt' ).astype(dataType)] all_list = csvxc_list + csvqne_list + excel_list + txt_list all_df = pd.concat(all_list, ignore_index=True ) all_df.to_pickle('allTemp.pkl' ) if __name__ == '__main__' : a = time.time() dataTopkl() a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
BUG
将数据导入dm的时候,因为大致四种文件,格式都不太一样,介绍那块有个表是分三列,三列确实是最优的选择,原本txt文件分成三列已经写好了,但是发现在xlsx文件是用不了的,因为很多都是没有这些字段,导致无法使用,也咩有很好的解决办法,暂定那些无法处理的,就将其他两列先为空了,后面再想办法。
日总结
今天试将数据导入数据库,安装dm独有的依赖花费了很长一段时候,晚上总结了一下怎么linux系统重装,怕以后还用得到,今天大部分的时候都是在安装依赖,因为这平台对外限流,yum和pip都很慢,又将总表的数据绘制了一下,梳理了一下导入步骤,打算先将评论导入,然后是残缺的spot,因为如果spot有问题,读残缺版的就行了,这样效率会快很多,不用重新50w条重新导入。
服务器重装 评论表导入dm
学习日期: 6.15
项目任务
起因
今天跑评论表的时候发现实在是太慢了,检查top运行进程的时候发现,CPU和内存都是在100%状态,后面检查机器发现,最大限制在了1核,那怎么玩,就先暂停了,修改成了8G 4核,但是重新启动的时候,发现平台镜像又清空了,真受不了一点,又花了一上午重装。
服务器重装
按照昨天的步骤基本没有问题,版本换成了3.8.8,安装依赖的时候主要pandas1.3.5 就行了。
评论表导入dm
测试dataframe导入dm没有问题了。 值得注意的是,因为数据量有点大,to_sql一次进不去,就分批导入了,大概需要1分钟,不如不分批会直接报错
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 import timeimport warningsfrom sqlalchemy import create_engineimport numpy as npimport pandas as pdimport radarfrom snownlp import SnowNLPimport arrowfrom faker import FakerfakerCn = Faker('zh_CN' ) warnings.filterwarnings("ignore" , message="Could not import the lzma module." ) def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df.insert(2 , 'EVALUATION_GRADE' , '' ) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply( lambda x: str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) if x == "" else ( x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x)) def calculate_scores (row ): row.EVALUATION = '' .join(str (row.EVALUATION).split()) if (row.EVALUATION == "用户未点评。系统默认好评。" ) or (type (row.EVALUATION) != str ): score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if row.EVALUATION_GRADE == "" : row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df def provinceRandom () -> str : province = fakerCn.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 33 : province = fakerCn.province()[0 :2 ] if province not in province_list: province_list.append(province) return province_list def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@47.120.9.247:5237' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) if __name__ == '__main__' : a = time.time() provinces = provinceList() provinces.remove("黑龙" ) provinces.append("黑龙江" ) print ("省份筛选完成" ) allDf = pd.read_pickle('allClearData.pkl' ) comment_temp_df = clearSpotComment(allDf, provinces) comment_temp_df.to_pickle('commentFinalData.pkl' ) path = 'commentFinalData.pkl' comment_temp = pd.read_pickle(path) for col in comment_temp.select_dtypes(include=[object ]): comment_temp[col] = comment_temp[col].astype('string' ) chunkSize = 1000 for i in range (0 , len (comment_temp), chunkSize): data_chunk = comment_temp[i:i + chunkSize] intoDm(data_chunk, 'comment_info' ) a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
BUG
之前python一直莫名其妙的自己停止,一直推测是内存溢出的问题,但是服务器中内存是有12G的,不应该啊,自己程序内存撑死也就占用4个G,后来top查看实时运行进程的时候,发现有很多python3的任务还在跑,但是没有停止,占用了大量的内存,导致今天内存都崩了,print都不行。杀死就好了。
1 2 pkill -9 python3 echo 3 > /proc/sys/vm/drop_caches
进程如下
日总结
今天花点时间处理掉了最麻烦的评论表,之所以麻烦是因为评论这边需要的技术点是最多的,一些数据的填充都需要用到ai,数据挖掘,语言处理,进行缺失值的填充。而且数据量是最多的,因为我是策略是45w条一起的,所以会导致挺慢的。效果不错的,评论我大致分了7个字段,所有的nan缺失值都填充了,而且对语言情绪进行了分析。不出意外明天就能把hdfs的新数据,清洗完毕了。
数据清洗工作完成
学习日期: 6.16
项目任务
目前我分了四个文件,改天会合并成一个py文件,利用to_pickle处理成,pandas处理最快的方式,这样就不会在内存中占用资源,而是将数据放入磁盘中,不仅解决了内存超出问题,而且优化了速度。代码比较多就不放这篇笔记中了,会传到博客中,等比赛结束再公开。
level缺失值填充
用了回归模型,但是不是很准确,如果作为4A这样的景区评定的话,还是绰绰有余的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 level_dict = {'3A景区' : 1 , '4A景区' : 2 , '5A景区' : 3 } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df['LEVEL' ].fillna(-1 , inplace=True ) predict_data = df[df['LEVEL' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['LEVEL' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['LEVEL' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values, train_data['LEVEL' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values, test_data['LEVEL' ].values))prediction= lr.predict(predict_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values) prediction[prediction > 3 ] = 3 prediction[prediction < 1 ] = 1 predict_data['LEVEL' ] = prediction.round (0 ) df.update(predict_data) level_dict = {1 :'3A景区' , 2 :'4A景区' , 3 :'5A景区' } df['LEVEL' ] = df['LEVEL' ].map (level_dict)
BUG
修改表类型将text换成varchar的了以后,我再导入spot景区数据,发现报错了,一看就是string类型太短了,存不住景区介绍那几块,那几块换成了text,其他依旧varchar,就没问题了。
日总结
今天把spot和其他城市表和省份表都重新导入了,而且建立了备份文件,防止数据丢失,因为现在已经是在平台了,等后面清洗的时候大概只需要20秒。明天还是要整理一下自己的代码包,注释需要添加的添加一下,文档是需要写代码如何清洗和运行的,所以为了不繁琐,还是需要整理一下的,速度方面评论那部分实在是太大了,也没有很好的优化方案,就暂定如此了。如果有空再修改一下。
景区评论接口修改
学习日期: 6.19
项目任务
添加所有景区的评论接口
这个其实挺简单的,根据spot_id查询对应景区的所有评论就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @app.route('/spot/commentAll/<spot_id>' , methods=['GET' ] ) def spot_comments_from_id (spot_id ): data = dmToDfpkuseg.spotAllComment(eval (spot_id)) return json.dumps({'msg' : '查询成功' , 'code' : 200 , 'data' : data}, ensure_ascii=False ) def spotAllComment (spot_id ): cur.execute(""" select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO where SPOT_ID={} """ .format (spot_id)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" ,"evaluation_name" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
修改景区接口
因为增加了字段,加上一些不需要的词要加到停用词中,又进行处理了一下。添加了停用词,模型又训练了一遍
1 2 3 4 5 6 7 8 9 10 11 12 def keyWordFindComment (spot_id, comment_key_word ): cur.execute(""" select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO where SPOT_ID={} and EVALUATION like '%{}%' """ .format (spot_id, comment_key_word)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" , "evaluation_name" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
BUG
跑py代码的时候,服务器报错,看起来是连接的问题,之前好像是遇到过,然后我用driver工具测试,发现连接不上数据库了,去数据库的服务器查看,发现是服务死了,前台启动以后报错解决。
日总结
今天主要是对评论部分接口的修改以及添加,python跑一遍flask接口的确有点慢,是先加载再调用的,所以启动python的接口我就需要七八分钟,也是因为数据量大的原因,没有做优化,调用的速度还是很快的,所以就没有做优化,分词的效果还需要慢慢调整。
代码整合注释添加
学习日期: 6.20
项目任务
代码整合
整合的目录如下,清洗的代码还是分两部来跑了,不然内存会崩。先执行alldataToPkl,将所有hdfs数据保存到本地然后再合成一个allClearData.pkl,然后allDataClearTableIntoDm就是将所有数据拆分成四个表。
代码如下
因为代码很长,博客中会代码展开栏的,为了博客的显示,就直接将代码放入了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 import osimport jsonimport timeimport warningsfrom sqlalchemy import create_engineimport numpy as npimport pandas as pdimport radarfrom snownlp import SnowNLP, sentimentimport arrowfrom faker import Fakerfrom sklearn.linear_model import LinearRegressionfakerCn = Faker('zh_CN' ) pd.set_option('mode.chained_assignment' , None ) warnings.filterwarnings("ignore" , message="Could not import the lzma module." ) sentiment.load("sentiment.marshal" ) def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply( lambda x: str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) if x == "" else ( x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x)) def calculate_scores (row ): row.EVALUATION = '' .join(str (row.EVALUATION).split()) if (row.EVALUATION == "用户未点评。系统默认好评。" ) or (type (row.EVALUATION) != str ): score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if pd.isna(row.EVALUATION_GRADE): print (row.EVALUATION_GRADE) row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df def provinceList (): province_set = set () while len (province_set) <= 33 : province = fakerCn.province()[0 :2 ] if province == "黑龙" : province = "黑龙江" province_set.add(province) return list (province_set) def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@47.120.9.36:5236' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) def cityFindProvince (cityName ): with open ("../data/city.json" , "r" , encoding="utf-8" ) as f: content: dict = json.load(f) for province_name, city_name_datas in content.items(): for city_name_data in city_name_datas: for city_name, county_name_list in city_name_data.items(): if city_name == cityName: return province_name else : for county_name in county_name_list: if county_name == cityName: return province_name return "未知省" def clearSpotProvince (cityWithProvinceDf ): cityWithProvinceDf['PROVINCE_NAME' ] = cityWithProvinceDf['CITY_NAME' ].apply(cityFindProvince) province_dict = {name: rank + 1 for rank, name in enumerate (cityWithProvinceDf['PROVINCE_NAME' ].unique())} cityWithProvinceDf['PROVINCE_ID' ] = cityWithProvinceDf['PROVINCE_NAME' ].apply(lambda x: province_dict[x]) province_temp_df = cityWithProvinceDf.copy()[['PROVINCE_NAME' , 'PROVINCE_ID' ]].drop_duplicates() cityWithProvinceDf.drop(columns=['PROVINCE_NAME' ], inplace=True ) return province_temp_df def clearSpotDf (allInfoDf ): spot_df = allInfoDf[['SPOT_NAME' , 'LEVEL' , 'LOCATION' , 'GRADE' , 'QTY' , 'INTRO' , 'OPENTIME' , 'NOTICE' , 'ST' , 'HOT' , 'CITY_ID' , 'RATING' , 'SPOT_ID' ]].drop_duplicates().copy() spot_df.insert(3 , 'PHONE' , [fakerCn.phone_number() for _ in range (spot_df.iloc[:, 0 ].size)]) return spot_df def clearSpotCity (spotWithCityDf ): spotWithCityDf['CITY_NAME' ] = \ spotWithCityDf['LOCATION' ].str .extract('([^市]{2}()+市)' )[ 0 ].fillna('未知市' ) city_id_dict = {city_name: rank + 1 for rank, city_name in enumerate (spotWithCityDf['CITY_NAME' ].unique())} spotWithCityDf['CITY_ID' ] = spotWithCityDf['CITY_NAME' ].apply(lambda x: city_id_dict[x]) city_temp_df = spotWithCityDf.copy()[['CITY_NAME' , 'CITY_ID' ]].drop_duplicates() spotWithCityDf.drop(columns=['CITY_NAME' ], inplace=True ) return city_temp_df def keep_row_with_less_missing_values_and_less_qty (group ): missing_values = group.isnull().sum (axis=1 ) qty_sum = group['QTY' ] combined = pd.DataFrame({'Missing' : missing_values, 'QTY' : qty_sum}) combined_sorted = combined.sort_values(by=['Missing' , 'QTY' ]) return group.loc[combined_sorted.index[0 ]] def clearSpotFinal (df ): try : df['QTY' ] = df['QTY' ].str [:-3 ].astype(int ) except ValueError: print ("评论数字段包含一些无法转换为数字的值!" ) df['HOT' ].fillna(-1 , inplace=True ) predict_data = df[df['HOT' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['HOT' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['HOT' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' ]].values, train_data['HOT' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' ]].values, test_data['HOT' ].values)) prediction = lr.predict(predict_data[['GRADE' , 'QTY' ]].values) prediction[prediction > 10 ] = 10 prediction[prediction < 0 ] = 0 predict_data['HOT' ] = prediction.round (1 ) df.update(predict_data) duplicated_mask = df.duplicated(subset='SPOT_NAME' , keep=False ) duplicated_rows = df[duplicated_mask] new_rows = duplicated_rows.groupby('SPOT_NAME' ).apply(keep_row_with_less_missing_values_and_less_qty) non_duplicated_rows = df[~duplicated_mask] result = pd.concat([non_duplicated_rows, new_rows]) max_comment_sum = result['QTY' ].max () result['RATING' ] = round ((result['GRADE' ] * 6 ) + (result['HOT' ] * 4 ) + (result['QTY' ] / max_comment_sum * 30 ), 2 ) result.to_pickle('allClearDataSpot.pkl' ) df = pd.read_pickle('allClearDataSpot.pkl' ) level_dict = {'3A景区' : 1 , '4A景区' : 2 , '5A景区' : 3 } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df['LEVEL' ].fillna(-1 , inplace=True ) predict_data = df[df['LEVEL' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['LEVEL' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['LEVEL' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values, train_data['LEVEL' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values, test_data['LEVEL' ].values)) prediction = lr.predict(predict_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values) prediction[prediction > 3 ] = 3 prediction[prediction < 1 ] = 1 predict_data['LEVEL' ] = prediction.round (0 ) df.update(predict_data) level_dict = {1 : '3A景区' , 2 : '4A景区' , 3 : '5A景区' } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df.to_pickle('spotFinalData.pkl' ) df[df.select_dtypes(include=['object' ]).columns] = df.select_dtypes(include=['object' ]).astype('string' ) if __name__ == '__main__' : a = time.time() province_city = pd.read_csv("../data/GT_CITYS.csv" ,encoding = 'gbk' ) intoDm(province_city,"GT_CITYS" ) provinces = provinceList() print ("省份筛选完成" ) allDf = pd.read_pickle('allClearData.pkl' ) comment_temp_df = clearSpotComment(allDf, provinces) path = 'commentFinalData.pkl' comment_temp_df.to_pickle(path) print ("开始导入" ) comment_temp = pd.read_pickle(path) comment_temp[comment_temp.select_dtypes(include=['object' ]).columns] = comment_temp.select_dtypes(include=['object' ]).astype('string' ) for col in comment_temp.select_dtypes(include=[object ]): comment_temp[col] = comment_temp[col].astype('string' ) chunkSize = 5200 for i in range (0 , len (comment_temp), chunkSize): data_chunk = comment_temp[i:i + chunkSize] intoDm(data_chunk, 'comment_info' ) spot_info = clearSpotDf(allDf) spot_info.to_pickle('allClearSpot.pkl' ) city_df = clearSpotCity(spot_info) intoDm(city_df, 'city_info' ) province_df = clearSpotProvince(city_df) intoDm(province_df, 'province_info' ) spot_final_info = pd.read_pickle('allClearSpot.pkl' ).reset_index().drop(columns=['index' ]) clearSpotFinal(spot_final_info) intoDm(spot_final_info, "spot_info" ) a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
BUG
报出编码错误,已解决,问题点在pycharm编译器的问题,我刚开始是用windows中的python跑的,不管怎样都会报这个错误,然后就想到是不是之前编译器设置的时候utf-8的问题,导致没法处理gbk的一些数据,就换了平台中的python编译器,最后运行成功了。
日总结
今日的任务就是梳理清晰部分以及接口部分的代码,因为要求注释需要25%,之前其实已经达到了,因为代码有些部分比较冗余,修改封装一下,接口nohup 挂载一下,清洗需要两次,看后面性能,可以放到一个里面,处理评论表实在是太慢了,单单评论表需要20分钟,其他加起来不超过一分钟。后期写文档的时候看看能不能优化。