娱乐科技财经新闻安全常识
投稿投诉
常识日常
社交礼仪
安全防范
适应宝库
新闻军事
国内国际
财经股票
基金外汇
科技手机
众测体育
娱乐时尚
女性育儿

Spark(十六)SparkStreaming需求练习 - 文好找(wenhz.com)

  一。环境准备1。pom文件dependenciesdependencygroupIdorg。apache。sparkgroupIdsparkcore2。12artifactIdversion3。0。0versiondependencydependencygroupIdorg。apache。sparkgroupIdsparkstreaming2。12artifactIdversion3。0。0versiondependencydependencygroupIdorg。apache。sparkgroupIdsparkstreamingkafka0102。12artifactIdversion3。0。0versiondependency!https:mvnrepository。comartifactcom。alibabadruiddependencygroupIdcom。alibabagroupIddruidartifactIdversion1。1。10versiondependencydependencygroupIdmysqlgroupIdmysqlconnectorjavaartifactIdversion5。1。27versiondependencydependencygroupIdcom。fasterxml。jackson。coregroupIdjacksoncoreartifactIdversion2。10。1versiondependencydependenciesbuildplugins!该插件用于将Scala代码编译成class文件plugingroupIdnet。alchim31。mavengroupIdscalamavenpluginartifactIdversion3。2。2versionexecutionsexecution!声明绑定到maven的compile阶段goalsgoalcompilegoalgoalsexecutionexecutionspluginplugingroupIdorg。apache。maven。pluginsgroupIdmavenassemblypluginartifactIdversion3。0。0versionconfigurationdescriptorRefsdescriptorRefjarwithdependenciesdescriptorRefdescriptorRefsconfigurationexecutionsexecutionidmakeassemblyidphasepackagephasegoalsgoalsinglegoalgoalsexecutionexecutionspluginpluginsbuild2。beanimportjava。text。SimpleDateFormatimportjava。util。Date数据格式:1597148289569,华北,北京,102,4,20200811,11:12caseclassAdsInfo(ts:Long,area:String,city:String,userId:String,adsId:String,vardayString:Stringnull,yyyyMMddvarhmString:Stringnull){hh:mmvaldatenewDate(ts)dayStringnewSimpleDateFormat(yyyyMMdd)。format(date)hmStringnewSimpleDateFormat(HH:mm)。format(date)}3。工具类JDBCUtilsobjectJDBCUtil{创建连接池对象vardataSource:DataSourceinit()连接池的初始化definit():DataSource{valparamMapnewjava。util。HashMap〔String,String〕()paramMap。put(driverClassName,PropertiesUtil。getValue(jdbc。driver。name))paramMap。put(url,PropertiesUtil。getValue(jdbc。url))paramMap。put(username,PropertiesUtil。getValue(jdbc。user))paramMap。put(password,PropertiesUtil。getValue(jdbc。password))paramMap。put(maxActive,PropertiesUtil。getValue(jdbc。datasource。size))使用Druid连接池对象DruidDataSourceFactory。createDataSource(paramMap)}从连接池中获取连接对象defgetConnection():Connection{dataSource。getConnection}defmain(args:Array〔String〕):Unit{println(getConnection())}}Properties工具类project。properties文件jdbc配置jdbc。datasource。size10jdbc。urljdbc:mysql:hadoop102:3306steamingproject?useUnicodetruecharacterEncodingutf8rewriteBatchedStatementstruejdbc。userrootjdbc。passwordrootjdbc。driver。namecom。mysql。jdbc。DriverKafka配置kafka。broker。listhadoop102:9092,hadoop103:9092,hadoop104:9092kafka。topicmytestkafka。group。idcg1importjava。util。ResourceBundleProperties文件工具类objectPropertiesUtil{绑定配置文件ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名国际化I18NPropertiesvalsummer:ResourceBundleResourceBundle。getBundle(project)defgetValue(key:String):String{summer。getString(key)}defmain(args:Array〔String〕):Unit{println(getValue(jdbc。user))}}3。创建BaseAppdescription:基础类author:HaoWucreate:2020年08月11日abstractclassBaseApp{valconf:SparkConfnewSparkConf()。setMaster(local〔〕)。setAppName(myAPP)valssc:StreamingContextnewStreamingContext(conf,Seconds(3))设置消费kafka的参数,可以参考kafka。consumer。ConsumerConfig类中配置说明valkafkaParams:Map〔String,Object〕Map〔String,Object〕(bootstrap。servershadoop102:9092,hadoop103:9092,hadoop104:9092,zookeeper的host,portgroup。idg3,消费者组enable。auto。committrue,是否自动提交auto。commit。interval。ms500,500ms自动提交offsetkey。deserializerorg。apache。kafka。common。serialization。StringDeserializer,value。deserializerorg。apache。kafka。common。serialization。StringDeserializer,auto。offset。resetearliest第一次运行,从最初始偏移量开始消费数据)消费kafka的mytest主题生成DStreamvalds:InputDStream〔ConsumerRecord〔String,String〕〕KafkaUtils。createDirectStream〔String,String〕(ssc,LocationStrategies。PreferConsistent,订阅主题ConsumerStrategies。Subscribe〔String,String〕(List(mytest),kafkaParams))将输入流InputDStream〔ConsumerRecord〔String,String〕〕stream〔对象〕paramdsreturndefgetAllBeans(ds:InputDStream〔ConsumerRecord〔String,String〕〕):DStream〔AdsInfo〕{valresult:DStream〔AdsInfo〕ds。map(record{valarr:Array〔String〕record。value()。split(,)AdsInfo(arr(0)。toLong,arr(1),arr(2),arr(3),arr(4))})result}处理逻辑paramoptdefrunApp(opt:Unit):Unit{try{处理逻辑opt执行程序ssc。start()ssc。awaitTermination()}catch{casee:Exceptione。getMessage}}}需求一:动态添加黑名单
  实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑。
  注:黑名单保存到MySQL中。
  思路分析
  1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
  2)校验通过则对给用户点击广告次数累加一并存入MySQL;
  3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
  准备工作1)存放黑名单用户的表CREATETABLEblacklist(useridCHAR(2)PRIMARYKEY);2)存放单日各用户点击每个广告的次数CREATETABLEuseradcount(dtdate,useridCHAR(2),adidCHAR(2),countBIGINT,PRIMARYKEY(dt,userid,adid));description:需求一:动态添加黑名单说明:实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑(用户,广告id,时间,次数)注:黑名单保存到MySQL中author:HaoWucreate:2020年08月12日objectProjectDemo1extendsBaseApp{defmain(args:Array〔String〕):Unit{runApp{valasdInfo:DStream〔AdsInfo〕getAllBeans(ds)校验数据是否在黑名单中defisBlackList(userid:String,connection:Connection):Boolean{varflag:Booleantruevalsqlselectfromblacklistwhereuserid?。stripMarginvalps:PreparedStatementconnection。prepareStatement(sql)ps。setString(1,userid)valresult:ResultSetps。executeQuery()if(result!null){flagfalse}flag}1。聚合当前批次数据((timestamp,userid,adsid),count)valcountDS:DStream〔((String,String,String),Long)〕asdInfo。map{((20200811,102,1),1)caseadsInfo:AdsInfo((adsInfo。dayString,adsInfo。userId,adsInfo。adsId),1L)}。reduceByKey()countDS。foreachRDD(rddrdd。foreachPartition{iter{2。向mysql插入数据,准备插入sql和连接valconnection:ConnectionJDBCUtil。getConnection()valsqlinsertintouseradcountvalues(?,?,?,?)ONDUPLICATEKEYUPDATECOUNTcount?。stripMarginvalps:PreparedStatementconnection。prepareStatement(sql)2。过滤出在名单中的数据iter。filter{case((,userid,),)valfalgisBlackList(userid,connection);falg}往mysql重插入更新数据。foreach{case((date,userid,adsid),count){ps。setString(1,date)ps。setString(2,userid)ps。setString(3,adsid)ps。setLong(4,count)ps。setLong(5,count)ps。executeUpdate()}}关闭ps。close()3。插入成功之后,查询对应得userid点击广告此时是否100?valsql2selectuseridfromuseradcountwherecount20。stripMarginvalps2:PreparedStatementconnection。prepareStatement(sql2)valresultSet:ResultSetps2。executeQuery()封装查询出的黑名单列表valblocklistnewmutable。HashSet〔String〕()while(resultSet。next()){valuserid:StringresultSet。getString(userid)blocklistuserid}关闭resulteSet,PreparedStatementresultSet。close()ps2。close()4。将blocklist数据依次插入黑名单表,没有就插入,有就更新valsql3:StringINSERTINTOblacklistVALUES(?)ONDUPLICATEKEYUPDATEuserid?。stripMarginvalps3:PreparedStatementconnection。prepareStatement(sql3)for(useridblocklist){ps3。setString(1,userid)ps3。setString(2,userid)ps3。executeUpdate()}ps3。close()connection。close()}})}}}需求二:广告点击量实时统计
  描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL
  步骤:updateStateByKey有状态累加计算向mysql执行插入更新操作
  Mysql表CREATETABLEareacityadcount(dtdate,areaCHAR(4),cityCHAR(4),adidCHAR(2),countBIGINT,PRIMARYKEY(dt,area,city,adid)联合主键);
  代码实现importjava。sql。{Connection,PreparedStatement}importcom。spark。streamingneed。bean。AdsInfoimportcom。spark。streamingneed。utils。JDBCUtilimportorg。apache。spark。streaming。dstream。DStreamdescription:需求二:广告点击量实时统计描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQLauthor:HaoWucreate:2020年08月11日objectProjectDemo2extendsBaseApp{defmain(args:Array〔String〕):Unit{runApp{updateStateByKey算子有状态,需要checkpointssc。checkpoint(function2)1。单个批次内对数据进行按照天维度的聚合统计数据格式:1597148289569,华北,北京,102,4valDsAds:DStream〔AdsInfo〕getAllBeans(ds)valkvDS:DStream〔((String,String,String,String),Int)〕DsAds。map{case(adsInfo){((adsInfo。dayString,adsInfo。area,adsInfo。city,adsInfo。adsId),1)}}2。结合MySQL数据跟当前批次数据更新原有的数据计算当前批次和之前的数据累加结果valresult:DStream〔((String,String,String,String),Int)〕kvDS。updateStateByKey{case(seq,opt){varsum:Intseq。sumvalvalueopt。getOrElse(0)sumvalueSome(sum)}}3。将结果写入Mysqlresult。foreachRDD(rdd{rdd。foreachPartition{iter{每个分区创建一个Connection连接valconnection:ConnectionJDBCUtil。getConnection()准备sql,实现mysql的upsert操作valsqlinsertintoareacityadcountvalues(?,?,?,?,?)onduplicatekeyupdatecount?。stripMarginPreparedStatementvalps:PreparedStatementconnection。prepareStatement(sql)RDD分区中的每个数据都执行写出iter。foreach{case((dayString,area,city,adsId),count){填充占位符ps。setString(1,dayString)ps。setString(2,area)ps。setString(3,city)ps。setString(4,adsId)ps。setInt(5,count)ps。setInt(6,count)执行写入ps。executeUpdate()}}关闭资源ps。close()connection。close()}}})}}}需求三:最近一小时广告点击量需求说明
  求最近1h的广告点击量,要求按照以下结果显示结果展示:1:List〔15:5010,15:5125,15:5230〕2:List〔15:5010,15:5125,15:5230〕3:List〔15:5010,15:5125,15:5230〕
  思路分析
  1)开窗确定时间范围;
  2)在窗口内将数据转换数据结构为((adid,hm),count);
  3)按照广告id进行分组处理,组内按照时分排序。
  代码实现importorg。apache。spark。streaming。{Minutes,Seconds}importorg。apache。spark。streaming。dstream。DStreamdescription:需求三:最近一小时广告点击量,3秒更新一次author:结果展示:1:List〔15:5010,15:5125,15:5230〕2:List〔15:5010,15:5125,15:5230〕3:List〔15:5010,15:5125,15:5230〕create:2020年08月12日objectProjectDemo3extendsBaseApp{defmain(args:Array〔String〕):Unit{运行apprunApp{valAdsDStream:DStream〔((String,String),Int)〕getAllBeans(ds)。map{caseadsInfo((adsInfo。adsId,adsInfo。hmString),1)}valresult:DStream〔(String,List〔(String,Int)〕)〕AdsDStream窗口内聚合。reduceByKeyAndWindow((a:Int,b:Int){ab},Minutes(60),Seconds(3))。map{case((adsId,ahmString),count)(adsId,(ahmString,count))}按照广告id分组。groupByKey()组内按时间升序。mapValues{caseiteriter。toList。sortBy(。1)}result。print(10)}}}
  结果Time:1597234032000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,13)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,6)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,22)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,22)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,10)))Time:1597234035000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,20)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,13)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,26)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,26)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,15)))Time:1597234038000ms(1,List((20:01,12),(20:02,112),(20:03,98),(20:04,95),(20:05,104),(20:06,96),(20:07,23)))(2,List((20:01,24),(20:02,97),(20:03,99),(20:04,103),(20:05,95),(20:06,105),(20:07,16)))(3,List((20:01,30),(20:02,87),(20:03,92),(20:04,108),(20:05,117),(20:06,88),(20:07,34)))(4,List((20:01,15),(20:02,101),(20:03,100),(20:04,99),(20:05,84),(20:06,112),(20:07,30)))(5,List((20:01,19),(20:02,103),(20:03,111),(20:04,95),(20:05,100),(20:06,99),(20:07,20)))

