弹性分布式数据集 RDD及常用算子( 四 )

cartesian:转换算子,可以对两个RDD做笛卡尔积def main(args: Array[String]): Unit = {/*** cartesian:转换算子,可以对两个RDD做笛卡尔积** 当数据重复时 很容易触发笛卡尔积 造成数据的膨胀*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo14cartesian")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val idNameKVRDD: RDD[(String, String)] = sc.parallelize(List(("001", "zs"), ("002", "ls"), ("003", "ww")))val genderAgeKVRDD: RDD[(String, Int)] = sc.parallelize(List(("男", 25), ("女", 20), ("男", 22)))idNameKVRDD.cartesian(genderAgeKVRDD).foreach(println)}sortBy:转换算子 可以指定一个字段进行排序 默认升序def main(args: Array[String]): Unit = {/*** sortBy:转换算子 可以指定一个字段进行排序 默认升序*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo15sortBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val intRDD: RDD[Int] = sc.parallelize(List(1, 3, 6, 5, 2, 4, 6, 8, 9, 7))intRDD.sortBy(i => i).foreach(println) // 升序intRDD.sortBy(i => -i).foreach(println) // 降序intRDD.sortBy(i => i, ascending = false).foreach(println) // 降序val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 按照年龄进行降序stuRDD.sortBy(s => -s.split(",")(2).toInt).foreach(println)}常见的Action算子def main(args: Array[String]): Unit = {/*** 常见的Action算子:foreach、take、collect、count、reduce、save相关* 每个Action算子都会触发一个job**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo16Action")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")/*** foreach:对每条数据进行处理,跟map算子的区别在于,foreach算子没有返回值*/stuRDD.foreach(println)// 将stuRDD中的每条数据保存到MySQL中/*** 建表语句:* CREATE TABLE `stu_rdd` (* `id` int(10) NOT NULL AUTO_INCREMENT,* `name` char(5) DEFAULT NULL,* `age` int(11) DEFAULT NULL,* `gender` char(2) DEFAULT NULL,* `clazz` char(4) DEFAULT NULL,* PRIMARY KEY (`id`)* ) ENGINE=InnoDB DEFAULT CHARSET=utf8;*/// 每一条数据都会创建一次连接,频繁地创建销毁连接效率太低,不合适//stuRDD.foreach(line => {//val splits: Array[String] = line.split(",")//// 1、建立连接//val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false", "root", "123456")//println("建立了一次连接")//// 2、创建prepareStatement//val pSt: PreparedStatement = conn.prepareStatement("insert into stu_rdd(id,name,age,gender,clazz) values(?,?,?,?,?)")////// 3、传入参数//pSt.setInt(1, splits(0).toInt)//pSt.setString(2, splits(1))//pSt.setInt(3, splits(2).toInt)//pSt.setString(4, splits(3))//pSt.setString(5, splits(4))////// 4、执行SQL//pSt.execute()////// 5、关闭连接//conn.close()////})/*** take : Action算子,可以将指定条数的数据转换成Scala中的Array**/// 这里的foreach是Array的方法,不是算子stuRDD.take(5).foreach(println)/*** collect : Action算子,可以将RDD中所有的数据转换成Scala中的Array*/// 这里的foreach是Array的方法,不是算子stuRDD.collect().foreach(println)/*** count : Action算子,统计RDD中数据的条数*/println(stuRDD.count())/*** reduce : Action算子,将所有的数据作为一组进行聚合操作*/// 统计所有学生的年龄之和println(stuRDD.map(_.split(",")(2).toInt).reduce(_ + _))/*** save相关:* saveAsTextFile、saveAsObjectFile*/}【弹性分布式数据集 RDD及常用算子】

经验总结扩展阅读