职能企业保护职业创业
投稿投诉
创业安检
掌握呵护
案例安身
升初专业
职业小学
形象知识
工作提高
科技职级
保护职务
法务经验
安保方法
中考作文
技巧安全
初中魅力
常识安心
保全安护
企业技能
生活安务
法律纠纷
技艺百科
职能能力
技术安监
培训提升
辩护水平

Spark(十六)SparkStreaming需求练习

11月11日 失了心投稿
  一。环境准备1。pom文件dependenciesdependencygroupIdorg。apache。sparkgroupIdsparkcore2。12artifactIdversion3。0。0versiondependencydependencygroupIdorg。apache。sparkgroupIdsparkstreaming2。12artifactIdversion3。0。0versiondependencydependencygroupIdorg。apache。sparkgroupIdsparkstreamingkafka0102。12artifactIdversion3。0。0versiondependency!https:mvnrepository。comartifactcom。alibabadruiddependencygroupIdcom。alibabagroupIddruidartifactIdversion1。1。10versiondependencydependencygroupIdmysqlgroupIdmysqlconnectorjavaartifactIdversion5。1。27versiondependencydependencygroupIdcom。fasterxml。jackson。coregroupIdjacksoncoreartifactIdversion2。10。1versiondependencydependenciesbuildplugins!该插件用于将Scala代码编译成class文件plugingroupIdnet。alchim31。mavengroupIdscalamavenpluginartifactIdversion3。2。2versionexecutionsexecution!声明绑定到maven的compile阶段goalsgoalcompilegoalgoalsexecutionexecutionspluginplugingroupIdorg。apache。maven。pluginsgroupIdmavenassemblypluginartifactIdversion3。0。0versionconfigurationdescriptorRefsdescriptorRefjarwithdependenciesdescriptorRefdescriptorRefsconfigurationexecutionsexecutionidmakeassemblyidphasepackagephasegoalsgoalsinglegoalgoalsexecutionexecutionspluginpluginsbuild2。beanimportjava。text。SimpleDateFormatimportjava。util。Date数据格式:1597148289569,华北,北京,102,4,20200811,11:12caseclassAdsInfo(ts:Long,area:String,city:String,userId:String,adsId:String,vardayString:Stringnull,yyyyMMddvarhmString:Stringnull){hh:mmvaldatenewDate(ts)dayStringnewSimpleDateFormat(yyyyMMdd)。format(date)hmStringnewSimpleDateFormat(HH:mm)。format(date)}3。工具类JDBCUtilsobjectJDBCUtil{创建连接池对象vardataSource:DataSourceinit()连接池的初始化definit():DataSource{valparamMapnewjava。util。HashMap〔String,String〕()paramMap。put(driverClassName,PropertiesUtil。getValue(jdbc。driver。name))paramMap。put(url,PropertiesUtil。getValue(jdbc。url))paramMap。put(username,PropertiesUtil。getValue(jdbc。user))paramMap。put(password,PropertiesUtil。getValue(jdbc。password))paramMap。put(maxActive,PropertiesUtil。getValue(jdbc。datasource。size))使用Druid连接池对象DruidDataSourceFactory。createDataSource(paramMap)}从连接池中获取连接对象defgetConnection():Connection{dataSource。getConnection}defmain(args:Array〔String〕):Unit{println(getConnection())}}Properties工具类project。properties文件jdbc配置jdbc。datasource。size10jdbc。urljdbc:mysql:hadoop102:3306steamingproject?useUnicodetruecharacterEncodingutf8rewriteBatchedStatementstruejdbc。userrootjdbc。passwordrootjdbc。driver。namecom。mysql。jdbc。DriverKafka配置kafka。broker。listhadoop102:9092,hadoop103:9092,hadoop104:9092kafka。topicmytestkafka。group。idcg1importjava。util。ResourceBundleProperties文件工具类objectPropertiesUtil{绑定配置文件ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名国际化I18NPropertiesvalsummer:ResourceBundleResourceBundle。getBundle(project)defgetValue(key:String):String{summer。getString(key)}defmain(args:Array〔String〕):Unit{println(getValue(jdbc。user))}}3。创建BaseAppdescription:基础类author:HaoWucreate:2020年08月11日abstractclassBaseApp{valconf:SparkConfnewSparkConf()。setMaster(local〔〕)。setAppName(myAPP)valssc:StreamingContextnewStreamingContext(conf,Seconds(3))设置消费kafka的参数,可以参考kafka。consumer。ConsumerConfig类中配置说明valkafkaParams:Map〔String,Object〕Map〔String,Object〕(bootstrap。servershadoop102:9092,hadoop103:9092,hadoop104:9092,zookeeper的host,portgroup。idg3,消费者组enable。auto。committrue,是否自动提交auto。commit。interval。ms500,500ms自动提交offsetkey。deserializerorg。apache。kafka。common。serialization。StringDeserializer,value。deserializerorg。apache。kafka。common。serialization。StringDeserializer,auto。offset。resetearliest第一次运行,从最初始偏移量开始消费数据)消费kafka的mytest主题生成DStreamvalds:InputDStream〔ConsumerRecord〔String,String〕〕KafkaUtils。createDirectStream〔String,String〕(ssc,LocationStrategies。PreferConsistent,订阅主题ConsumerStrategies。Subscribe〔String,String〕(List(mytest),kafkaParams))将输入流InputDStream〔ConsumerRecord〔String,String〕〕stream〔对象〕paramdsreturndefgetAllBeans(ds:InputDStream〔ConsumerRecord〔String,String〕〕):DStream〔AdsInfo〕{valresult:DStream〔AdsInfo〕ds。map(record{valarr:Array〔String〕record。value()。split(,)AdsInfo(arr(0)。toLong,arr(1),arr(2),arr(3),arr(4))})result}处理逻辑paramoptdefrunApp(opt:Unit):Unit{try{处理逻辑opt执行程序ssc。start()ssc。awaitTermination()}catch{casee:Exceptione。getMessage}}}需求一:动态添加黑名单
  实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑。
  注:黑名单保存到MySQL中。
  思路分析
  1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
  2)校验通过则对给用户点击广告次数累加一并存入MySQL;
  3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
  准备工作1)存放黑名单用户的表CREATETABLEblacklist(useridCHAR(2)PRIMARYKEY);2)存放单日各用户点击每个广告的次数CREATETABLEuseradcount(dtdate,useridCHAR(2),adidCHAR(2),countBIGINT,PRIMARYKEY(dt,userid,adid));description:需求一:动态添加黑名单说明:实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑(用户,广告id,时间,次数)注:黑名单保存到MySQL中author:HaoWucreate:2020年08月12日objectProjectDemo1extendsBaseApp{defmain(args:Array〔String〕):Unit{runApp{valasdInfo:DStream〔AdsInfo〕getAllBeans(ds)校验数据是否在黑名单中defisBlackList(userid:String,connection:Connection):Boolean{varflag:Booleantruevalsqlselectfromblacklistwhereuserid?。stripMarginvalps:PreparedStatementconnection。prepareStatement(sql)ps。setString(1,userid)valresult:ResultSetps。executeQuery()if(result!null){flagfalse}flag}1。聚合当前批次数据((timestamp,userid,adsid),count)valcountDS:DStream〔((String,String,String),Long)〕asdInfo。map{((20200811,102,1),1)caseadsInfo:AdsInfo((adsInfo。dayString,adsInfo。userId,adsInfo。adsId),1L)}。reduceByKey()countDS。foreachRDD(rddrdd。foreachPartition{iter{2。向mysql插入数据,准备插入sql和连接valconnection:ConnectionJDBCUtil。getConnection()valsqlinsertintouseradcountvalues(?,?,?,?)ONDUPLICATEKEYUPDATECOUNTcount?。stripMarginvalps:PreparedStatementconnection。prepareStatement(sql)2。过滤出在名单中的数据iter。filter{case((,userid,),)valfalgisBlackList(userid,connection);falg}往mysql重插入更新数据。foreach{case((date,userid,adsid),count){ps。setString(1,date)ps。setString(2,userid)ps。setString(3,adsid)ps。setLong(4,count)ps。setLong(5,count)ps。executeUpdate()}}关闭ps。close()3。插入成功之后,查询对应得userid点击广告此时是否100?valsql2selectuseridfromuseradcountwherecount20。stripMarginvalps2:PreparedStatementconnection。prepareStatement(sql2)valresultSet:ResultSetps2。executeQuery()封装查询出的黑名单列表valblocklistnewmutable。HashSet〔String〕()while(resultSet。next()){valuserid:StringresultSet。getString(userid)blocklistuserid}关闭resulteSet,PreparedStatementresultSet。close()ps2。close()4。将blocklist数据依次插入黑名单表,没有就插入,有就更新valsql3:StringINSERTINTOblacklistVALUES(?)ONDUPLICATEKEYUPDATEuserid?。stripMarginvalps3:PreparedStatementconnection。prepareStatement(sql3)for(useridblocklist){ps3。setString(1,userid)ps3。setString(2,userid)ps3。executeUpdate()}ps3。close()connection。close()}})}}}需求二:广告点击量实时统计
  描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL
  步骤:updateStateByKey有状态累加计算向mysql执行插入更新操作
  Mysql表CREATETABLEareacityadcount(dtdate,areaCHAR(4),cityCHAR(4),adidCHAR(2),countBIGINT,PRIMARYKEY(dt,area,city,adid)联合主键);
  代码实现importjava。sql。{Connection,PreparedStatement}importcom。spark。streamingneed。bean。AdsInfoimportcom。spark。streamingneed。utils。JDBCUtilimportorg。apache。spark。streaming。dstream。DStreamdescription:需求二:广告点击量实时统计描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQLauthor:HaoWucreate:2020年08月11日objectProjectDemo2extendsBaseApp{defmain(args:Array〔String〕):Unit{runApp{updateStateByKey算子有状态,需要checkpointssc。checkpoint(function2)1。单个批次内对数据进行按照天维度的聚合统计数据格式:1597148289569,华北,北京,102,4valDsAds:DStream〔AdsInfo〕getAllBeans(ds)valkvDS:DStream〔((String,String,String,String),Int)〕DsAds。map{case(adsInfo){((adsInfo。dayString,adsInfo。area,adsInfo。city,adsInfo。adsId),1)}}2。结合MySQL数据跟当前批次数据更新原有的数据计算当前批次和之前的数据累加结果valresult:DStream〔((String,String,String,String),Int)〕kvDS。updateStateByKey{case(seq,opt){varsum:Intseq。sumvalvalueopt。getOrElse(0)sumvalueSome(sum)}}3。将结果写入Mysqlresult。foreachRDD(rdd{rdd。foreachPartition{iter{每个分区创建一个Connection连接valconnection:ConnectionJDBCUtil。getConnection()准备sql,实现mysql的upsert操作valsqlinsertintoareacityadcountvalues(?,?,?,?,?)onduplicatekeyupdatecount?。stripMarginPreparedStatementvalps:PreparedStatementconnection。prepareStatement(sql)RDD分区中的每个数据都执行写出iter。foreach{case((dayString,area,city,adsId),count){填充占位符ps。setString(1,dayString)ps。setString(2,area)ps。setString(3,city)ps。setString(4,adsId)ps。setInt(5,count)ps。setInt(6,count)执行写入ps。executeUpdate()}}关闭资源ps。close()connection。close()}}})}}}需求三:最近一小时广告点击量需求说明
  求最近1h的广告点击量,要求按照以下结果显示结果展示:1:List〔15:5010,15:5125,15:5230〕2:List〔15:5010,15:5125,15:5230〕3:List〔15:5010,15:5125,15:5230〕
  思路分析
  1)开窗确定时间范围;
  2)在窗口内将数据转换数据结构为((adid,hm),count);
  3)按照广告id进行分组处理,组内按照时分排序。
  代码实现importorg。apache。spark。streaming。{Minutes,Seconds}importorg。apache。spark。streaming。dstream。DStreamdescription:需求三:最近一小时广告点击量,3秒更新一次author:结果展示:1:List〔15:5010,15:5125,15:5230〕2:List〔15:5010,15:5125,15:5230〕3:List〔15:5010,15:5125,15:5230〕create:2020年08月12日objectProjectDemo3extendsBaseApp{defmain(args:Array〔String〕):Unit{运行apprunApp{valAdsDStream:DStream〔((String,String),Int)〕getAllBeans(ds)。map{caseadsInfo((adsInfo。adsId,adsInfo。hmString),1)}valresult:DStream〔(String,List〔(String,Int)〕)〕AdsDStream窗口内聚合。reduceByKeyAndWindow((a:Int,b:Int){ab},Minutes(60),Seconds(3))。map{case((adsId,ahmString),count)(adsId,(ahmString,count))}按照广告id分组。groupByKey()组内按时间升序。mapValues{caseiteriter。toList。sortBy(。1)}result。print(10)}}}
  结果Time:1597234032000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,13)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,6)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,22)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,22)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,10)))Time:1597234035000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,20)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,13)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,26)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,26)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,15)))Time:1597234038000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,23)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,16)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,34)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,30)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,20)))
