groupBy:按照某个字段进行分组def main(args: Array[String]): Unit = {/*** groupBy:按照某个字段进行分组*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo10groupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 统计班级人数stuRDD.groupBy(s => s.split(",")(4)).map(kv => s"${kv._1},${kv._2.size}").foreach(println)}
groupByKey:转换算子,需要作用在KV格式的RDD上 def main(args: Array[String]): Unit = {/*** groupByKey:转换算子,需要作用在KV格式的RDD上*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用groupByKey统计班级人数// 将学生数据变成KV格式的RDD,以班级作为Key,1作为Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()grpRDD.map(kv => s"${kv._1},${kv._2.size}").foreach(println)}
reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合def main(args: Array[String]): Unit = {/*** reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合* 需要接受一个函数f* 函数f:两个参数,参数的类型同RDD的Value的类型一致,最终需要返回同RDD的Value的类型一致值* 实际上函数f可以看成一个聚合函数* 常见的聚合函数(操作):max、min、sum、count、avg* reduceByKey可以实现Map端的预聚合,类似MR中的Combiner* 并不是所有的操作都能使用预聚合,例如avg就无法实现*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用reduceByKey统计班级人数// 将学生数据变成KV格式的RDD,以班级作为Key,1作为Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))clazzKVRDD.reduceByKey((i1: Int, i2: Int) => i1 + i2).foreach(println)// 简写形式clazzKVRDD.reduceByKey((i1, i2) => i1 + i2).foreach(println)clazzKVRDD.reduceByKey(_ + _).foreach(println)}
aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合def main(args: Array[String]): Unit = {/*** aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合* 可以弥补reduceByKey无法实现avg操作**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo13aggregateByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val ageKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), s.split(",")(2).toInt))val clazzCntKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))// 统计每个班级年龄之和val ageSumRDD: RDD[(String, Int)] = ageKVRDD.reduceByKey(_ + _)// 统计每个班级人数val clazzCntRDD: RDD[(String, Int)] = clazzCntKVRDD.reduceByKey(_ + _)// 统计每个班级的平均年龄ageSumRDD.join(clazzCntRDD).map {case (clazz: String, (ageSum: Int, cnt: Int)) =>(clazz, ageSum.toDouble / cnt)}.foreach(println)/*** zeroValue:初始化的值,类型自定义,可以是数据容器* seqOp:在组内(每个分区内部即每个Map任务)进行的操作,相当是Map端的预聚合操作* combOp:在组之间(每个Reduce任务之间)进行的操作,相当于就是最终每个Reduce的操作*/// 使用aggregateByKey统计班级年龄之和ageKVRDD.aggregateByKey(0)((age1: Int, age2: Int) => {age1 + age2 // 预聚合}, (map1AgeSum: Int, map2AgeSum: Int) => {map1AgeSum + map2AgeSum // 聚合}).foreach(println)// 使用aggregateByKey统计班级人数clazzCntKVRDD.aggregateByKey(0)((c1: Int, c2: Int) => {c1 + 1 // 预聚合}, (map1Cnt: Int, map2Cnt: Int) => {map1Cnt + map2Cnt // 聚合}).foreach(println)// 使用aggregateByKey统计班级的平均年龄ageKVRDD.aggregateByKey((0, 0))((t2: (Int, Int), age: Int) => {val mapAgeSum: Int = t2._1 + ageval mapCnt: Int = t2._2 + 1(mapAgeSum, mapCnt)}, (map1U: (Int, Int), map2U: (Int, Int)) => {val ageSum: Int = map1U._1 + map2U._1val cnt: Int = map1U._2 + map2U._2(ageSum, cnt)}).map {case (clazz: String, (sumAge: Int, cnt: Int)) =>(clazz, sumAge.toDouble / cnt)}.foreach(println)}
经验总结扩展阅读
- Redis系列8:Bitmap实现亿万级数据计算
- 数据科学学习手札146 geopandas中拓扑非法问题的发现、诊断与修复
- 小样本利器4. 正则化+数据增强 Mixup Family代码实现
- python3使用libpcap库进行抓包及数据处理
- 分布式ID生成方案总结整理
- Python数据分析:实用向
- .NET API 接口数据传输加密最佳实践
- SQL分层查询
- data删除了没事吧 data数据能删除吗
- 华为手机怎么连接电脑方法(华为usb数据线接电脑)