弹性分布式数据集 RDD及常用算子( 二 )

filter:转换算子def main(args: Array[String]): Unit = {/*** filter:转换算子* 用于过滤数据,需要接受一个函数f* 函数f:参数只有一个,类型为RDD中每一条数据的类型 => 返回值类型必须为Boolean* 最终会基于函数f返回的Boolean值进行过滤,得到一个新的RDD* 如果函数f返回的Boolean为true则保留数据* 如果函数f返回的Boolean为false则过滤数据*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo05filter")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)println(seqRDD.getNumPartitions) // getNumPartitions并不是算子,它只是RDD的一个属性//seqRDD.foreach(println)// 将奇数过滤出来seqRDD.filter(i => i % 2 == 1).foreach(println)// 将偶数过滤出来seqRDD.filter(i => i % 2 == 0).foreach(println)}sample:转换算子def main(args: Array[String]): Unit = {/*** sample:转换算子* 用于对数据进行取样* 总共有三个参数:* withReplacement:有无放回* fraction:抽样的比例(这个比例并不是精确的,因为抽样是随机的)* seed:随机数种子*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo06sample")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")stuRDD.sample(withReplacement = false, 0.1).foreach(println)// 如果想让每次抽样的数据都一样,则可以将seed进行固定stuRDD.sample(withReplacement = false, 0.01, 10).foreach(println)}mapValues:转换算子def main(args: Array[String]): Unit = {/*** mapValues:转换算子* 同map类似,只不过mapValues需要对KV格式的RDD的Value进行遍历处理*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo07mapValues")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val kvRDD: RDD[(String, Int)] = sc.parallelize(List("k1" -> 1, "k2" -> 2, "k3" -> 3))// 对每个Key对应的Value进行平方kvRDD.mapValues(i => i * i).foreach(println)// 使用map方法实现kvRDD.map(kv => (kv._1, kv._2 * kv._2)).foreach(println)}join:转换算子def main(args: Array[String]): Unit = {/*** join:转换算子* 需要作用在两个KV格式的RDD上,会将相同的Key的数据关联在一起*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo08join")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 加载学生数据,并转换成KV格式,以ID作为Key,其他数据作为Valueval stuKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/students.txt").map(line => {val id: String = line.split(",")(0)// split 指定分割符切分字符串得到Array// mkString 指定拼接符将Array转换成字符串val values: String = line.split(",").tail.mkString("|")(id, values)})// 加载分数数据,并转换成KV格式,以ID作为Key,其他数据作为Valueval scoKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/score.txt").map(line => {val id: String = line.split(",")(0)val values: String = line.split(",").tail.mkString("|")(id, values)})// join : 内连接val joinRDD1: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)//joinRDD1.foreach(println)//stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println)//stuKVRDD.rightOuterJoin(scoKVRDD).foreach(println)stuKVRDD.fullOuterJoin(scoKVRDD).foreach(println)}union:转换算子,用于将两个相类型的RDD进行连接def main(args: Array[String]): Unit = {// union:转换算子,用于将两个相类型的RDD进行连接val conf: SparkConf = new SparkConf()conf.setAppName("Demo09union")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val sample01RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)val sample02RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)println(s"sample01RDD的分区数:${sample01RDD.getNumPartitions}")println(s"sample02RDD的分区数:${sample02RDD.getNumPartitions}")// union 操作最终得到的RDD的分区数等于两个RDD分区数之和println(s"union后的分区数:${sample01RDD.union(sample02RDD).getNumPartitions}")val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))//sample01RDD.union(intRDD) // 两个RDD的类型不一致无法进行union// union 等同于SQL中的union allsample01RDD.union(sample02RDD).foreach(println)// 如果要进行去重 即等同于SQL中的union 则可以在 union后再进行distinctsample01RDD.union(sample02RDD).distinct().foreach(println)}

经验总结扩展阅读