1 2 3 4 5 6 7 8 {% tabs 分栏%} <!-- tab BUG1@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> <!-- tab BUG2@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> {% endtabs %}
学习日期: 6.1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import pandas as pdfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleTypefrom commentDmTest.DmConnect import *os.environ["PYSPARK_PYTHON" ] = "E:\\software\\anaconda3\\envs\\pyspark\\python.exe" conn = connDm() cur = conn.cursor() cur.execute(""" with t1 as (select SPOT_ID, round(avg(FEEL_SCORE), 3) as avgScore from COMMENT_INFO_NLP group by SPOT_ID) select SPOT_NAME, GRADE, HOT, t2.SPOT_ID, replace(SUM, substring(SUM, locate('条', SUM)), '') as sum, avgScore from SPOT_INFO t2 join t1 on t1.SPOT_ID=t2.SPOT_ID """ )spot_info_df = pd.DataFrame(cur.fetchall(), columns=['SPOT_NAME' , 'GRADE' , 'HOT' , 'SPOT_ID' , 'SUM' , 'AVG' ]) cur.execute(""" select MEDIAN(convert(int,t1.sum)) avgCommentSum from (select replace(SUM, substring(SUM, locate('条', SUM)), '') as sum from SPOT_INFO) t1 """ )avg_comment_sum = cur.fetchall()[0 ][0 ] spot_info_df['SUM' ] = spot_info_df['SUM' ].apply(pd.to_numeric) max_comment_sum = spot_info_df['SUM' ].max () def process_row (r ): grade_count = r.GRADE if r.GRADE else 0.0 hot_count = r.HOT if r.HOT else 0.0 sum_count = r.SUM if r.SUM else avg_comment_sum avg_count = r.AVG if r.AVG else 0.0 grade_score = 0.2 * grade_count / 5 * 100 if grade_count <= 5 else 20.0 hit_score = 0.4 * hot_count / 10 * 100 if hot_count <= 10 else 40.0 sum_score = 0.2 * sum_count / max_comment_sum * 100 if sum_count <= max_comment_sum else 20 avg_score = 0.2 * avg_count / 5 * 100 if avg_count <= 5 else 20.0 rating = round (grade_score, 2 ) + round (hit_score, 2 ) + round (sum_score, 2 ) + round (avg_score, 2 ) return r.SPOT_ID, r.SPOT_NAME, round (rating, 2 ) spark = connSpark() schema = StructType([ StructField("SPOT_NAME" , StringType(), True ), StructField("GRADE" , DoubleType(), True ), StructField("HOT" , DoubleType(), True ), StructField("SPOT_ID" , IntegerType(), True ), StructField("SUM" , DoubleType(), True ), StructField("AVG" , DoubleType(), True ), ]) spot_info_df_spark = spark.createDataFrame(spot_info_df, schema=schema) spot_info_df_spark.printSchema() rating_rdd = spot_info_df_spark.rdd.map (process_row) rating_rdd = rating_rdd.map (lambda x: (int (x[0 ]), str (x[1 ]), float (x[2 ]))) rating_df = rating_rdd.toDF(["spot_id_rat" , "spot_name_rat" , "RATING" ]) spot_into = dmRead(spark, "SPOT_INFO" ) need_col: list = spot_into.toPandas().columns.values.tolist() need_col.append("RATING" ) spot_final = rating_df.join(spot_into, rating_df.spot_id_rat == spot_into.SPOT_ID) \ .selectExpr(need_col) dmWrite(spot_final, "SPOT_INFO" )
1 2 3 4 5 6 7 8 9 10 11 12 13 def keyWordFindComment (spot_id, comment_key_word ): cur.execute(""" select spot_id, evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO_NLP where SPOT_ID={} and EVALUATION like '%{}%' """ .format (spot_id, comment_key_word)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
1 2 3 4 5 6 7 8 9 10 11 @app.route('/spot/comment/<spot_id>' , methods=['POST' , 'GET' ] ) def spot_comments_high_word_find (spot_id ): if request.method != 'POST' : return json.dumps({'msg' : '查询失败,请使用POST传参' , 'code' : 500 , 'data' : 'null' }, ensure_ascii=False ) keyWord = request.args.to_dict() data = dmToDfpkuseg.keyWordFindComment(eval (spot_id), keyWord.get('keyWord' )) if len (data) == 0 : return json.dumps({'msg' : '查询失败,没有该景区关键字对应的评论' , 'code' : 500 , 'data' : 'null' }, ensure_ascii=False ) return json.dumps({'msg' : '查询成功' , 'code' : 200 , 'data' : data}, ensure_ascii=False )
学习日期: 6.2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package clearDfimport clearDf.Utils .{dmWriterOverwrite, getSpark}import clearDf.fileClearTest.{csvToDf, xlsxToDf}import org.apache.spark.sql.functions._import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.{DataFrame , SparkSession }object finalClearOne { def main (args: Array [String ]): Unit = { val spark: SparkSession = getSpark import spark.implicits._ import spark.sql sql(" set spark.executor.processTreeMetrics=true" ) val frame2: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\2.csv" ) val frame3: DataFrame = xlsxToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\3.xlsx" ) val frame4: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\4.csv" ) val frameSmallFill: DataFrame = frame2.unionByName(frame3) .withColumn("GRADE" ,round(substring($"GRADE" ,1 ,2 ).cast(IntegerType ) / 20 ,1 )) .withColumn("PHONE" ,lit("" )) .withColumn("INTRO" ,lit("" )) .withColumn("HOT" ,lit("" )) .withColumn("COMMENT_GRADE" ,lit("" )) .withColumn("COMMENT_TIME" ,lit("" )) val frameAllData:DataFrame = frame4.unionByName(frameSmallFill) dmWriterOverwrite(frameAllData,"U7U7.All_TEMP" ) import clearTxt.txtClearArrayList txtClearArrayList.txtIntoDmData.main() } }
学习日期: 6.5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 package clearDfimport clearDf.Utils .{dmRead, dmWriterOverwrite, getSpark}import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.regression.{RandomForestRegressionModel , RandomForestRegressor }import org.apache.spark.sql.expressions.{Window , WindowSpec }import org.apache.spark.sql.functions._import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.{DataFrame , SparkSession }object finalClearTwo { val spark: SparkSession = getSpark import spark.implicits._ import spark.sql def main (args: Array [String ]): Unit = { finalClearOne.main(args) val all_data: DataFrame = dmRead(spark, "ALL_TEMP" ) .withColumn("SPOT" , regexp_replace($"SPOT" , "\\?" , "" )) .where("SPOT <> 'null'" ) val spot_info: DataFrame = spotInfoClear(all_data) val comment_info: DataFrame = commentWithSpot(all_data, spot_info) println(comment_info.count()) val city_temp:DataFrame = cityInfoClear(spot_info) dmWriterOverwrite(city_temp,"city_info_temp" ) val spot_info_temp: DataFrame = spotWithCityId(spot_info, city_temp) val create_temp_city: DataFrame = city_temp .withColumn("PROVINCE_NAME" , lit("" )) .where("CITY_ID=0" ) dmWriterOverwrite(create_temp_city,"CITY_TEMP" ) import clearTxt.jsTest.jsonToDataTest jsonToDataTest.provinceNameIntoCity() val city_temp_two:DataFrame = dmRead(spark, "CITY_TEMP" ) val province_data: DataFrame = createToProvince(city_temp_two) dmWriterOverwrite(province_data,"PROVINCE_INFO" ) val city_info_final:DataFrame = cityJoinProvince(city_temp_two,province_data) dmWriterOverwrite(city_info_final,"CITY_INFO" ) val spot_Prediction: DataFrame = spotPredictionFinalTable(spot_info_temp) dmWriterOverwrite(spot_Prediction, "spot_info_temp" ) } def spotPredictionFinalTable (spot_info_temp: DataFrame ): DataFrame ={ val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID" ,"GRADE" ,"HOT" ,"SUM" ) .withColumn("GRADE" ,$"GRADE" .cast(DoubleType )) .withColumn("HOT" ,$"HOT" .cast(DoubleType )) .withColumn("SUM" ,regexp_extract($"SUM" ,"[^条]+()" ,0 ).cast(DoubleType )) val data2: DataFrame = spot_info_temp .withColumn("HOT" ,$"HOT" .cast(DoubleType )) val meanValues: Double = data.agg(round(avg(col("HOT" )),1 )).head().getDouble(0 ) val filledData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembler: VectorAssembler = new VectorAssembler () .setInputCols(Array ("GRADE" , "SUM" )) .setOutputCol("features" ) val assembledData: DataFrame = assembler.transform(filledData) val rf: RandomForestRegressor = new RandomForestRegressor () .setLabelCol("HOT" ) .setFeaturesCol("features" ) val model: RandomForestRegressionModel = rf.fit(assembledData) val predictedData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembledPredictedData: DataFrame = assembler.transform(predictedData) val filledPredictedData: DataFrame = model.transform(assembledPredictedData) .withColumn("prediction" , round(col("prediction" ), 1 )) val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID" ,"prediction" ).withColumnRenamed("SPOT_ID" ,"id" ) data2.join(predict,predict("id" ) === data2("SPOT_ID" )) .withColumn("HOT" , when(col("HOT" ).isNull, col("prediction" )).otherwise(col("hot" ))) .withColumn("HOT" ,round($"HOT" ,1 )) .withColumn("GRADE" ,round($"GRADE" ,1 )) .drop("id" ,"prediction" ) } def cityJoinProvince (city_data:DataFrame ,province_data:DataFrame ): DataFrame ={ val city_temp: DataFrame = city_data.join(province_data, city_data("PROVINCE_NAME" ) === province_data("NAME" )) .selectExpr("CITY_NAME" , "CITY_ID" , "PROVINCE_ID" ) city_temp } def createToProvince (city_data:DataFrame ): DataFrame ={ val province_data: DataFrame = city_data .selectExpr("PROVINCE_NAME" ) .distinct() .withColumnRenamed("PROVINCE_NAME" , "NAME" ) .withColumn("PROVINCE_ID" , row_number() over Window .orderBy($"NAME" )) province_data } def spotInfoClear (all_data: DataFrame ): DataFrame = { val SPOT_ID_ORDER : WindowSpec = Window .orderBy($"SPOT_NAME" .desc) val spot_info_temp: DataFrame = all_data .withColumnRenamed("SPOT" , "SPOT_NAME" ) .drop("COMMENT" ) .drop("COMMENT_GRADE" ) .drop("COMMENT_TIME" ) .distinct() .withColumn("SPOT_ID" , row_number() over SPOT_ID_ORDER ) .where("SPOT_ID<>9" ) .withColumn("SPOT_ID" , row_number() over SPOT_ID_ORDER ) spot_info_temp } def commentWithSpot (all_data: DataFrame , spot_data: DataFrame ): DataFrame = { val comment_info_temp: DataFrame = all_data.join(spot_data, all_data("SPOT" ) === spot_data("SPOT_NAME" )) .selectExpr("SPOT_ID" , "COMMENT" , "COMMENT_GRADE" , "COMMENT_TIME" ) .withColumnRenamed("COMMENT" , "EVALUATION" ) .withColumnRenamed("COMMENT_GRADE" , "EVALUATION_GRADE" ) .withColumnRenamed("COMMENT_TIME" , "EVALUATION_TIME" ) .where("EVALUATION<>'NULL'" ).distinct() .withColumn("EVALUATION_ID" , row_number() over Window .orderBy($"SPOT_ID" )) comment_info_temp } def cityInfoClear (spot_info:DataFrame ): DataFrame ={ spot_info.createOrReplaceTempView("spot_info" ) val city_final_frame: DataFrame = sql( """ |select |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^市]{2}()+市)"),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as CITY_NAME |from |spot_info |""" .stripMargin) .distinct() .withColumn("CITY_ID" , row_number() over Window .orderBy($"CITY_NAME" .desc)) city_final_frame } def spotWithCityId (spot_info:DataFrame ,city_temp:DataFrame ): DataFrame ={ spot_info.createOrReplaceTempView("spot_info" ) city_temp.createOrReplaceTempView("city_info" ) val spot_test: DataFrame = sql( s"" " |with t1 as ( |select |*, |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION," ([^市]{2 }()+市)"),regexp_extract(LOCATION," ([^省]+省)?([^自治州]+自治州)", 2)) as NAME |from |spot_info) |select |t1.*, |ci.CITY_ID |from |t1 |join city_info ci on ci.CITY_NAME = t1.NAME |" "" .stripMargin).drop("NAME" ) spot_test } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def spotPredictionFinalTable (spot_info_temp: DataFrame ): DataFrame ={ val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID" ,"GRADE" ,"HOT" ,"SUM" ) .withColumn("GRADE" ,$"GRADE" .cast(DoubleType )) .withColumn("HOT" ,$"HOT" .cast(DoubleType )) .withColumn("SUM" ,regexp_extract($"SUM" ,"[^条]+()" ,0 ).cast(DoubleType )) val data2: DataFrame = spot_info_temp .withColumn("HOT" ,$"HOT" .cast(DoubleType )) val meanValues: Double = data.agg(round(avg(col("HOT" )),1 )).head().getDouble(0 ) val filledData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembler: VectorAssembler = new VectorAssembler () .setInputCols(Array ("GRADE" , "SUM" )) .setOutputCol("features" ) val assembledData: DataFrame = assembler.transform(filledData) val rf: RandomForestRegressor = new RandomForestRegressor () .setLabelCol("HOT" ) .setFeaturesCol("features" ) val model: RandomForestRegressionModel = rf.fit(assembledData) val predictedData: DataFrame = data.na.fill(meanValues, Seq ("HOT" )) val assembledPredictedData: DataFrame = assembler.transform(predictedData) val filledPredictedData: DataFrame = model.transform(assembledPredictedData) .withColumn("prediction" , round(col("prediction" ), 1 )) val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID" ,"prediction" ).withColumnRenamed("SPOT_ID" ,"id" ) data2.join(predict,predict("id" ) === data2("SPOT_ID" )) .withColumn("HOT" , when(col("HOT" ).isNull, col("prediction" )).otherwise(col("hot" ))) .withColumn("HOT" ,round($"HOT" ,1 )) .withColumn("GRADE" ,round($"GRADE" ,1 )) .drop("id" ,"prediction" ) }
学习日期: 6.6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import randomimport pandas as pdimport radarfrom faker import Fakerfrom finalMain.DmConnect import connDmfrom snownlp import SnowNLPfrom snownlp import sentimentimport osfrom sqlalchemy import create_enginedef provinceRandom () -> str : f = Faker('zh_CN' ) province = f.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 26 : province = provinceRandom() if province not in province_list: province_list.append(province) return province_list def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA@' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) def commentDataFinalIntoDm (): data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'snownlpDrill/sentiment.marshal' ) sentiment.load(data_path) conn = connDm() cur = conn.cursor() cur.execute("select * from COMMENT_TEMP" ) comment_list: list = cur.fetchall() comment_df = pd.DataFrame(comment_list, columns=["SPOT_ID" , "EVALUATION" , "EVALUATION_GRADE" , "EVALUATION_TIME" , "EVALUATION_ID" ]) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' province_list = provinceList() for index in list (comment_df.index): comment_values = comment_df['EVALUATION' ] comment_value = comment_values.loc[index] if comment_df['EVALUATION_TIME' ].loc[index] == "" : random_index = random.randrange(len (province_list)) comment_df.iloc[index - 1 , 3 ] = str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) + "IP属地: " + \ province_list[random_index] score = round (SnowNLP(comment_value).sentiments, 2 ) feelScore = round (score * 5 , 1 ) comment_df.iloc[index - 1 , 5 ] = feelScore feel = scoreSnow(score) comment_df.iloc[index - 1 , 6 ] = feel if comment_df['EVALUATION_GRADE' ].loc[index] == "" : comment_df.iloc[index - 1 , 2 ] = str (int (round (feelScore, 0 ))) + "分 " + feel intoDm(comment_df, "COMMENT_INFO" ) commentDataFinalIntoDm()
今日任务主要是优化,然后测试了多次,从空表直接执行一次spark代码一次py代码就将数据处理好并存入dm的数据库中了。今天主要是收获是发现了一个很好用的能填充随机参数了类,python faker库能随机生成很多需要的个人信息或者编码之类的挺多数据的,而且格式可以调,还是很不错的。
学习日期: 6.7
使用spark-submit提交jar包读取发现有部分乱码,仔细看了看都是在后面的数据,好像就是3000条,那就是java洗的,去找java那部分代码的格式问题,发现读txt文件的时候就已经乱码了,起初还以为是导入数据库才乱码,还修改了url结果是IO流的问题reader = new InputStreamReader(file, StandardCharsets.UTF_8);读文件的时候加编码格式就行了。
学习日期: 6.8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 import randomimport pandas as pdimport reimport osimport radarfrom faker import Fakerfrom hdfs3 import HDFileSystemimport globfrom snownlp import SnowNLPdef scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def provinceRandom () -> str : f = Faker('zh_CN' ) province = f.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 26 : province = provinceRandom() if province not in province_list: province_list.append(province) return province_list def hdfsFileRead (): hdfs = HDFileSystem(host='hadoopb-namenode.damengb-zone.svc' , port=9000 ) file_list = hdfs.ls('/data/txt' ) hdfsConfig = hdfsFileRead() hdfs = hdfsConfig[0 ] file_list = hdfsConfig[1 ] for f in file_list: with hdfs.open (f) as file: text = file.read() return hdfs, file_list def LoadFileRead (): folder_path = r'/data/txt' file_names = os.listdir(folder_path) return file_names def clearSpot (): spot_df = df[df.columns[0 :11 ]].drop_duplicates() spot_df['rating' ] = 4 * spot_df['GRADE' ].astype(float ) + 4 * spot_df['HOT' ].astype(float ) if __name__ == '__main__' : listDf = [] folder_path = r'../data/txt' for file_path in glob.glob(os.path.join(folder_path, '*.txt' )): with open (file_path, 'r' , encoding="utf-8" ) as f: text = f.read() spot = text[text.find('[SPOT]' ) + 7 :text.find('[LOCATION]' )].strip() location = text[text.find('[LOCATION]' ) + 11 :text.find('[OPENTIME]' )].strip() opentime = text[text.find('[OPENTIME]' ) + 10 :text.find('[PHONE]' )].strip() phone = text[text.find('[PHONE]' ) + 7 :text.find('[INTRO]' )].strip() intro = text[text.find('[INTRO]' ) + 7 :text.find('[GRADE]' )].strip() grade = text[text.find('[GRADE]' ) + 7 :text.find('[HOT]' )].strip() hot = text[text.find('[HOT]' ) + 5 :text.find('[SUM]' )].strip() sum_ = text[text.find('[SUM]' ) + 5 :text.find('[COMMENT]' )].strip() comment = text[text.find('[COMMENT]' ) + 10 :text.find('[COMMENT_GRADE]' )].strip() comment_grade = text[text.find('[COMMENT_GRADE]' ) + 15 :text.find('[COMMENT_TIME]' )].strip() comment_time = text[text.find('[COMMENT_TIME]' ) + 14 :].strip() data = { 'SPOT_NAME' : spot, 'LEVEL' : '' , 'LOCATION' : location, 'OPENTIME' : opentime, 'PHONE' : phone, 'INTRO' : intro, 'NOTICE' : '' , 'ST' : '' , 'GRADE' : grade, 'HOT' : hot, 'QTY' : sum_, 'EVALUATION' : comment, 'EVALUATION_GRADE' : comment_grade, 'EVALUATION_TIME' : comment_time, 'EVALUATION_NAME' : '' , 'FEEL_SCORE' : '' , 'FEEL' : '' } listDf.append(data) df = pd.DataFrame(listDf) df['ST' ] = df['INTRO' ].apply(lambda x: re.search(r'服务设施([\s\S]*)' , x, re.S).group(1 )) df['NOTICE' ] = df['INTRO' ].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施' , x, re.S).group(1 )) df['INTRO' ] = df['INTRO' ].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文' , x, re.S).group(1 )) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" ))
读取的时候直接报错, 一看信息就是编码的问题,去搜了一下python处理文件的编码问题只要在外层读取的open方法中设置encoding就好了,with open(file_path, ‘r’, encoding=“utf-8”) 。
学习日期: 6.9
都先把数据处理成所有需要的字段了,都先添加上 ,然后首要任务,关联好景区id,不知道能不能group by分组,明天再研究一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import osimport pandas as pdfrom faker import Fakerf = Faker('zh_CN' ) def readCsvToDf (folder_path ): all_files = os.listdir(folder_path) csv_files = [file for file in all_files if file.endswith('.csv' )] dfs = [] for file in csv_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) dfs.append(df) return pd.concat(dfs, ignore_index=True ) def test (x ): stringApply = '' .join(str (x).split()).strip('\n' ) return stringApply def clearAllDf (merged_df ): merged_df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) merged_df_copy = merged_df.applymap(test).drop(columns=['TGA' ]) merged_df_copy['SPOT_NAME' ] = merged_df_copy['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) merged_df_copy.insert(9 ,'RATING' ,'' ) merged_df_copy.insert(9 ,'CITY_ID' ,'' ) merged_df_copy.insert(9 ,'HOT' ,'' ) return merged_df_copy def clearSpotDf (allDf ): allDf.insert(3 , 'PHONE' , [f.phone_number() for _ in range (allDf.iloc[:, 0 ].size)]) if __name__ == '__main__' : folder_qnecsv_path = "../data/csv/qnecsv" merged_df = readCsvToDf(folder_qnecsv_path) allDf = clearAllDf(merged_df)
在使用pandas 的时候,出现如下的警告。虽然不会影响程序的正常运行,但是看着就很烦。
1 2 A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead
搜索以后发现是因为我的dataframe是其他dataframe赋值的,也就是a = b ,我对a进行了修改,这样不太好,最好直接对b进行修改,解决方案有三个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import requestsdef get_province (city_name ): ak = '你的百度地图API key' url = 'http://api.map.baidu.com/geocoder?address={}&output=json&ak={}' .format (city_name, ak) res = requests.get(url).json() if res['status' ] == 0 : province = res['result' ]['addressComponent' ]['province' ] else : province = None return province cities = ['金华市' , '苏州市' , '舟山市' , '武汉市' , '桐乡市' , '杭州市' , '无锡市' , '扬州市' , '恩施市' ] province = [get_province(city) for city in cities] df = pd.DataFrame({'city' : cities, 'province' : province}) print (df)
学习日期: 6.12
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import osfrom hdfs import Client, InsecureClientprint ("启动" )client = InsecureClient('http://hadoopb-namenode.damengb-zone.svc:9870' ) hdfs_csv_path = "/data/csv/qnecsv" csv_data_list = client.list (hdfs_path=hdfs_csv_path) local_path = "/home/PyCode/data/hdfscsv/qnecsv" print (len (csv_data_list))def download_folder (client, hdfs_path, local_path ): if client.status(hdfs_path)['type' ] == 'FILE' : client.download(hdfs_path, local_path) elif client.status(hdfs_path)['type' ] == 'DIRECTORY' : if not os.path.exists(local_path): os.makedirs(local_path) for file in client.list (hdfs_path): sub_hdfs_path = hdfs_path + '/' + file sub_local_path = local_path + '/' + file download_folder(client, sub_hdfs_path, sub_local_path) download_folder(client, "/data/csv/qnecsv" , "/home/PyCode/data/hdfscsv/qnecsv" ) download_folder(client, "/data/csv/xccsv" , "/home/PyCode/data/hdfscsv/xccsv" ) download_folder(client, "/data/txt" , "/home/PyCode/data/hdfstxt" ) download_folder(client, "/data/excel" , "/home/PyCode/data/hdfsexcel" )
学习日期: 6.13
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from snownlp import SnowNLPimport numpy as npdef clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df.insert(2 , 'EVALUATION_GRADE' , '' ) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x + " IP属地: " + np.random.choice(province_list) if "IP" not in x else x) def calculate_scores (row ): if row.EVALUATION == "用户未点评。系统默认好评。" : score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if row.EVALUATION_GRADE == "" : row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df
1. 选择合适的数据类型
DataFrame[‘column_name’] = DataFrame[‘column_name’].astype(‘category’)
对于数值型数据,尽量使用’int’类型,根据数值大小的不同,还可以选择’int64’, ‘int32’, ‘int16’, ‘int8’。
2. 利用向量化操作,尽可能避免循环
1 2 3 4 5 6 for idx in df.index: df.loc[idx, 'column_name' ] = df.loc[idx, 'column_name' ] + 1 df['column_name' ] = df['column_name' ] + 1
3. 使用inplace 参数可以避免复制
对于一些可以改变原始DataFrame对象的操作,例如:drop, sort_values。 可以设置参数’inplace=True’,来保证操作在原数据上进行,避免创建新的复制品。
4. 避免链式操作
尽量避免使用链式操作,如df[df[‘A’]>0][‘B’] = 1,这样的操作会返回DataFrame的副本而非视图,对副本的操作不会反映到原始数据上。
5. 使用.isin()
1 chosen_ones = df[df.column_name.isin(list_of_values)]
6. 使用.loc(), .at(),.iat()
对数据的访问和修改,loc和iloc函数速度相对于普通方法要快,at 和iat为访问单个元素提供了更快的方法。
7. 对于大数据集,优选fillna()和interpolate()
对于Pandas的性能优化, 主要的方向是减少内存消耗和增加计算效率,通过选择合适的数据类型,避免不必要的循环,合理使用Pandas提供的函数和方法,都是实现优化的有效策略。在编写代码前,明确了解数据的类型,选择最佳的数据处理方式,都可以帮助优化Pandas的性能。今天优化了很久,才将时间缩减到能成功运行。后面测试导入dm了,又要重新配置环境。
学习日期: 6.14
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 import osimport reimport timeimport globimport jsonimport randomimport timeimport osimport radarfrom snownlp import SnowNLPimport numpy as npimport pandas as pdimport arrowfrom faker import FakerfakerCn = Faker('zh_CN' ) order = ['SPOT_NAME' , 'LEVEL' , 'LOCATION' , 'GRADE' , 'QTY' , 'INTRO' , 'OPENTIME' , 'NOTICE' , 'ST' , 'HOT' , 'CITY_ID' , 'RATING' , 'SPOT_ID' , 'EVALUATION_NAME' , 'EVALUATION_TIME' , 'EVALUATION' ] dataType = {'SPOT_NAME' : str , 'LEVEL' : str , 'LOCATION' : str , 'GRADE' : float , 'QTY' : str , 'INTRO' : str , 'OPENTIME' : str , 'NOTICE' : str , 'ST' : str , 'HOT' : float , 'CITY_ID' : str , 'RATING' : float , 'SPOT_ID' : str , 'EVALUATION_NAME' : str , 'EVALUATION_TIME' : str , 'EVALUATION' : str } def clearToAllDf (merged_df ): merged_df_copy = merged_df.copy() merged_df_copy = merged_df_copy spot_names = merged_df_copy['SPOT_NAME' ].unique() spot_id_dict = {name: i + 1 for i, name in enumerate (spot_names)} merged_df_copy['SPOT_ID' ] = merged_df_copy['SPOT_NAME' ].apply(lambda x: spot_id_dict[x]) return merged_df_copy def csvqne (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.drop(columns=['TGA' ]).reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def csvxc (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_csv(file_path) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def xlsx (folder_path ): all_files = os.listdir(folder_path) dfs = [] for file in all_files: file_path = os.path.join(folder_path, file) df = pd.read_excel(file_path, engine='openpyxl' ) df.dropna(subset=['COMMENT' ], inplace=True ) df.rename(columns={'SPOT' : 'SPOT_NAME' , 'CO_TIME' : 'EVALUATION_TIME' , 'CO_NAME' : 'EVALUATION_NAME' , 'COMMENT' : 'EVALUATION' }, inplace=True ) df = df.reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) dfs.append(df) return dfs def txt (folder_path ): listDf = [] for file_path in glob.glob(os.path.join(folder_path, '*.txt' )): with open (file_path, 'r' , encoding="utf-8" ) as f: text = f.read() spot = text[text.find('[SPOT]' ) + 7 :text.find('[LOCATION]' )].strip() location = text[text.find('[LOCATION]' ) + 11 :text.find('[OPENTIME]' )].strip() opentime = text[text.find('[OPENTIME]' ) + 10 :text.find('[PHONE]' )].strip() phone = text[text.find('[PHONE]' ) + 7 :text.find('[INTRO]' )].strip() intro = text[text.find('[INTRO]' ) + 7 :text.find('[GRADE]' )].strip() grade = text[text.find('[GRADE]' ) + 7 :text.find('[HOT]' )].strip() hot = text[text.find('[HOT]' ) + 5 :text.find('[SUM]' )].strip() sum_ = text[text.find('[SUM]' ) + 5 :text.find('[COMMENT]' )].strip() comment = text[text.find('[COMMENT]' ) + 10 :text.find('[COMMENT_GRADE]' )].strip() comment_grade = text[text.find('[COMMENT_GRADE]' ) + 15 :text.find('[COMMENT_TIME]' )].strip() comment_time = text[text.find('[COMMENT_TIME]' ) + 14 :].strip() data = { 'SPOT_NAME' : spot, 'LEVEL' : np.nan, 'LOCATION' : location, 'OPENTIME' : opentime, 'PHONE' : phone, 'INTRO' : intro, 'NOTICE' : np.nan, 'ST' : np.nan, 'GRADE' : grade, 'HOT' : hot, 'QTY' : sum_, 'EVALUATION' : comment, 'EVALUATION_GRADE' : comment_grade, 'EVALUATION_TIME' : comment_time, 'EVALUATION_NAME' : fakerCn.user_name() } listDf.append(data) df = pd.DataFrame(listDf).reindex(columns=order) df['SPOT_NAME' ] = df['SPOT_NAME' ].apply(lambda x: x.replace("?" , "" )) df['ST' ] = df['INTRO' ].apply(lambda x: re.search(r'服务设施([\s\S]*)' , x, re.S).group(1 )) df['NOTICE' ] = df['INTRO' ].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施' , x, re.S).group(1 )) df['INTRO' ] = df['INTRO' ].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文' , x, re.S).group(1 )) return df def dataTopkl (): csvqne_list = csvqne("../data/hdfscsv/qnecsv" ) csvxc_list = csvxc("../data/hdfscsv/xccsv" ) excel_list = xlsx('../data/hdfsexcel' ) txt_list = [txt(r'../data/hdfstxt' ).astype(dataType)] all_list = csvxc_list + csvqne_list + excel_list + txt_list all_df = pd.concat(all_list, ignore_index=True ) all_df.to_pickle('allTemp.pkl' ) if __name__ == '__main__' : a = time.time() dataTopkl() a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
服务器重装 评论表导入dm
学习日期: 6.15
今天跑评论表的时候发现实在是太慢了,检查top运行进程的时候发现,CPU和内存都是在100%状态,后面检查机器发现,最大限制在了1核,那怎么玩,就先暂停了,修改成了8G 4核,但是重新启动的时候,发现平台镜像又清空了,真受不了一点,又花了一上午重装。
按照昨天的步骤基本没有问题,版本换成了3.8.8,安装依赖的时候主要pandas1.3.5 就行了。
测试dataframe导入dm没有问题了。 值得注意的是,因为数据量有点大,to_sql一次进不去,就分批导入了,大概需要1分钟,不如不分批会直接报错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 import timeimport warningsfrom sqlalchemy import create_engineimport numpy as npimport pandas as pdimport radarfrom snownlp import SnowNLPimport arrowfrom faker import FakerfakerCn = Faker('zh_CN' ) warnings.filterwarnings("ignore" , message="Could not import the lzma module." ) def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df.insert(2 , 'EVALUATION_GRADE' , '' ) comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply( lambda x: str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) if x == "" else ( x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x)) def calculate_scores (row ): row.EVALUATION = '' .join(str (row.EVALUATION).split()) if (row.EVALUATION == "用户未点评。系统默认好评。" ) or (type (row.EVALUATION) != str ): score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if row.EVALUATION_GRADE == "" : row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df def provinceRandom () -> str : province = fakerCn.province() if province.find("省" ) != -1 or province.find("市" ) != -1 : return province[0 :-1 ] return provinceRandom() def provinceList (): province_list = [] while len (province_list) <= 33 : province = fakerCn.province()[0 :2 ] if province not in province_list: province_list.append(province) return province_list def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) if __name__ == '__main__' : a = time.time() provinces = provinceList() provinces.remove("黑龙" ) provinces.append("黑龙江" ) print ("省份筛选完成" ) allDf = pd.read_pickle('allClearData.pkl' ) comment_temp_df = clearSpotComment(allDf, provinces) comment_temp_df.to_pickle('commentFinalData.pkl' ) path = 'commentFinalData.pkl' comment_temp = pd.read_pickle(path) for col in comment_temp.select_dtypes(include=[object ]): comment_temp[col] = comment_temp[col].astype('string' ) chunkSize = 1000 for i in range (0 , len (comment_temp), chunkSize): data_chunk = comment_temp[i:i + chunkSize] intoDm(data_chunk, 'comment_info' ) a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
1 2 pkill -9 python3 echo 3 > /proc/sys/vm/drop_caches
学习日期: 6.16
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 level_dict = {'3A景区' : 1 , '4A景区' : 2 , '5A景区' : 3 } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df['LEVEL' ].fillna(-1 , inplace=True ) predict_data = df[df['LEVEL' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['LEVEL' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['LEVEL' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values, train_data['LEVEL' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values, test_data['LEVEL' ].values))prediction= lr.predict(predict_data[['GRADE' , 'QTY' ,'HOT' ,'RATING' ]].values) prediction[prediction > 3 ] = 3 prediction[prediction < 1 ] = 1 predict_data['LEVEL' ] = prediction.round (0 ) df.update(predict_data) level_dict = {1 :'3A景区' , 2 :'4A景区' , 3 :'5A景区' } df['LEVEL' ] = df['LEVEL' ].map (level_dict)
学习日期: 6.19
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @app.route('/spot/commentAll/<spot_id>' , methods=['GET' ] ) def spot_comments_from_id (spot_id ): data = dmToDfpkuseg.spotAllComment(eval (spot_id)) return json.dumps({'msg' : '查询成功' , 'code' : 200 , 'data' : data}, ensure_ascii=False ) def spotAllComment (spot_id ): cur.execute(""" select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO where SPOT_ID={} """ .format (spot_id)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" ,"evaluation_name" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
1 2 3 4 5 6 7 8 9 10 11 12 def keyWordFindComment (spot_id, comment_key_word ): cur.execute(""" select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO where SPOT_ID={} and EVALUATION like '%{}%' """ .format (spot_id, comment_key_word)) list_test = cur.fetchall() df_test = pd.DataFrame(list_test, columns=["spot_id" , "evaluation_name" , "evaluation" , "evaluation_grade" , "evaluation_time" , "feel_score" , "feel" ]) return eval (df_test.to_json(orient='records' , force_ascii=False ).replace("null" , "'未填'" ))
学习日期: 6.20
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 import osimport jsonimport timeimport warningsfrom sqlalchemy import create_engineimport numpy as npimport pandas as pdimport radarfrom snownlp import SnowNLP, sentimentimport arrowfrom faker import Fakerfrom sklearn.linear_model import LinearRegressionfakerCn = Faker('zh_CN' ) pd.set_option('mode.chained_assignment' , None ) warnings.filterwarnings("ignore" , message="Could not import the lzma module." ) sentiment.load("sentiment.marshal" ) def scoreSnow (sentiments ): if 0.9 <= sentiments <= 1 : feelMood = "超棒" elif 0.8 <= sentiments <= 0.9 : feelMood = "满意" elif 0.5 <= sentiments <= 0.8 : feelMood = "不错" elif 0.2 <= sentiments <= 0.5 : feelMood = "一般" else : feelMood = "不佳" return feelMood def clearSpotComment (allInfoDf, province_list ): print ("评论表开始处理" ) comment_df = allInfoDf[allInfoDf.columns[12 :]].copy() comment_df['FEEL_SCORE' ] = '' comment_df['FEEL' ] = '' order = ['SPOT_ID' , 'EVALUATION' , 'EVALUATION_GRADE' , 'EVALUATION_TIME' , 'EVALUATION_NAME' , 'FEEL_SCORE' , 'FEEL' ] comment_df = comment_df[order] comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply(lambda x: x.replace("/" , "-" )) comment_df['EVALUATION_TIME' ] = comment_df['EVALUATION_TIME' ].apply( lambda x: str (radar.random_date("2020-09-13" , "2023-02-02" ).date()) if x == "" else ( x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x)) def calculate_scores (row ): row.EVALUATION = '' .join(str (row.EVALUATION).split()) if (row.EVALUATION == "用户未点评。系统默认好评。" ) or (type (row.EVALUATION) != str ): score = 1.0 else : score = round (SnowNLP(row.EVALUATION).sentiments, 2 ) feelScore = round (score * 5 , 1 ) feel = scoreSnow(score) if pd.isna(row.EVALUATION_GRADE): print (row.EVALUATION_GRADE) row.EVALUATION_GRADE = str (int (round (feelScore, 0 ))) + "分 " + feel row.FEEL_SCORE = feelScore row.FEEL = feel return row comment_df = comment_df.apply(calculate_scores, axis=1 ) print ("评论表结束处理" ) return comment_df def provinceList (): province_set = set () while len (province_set) <= 33 : province = fakerCn.province()[0 :2 ] if province == "黑龙" : province = "黑龙江" province_set.add(province) return list (province_set) def intoDm (data, tableName ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False ) def cityFindProvince (cityName ): with open ("../data/city.json" , "r" , encoding="utf-8" ) as f: content: dict = json.load(f) for province_name, city_name_datas in content.items(): for city_name_data in city_name_datas: for city_name, county_name_list in city_name_data.items(): if city_name == cityName: return province_name else : for county_name in county_name_list: if county_name == cityName: return province_name return "未知省" def clearSpotProvince (cityWithProvinceDf ): cityWithProvinceDf['PROVINCE_NAME' ] = cityWithProvinceDf['CITY_NAME' ].apply(cityFindProvince) province_dict = {name: rank + 1 for rank, name in enumerate (cityWithProvinceDf['PROVINCE_NAME' ].unique())} cityWithProvinceDf['PROVINCE_ID' ] = cityWithProvinceDf['PROVINCE_NAME' ].apply(lambda x: province_dict[x]) province_temp_df = cityWithProvinceDf.copy()[['PROVINCE_NAME' , 'PROVINCE_ID' ]].drop_duplicates() cityWithProvinceDf.drop(columns=['PROVINCE_NAME' ], inplace=True ) return province_temp_df def clearSpotDf (allInfoDf ): spot_df = allInfoDf[['SPOT_NAME' , 'LEVEL' , 'LOCATION' , 'GRADE' , 'QTY' , 'INTRO' , 'OPENTIME' , 'NOTICE' , 'ST' , 'HOT' , 'CITY_ID' , 'RATING' , 'SPOT_ID' ]].drop_duplicates().copy() spot_df.insert(3 , 'PHONE' , [fakerCn.phone_number() for _ in range (spot_df.iloc[:, 0 ].size)]) return spot_df def clearSpotCity (spotWithCityDf ): spotWithCityDf['CITY_NAME' ] = \ spotWithCityDf['LOCATION' ].str .extract('([^市]{2}()+市)' )[ 0 ].fillna('未知市' ) city_id_dict = {city_name: rank + 1 for rank, city_name in enumerate (spotWithCityDf['CITY_NAME' ].unique())} spotWithCityDf['CITY_ID' ] = spotWithCityDf['CITY_NAME' ].apply(lambda x: city_id_dict[x]) city_temp_df = spotWithCityDf.copy()[['CITY_NAME' , 'CITY_ID' ]].drop_duplicates() spotWithCityDf.drop(columns=['CITY_NAME' ], inplace=True ) return city_temp_df def keep_row_with_less_missing_values_and_less_qty (group ): missing_values = group.isnull().sum (axis=1 ) qty_sum = group['QTY' ] combined = pd.DataFrame({'Missing' : missing_values, 'QTY' : qty_sum}) combined_sorted = combined.sort_values(by=['Missing' , 'QTY' ]) return group.loc[combined_sorted.index[0 ]] def clearSpotFinal (df ): try : df['QTY' ] = df['QTY' ].str [:-3 ].astype(int ) except ValueError: print ("评论数字段包含一些无法转换为数字的值!" ) df['HOT' ].fillna(-1 , inplace=True ) predict_data = df[df['HOT' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['HOT' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['HOT' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' ]].values, train_data['HOT' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' ]].values, test_data['HOT' ].values)) prediction = lr.predict(predict_data[['GRADE' , 'QTY' ]].values) prediction[prediction > 10 ] = 10 prediction[prediction < 0 ] = 0 predict_data['HOT' ] = prediction.round (1 ) df.update(predict_data) duplicated_mask = df.duplicated(subset='SPOT_NAME' , keep=False ) duplicated_rows = df[duplicated_mask] new_rows = duplicated_rows.groupby('SPOT_NAME' ).apply(keep_row_with_less_missing_values_and_less_qty) non_duplicated_rows = df[~duplicated_mask] result = pd.concat([non_duplicated_rows, new_rows]) max_comment_sum = result['QTY' ].max () result['RATING' ] = round ((result['GRADE' ] * 6 ) + (result['HOT' ] * 4 ) + (result['QTY' ] / max_comment_sum * 30 ), 2 ) result.to_pickle('allClearDataSpot.pkl' ) df = pd.read_pickle('allClearDataSpot.pkl' ) level_dict = {'3A景区' : 1 , '4A景区' : 2 , '5A景区' : 3 } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df['LEVEL' ].fillna(-1 , inplace=True ) predict_data = df[df['LEVEL' ] == -1 ] listData = [] for _ in range (10 ): listData.append(df[df['LEVEL' ] != -1 ].sample(frac=0.8 )) enhance = pd.concat(listData, ignore_index=True ) lr = LinearRegression() train_data = enhance.sample(frac=0.8 ) test_data = df[df['LEVEL' ] != -1 ] lr.fit(train_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values, train_data['LEVEL' ].values) print ('模型得分: ' , lr.score(test_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values, test_data['LEVEL' ].values)) prediction = lr.predict(predict_data[['GRADE' , 'QTY' , 'HOT' , 'RATING' ]].values) prediction[prediction > 3 ] = 3 prediction[prediction < 1 ] = 1 predict_data['LEVEL' ] = prediction.round (0 ) df.update(predict_data) level_dict = {1 : '3A景区' , 2 : '4A景区' , 3 : '5A景区' } df['LEVEL' ] = df['LEVEL' ].map (level_dict) df.to_pickle('spotFinalData.pkl' ) df[df.select_dtypes(include=['object' ]).columns] = df.select_dtypes(include=['object' ]).astype('string' ) if __name__ == '__main__' : a = time.time() province_city = pd.read_csv("../data/GT_CITYS.csv" ,encoding = 'gbk' ) intoDm(province_city,"GT_CITYS" ) provinces = provinceList() print ("省份筛选完成" ) allDf = pd.read_pickle('allClearData.pkl' ) comment_temp_df = clearSpotComment(allDf, provinces) path = 'commentFinalData.pkl' comment_temp_df.to_pickle(path) print ("开始导入" ) comment_temp = pd.read_pickle(path) comment_temp[comment_temp.select_dtypes(include=['object' ]).columns] = comment_temp.select_dtypes(include=['object' ]).astype('string' ) for col in comment_temp.select_dtypes(include=[object ]): comment_temp[col] = comment_temp[col].astype('string' ) chunkSize = 5200 for i in range (0 , len (comment_temp), chunkSize): data_chunk = comment_temp[i:i + chunkSize] intoDm(data_chunk, 'comment_info' ) spot_info = clearSpotDf(allDf) spot_info.to_pickle('allClearSpot.pkl' ) city_df = clearSpotCity(spot_info) intoDm(city_df, 'city_info' ) province_df = clearSpotProvince(city_df) intoDm(province_df, 'province_info' ) spot_final_info = pd.read_pickle('allClearSpot.pkl' ).reset_index().drop(columns=['index' ]) clearSpotFinal(spot_final_info) intoDm(spot_final_info, "spot_info" ) a1 = time.time() finalTime = arrow.get(a1 - a).format ('mm分:ss秒:SSS毫秒' ) print ("执行速度" , finalTime)
今日的任务就是梳理清晰部分以及接口部分的代码,因为要求注释需要25%,之前其实已经达到了,因为代码有些部分比较冗余,修改封装一下,接口nohup 挂载一下,清洗需要两次,看后面性能,可以放到一个里面,处理评论表实在是太慢了,单单评论表需要20分钟,其他加起来不超过一分钟。后期写文档的时候看看能不能优化。