模版
学习日期:
所学内容概述
BUG
日总结
1 2 3 4 5 6 7 8 {% tabs 分栏%} <!-- tab BUG1@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> <!-- tab BUG2@fas fa-bomb --> Any content (support inline tags too). <!-- endtab --> {% endtabs %}
csv导入DM8
学习日期: 5.4
所学内容概述
今天试着把csv文件读取到dm8数据库中,所实现的步骤如下:
连接dm8数据库并尝试读取库中的表
1 2 3 4 5 6 <dependency > <groupId > com.dameng</groupId > <artifactId > Dm8JdbcDriver18</artifactId > <version > 8.1.0.157</version > </dependency >
1 2 3 val spark: SparkSession = SparkSession .builder().master("local[*]" ).appName("test" ) .config("spark.sql.shuffle.partitions" ,"500" ) .getOrCreate()
连接到dm8并读取表 和之前连接clickhouse是差不多的改了driver和url而已
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 case class JDBC_CONN (url:String ,conn:Properties )def jdbc_Conn (): JDBC_CONN ={ val url = "jdbc:dm://47.120.9.247:5236?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&rewriteBatchedStatements=true" val driver = "dm.jdbc.driver.DmDriver" val user = "SYSDBA" val password = "SYSDBA" val properties = new Properties () properties.put("driver" ,driver) properties.put("user" ,user) properties.put("password" ,password) JDBC_CONN (url,properties) } def dmRead (sparkSession: SparkSession ,TableName :String ): DataFrame ={ val jdbcConn: JDBC_CONN = jdbc_Conn() sparkSession.read.jdbc(jdbcConn.url, "CC." +TableName ,jdbcConn.conn) }
读取csv文件转换为df
这个比较简单网上很多步骤的,难点在编码转换的问题,下面的BUG也都列出来了
1 2 3 4 5 6 7 8 9 10 11 val fileData: DataFrame = spark .read .format(typeData) .option("timestampFormat" , "yyyy/MM/dd HH:mm:ss ZZ" ) .option("header" , "true" ) .option("multiLine" ,value = true ) .option("encoding" ,"GBK" ) .load(s"file:///$filePath " ) fileData.show()
df导入dm8
1 2 3 4 5 6 def dmWriter (dataFrame: DataFrame , TableName :String ): Unit ={ val jdbcconn: JDBC_CONN = jdbc_Conn() dataFrame.write.option("isolationLevel" ,"READ_COMMITTED" ).mode("overwrite" ) .jdbc(jdbcconn.url,"CC." +TableName ,jdbcconn.conn) }
遇到的问题
一个就是编码的问题
还有一个是url一般在端口后面是指定数据库的,但是dm8似乎不行,我在5236后面跟/CC,读取CC表的时候,还是会显示视图不存在,要读到CC库里面的表要CC.表名
BUG
今天的BUG的编码格式的连环BUG全部列在下面了
BUG1
spark读取csv文件的时候中文会乱码,搜索后发现中软杯提供的csv文件的编码不是utf-8的,而自己IDEA默认显示的是utf-8。是编码不统一,在load文件的时候将编码修改成gbk就行了。.option(“encoding”,“gbk”)
BUG2
上面的BUG修改完以后变成了这样,发现好像是换行的问题,因为INTRO字段是有换行的,而且数据挺多的,所以需要加允许换行的配置应该,去搜了一下.option(“multiLine”,value = true) //换行
BUG3
换行以后又发现乱码了,不换行乱码,换行就乱码很奇怪,然后去搜问题,发现是我虽然修改了编码为gbk,但是项目的编码确实utf8,导致乱码,解决步骤如下:
IDEA的File中setting的File Encodings将编码统一为GBK,然后右下角的utf-8也改为GBK,问题解决了。
日总结
今天的任务主要是把csv文件用spark能试着导入dm8,还没尝试处理,先把数据能放进去我觉得是第一步,然后后面涉及到清洗的,再慢慢搞了,csv文件导入还是挺方便的直接根据你列自动导入到数据库,而且字段都分好了。修编码的bug花费了点时间,现在的思路就是先把四个样数据先全部放数据库中,然后再用spark把字段处理以后放其他数据库。
txt和xlsx导入DM8
学习日期: 5.5
所学内容概述
将提供的xlsx和txt文件夹导入数据库
XLSX转DF
刚开始以为是和csv差不多修改一下format参数即可,毕竟csv和xlsx自己都是用execl打开的就看起来一样,但是经过自己查找资料发现,xlsx是属于xlsx文件,但是csv不算,只是能用xlsx打开,并帮你格式处理好。
然后就出现了报错,网上读取spark读取xlsx的方法有很多,经过一上午的测试才找到合适的方法。
方法如下:
首先导入能操作execl的依赖,注意自己的scala和spark的版本,scala版本是多少下面execl_的版本就要对应好,然后下面的version,去官网看,一般11版本的是0.12 12版本的是0.14,自己Maven中spark的依赖,一定要和自己电脑装的spark对应好。
1 2 3 4 5 <dependency > <groupId > com.crealytics</groupId > <artifactId > spark-excel_2.12</artifactId > <version > 0.14.0</version > </dependency >
然后直接转df就好了,和csv操作时候,还是有很多参数不同的,自行分辨一下,主要添加了.option(“useHeader”, “true”)因为execl表嘛,都是有列名的。读xlsx的时候就不需用编码转换了,好像只有csv是需要统一gbk的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def xlsxToDf (sparkSession: SparkSession , filePath: String ): DataFrame = { val fileData: DataFrame = sparkSession .read .format("com.crealytics.spark.excel" ) .option("header" , "true" ) .option("path" , filePath) .option("useHeader" , "true" ) .option("multiLine" , value = true ) .load() fileData.show() fileData }
TXT转DF
测试了一下发现只能文件读取,而且转dataFrame不太好做到,只能单独的取出key和value就花费了我接下来剩下的所有时间,思路理清楚了打算用map,按照[和],切割成一个二维集合,然后 内层集合中0是key,1是value,思路清晰以后,发现rdd代码map操作的时候,如果要map进去,外层的rdd会变成一个集合,自动将换行成集合的一个索引值。RDD[String]处理的话,正则就用不了了,因为是一行的,那只能用split按照左右括号切割两次,但是这样拿出来的key和value不是同时的,那也就要将key和value存集合,然后再试着直接value加入数据库,始终无法转成df,spark直接将字符串传数据库也不太好,所以今天是没解决的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def txtToDf (sparkSession: SparkSession ): Unit ={ import sparkSession.implicits._ val textFile: RDD [String ] = sparkSession.sparkContext.textFile("E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\1\\0099511525-7779-4759-b423-796c1dcebb87.txt" ) textFile.toDF().show() val value: RDD [Array [String ]] = textFile.map { x: String => x.split('[') } val strings: Array [String ] = value.collect()(0 ) println(strings.mkString("Array(" , ", " , ")" )) val value1: RDD [Array [Array [String ]]] = value.map { x: Array [String ] => x.map { y: String => y.split("]" ) } } val array: Array [Array [Array [String ]]] = value1.collect() }
BUG
看这个报错的信息是VM内存超出了,之前李昊跟我说过自己电脑跑太多磁盘会超,然后就跑不了,只能把跟IDEA有关的进程全部杀了重新开一次,清一下磁盘缓存。
这个报错是说xlsx无法处理,去搜了以后发现是需要加依赖,然后format里面的参数也是不一样的。
我一模一样跟别人步骤来出现了这样的报错,发现如果读取xlsx文件是需要加一个依赖,但是一旦加了那个依赖,我自己原本对csv读取的程序都会报错,仔细看了一下,感觉是版本问题,他是scala2.11的,按照官方文档修改了版本以后就出现了下面的报错,网上也没有相同的案例,我还是怀疑是依赖的问题,然后我看之前学习spark的视频发现他说spark3.0就对应好scala2.12就行,不然可能会有问题,我就看了自己电脑的spark版本,发现是3.0的,装的scala是2.12.0,但是我Maven依赖不是这样,我就把依赖版本全部改了,报错解决,顺利读取到xlsx
日总结
今天的任务本来以为很快能完成的,毕竟之前csv放数据库中已经操作好了,以为只是随便调一下参数就行了,但是从简单的xlsx开始发现就出现了各种报错,最后搞好了以后,发现txt的那个文件夹竟然也是数据,就想先试着一个txt文件能不能转df然后导入数据库中,写了一天发现难以实现,请教了高人指点,大数据老师也做不出来,另寻它法了。
TXT导入DM8
学习日期: 5.7
所学内容概述
因为这个的难点还是很多的,捋一下自己的解决思路
解决思路
首先自己肯定是打算用spark抽txt转dataFrame的,还是无法实现,具体原因在昨天的
TXT转DF 中已经将清楚。
然后就想其他办法,我的第二种思路是使用java的IO流,将txt读取,封装成实体类,然后通过scala调用封装好的实体类转成toDF,再导入数据库。问题如下:
Java用IO将除了要作为字段的[xx]的value读取出来以后,封装到实体类中,但是封装是运行java的,在scala中读取该类依然是没数据。因为java运行完毕,实体类相当于生命周期就被杀死了。
scala可以调用java.main来执行java,但是后面用scala读java的类的时候还是null,如此只能另寻它法了。
第三种思路也挺清晰的:读取文件夹获取文件夹内每个文件,将txt文件一行一行的读,读要自己需要的作为字段名的那一行,将下一行之前收集的value添加到集合中,以此如此,最后跳出循环以后将最后一行也放入集合,这样自己所需要的字段全部就取出来了,封装到了集合中,然后每个txt一个集合,最后封装到一个集合,也就是该方法是将文件夹收集为二维集合。循环获取内层集合,根据索引作为字段写sql,最后执行导入就行了。
第四种思路就是:直接用Java将txt文件封装到类中,也是对上面方法的一种优化,一个txt封装一个类,通过反射自动根据key,调用对应的set方法将value封装,封装完成的对象放list中。问题如下:
正则不太熟,写的正则刚开始是匹配"\[(.*?)]"匹配[内的字段],作为key,然后在达到下面的value全部收集,前面500个文件还正常,但是后面报错,发现是评论中有email表情,他刚好是两个[]里面,就会报错,因为反射匹配不到。
上面a的是能解决的,在正则匹配的觉得有没有find 的同时,加入equals匹配自己需要的字段,解决以后导入表和第三种使用集合是差不多一样的。
读取文件夹封装为二维集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package clearTxt.txtClearArrayList;import java.sql.Statement;import java.util.ArrayList;public class txtIntoDmData { public static void main (String[] args) throws Exception { Statement state = ConDm.connectDm(); ArrayList<ArrayList<String>> arrayLists = readFileArray.readFileToArray(); System.out.println(arrayLists.size()); int count = 0 ; for (ArrayList<String> arrayList : arrayLists) { String sql_insert1 = "insert into CC.TXT(SPOT, LOCATION, OPENTIME, PHONE, INTRO, GRADE, HOT, SUM, \"COMMENT\", COMMENT_GRADE, COMMENT_TIME) " + "values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" ; String sql = String.format(sql_insert1, "'" + arrayList.get(0 ) + "'" , "'" + arrayList.get(1 ) + "'" , "'" + arrayList.get(2 ) + "'" , "'" + arrayList.get(3 ) + "'" , "'" + arrayList.get(4 ) + "'" , "'" + arrayList.get(5 ) + "'" , "'" + arrayList.get(6 ) + "'" , "'" + arrayList.get(7 ) + "'" , "'" + arrayList.get(8 ) + "'" , "'" + arrayList.get(9 ) + "'" , "'" + arrayList.get(10 ) + "'" ); state.execute(sql); count++; } System.out.println("插入成功,插入了" + count + "条数据" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package clearTxt.txtClearArrayList;import java.io.*;import java.util.ArrayList;public class readFileArray { public static ArrayList<ArrayList<String>> readFileToArray () throws IOException { File fileList = new File ("E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\1" ); String[] fileNameList = fileList.list(); String path = fileList.getPath(); assert fileNameList != null ; BufferedReader buffReader = null ; InputStreamReader reader = null ; ArrayList<ArrayList<String>> arrayLists = new ArrayList <>(); for (String fileName : fileNameList) { ArrayList<String> list = new ArrayList <>(); InputStream file = new FileInputStream (path + "\\" + fileName); reader = new InputStreamReader (file); buffReader = new BufferedReader (reader); StringBuilder txtContent = new StringBuilder (); String strTmp = "" ; while ((strTmp = buffReader.readLine()) != null ) { if (strTmp.equals("[SPOT]" ) || strTmp.equals("[LOCATION]" ) || strTmp.equals("[OPENTIME]" ) || strTmp.equals("[PHONE]" ) || strTmp.equals("[INTRO]" ) || strTmp.equals("[GRADE]" ) || strTmp.equals("[HOT]" ) || strTmp.equals("[SUM]" ) || strTmp.equals("[COMMENT]" ) || strTmp.equals("[COMMENT_GRADE]" ) || strTmp.equals("[COMMENT_TIME]" )) { if (!txtContent.toString().equals("" )) { list.add(txtContent.toString()); txtContent = new StringBuilder (); } } else { if (strTmp.equals("" )) { txtContent.append("\n\n" ); } else { txtContent.append(strTmp); } } } list.add(txtContent.toString()); arrayLists.add(list); } assert reader != null ; reader.close(); buffReader.close(); return arrayLists; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package clearTxt.txtClearArrayList;import java.sql.Connection ;import java.sql.DriverManager ;import java.sql.Statement ;public class ConDm { public static Statement connectDm() throws Exception { Connection con; String cname = "dm.jdbc.driver.DmDriver" ; String url = "jdbc:dm://47.120.9.247:5236" ; String userid = "SYSDBA" ; String pwd = "SYSDBA" ; Statement state; Class .forName(cname); con = DriverManager .getConnection(url, userid, pwd); state = con.createStatement(); return state; } }
读取文件夹封装为实体类集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package clearTxt.txtClearClass;import clearTxt.txtClearArrayList.ConDm;import java.sql.Statement;import java.util.List;public class txtIntoDmDataPlus { public static void main (String[] args) throws Exception{ Statement state = ConDm.connectDm(); int count=0 ; List<TxtSpot> txtSpots = TestFileClass.readFileToClass(); System.out.println("一共" +txtSpots.size()+"条数据" ); for (TxtSpot txtSpot:txtSpots){ String sql_insert1 = "insert into CC.TXT_copy1(SPOT, LOCATION, OPENTIME, PHONE, INTRO, GRADE, HOT, SUM, \"COMMENT\", COMMENT_GRADE, COMMENT_TIME) " + "values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" ; String sql = String.format(sql_insert1, "'" + txtSpot.getSPOT() + "'" , "'" + txtSpot.getLOCATION() + "'" , "'" + txtSpot.getOPENTIME() + "'" , "'" + txtSpot.getPHONE() + "'" , "'" + txtSpot.getINTRO() + "'" , "'" + txtSpot.getGRADE() + "'" , "'" + txtSpot.getHOT() + "'" , "'" + txtSpot.getSUM() + "'" , "'" + txtSpot.getCOMMENT() + "'" , "'" + txtSpot.getCOMMENT_GRADE() + "'" , "'" + txtSpot.getCOMMENT_TIME() + "'" ); state.execute(sql); count++; } System.out.println("插入成功,插入了" + count + "条数据" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package clearTxt.txtClearClass;import org.apache.commons.lang.StringUtils;import org.junit.Test;import java.io.*;import java.lang.reflect.Field;import java.util.ArrayList;import java.util.List;import java.util.regex.Matcher;import java.util.regex.Pattern;public class TestFileClass { @Test public void test () { try { System.out.println(readFileToClass().size()); System.out.println(StringUtils.isNotBlank("" )); } catch (Exception e) { e.printStackTrace(); } } public static List<TxtSpot> readFileToClass () throws Exception { List<TxtSpot> list = new ArrayList <>(); File file = new File ("E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\1" ); String[] fileNameArr = file.list(); String path = file.getPath(); String pattern = "\\[(.*?)]" ; Pattern regex = Pattern.compile(pattern); assert fileNameArr != null ; BufferedReader buffReader = null ; InputStreamReader reader = null ; for (String fileName : fileNameArr) { TxtSpot txtSpot = new TxtSpot (); String key = "" ; String value = "" ; InputStream fileInputStream = new FileInputStream (path + "\\" + fileName); reader = new InputStreamReader (fileInputStream); buffReader = new BufferedReader (reader); String strTmp; while ((strTmp = buffReader.readLine()) != null ) { Matcher matcher = regex.matcher(strTmp); if (matcher.find() && (strTmp.equals("[SPOT]" ) || strTmp.equals("[LOCATION]" ) || strTmp.equals("[OPENTIME]" ) || strTmp.equals("[PHONE]" ) || strTmp.equals("[INTRO]" ) || strTmp.equals("[GRADE]" ) || strTmp.equals("[HOT]" ) || strTmp.equals("[SUM]" ) || strTmp.equals("[COMMENT]" ) || strTmp.equals("[COMMENT_GRADE]" ) || strTmp.equals("[COMMENT_TIME]" ))) { if (StringUtils.isNotBlank(key)) { try { setTxtSpot(key, value, txtSpot); } catch (Exception e) { e.printStackTrace(); } } key = matcher.group(1 ); value = "" ; } else { if (StringUtils.isNotBlank(strTmp)) { value += strTmp; } else { value += "\n\n" + strTmp; } } } if (StringUtils.isNotBlank(key)) { setTxtSpot(key, value, txtSpot); } list.add(txtSpot); } assert reader != null ; reader.close(); buffReader.close(); return list; } public static void setTxtSpot (String key, String value, TxtSpot txtSpot) throws Exception { Field field = txtSpot.getClass().getDeclaredField(key); field.setAccessible(true ); field.set(txtSpot, value); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package clearTxt.txtClearClass;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;@Data @NoArgsConstructor @AllArgsConstructor @Builder public class TxtSpot { private String SPOT; private String LOCATION; private String OPENTIME; private String PHONE; private String INTRO; private String GRADE; private String HOT; private String SUM; private String COMMENT; private String COMMENT_GRADE; private String COMMENT_TIME; }
BUG
使用反射根据key,往对象中设置属性的时候报错了,看报错信息这里,是set的问题,仔细看了一下反射的使用,set的时候第一个参数需要放对象,后面是要设置的属性。
这个报错是因为我正则匹配取的是[]中间的值,但是没想到有点评论有email,是在[]中间的,读取出来以后因为没注意所以反射就匹配不到了,导致报错,在反射之前或者再外层的key值,用if筛选好。
日总结
今天的任务多倒是不多,就是有点难,万万没想到自己是用java洗进数据库中的,随便复习了一下java 的io流,以及反射,spark确实如果操作这个文件还是有点难的,不如直接用java来得方便,一个正则或者过滤就能拿到自己需要的value,还是克服了很多困难才搞好,好在弄好了。明后两天就能把数据关联好,数据这块就差不多了。
二次清洗数据
学习日期: 5.8
所学内容概述
IDEA集成DM8
这样idea相当于可以可视化dm8了,操作的时候会方便很多,用java写数据库相关字段的时候还会补全。
创建Driver
添加dm8驱动在DM安装目录下面有 然后修改Driver 应用即可
最后就是配置了
景区评论城市表拆分并字段关联
需要的统一属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val spark: SparkSession = getSparkimport spark.sqlimport spark.implicits._sql(" set spark.executor.processTreeMetrics=false" ) val TXTData : DataFrame = dmRead(spark, "CC.TXT" ) val CSV2 : DataFrame = dmRead(spark, "CC.CSV2" ) val CSV4 : DataFrame = dmRead(spark, "CC.CSV4" ) val XLSX3 : DataFrame = dmRead(spark, "CC.XLSX3" ) val SPOT_INFO : DataFrame = dmRead(spark, "SPOT_INFO" ) val smallData: DataFrame = CSV2 .unionByName(XLSX3 )val bigData: DataFrame = TXTData .unionByName(CSV4 )val SPOT_ID_ORDER : WindowSpec = Window .orderBy($"GRADE" .desc) val CITY_ID : WindowSpec = Window .orderBy($"NAME" .desc) SPOT_INFO .createOrReplaceTempView("spot_info" ) smallData.createOrReplaceTempView("smallData" ) bigData.createOrReplaceTempView("bigData" )
清洗入景区表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def dataToSpot (): Unit ={ val smallDataToDm: DataFrame = sql( """ |select |SPOT as SPOT_NAME, |LOCATION, |OPENTIME, |NULL as PHONE, |NULL as INTRO, |round((cast (substring(GRADE,1,2) as int) / 20),1) as GRADE, |NULL as HOT, |NULL as SUM |from |smallData |""" .stripMargin).distinct() val bigDataToDm: DataFrame = bigData .withColumn("SPOT" , regexp_replace($"SPOT" , "\\?" , "" )) .where("SPOT <> 'null'" ) .withColumnRenamed("SPOT" , "SPOT_NAME" ) .drop("COMMENT" ) .drop("COMMENT_GRADE" ) .drop("COMMENT_TIME" ) .distinct() val finalFrameSpot: DataFrame = bigDataToDm.unionByName(smallDataToDm) .withColumn("SPOT_ID" , row_number() over SPOT_ID_ORDER ) dmWriter(finalFrameSpot,"SPOT_INFO" ) }
清洗入评价表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def dataToComment (): Unit ={ val smallTempData: DataFrame = sql( """ |select |SPOT, |COMMENT as EVALUATION, |NULL as EVALUATION_GRADE, |NULL as EVALUATION_TIME |from |smallData |""" .stripMargin) val bigTempData: DataFrame = sql( """ |select |SPOT, |COMMENT as EVALUATION, |COMMENT_GRADE as EVALUATION_GRADE, |COMMENT_TIME as EVALUATION_TIME |from |bigData |""" .stripMargin) bigTempData.unionByName(smallTempData).createOrReplaceTempView("bigAndSmallTable" ) val comment_final_frame: DataFrame = sql( """ |with t1 as ( |select |SPOT_NAME, |SPOT_ID |from |spot_info |) |select |SPOT_ID, |EVALUATION, |EVALUATION_GRADE, |EVALUATION_TIME, |row_number() over(order by SPOT_ID) as EVALUATION_ID |from |bigAndSmallTable a |join |t1 on t1.SPOT_NAME = a.SPOT |where EVALUATION <> 'null' |""" .stripMargin) dmWriter(comment_final_frame,"COMMENT_INFO" ) }
清洗入城市表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def dataToCity (): Unit ={ val city_final_frame: DataFrame = sql( """ |select |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^省]+省)?([^市]+市)", 2),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as NAME |from |spot_info |""" .stripMargin) .distinct() .withColumn("CITY_ID" , row_number() over CITY_ID ) dmWriter(city_final_frame,"CITY" ) }
景区关联城市id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def spotJoinCITY (): Unit ={ dmRead(spark,"CITY" ).createOrReplaceTempView("city_info" ) val final_spot_test: DataFrame = sql( """ |with t1 as ( |select |*, |if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^省]+省)?([^市]+市)", 2),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as NAME |from |spot_info) |select |t1.*, |ci.CITY_ID |from |t1 |join city_info ci on ci.NAME = t1.NAME |""" .stripMargin).drop("NAME" ) dmWriter(final_spot_test,"SPOT_INFO" ) }
BUG
今天的报错差不多只有这一个需要搜索解决的,搜索的结果是spark3.0的问题,两个选择,要么降低spark版本,我觉得太麻烦了,就选择了修改配置,sql(" set spark.executor.processTreeMetrics=false") ,需要这个警告会卡住不动。所以解决了一下。
日总结
今天的状态极佳,原定三天左右时间完成二次清洗的,没想到今天一天就完成了,梳理好思路,再敲代码,基本没什么报错,之前比赛的时候写sql都会经常报错,今天的难点其实在关联字段上面,一定要梳理清除,我的办法就是建表,先找出两个表有关的字段,然后join,取出新表需要的所有字段包括关联字段。所以之前从样数据中抽到数据库的表一定是不能动的,在拆分表的时候是要用的,后面如果还要建表的话可能还会用到,然后就是在这几天的清洗过程中发现正则对于处理异常数据,包括找某个点的字段的时候还是非常好用的,抽空还是需要去看看正则表达式的使用,也是程序员的必备技能。
云服务器维修重装
学习日期: 5.9
所学内容概述
重装原因
达梦数据库一直突然显示6001报错,然后服务也一直起不来,说是表空间损坏了,修了很久也没弄好。直接重装云服务器以及各种配置,因为过程比较多,就不一一放上来了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Starting DmServiceDMSERVER: ./DmServiceDMSERVER: line 413: 2834 Floating point exception(core dumped) eval exec "\"$EXEC_PROG_FULL_PATH \"" "$STARTUP_PARAMS " -noconsole $TMP_START_MODE > "$SERVICE_LOG_FILE " 2>&1 [ FAILED ] file dm.key not found, use default license! version info: develop DM Database Server x64 V8 1-2-38-21.07.09-143359-10018-ENT startup... Normal of FAST Normal of DEFAULT Normal of RECYCLE Normal of KEEP Normal of ROLL Database mode = 0, oguid = 0 License will expire on 2022-07-09 file lsn: 24388 ndct db load finished
BUG
BUG1
问题起源,开始连接失败,自己去找原因,各式各样的改配置改东改西,全部都没有用。因为linux只有命令行,命令行对服务启动和关闭又会报其他各种错误,最后实在受不了,直接将阿里云服务器重置了。
BUG2
重装服务器以后,之前部署的网页发现js好像是没读取到,点击的时候也没有跳转,去宝塔上面看发现php那块自己设置的是静态,因为服务器没装php,把php装上,配置好网站,就正常显示了。
日总结
本来打算将自己阿里云服务器装好nginx和redis等,一些部署需要用到的东西,装完测试自己之前做过的瑞吉外卖部署上去以后,自己想看一下dm8数据库的时候报错了,修了半天没修好,只能把整个服务器重装了,部署的话还是在工作室服务器或者换一台服务器部署好了。
数据重新放dm(修改)
学习日期: 5.10
所学内容概述
之前的代码自己又修改了一遍,因为懒得建表,所以都是先直接overwrite跑一遍,自动建表,然后把ddl复制改一下类型,再建最终表append进去。后面的自己的方法基本没看,注意的地方就是临时表的表名。
1 2 3 4 5 6 7 8 9 def main (args: Array [String ]): Unit = { dataToSpot() dmWriter(dmRead(spark,"SPOT_INFO_TEMP_TEST" ).withColumn("SPOT_ID" ,row_number() over SPOT_ID_ORDER ),"SPOT_INFO_TEMP" ) dataToComment() dataToCity() spotJoinCITY() dmWriter(dmRead(spark,"SPOT_INFO_FINAL" ),"SPOT_INFO" ) }
BUG
只有不值得记得一些小bug,比如dm抽取的时候,抽出表了,这种看报错信息一下就看出来了,就不记录了,总的来说今天没有什么BUG。
有一个小警告是之前出现过的,本来我加了配置设置已经没了,但是今天重新导入的时候又报错了。但是不影响使用以及数据的导入。
日总结
今天上午把dm8重装了,用镜像装好以后,发现使用docker装的话方便很多,后面docker还是值得花点时间去学习一下的,好像安装都会很方便的。然后下午就把数据给导进去了,算把之前自己写的方法回忆了一下,然后进行了少部分的修改,能更好的贴合图表的数据。
分析数据梳理展示图表
学习日期: 5.11
所学内容概述
旅游分析
所能实现的表图
首页展示 二维图
最热景区(按照景区表的HOT)
星级景区(按照景区表的GRADE 5星为最高)
点评最多(按照景区表的SUM)
热门城市
热门省份
最近景区(根据访问ip确定位置)
口碑排行优(根据评论 判断情绪加权算法)
口碑排行劣(根据评论 判断情绪加权算法)
首页展示 三维图
x轴 省份名 y轴 景区 z轴 点评数
x轴 省份名 y轴 景区 z轴 热门数
x轴 省份名 y轴 景区 z轴 点评数
x轴 省份名 y轴 城市 z轴 热度排行(城市所有景区热度相加)
基本二维图加省份就能做三维图,二选一
季节推荐景区(x轴 季节 y轴 景区 z轴 推荐程度)
季节最热门景区(x轴 季节 y轴 景区 z轴 该季度总评论数)
评论区
评论词云 百分比圆图(点击查看高频词的评论)
智能推荐
点进去就有的(综合各个字段 根据推荐算法 排出最值得游玩的5大景区 展示推荐指数)
按照用户要求 推荐出景区 可做表格 可做轮播图
BUG
无
日总结
今天得把所能呈现的表都要分析清楚了,根据字段,把一般景区的表都先分析出来,可添加三维的,算是没分析完 ,因为有部分表在制作过程中可能还是需要去掉的,所以最好能分析出25-30张图表,后面再筛选,晚上学习了一下如何画原型图,页面展示成什么样子,又架构了一下,明天把原型图试着画一下
地址判断省份 | 绘制原型图
学习日期: 5.12
所学内容概述
需求:根据城市名判断所在省份包括地级市
实现:通过js前端完成
注意点:要理解provinces.js文件的数据结构,然后就是基本的for循环和if判断,就要搞清楚逻辑,地级市那边是如果没在直接的省份下没找到,就跑到对应市下面的县级市和区去找,找以后返回对应的省份。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 <template> <div> <input type="text" v-model="city"> <button @click="a()">搜索</button> <h1>{{city}}属于:{{ province }}</h1> </div> </template> <script> import provinces from '../hooks/provinces' import {ref} from 'vue' export default { name:'City', setup(){ let city = ref('') let province = ref('') function a() { console.log(provinces) provinces.forEach((item,index) => { item.city.forEach((itt, idx) => { console.log(item.name) if (itt.name == city.value) { console.log(city.value+'位于省份:', item.name) province.value = item.name } else{ for(let i=0;i<itt.districtAndCounty.length;i++){ if (itt.districtAndCounty[i] == city.value){ console.log(city.value+'位于省份:', item.name) province.value = item.name break } } } }) }) } return{ a,city,province } } } </script>
province.js有点多,我直接放阿里云上面了,链接如下:
https://u7img.oss-cn-hangzhou.aliyuncs.com/provinces.js
成功效果图
绘制原型图
大致基本构造如下,样式什么都是需要修改的。
BUG
BUG1
报错如图所示,虽然有报错,但是其实是正常显示的,控制台这样输出一个红色雀氏不太好看,仔细分析了一下可能是在我判断length的数组还没创建的时候,他调用了这个,会导致出现undefined,在使用length之前用if判断一下是不是空即可,换成foreach也是一样的,之前foreach的时候也有这个报错。
日总结
今天上午的任务是把之前的原型图按照比赛要求,进行了修改,紧紧地将分析两个关键字和项目扣住,根据学长的建议画了原型图,这两天先出一版初稿。下午想到如果能根据城市分析出对应的省份,那对于项目来说能增加很多表,能也多几个页面。所以就去研究了一下,发现基本都是在集合中,字段匹配的方法,考虑到前端画表的问题,先使用js实现了,如果后面不太方便的话,还是要用其他语言多一个省份表比较好。
Java城市表添加省份字段
学习日期: 5.13
所学内容概述
起因
考虑到旅游页面设计的很多图表都是根距离以及省份来的,如果前端根据城市名找出对应的省份的话,每次都是四层循环,而且效率很低,有些地方没法break,没法优化算法,所以就直接打算清洗到数据库中好了,直接放城市表中,也不加省份id,连接表了,省份表如果就两个字段确实没必要。
清洗思路
根据之前js的方法,就是把所有省份和城市,县城封装到一个类似于集合的地方,就比如前端处理的时候是放js中的,那个js格式和json很想,我想java能不能读json文件呢,应该是可以的,就把之前存放city的js文件处理成json的,放IEDA中了,那只要弄清楚结构,一层一层拨开就行了json文件如下:https://u7img.oss-cn-hangzhou.aliyuncs.com/city.json
准备工作
还是需要导入对json处理的Maven依赖的。
1 2 3 4 5 <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.35</version > </dependency >
代码如下(有注解)
基本每句都有注解,懂json结构,尝试过下面代码debug的,应该还是很清晰的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 package clearTxt.jsTest;import clearTxt.txtClearArrayList.ConDm;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import org.junit.Test;import java.io.*;import java.sql.ResultSet;import java.sql.Statement;import java.util.ArrayList;import java.util.List;import java.util.Set;public class jsonToDataTest { @Test public void testProvince () { String city = "苍南县" ; String s = cityGetProvince(city); System.out.println(city+":" +s); } public static void main (String[] args) throws Exception { provinceNameIntoCity(); } public static void provinceNameIntoCity () throws Exception { Statement statementGet = ConDm.connectDm(); String sql = "select NAME from CITY" ; ResultSet rs = statementGet.executeQuery(sql); Statement statementInto = ConDm.connectDm(); int count = 1 ; while (rs.next()){ String cityName = rs.getString("NAME" ); String provinceName = cityGetProvince(cityName); System.out.println(cityName + "——————" + provinceName); String sqlInto = "insert into CITY_INFO(NAME,CITY_ID,PROVINCE_NAME) values (%s,%d,%s)" ; String sqlFinalInto = String.format(sqlInto, "'" +cityName+"'" , count, "'" +provinceName+"'" ); statementInto.execute(sqlFinalInto); count++; } System.out.println("添加完成——一共" +count+"条数据" ); } public static String cityGetProvince (String cityName) { String jsonData = getStr(new File ("src/main/resources/city.json" )); JSONObject parse = (JSONObject) JSONObject.parse(jsonData); assert parse != null ; Set<String> provinces = parse.keySet(); for (String province : provinces) { JSONArray provinceArray = parse.getJSONArray(province); for (Object value : provinceArray) { JSONObject cityArray = (JSONObject) value; Set<String> cityArrays = cityArray.keySet(); List<String> list = new ArrayList <>(cityArrays); String city = list.get(0 ); if (cityName.equals(city)) { return province; } JSONArray countys = cityArray.getJSONArray(city); for (Object o : countys) { String county = (String) o; if (cityName.equals(county)) { return province; } } } } return "未找到" ; } public static String getStr (File jsonFile) { String jsonStr = "" ; try { FileReader fileReader = new FileReader (jsonFile); Reader reader = new InputStreamReader (new FileInputStream (jsonFile),"utf-8" ); int ch = 0 ; StringBuffer sb = new StringBuffer (); while ((ch = reader.read()) != -1 ) { sb.append((char ) ch); } fileReader.close(); reader.close(); jsonStr = sb.toString(); return jsonStr; } catch (IOException e) { e.printStackTrace(); return null ; } } }
BUG
正式写的时候,没遇到什么值得记的bug,多次测试敲出来的,所以我觉得有些报错不算bug。
日总结
考虑到该旅牛网压力最大的就是前端人员了,所以一些后端能处理的逻辑,尽量我们给他们处理好,所以就打算把省份信息直接放到数据库中,这样后端写接口也会方便,前端也方便,直接使用就行了。还是花了一番功夫的,刚开始试着直接用java调用js文件,发现网上没什么教程,而且return的值他们基本都是很简单的string,而不是这种复杂的好几层的集合,所以就想到存成json,应该处理就方便点了。花了下午的时间把方法写好,晚上把city表完善了一下。
城市表修改
学习日期: 5.15
所学内容概述
起因
之前将城市表绑定了省份名,这样会造成一个不好的情况就是前端router就会出现中文了,那是非常不专业不美观的,所以打算创建一个省份表并给城市表绑定上省份id
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def cityJoinProvince (): Unit ={ val province_data: DataFrame = dmRead(spark, "PROVINCE_INFO" ).withColumnRenamed("NAME" ,"NAME1" ) val city_data: DataFrame = dmRead(spark, "CITY_INFO" ) val city_temp: DataFrame = city_data.join(province_data, $"PROVINCE_NAME" === $"NAME1" ) .selectExpr("NAME" , "CITY_ID" , "PROVINCE_ID" ) dmWriterOverwrite(city_temp,"CITY_TEMP" ) } def createToProvince (): Unit ={ val province_data: DataFrame = dmRead(spark, "CITY_INFO" ) .selectExpr("PROVINCE_NAME" ) .distinct() .withColumnRenamed("PROVINCE_NAME" , "NAME" ) .withColumn("PROVINCE_ID" , row_number() over Window .orderBy($"NAME" )) dmWriterAppend(province_data,"PROVINCE_INFO" ) }
BUG
想使用spark中是join连接表的时候报错了,看情况报了两个Name的错,感觉是像之前join的时候两个表都有NAME字段,果然如此,然后就先把随便一个表的NAME临时改个名就行了。或者province_data(“NAME”)指定一下是哪个df 的列名
日总结
今天只有下午的学习时间,所以就打算先把数据再弄的规范点,基本没什么问题,之前解决过类似的问题,但是直接都是使用createTableTemp创建临时表的进行表连接的,那样代码不是很好看现在想用spark 的join试一下,发现出了一点问题,但是解决还是顺利的。
python的flask使用
学习日期: 5.16
所学内容概述
起因
因为推荐的话,学长说是py比scala快很多,所以打算用python,因为如果是用scala的spark的话,可以用SpringBoot直接调用scala的main方法达到一个,给接口返回数据的作用,但是如果用py的话应该不太行了,就打算用python写接口,就找到了轻量性的web框架
实现
直接导入flask就行了然后上面写一个@标签运行run就行了,方便很多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from flask import Flaskapp = Flask(__name__) @app.route('/' ) def hello_world (): users = ["User1" , "User2" , "User3" , "User4" , "User5" ] items = ["西湖" , "武当山风景区" , "横店影视城" , "普陀山风景区" , "灵山大佛" , "黄鹤楼" ] datasets = [ [1 , 0 , 1 , 1 , 0 , 1 ], [1 , 0 , 0 , 1 , 1 , 0 ], [1 , 0 , 1 , 0 , 0 , 1 ], [0 , 1 , 0 , 1 , 1 , 0 ], [1 , 1 , 1 , 0 , 1 , 0 ], ] import pandas as pd df = pd.DataFrame(datasets, columns=items, index=users) json_data = df.to_json(orient='records' , force_ascii=False ) return json_data if __name__ == '__main__' : app.run()
BUG
接口写好以后,访问了以后报错了,推测是类型问题,因为我直接返回了dataframe,一般前端是要json的,但是我推荐处理一般都是dataframe或者其他的,所以试着转成json返回试试。df.to_json(orient=‘records’, force_ascii=False)解决
日总结
今天本来是学了上午学了相似度计算,后面感觉似乎数据格式要求有点太死板了,然后去看了看别的,没有什么好的教程,只能慢慢测试了,因为py后面我推荐也是要写接口的,就试着使用python返回接口路由,没想到这么简单,不像java要那么麻烦,这轻量级的web用着就是舒服。
推荐算法——相似度
学习日期: 5.17
所学内容概述
起因
因为打算做一个,如果用户收藏该景区推荐出其他收藏该景区的用户也去过哪些地方。这样的话需要计算相似度,目前比较好的是杰卡德和皮尔逊算法来计算相似度,具体如何使用还是得看需求。
杰卡德相似度
如下代码都有注解,spark学过以后学这个也不算太难。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import pandas as pdusers = ["User1" , "User2" , "User3" , "User4" , "User5" ] items = ["西湖" , "武当山风景区" , "横店影视城" , "普陀山风景区" , "灵山大佛" , "黄鹤楼" ] datasets = [ [1 , 0 , 1 , 1 , 0 , 1 ], [1 , 0 , 0 , 1 , 1 , 0 ], [1 , 0 , 1 , 0 , 0 , 1 ], [0 , 1 , 0 , 1 , 1 , 0 ], [1 , 1 , 1 , 0 , 1 , 0 ], ] print ()df = pd.DataFrame(datasets, columns=items, index=users,dtype=bool ) print (df)from sklearn.metrics import jaccard_scoreprint (jaccard_score(df["西湖" ], df["横店影视城" ]))from sklearn.metrics.pairwise import pairwise_distancesuser_similar = 1 - pairwise_distances(df.values, metric='jaccard' ) user_similar = pd.DataFrame(user_similar, columns=users, index=users) print ("用户之间的两两相似度: " )print (user_similar)item_similar = 1 - pairwise_distances(df.T.values, metric='jaccard' ) item_similar = pd.DataFrame(item_similar, columns=items, index=items) print ("景区之间的两两相似度: " )print (item_similar)topTwo_users = {} for i in user_similar.index: _df = user_similar.loc[i].drop([i]) _df_sorted = _df.sort_values(ascending=False ) topTwo_users[i] = list (_df_sorted.index[:2 ]) print (topTwo_users)import numpy as nprs_results = {} for user,sim_users in topTwo_users.items(): rs_result = set () for sim_user in sim_users: rs_result = rs_result.union(set (df.loc[sim_user].replace(False ,np.nan).dropna().index)) rs_result -= set (df.loc[user].replace(False ,np.nan).dropna().index) rs_results[user] = rs_result
BUG
BUG1
这里ix我只有在df那边使用过,报错了,去搜了一下说是ix被弃用了换成了loc,该方法的返回值是一个Seriers其实就是把这一行的数据全部展示,结构有点像python集合(java的字典),几列就几个集合,里面一个是列名一个是参数。改成loc就没报错了。
BUG2
这个去搜了一下说是scikit-learn从0.20.1不再自动修改numpy语法。所以需要养成更加规范的书写习惯解决。所以是计算值的话就要加上df.values,user_similar = 1 - pairwise_distances(df.values, metric=‘jaccard’)问题解决。报错前是没values的。
BUG3
这是一个警告,对结果是没影响的,但是每次运行就报警告不太舒服,去搜了一下,解决:设置dtype=bool,比如 df = pd.DataFrame(datasets, columns=items, index=users, dtype=bool)。因为自己的值设置是0 和 1,好像是说 0 1的话要当成bool类型的,所以要提前设置dtype=bool
BUG4
这个问题有点难的,其实是看他的pairwise_distances的原理,是直接计算行之间杰卡德相似度,所以像我是一个5*6的矩阵,我刚开始是df.values,那他只有5行,在我转成dataframe的时候,我又将行和列要设置成items,但是我的items列表是6个数据,所以导致数据超出,无法创建dataframe,但是我又想要items之间的杰卡德相似度,items又是列名,那只能进行转置,python转置还是很方便的直接dataframe.T就行了,也就是行转列,之间的数据是不会变的。
日总结
上午学的算法,主要的任务是弄懂python中的dataframe的一些基本方法,然后试着用假数据推荐出了相似用户给出的推荐景区,感觉还不错,后面搞清除需求,推荐相似用户的景区应该也是挺好实现的。上午才只学了杰卡德没学皮尔逊原因主要是bug实在是有点多,遭不住,修完就一个小时了,然后又一句一句去理解代码,因此一上午就学了一个杰卡德。明天学习进度应该可以快起来了。
推测评分案例
学习日期:5.18
所学内容概述
任务概述
有一个用户对电影的评分表,根据所有用户对电影的评分,有些用户对有的电影是没评分的,我们要根据该用户和其他用户的相似度,再根据其他用户对该电影的评分,推测出如果该用户对该电影评分,会是多少分。该算法是基于皮尔逊相似系数以及用户之间的相似度推测的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import pandas as pdimport numpy as npDATA_PATH = "../datasets/ml-latest-small/ratings.csv" dtype = {"userId" : np.int32, "movieId" : np.int32, "rating" : np.float32} ratings = pd.read_csv(DATA_PATH, dtype=dtype, usecols=range (3 )) ratings_matrix = ratings.pivot_table(index=["userId" ], columns=["movieId" ], values="rating" ) similarityCorr = ratings_matrix.T.corr() similar_users = similarityCorr[1 ].drop([1 ]).dropna() similar_users = similar_users.where(similar_users>0 ).dropna() ids = set (ratings_matrix[3 ].dropna().index) & set (similar_users.index) final_similar_user = similar_users.loc[list (ids)] sum_up = 0 sum_down = 0 for sim_uid,similarity in final_similar_user.iteritems(): sim_user_rating_for_item = ratings_matrix.loc[sim_uid][3 ] sum_up += sim_user_rating_for_item * similarity sum_down += similarity final_rating_for_one = sum_up/sum_down
几种相似度异同
杰卡德相似度&余弦相似度&皮尔逊相关系数
余弦相似度
度量的是两个向量之间的夹角, 用夹角的余弦值来度量相似的情况
两个向量的夹角为0是,余弦值为1, 当夹角为90度是余弦值为0,为180度是余弦值为-1
余弦相似度在度量文本相似度, 用户相似度 物品相似度的时候较为常用
余弦相似度的特点, 与向量长度无关,余弦相似度计算要对向量长度归一化, 两个向量只要方向一致,无论程度强弱, 都可以视为’相似’
皮尔逊相关系数Pearson
实际上也是一种余弦相似度, 不过先对向量做了中心化, 向量a b 各自减去向量的均值后, 再计算余弦相似度
皮尔逊相似度计算结果在-1,1之间 -1表示负相关, 1表示正相关
度量两个变量是不是同增同减
皮尔逊相关系数度量的是两个变量的变化趋势是否一致, 不适合计算布尔值向量之间的相关度
杰卡德相似度 Jaccard
两个集合的交集元素个数在并集中所占的比例, 非常适用于布尔向量表示
分子是两个布尔向量做点积计算, 得到的就是交集元素的个数
分母是两个布尔向量做或运算, 再求元素和
总结:余弦相似度适合用户评分数据(实数值), 杰卡德相似度适用于隐式反馈数据(0,1布尔值)(是否收藏,是否点击,是否加购物车)
余弦相似度
皮尔逊相关系数
用户间预测方法
公式不知道怎么放上来,放上来也看不太懂,就是假如我要预测用户1对电影10的评分,首先要先用皮尔逊算出所用用户之间的相似度。然后取正相似度的用户,将这用户对电影10的评分取出,乘与用户1的相似度,全部累加作为分子,分母就是与用户1相似度累加,分子除分母就是推测评分。
BUG
又是个小警告,好像是因为ids是两个的交集,但是两个交集没指定类型,所以要设置成集合Set,集合才有求交集的方法,所以set(ratings_matrix[3].dropna().index) & set(similar_users.index)加两个Set类型转换一下就行了,不过不影响运行。
日总结
推荐算法如果说弄懂原理的话还是很难的,如果只是跟着敲只会一脸茫然,所以把每一行代码搞清楚都是很有必要的,有些地方老师的代码也未必好使,不这样搞懂,很难活学活用到现实中,看课件的代码发现很多地方,其实有多此一举的时候,稍加修改了一下,优化了一点效率,这预测不知道能不能用在中软杯项目上。
电影关键字案例
学习日期: 5.19
所学内容概述
案例
我觉得这个案例还是很有难度的,自己做的话做不太出来,三表之间要连接又要算相似度,又要筛选id,如果操作sql的话,自己应该能实现,现在对python还不是很熟悉,看着这案例代码,应该可以实现项目中词频了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 import collectionsfrom functools import reduceimport pandas as pdimport numpy as npfrom gensim.corpora import Dictionaryfrom gensim.models import TfidfModel_tags = pd.read_csv("../datasets/ml-latest-small/all-tags.csv" , usecols=range (1 , 3 )).dropna() _tags['tag' ] = _tags['tag' ].str .lower() tags = _tags.groupby('movieId' ).agg(list ) movies = pd.read_csv("../datasets/ml-latest-small/movies.csv" , index_col="movieId" ) movies['genres' ] = movies['genres' ].str .lower() movies['genres' ] = movies['genres' ].apply(lambda x: x.split('|' )) movies_index = set (movies.index) & set (tags.index) new_tags = tags.loc[list (movies_index)] ret = movies.join(new_tags) temp = map ( lambda x: (x[0 ], x[1 ], x[2 ], x[2 ] + x[3 ]) if x[3 ] is not np.nan else (x[0 ], x[1 ], x[2 ], []), ret.itertuples() ) movie_dataset = pd.DataFrame(temp, columns=["movieId" , "title" , "genres" , "tags" ]) movie_dataset.set_index('movieId' , inplace=True ) dataset = movie_dataset['tags' ].values dct = Dictionary(dataset) corpus = [dct.doc2bow(line) for line in dataset] model = TfidfModel(corpus) _movie_profile = [] for i, data in enumerate (movie_dataset.itertuples()): mid = data[0 ] title = data[1 ] genres = data[2 ] vector = model[corpus[i]] movie_tags = sorted (vector, key=lambda x: x[1 ], reverse=True )[:30 ] topN_tags_weights = dict (map (lambda x: (dct[x[0 ]], x[1 ]), movie_tags)) for g in genres: topN_tags_weights[g] = 1.0 topN_tags = [i[0 ] for i in topN_tags_weights.items()] _movie_profile.append((mid, title, topN_tags, topN_tags_weights)) movie_profile = pd.DataFrame(_movie_profile, columns=["movieId" , "title" , "profile" , "weights" ]) movie_profile.set_index("movieId" , inplace=True ) ''' 建立tag-物品的倒排索引 ''' def create_inverted_table (movie_profile ): inverted_table = {} for mid, weights in movie_profile["weights" ].iteritems(): for tag, weight in weights.items(): _ = inverted_table.get(tag, []) _.append((mid, weight)) inverted_table.setdefault(tag, _) return inverted_table inverted_table = create_inverted_table(movie_profile) watch_record = pd.read_csv("../datasets/ml-latest-small/ratings.csv" , usecols=range (2 ), dtype={"userId" :np.int32, "movieId" : np.int32}) watch_record = watch_record.groupby("userId" ).agg(list ) user_profile = {} for uid, mids in watch_record.itertuples(): record_movie_profile = movie_profile.loc[list (mids)] counter = collections.Counter(reduce(lambda x, y: list (x)+list (y), record_movie_profile["profile" ].values)) interest_words = counter.most_common(5 ) maxcount = interest_words[0 ][1 ] interest_words = [(w,round (c/maxcount, 4 )) for w,c in interest_words] user_profile[uid] = interest_words
BUG
在写完自己自定义map方法执行之后出现了报错,翻译是说str不能和list连接但是我索引2 3应该都是list啊怎么不能相加呢,我看案例代码就是这样,然后自己debug了一下发现,怎么会多了一个索引为index的字段,后面的索引都加+1了,修改索引以后,正常了,推测是map以后会自动往首位添加index字段。
日总结
这电影标签关键词案例是有难度的,自己敲完都是会去debug,一行一行看。这个案例讲一下就是根据电影的标签,以及用户看过的电影,能给用户推荐出用户想要的电影。具体实现还是很复杂的,要算tf-idf相关度,倒排索引等操作,还是需要实践的时候融会贯通了。
python操作dm数据库
学习日期: 5.22
所学内容概述
连接dm数据库是需要导入import dmPython的,但是有个问题。在python3是没有什么用的,pycharm会一直说找不到的dmPython的模块,import后面也会爆红。
解决办法
原本是直接使用的pip安装的dmPython但是发现pip下载的什么方法都没有,去达梦数据库的教程书上发现安装并不是用pip,而是用python执行一个setup.exe文件。
在我电脑的E:\software\dmdbms\drivers\python\dmPython目录下执行:(C++可能会报错我列下面bug中)
然后会发现别人都可以,但是你是不行,依旧爆红,而且无法使用,原因是:
1 2 3 4 5 6 7 8 由于在 Windows 环境下,python3.8 提高了扩展模块(dmPython)的依赖 DLL 加载的安全性,现在只能从以下三个目录加载依赖库:system paths(即 system32 目录)、PYD 文件所在目录、通过 add_dll_directory()添加的录。通过设置 PATH 环境变量和当前工作目录将不再有效。因此,在 Windows 上安装完 dmPython,即使环境变量 PATH 设置了 dpi库所在目录,在 import dmPython 时也会出现"DLL load failed while imorting dmPython" 的报错。在 python3.8 及以上版本环境下使用 dmPython,需要进行以下设置: 1. 在 C:\Python38\Lib\site-packages 增加.pth 结尾的文件,例如dmPython.pth,文件内容如下: import dpi2. 在 C:\Python38\Lib\site-packages 路径下增加 dpi.py,内容如下: import osos.add_dll_directory(r'E:\software\dmdbms\bin' )
完成以上步骤,就可以连接数据库了。dmPython爆红也没关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import dmPythondef connDm (): conn = dmPython.connect(user='SYSDBA' , password='SYSDBA' , server='47.120.9.247' , port=5236 , autoCommit=True ) return conn conn = connDm() cur = conn.cursor() cur.execute('select * from CITY_INFO' ) comment_list: list = cur.fetchall() print (comment_list)
BUG
BUG1
在python setup.exe install的时候会报c++的错误就很奇怪,但还是跟着步骤去修bug了,也不知道原因:
解决步骤如下
1、下载Microsoft Visual C++ 14. 0地址:https://download.visualstudio.microsoft.com/download/pr/cb1d5164-e767-4886-89552df3a7c816a8/ccac76bbd83f9d0e78a32f8bb22be6d2aca3fb5f10bf870504d8d84f36ab3440/vs_BuildTools.exe
2、打开下载文件,然后选择以下选项
3.安装即可,大约需要8分钟,最后安装完成点击【重启】即可
4.重启后,再去安装之前需要安装的Python包,成功!
BUG2
这里就是说找不到模块,但是是可以用的,也不知道为什么,直接点更多操作把它忽略了就行了。在更多操作里面,也可以Alt+Enter
日总结
今天将昨天的案例改了一下,然后试着分析dm数据库中评论的词频,但是卡在了读dm数据库,搞了一天,因为正常的模块都是pip就行了,使用mysql的那个依赖不行,安装dmPython的过程也出现了不少的问题,前前后后下载了很多次,试着去用了其他的依赖发现 都不太行,最后报错的时候也执行了一下,发现竟然可以。
景区评论词频统计
学习日期: 5.23
所学内容概述
起因
打算制作一个查看评论高频词的功能,但是效果不是很佳,实现代码以及逻辑点都写下面的代码中去了。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 from collections import Counterimport pandas as pdimport pkusegimport DmConnectdef segCut (x ): res = [] for i in x: a = list (set (seg.cut(i))) res += a return res conn = DmConnect.connDm() cur = conn.cursor() cur.execute('select SPOT_ID,EVALUATION from COMMENT_INFO' ) comment_list: list = cur.fetchall() comment_df = pd.DataFrame(comment_list, columns=["SPOT_ID" , "EVALUATION" ]) comments = comment_df.groupby('SPOT_ID' ).agg(list ) stopwords = pd.read_csv('../datasets/ml-latest-small/chineseStopWords.txt' , encoding='gbk' , names=['stopword' ], index_col=False ) stop_list = stopwords['stopword' ].tolist() seg = pkuseg.pkuseg() comments['cut' ] = comments['EVALUATION' ].apply(lambda x: [i for i in segCut(x) if i not in stop_list]) del comments['EVALUATION' ]def final_comment_list (): spot_cut_list = [] for spot_id, cuts in comments.itertuples(): spot_dict = {} counter = Counter(cuts) cutRes = counter.most_common(15 ) spot_dict[spot_id] = cutRes spot_cut_list.append(spot_dict) return spot_cut_list final_comment_list()
不足点
分词结果出来以后发现是很乱的,做不到淘宝那种形式的,很准确,他会将一些字也统计了,比如黄鹤楼不错,他会分为黄鹤楼和不错,黄鹤楼明显不是我需要的关键词。所以返回好的接口自己查看了一下,是不太行,还需要去找一些文献模型训练一下。
写入接口
暂时先放景区id,如果需要名字后面再加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask import Flaskfrom flask import jsonifyimport dmToDfpkusegapp = Flask(__name__) @app.route('/spot/comment/all' ) def spot_comments_high_word (): data = dmToDfpkuseg.final_comment_list() return jsonify({'msg' :'查询成功' ,'code' :200 ,'data' :data}) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(port=8081 )
BUG
BUG1
报错信息一看就是iloc中的类型不对好像,所以在执行前我试了一下type,先看了数据类型,发现都是int,也就是需要的,这就奇怪了,然后在控制台测试了一下直接修改,发现没报错,原来是超行了,他是索引从0开始的,而且第一个参数是行,第二个是列,所以我先去把最后一列生成空白值的,再修改,问题解决。
日总结
今天的任务其实还是很难的,自己能不能实现也没底,因为中文需要分词的问题,困扰了我好久,目前主流是三种分词,选择了最优的pkuseg,但是发现其实还是一般,后期试一下怎么自己添加数据集进行训练,因为他比如风景好,门票便宜,他是搞不出来,它会分成风景,好,门票,便宜。这个问题需要去解决一下,然后明天打算把语句的情绪分析给先解决了,这词频跑一遍的确有点太慢了。
评论情绪分析
学习日期: 5.24
所学内容概述
修改高频词接口
原本是spot_id:data这样的数据,这样没有景区名,不太好,加入了景区名,然后添加了一个动态路由的接口。Flask接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from flask import Flaskfrom flask import jsonifyimport dmToDfpkusegapp = Flask(__name__) @app.route('/spot/comment/all' ) def spot_comments_high_word (): data = dmToDfpkuseg.final_comment_list() return jsonify({'msg' :'查询成功' ,'code' :200 ,'data' :data}) @app.route('/spot/comment/<spot_id>/' ) def spot_comment_high_word (spot_id ): data = dmToDfpkuseg.final_comment_list() print (spot_id) for i in data: if i['spot_id' ] == eval (spot_id): print (i) return jsonify({'msg' :'查询成功' ,'code' :200 ,'data' :i}) return jsonify({'msg' :'查询失败,没有该景区id' ,'code' :500 ,'data' :'null' }) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(port=8081 )
对评论表进行情绪分析
起因
因为提供的数据中,有部分评分是有对该景区打分的,但是绝大多数是没有的,所以打算添加一个情绪评分列,并用该评分返回对应的情绪,算出平均情绪值或者好情绪和差情绪数,能侧面反映出该景区的好坏,在推荐上也能做文章了。
代码
基于snownlp情绪库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from snownlp import SnowNLPimport pandas as pdimport DmConnectdef scoreSnow (sentiments ): if 0.8 <= sentiments <= 1 : feelMood = "非常满意" elif 0.6 <= sentiments <= 0.8 : feelMood = "比较满意" elif 0.4 <= sentiments <= 0.6 : feelMood = "一般" elif 0.15 <= sentiments <= 0.4 : feelMood = "不太满意" else : feelMood = "非常不满意" return feelMood conn = DmConnect.connDm() cur = conn.cursor() cur.execute("select * from COMMENT_INFO" ) comment_list: list = cur.fetchall() comment_df = pd.DataFrame(comment_list, columns=["SPOT_ID" , "EVALUATION" , "EVALUATION_GRADE" , "EVALUATION_TIME" , "EVALUATION_ID" ]) comment_df.set_index('EVALUATION_ID' , inplace=True ) comment_df['feelScore' ] = '' comment_df['feel' ] = '' for index in list (comment_df.index): comment_values = comment_df['EVALUATION' ] comment_value = comment_values.loc[index] score = round (SnowNLP(comment_value).sentiments, 2 ) comment_df.iloc[index - 1 , 4 ] = score comment_df.iloc[index - 1 , 5 ] = scoreSnow(score)
不足
发现有些评论的情绪打分和实施是不太相符合的,如何解决这个问题。去搜索了一下snownlp的底层代码,发现他是能判断电商评论的一种情感分析模型,如果要分析其他的,需要自己训练,训练方法就是创建两个文件,pos.txt(好)和neg.txt(差),然后对这两个文本进行训练,文本内容一个存放好的情绪评论,一个存放差情绪的评论,训练出新的模型,需要sentiment.load,指定一下训练的模型即可。
BUG
BUG1
出现了404的报错,看下面我请求id后面是有/,动态路由好像是需要加/,不然接口无法访问的,加上/以后动态路由正常了。/spot/comment/<sopt_id>/
总结
自然语言的分析还是有点困难的,问题和不足也在上面说了,要想提高准确率就要去训练模型,但是自己是没有数据集的。自己去一个一个copy 的话效率太低,去看了一下携程旅游的武当山风景区的评论,发现携程的评论刚好有好评和差评,而且我发现比赛方提供的数据和携程的数据极其相似,打算用爬虫把他的好评和差评评论都爬下来加到txt文件中,晚上复习了一下爬虫,明天试下能不能实现,因为是动态的ajax还是有点难度的。
训练snownlp模型
学习日期: 5.25
所学内容概述
使用sqlalchemy连接dm
起因
因为利用snownlp将每个评语的评分都打印出来以后,打算存入数据库中,但是好像没有dataframe直接存放的方法,只有通过Data.to_sql(ame=tableName, con=engine, if_exists=‘append’, index=False),这种形式才能将dataframe导入,con配置的参数又需要使用create_engine的才行,之前dmpython连接的配置的是不行的。
解决过程
刚开始装直接pip装sqlalchemy的时候一直报错,意思就是没有dm的配置,后面一直在尝试,搜索,然后问chatgpt解决了问题,说是要先装dmpython2+版本,再装sqlalchemy1.3左右的。而且要跟之前在装dmpython的时候一样,使用python set.up install执行。
步骤如下
2.1 安装 SQLAlchemy
在终端执行命令:
1 Copypip install SQLAlchemy==1.3.23
如果安装失败,可使用镜像安装,命令如下:
1 Copypip install SQLAlchemy==1.3.23 -i http://pypi.doubanio.com/simple/ --trusted-host pypi.doubanio.com
注意
安装 SQLAlchemy 时需指定版本,SQLAlchemy 1.4 版本及以上暂不支持。
2.2 编译安装达梦数据库的 sqlalchemy 方言
到 $DM_HOME\drivers\python\sqlalchemy 目录下执行命令手动编译安装:python setup.py install。如图所示则代表安装成功。
1 2 Copycd D:\dmdbms\drivers\python\sqlalchemy python setup.py install
使用 pip list 命令查看是否安装成功。
代码实现
1 2 3 4 5 6 def intoDm (data: DataFrame,tableName:string ): conn_url = 'dm+dmPython://SYSDBA:SYSDBA@47.120.9.247:5236' engine = create_engine(conn_url) data.to_sql(name=tableName, con=engine, if_exists='append' , index=False )
训练模型
网上找到了关于景区评论的一些好评和差评的数据存放到pos.txt和neg.txt中,snowslp训练。发现训练以后,对景区评论的识别率明显提升了。缺点也有,就是好评和差评有部分分的太绝对了,只是说了一点差,但是评分就0了。这点暂时无法解决。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import osfrom snownlp import SnowNLPfrom snownlp import sentimentsentiment.train('./neg.txt' ,'./pos.txt' ) sentiment.save('sentiment.marshal' ) print ('"除了黄鹤楼以外,其它景点没有一点意义,就是为了让景点扩容,好多忽悠点门票和商业收入"的情感得分是:' ,SnowNLP("除了黄鹤楼以外,其它景点没有一点意义,就是为了让景点扩容,好多忽悠点门票和商业收入。" ).sentiments)print ('"打卡武汉地标!确实不错!"的情感得分是:' ,SnowNLP("打卡武汉地标!确实不错!" ).sentiments)data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'sentiment.marshal' ) sentiment.load(data_path) print ("————————————————————————修改后——————————————————————————" )print ('"除了黄鹤楼以外,其它景点没有一点意义,就是为了让景点扩容,好多忽悠点门票和商业收入"的情感得分是:' ,SnowNLP("除了黄鹤楼以外,其它景点没有一点意义,就是为了让景点扩容,好多忽悠点门票和商业收入。" ).sentiments)print ('"打卡武汉地标!确实不错!"的情感得分是:' ,SnowNLP("打卡武汉地标!确实不错!" ).sentiments)
看得出来准确率还是有提升了的,原本明明好评只有0.37和差评高达0.45,训练以后直接明显提升。
BUG
这个刚开始是说没有dm配置,折腾了好久,把模块卸载安装来回很多遍,后面使用dm提供的,然后调用才解决了。但是之前自己也使用过dm提供的,是报错的,然后重装了一下,代码删除又复制就好了,挺奇怪的。
日总结
今天实现的任务还是很多的,好多问题都引刃而解了,上午将之前nlp处理不准确的问题大大降低了,解决的话,完全解决的话,单单的模型训练是不够的,自然语言这块本来就算是一个难点,深入的话就需要自己写了,现在自然语言市场也是没法做到100预测的,大概看了下我训练好的,准确度差不多在75。下午解决了将df导入dm8的问题,发现是和之前的spark中join用法一样,而且很方便,会自动创建表以及字段名是按照dataframe中的列名,还是很舒服的。
flask本地运行接口
学习日期: 5.26
所学内容概述
服务器运行flask
如图,我的server.py中只返回了一个hello,但是我api访问下面那个ip地址没有任何反应,服务器安全组的5000端口也打开了,就是没反应不知道为什么。而且为什么会有0结尾的ip的网络地址为访问呢?好像只能内网访问…目前不知道如何解决。
遇到的问题
flask代码不知道怎么在服务器上面运行,试过直接跑,但是访问服务器IP,无法访问,好像是要nginx的反向代理,但是因为有部分推荐还没有实现,所以先在本地能正常使用,再把接口挂在服务器中。
代码如下
解决了浏览器访问中文乱码的问题,将jsonify改成jso.dumps
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from flask import Flask, jsonfrom flask import jsonifyfrom commentDmTest import dmToDfpkusegapp = Flask(__name__) app.config['JSON_AS_ASCII' ] = False @app.route('/' ) def hello_world (): return json.dumps({'data' :"哈哈哈" },ensure_ascii=False ) @app.route('/spot/comment/all' ) def spot_comments_high_word (): data = dmToDfpkuseg.final_comment_list() return json.dumps({'msg' :'查询成功' ,'code' :200 ,'data' :data},ensure_ascii=False ) @app.route('/spot/comment/<spot_id>/' ,methods=['GET' ,'POST' ] ) def spot_comment_high_word (spot_id ): data = dmToDfpkuseg.final_comment_list() print (spot_id) for i in data: if i['spot_id' ] == eval (spot_id): print (i) return json.dumps({'msg' :'查询成功' ,'code' :200 ,'data' :i},ensure_ascii=False ) return json.dumps({'msg' :'查询失败,没有该景区id' ,'code' :500 ,'data' :'null' },ensure_ascii=False ) if __name__ == '__main__' : app.run(host='0.0.0.0' ,port=80 )
BUG
在把代码都集成到一个server.py中以后,发现出现了很多跟导入模块以及路径问题,很奇怪,虽然解决了,但是没搞懂,我server.py会调用另外一个目录下的py文件的方法,假如那个文件是a文件,我a文件中又使用了b文件,b文件和a文件是同一级目录,我自然直接import b即可,但是当我在server中调用a文件的时候,会说找不到b文件,还有使用txt文件的时候,相对路径也会说找不到,只有在最上层的相对路径才能运行,否则就会报错。或者写绝对路径了。要么就将server的flask接口文件和对应的方法都放一个目录中去。
日总结
今日任务不是很重,主要打算测试flask挂载服务器上能否访问,结果搞了一上午都不行,因为自己服务器的nginx是有点问题的,博客在上面也不敢乱配置,所以就没尝试了。下午将所有的接口都写在一个py文件中了,目前就先本地使用吧,连接校园网就能访问。
pySpark的使用
学习日期: 5.29
所学内容概述
学习原因
发现机器学习的推荐还是需要使用sparkMl,但是因为scala的spark已经没那么好使了,底层封装的算法不如pyspark,而且后面比赛放会提供集群给我们,如果我只是使用py的pandas估计会有点吃力。而且如果要智能推荐的话还是要使用rdd结合sparkml使用。
测试
失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import findsparkfindspark.init() SPARK_APP_NAME = "preprocessingBehaviorLog" conf = SparkConf() config = [ ("spark.app.name" , SPARK_APP_NAME), ("spark.executor.memory" , "6g" ), ("spark.master" , "local[*]" ), ("spark.executor.cores" , "4" )] conf.setAll(config) spark = SparkSession.builder.config(conf=conf).getOrCreate()
推荐景区
思路
先使用sql查找出所有景区相关的指标,进行加权字段占比,先进行测试,再使用als模型推荐。结果卡在了如果要对各行数据进行加权处理的话,需要rdd算子,所以就要去学pyspark了,卡在process_row这个方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import pandas as pdfrom pyspark import SparkConffrom commentDmTest import DmConnectfrom pyspark.sql import SparkSessionconn = DmConnect.connDm() cur = conn.cursor() cur.execute(""" with t1 as (select SPOT_ID, round(avg(FEEL_SCORE), 3) as avgScore from COMMENT_INFO_NLP group by SPOT_ID) select SPOT_NAME, GRADE, HOT, t2.SPOT_ID, replace(SUM, substring(SUM, locate('条', SUM)), '') as sum, avgScore from SPOT_INFO t2 join t1 on t1.SPOT_ID=t2.SPOT_ID """ )spot_info_df = pd.DataFrame(cur.fetchall(), columns=['SPOT_NAME' , 'GRADE' , 'HOT' , 'SPOT_ID' , 'SUM' , 'AVG' ]) cur.execute(""" select avg(t1.sum) avgCommentSum from (select replace(SUM, substring(SUM, locate('条', SUM)), '') as sum from SPOT_INFO) t1 """ )avg_comment_sum = cur.fetchall()[0 ][0 ] spot_info_df['SUM' ] = spot_info_df['SUM' ].apply(pd.to_numeric) max_comment_sum = spot_info_df['SUM' ].max () def process_row (r ): grade_count = r.GRADE if r.GRADE else 0.0 hot_count = r.HOT if r.HOT else 0.0 sum_count = r.SUM if r.SUM else avg_comment_sum avg_count = r.AVGSCORE if r.AVGSCORE else 0.0 grade_score = 0.2 * grade_count / 5 * 100 if grade_count <= 5 else 20.0 hit_score = 0.4 * hot_count / 10 * 100 if hot_count <= 10 else 40.0 sum_score = 0.2 * sum_count / max_comment_sum * 100 if sum_count <= max_comment_sum else 20 avg_score = 0.2 * avg_count / 5 * 100 if avg_count <= 5 else 20.0 rating = grade_score + hit_score + sum_score + avg_score return r.SPOT_ID, r.SPOT_NAME, rating
BUG
未找到解决办法,scala中spark使用是可以好直接本地使用的python似乎是需要修改一些配置,等明天看黑马的教程学到了应该就能解决了。推测是电脑里面的spark的基于scala的,所以python的应该需要修改spark的使用语言。
日总结
问题主要集中在pyspark,起初以为跟scala使用spark差不多的,因为发现很多底层封装的源码都是差不多的,结果卡在了启动配置,还得花点时间去搞清楚pyspark才行了,后面启动以后那些rdd算子之类的,就不需要学了,然后将py能在服务器上像java打jar一样运行,任务就完成了。先把本地需要的接口都写完,然后去研究一下nginx和docker,将python部署上去。
虚拟机pyspark环境配置
学习日期: 5.30
所学内容概述
简单概述
首先要先安装好anaconda,里面是有pyspark的,然后配置服务器中的python和pyspark 的环境,并在spark的sbin目录下,输入./pyspark,就是以local模式使用python语言启动了spark。
测试
bin/pyspark 程序, 可以提供一个 交互式的 Python解释器环境, 在这里面可以用Python语言调用
Spark API 进行计算
sc.parallelize([1,2,3,4,5]).map(lambda x: x + 1).collect()
BUG
刚开始是使用小白的虚拟机,一直无法连接ssh,搜索以后发现是映射问题,修改网络配置,把里面的ip改成,使用 vi /etc/hosts的VM桥接模式分配的ip
日总结
今天把整个hadoop集群先配置了,然后需要把spark内自带的scala语言切换成python,因为教程是使用anaconda的虚拟环境,所以防止报错就先跟着使用虚拟环境了,后面如果比赛平台没有anaconda,需要使用本地的python了,目前是可以正常使用py运行spark了。
pycharm集成pyspark
学习日期: 5.31
所学内容概述
pycharm集成
先安装Anaconda,然后配置创建pyspark的虚拟环境
创建虚拟环境
1 2 3 4 5 6 7 8 # 创建虚拟环境 pyspark, 基于Python 3.8 conda create -n pyspark python=3.8 # 切换到虚拟环境内 conda activate pyspark # 在虚拟环境内安装包 pip install pyhive pyspark jieba -i https://pypi.tuna.tsinghua.edu.cn/simple
添加编译器
在添加编译器中选择如下图即可
测试
df互相转换也没问题,成功集成,如果要使用hdfs的文件的话只要textfile指定hdfs的路径就行了
BUG
先看报错是JVM的,推测是不是java哪里的问题,先是CSDN搜索,提出的解决方案是加入findspark模块 使用findspark.init(),意思应该是初始化spark,但是我本地都没装spark初始有什么用,不出意料,没有任何用,然后发现自己的hadoop环境配置错版本了,但是我window本地跑,又没有使用hadoop,不出意料,换完以后还是报错,最后看视频后面有个弹幕说,要os加入JAVA_HOME的配置,但是还是没用。
然后在编译器那边的时候看到了一个关联虚拟环境,打上勾以后重启了pycharm就好了。后面我又把勾和os去掉,发现还是正常运行,也不知道问题在哪里,相当于折腾了半天又还原就好了。。。
日总结
今天的任务还是挺重的,基本一天把pyspark看完了,用法其实和scala中的spark是差不多的,而且好处在于可以使用pandas’和pyspark之间的dataframe类型互相转换,而且python底层封装的推荐算法其实比scala中的优化很多,就是使用安装以及配置环境的问题会比较多,在python中小数据使用pandas,大数据使用pyspark。所以还是要看选择。