投诉 评论 转载

临近50岁或60岁,这几个征兆的出现,往往暗示了余生的大苦图源自网络侵权请联系删除中年时期,是人生的一道分水岭,也是晚年是否过得幸福的关键。这时候的人生选择极为重要,一步走错了,就会导致步步错,且再也没有回头的机会和挽回的可能。……杜海涛沈梦辰又出事了?网友这回真的很难结婚了1最近,我被沈梦辰的vlog视频圈粉了。从《年夜饭》到《芙蓉镇》再到昨天的《湘西铁板烧》,不知不觉,她的vlog《今天做沈么》已经出到了第三集。简单的日常vl……武磊终于有伴了!中国王牌新星被五大联赛球队签走,已正式官宣日前,德甲豪门拜仁官宣中国18岁门将刘绍子洋加盟球队。拜仁为刘绍子洋提供的合同到2025年到期,并且为他的老东家武汉三镇支付了将近百万欧元的转会费。从上述细节来看,拜仁引进刘绍……38年前,没人敢拍的题材,却被徐克拍了出来,至今仍在被模仿今年反复的疫情,使得我们走进电影院成为一件极其奢侈的事情。也间接催生网大,成为资本角力的主战场。可惜,即便凝聚了天时、地利、人和,网大宛如地主家的傻儿子一样,烂泥扶……应届生年薪近30万,光伏行业存在大量人才缺口记者武冰聪编辑随着新能源发展,光伏企业成为吸纳人才的福地。光伏行业因为薪资待遇高、发展前景好、人才缺口大,跃升秋招热门行列。入职光伏公司,朝阳产业前景好,应届生年薪……iQOONeo7,比降价后的小米还香?不得不说,蓝厂对联发科确实是真爱。其他家用天玑旗舰,大多是玩精准刀法战略。甚至还有像小米12Pro天玑版这样,发售3个月,就降1000的。感觉就不够上心好伐。……Spark(十六)SparkStreaming需求练习一。环境准备1。pom文件dependenciesdependencygroupIdorg。apache。sparkgroupIdsparkcore2。12artifactId……中国斯诺克两人晋级16强!丁俊晖赵心童爆冷出局,颜丙涛34惜中国斯诺克两人晋级16强!丁俊晖赵心童爆冷出局,颜丙涛34惜败。2022年斯诺克公开赛第三个比赛日,116决赛,中国军团遭遇滑铁卢,丁俊晖,赵心童,徐思,颜丙涛相继出局,其中丁……旅行的意义2022年9月30日星期五晴马上就是万众瞩目的十月一日了,是全球华人期待的节日。九月最后一天的结束,将开启金秋十月的美好,愿我们的祖国越来越繁荣昌盛,愿生我养我的这座美丽……斑秃小知识前天在路上偶遇一枕部小片状斑秃的年轻男性,笔者不禁回忆起在皮肤科学习时见过的多例斑秃患者。他们大部分是年轻人,男女都有,最严重的莫过于一位二十七八岁的女性,从颞部到枕部多……金钟奖红毯小S礼服夸张,曾宝仪腰身粗壮,林志玲老公好油腻台湾电视金钟奖是台湾的三大年度奖项之一,始于1965年,迄今已经举办了57年,颇受台湾电视人的重视。在前几年的金钟奖上,还会出现舒淇、张震、林依晨、林心如、贾静雯、赵又廷……游上海南京路和外滩今天,上海的天气很好,阳光明媚,秋高气爽。下午,去上海南京路和外滩游玩。实话实说,后疫情时代的上海,已经今非昔比了,完全没有了过去的辉煌与繁荣景象。到现在,上海解封已经三个多月……
养小龙虾怎么养殖油菜直播拌种技术要点严歌苓陆犯焉识读后感二篇论互联网时代的企业招聘策略来点儿新闻02。06范斯猫成为切尔西大中华区官方合作伙伴适合岁男生的发型清爽帅气展现青春活力甘园妹,全国冠军!咱贺州的骄傲互联网产品需求管理思考洞察市场今日头条月入赚钱策略送你一套操作指南反社会人格幽灵亟待社会心理干预机制菠萝种苗选择及采收闵行滨江公园建成了,无围墙,全天候开放

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找常识日常社交礼仪安全防范适应宝库新闻军事国内国际财经股票基金外汇科技手机众测体育娱乐时尚女性育儿