索尼开始发力旗舰电视X95J都调低4530元了!还要等吗?电视已经是普及率最高的家用电器,虽然我国已经是电视生产大国,也涌现出了很多优秀的电视品牌,如海信、TCL、创维等等。但是在很多人看来,买电视还得看外国品牌,原因是在他们看来,同……谁给他的勇气?杜兰特表白谷爱凌,看她高情商化解如今中美体育界最知名的体育运动员是谁呢?相信杜兰特绝对算得上一个,而谷爱凌绝对是国内目前最火的运动员,两人不论是名气还是收入都是不相上下,本来是两个毫不相干的运动员最近却意外的……迪拜杯北京时间3月29日,迪拜杯邀请赛排位赛继续进行,中国U23再战阿联酋U23!前两场比赛,中国国奥取得1胜1负(0:1阿联酋、4:2泰国),阿联酋国奥1胜1平(1:0中国、0:0……23分大逆转!湖人不敌鹈鹕,英格拉姆26分,詹姆斯里程碑险受北京时间3月28日,NBA常规赛结束了一场争夺附加赛的焦点战,湖人被鹈鹕逆转,以108116输掉比赛。这场比赛打得大开大合,湖人120开局,上半场一度领先23分,第三节鹈鹕单节……紧跟年度流行色!iPhone13Pro紫色版3月11日发布世界权威色彩研究机构潘通正式公布了2022年度流行色VeryPeri长春花蓝。它在色调上既包括了蓝色,同时混合了紫红色的基调,展现出一种活泼、欢快的态度和充满活力的存在感。……苹果iPhone备忘录,你真的会用吗?最全使用手册大公开!赶今天花时间整理出苹果备忘录功能!苹果设备至少可以随意切换,非常强大!这是一款让我们低估了的软件,尤其是时时文本扫描功能,爱了爱了,太好用了!在经历了实体笔记本……甘肃一奇葩景区,本没有什么亮点,却因传说导致市民排队游玩近几年来,随着我国旅游业的飞速发展,很多地区为了能吸引来更多的游客,也是煞费苦心,有些是通过各种平台渠道打广告,有些是通过营造一些负面新闻来吸引眼球,更有甚者是通过自己奇特的传……北京一定要去的景区,青山绿水层峦叠嶂,有京城绿肺之称北京一定要去的景区,青山绿水、层峦叠嶂,有京城绿肺之称当我们谈论起北京的时候不知道大家脑海里首先蹦出来的究竟是什么样的景色,是气势恢宏的北京故宫还是拥有着特别意义的鸟巢风……北京城的山桃花开了!此时至周末去赏刚刚好!这一周,北京城的山桃花开了。从车窗向外看,一片开得正盛的桃花闯入了视线,粉粉的,嫩嫩的。返程时再看另一侧,粉色的桃花已经连绵不断,令人移不开眼。柳枝也完全变成了嫩绿……增强江苏产业绿色低碳竞争力绿色低碳是新一轮技术革命和产业转型的鲜明特征。2022年省政府工作报告提出,大力培育绿色低碳产业,加强绿色低碳技术攻关和应用示范,加快建设国家绿色产业示范基地,提高产业发展的含……姚明和邓肯交手23次,姚明场均178,命中率成唯一遮羞布姚明在2002年以第一位外籍状元的身份进入NBA,2011年30岁的他因为伤病原因退役,期间他效力的火箭队成为中国球迷最关注的NBA球队。当时邓肯率领的马刺队虽然没有和火……国内禁止外国人进入的7个景点,超霸气中国的美美在一砖一瓦皆灵气美在一丘一壑尽风流让无数中外游客心驰神往但国内有这么几个地方只对中国人开放,谢绝外国人进入这些中国人会员制景点……
朵字取名的寓意女孩朵字取名精选这次的女宝宝起名专题,吾爱诗经网准备跟各位准爸爸、准妈妈们重点讲讲该如何用朵字取名。如果正好有此打算,或者想了解的朋友千万别错过下文了。姓名是要跟随我们每个人一辈子的东西,父母……最强配最强Alienware发布RTX4090桌面新机型相信大家都知道,目前Nvidia推出了最新一代显示卡,就是RTX40系列,当中最强大的RTX4090将会在12日正式推出,而现在就有最新的消息,就是著名电竞品牌DellAlie……吴金贵河南队实力非常强劲大连的低温对两队将是严峻考验直播吧10月3日讯明天晚上,中超联赛第21轮将举行上海申花与河南嵩山龙门队的比赛。今天下午,申花主帅吴金贵和队员吴曦出席了新闻发布会。谈到球队的备战情况,吴金贵表示:明天……2022年国庆晚会播出受好评,节目时长短且紧凑,没有流量明星每逢佳节,看晚会都是大家最期待的事情之一,特别是国庆晚会,更是每年都会在播出之后引发一波讨论。今年的国庆晚会也在如期举办,无论是正式播出的晚会,还是以特别节目的形式讲述中……构建高质高效的政务信息工作保障体系高质高效的政务信息工作保障体系是企业适应社会环境、增强竞争能力的客观需求;是快速反映工作情况、及时跟踪工作动态的有力保障;是下情上达、上情下传,及时沟通上下的有效手段;是为各级……让客户说真话成交更容易你有没有发现,业绩好的同事在任何时候、任何场合和任何人都能产生瞬间的亲和感,客户愿意和他说话,而且愿意说真话。他们无论怎样都不会让别人产生距离感,藉着这亲和力谈了一单又一单。你……珍珠母提取物对皮肤的作用透明质酸钠对皮肤的作用应该很多人对于珍珠母提取物并不是那么了解,其实这种提取物主要是有平肝潜阳以及定睛明目的作用,主要用于头晕目眩以及胸闷的情况。那么大家知道这种珍珠母题食物对皮肤有什么作用和副作用……全球汽车零部件哪家强?宁德时代未进前20强,米其林只排在第92022全球汽车供应链核心企业竞争力白皮书在前不久发布,机构对全球汽车零部件企业相关数据进行收集、整理,形成了以企业营收为依据的数据名单。今年零部件收入的入围门槛为197。91……男人冬季吃什么补肾壮阳男人肾虚,容易疲惫。肾乃身体之本,尤其对于男人来说非常重要,除了经常健身运动、使用补品之外,饮食也很重要,冬天补肾,可以让你不怕冷,让男性朋友身体更健康,更自信。那么男季冬天吃……大衣哥儿子二婚后变精神小伙!媳妇也是貌美如花,大衣哥很满意引读:大衣哥儿子二婚后变精神小伙!媳妇也是貌美如花,大衣哥很满意大家都知道,朱之文的儿子叫做朱小伟,朱小伟曾经有一个老婆叫做陈亚楠,她是一个护士。很多人都觉得当时的朱小伟……糖豆人部分联动皮肤将限时返场持续至2022年1月3日《糖豆人:终极淘汰赛》游戏一直很受玩家们喜爱,联动内容也在不断更新,有部分玩家表示错过联动活动没有领取到活动皮肤。近日《糖豆人:终极淘汰赛》官方宣布将举行合作联动皮肤限时返场活……论刑事诉讼律师实务我国刑事诉讼是指国家专门机关在当事人以及其他诉讼参与人的参加之下,依照法律规定的程序,追诉犯罪,解决被追诉人刑事责任的活动。刑事诉讼是法律体系中不可缺少的组成部分,作为律师应当……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网