<small id='eYuNg4QP7L'></small> <noframes id='ALYrt'>

  • <tfoot id='B9LZm'></tfoot>

      <legend id='UKTON7v'><style id='2cra'><dir id='H3COAa'><q id='3STJMOcq'></q></dir></style></legend>
      <i id='2MiHgsdaL'><tr id='xlbgMHaVK'><dt id='jh857n3aYI'><q id='n5ILYbZ1'><span id='IofcLwZy'><b id='kNbO7Y'><form id='JyAWOe4H'><ins id='cGeYD6b'></ins><ul id='ato1eB'></ul><sub id='RpBe'></sub></form><legend id='Dz9M85ef'></legend><bdo id='qam3QtfA'><pre id='AxMeWEayPz'><center id='fszag69'></center></pre></bdo></b><th id='IKiXc4JjgG'></th></span></q></dt></tr></i><div id='vWVUo9'><tfoot id='18UdtqW'></tfoot><dl id='EZfcY5'><fieldset id='QGFlV'></fieldset></dl></div>

          <bdo id='0RbBMSjy'></bdo><ul id='KBx18O'></ul>

          1. <li id='3aKOLWsuJF'></li>
            登陆

            章鱼网竟彩足球推荐-「Spark大数据系列」spark RDD 浅显易懂之 RDD的创立(二)

            admin 2019-11-01 306人围观 ,发现0个评论

            二.RDD的创立

            学习RDD,从创立RDD开端。

            有两大类创立RDD的办法:第一类是从内部的Driver的数据集来创立,第二类是从外部的数据源来创立。

            1. 从内部的Driver的数据集来创立RDD的办法:

            val data = Array(1, 2, 3, 4, 5)

            val distData = sc.parallelize(data)

            实践创立的办法是 parallelize ;下面了解这个办法的悉数信息如下:

            /** Distribute a local Scala collection to form an RDD.
            *
            * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
            * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
            * modified collection. Pass a copy of the argument to avoid this.
            * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
            * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
            * @param seq Scala collection to distribute
            * @param numSlices number of partitions to divide the collection into
            * @return RDD repre章鱼网竟彩足球推荐-「Spark大数据系列」spark RDD 浅显易懂之 RDD的创立(二)senting distributed collection
            */
            def parallelize[T: ClassTag](
            seq: Seq[T],
            numSlices: Int = defaultParallelism): RDD[T] = withScope {
            assertNotStopped()
            new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
            }

            办法的阐明:

            1. 从一个本地的scala调集到RDD这个办法是lazily的,假设这个调集seq是一个可变的调集,这个RDD在调用action(其间action的具体阐明,请看下一节解说)之前的数据修正,都会收效;假如你不期望这种不行不知道的修正的发作,你能够仿制一个副原本防止。防止运用这个办法来创立一个空的 RDD ,假如你要创立这类,你能够考虑运用emptyRDD来做,或许运用 parallelize(Seq[T]()) 来为一个RDD做’T’ 的一个空分区。参数seq是一个分布式的scala调集参数numSlices 是将调集分隔的为numSlices的个数,而且默认值为defaultParallelism,其间这个值是taskScheduler.defaultParallelism,再检查这个接口的完成类。
             override def defaultParallelism(): Int = {
            conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
            }

            原来是由spark.default.parallelism装备参数传递过来的,假如没有装备,在不同的完成中再给了一个默认值,不同的完成会略有不同:

            至此章鱼网竟彩足球推荐-「Spark大数据系列」spark RDD 浅显易懂之 RDD的创立(二),这个办法就能全面把握了。

            比方咱们能够这样来创立:

            一种:

            val data = Array(1, 2, 3, 4, 5)

            val distData = sc.parallelize(data)

            二种:

            val data = List(1, 2, 3, 4, 5)

            val distData = sc.parallelize(data)

            三种:

            val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

            val distData = sc.parallelize(data, 5)

            你还能清楚的知道,在不同的运转环境中,不传分区数,它的默认值会是多少。别的用来创立RDD的调集,最好是不行变的;可变的调集,存在数据不确定的危险。

            1. 在外部数据源中创立RDD 的办法

            val distFile = sc.textFile("data.txt")

            相同的,先检查 textFile 办法的原始完成:

            /**
            * Read a text file from HDFS, a local file system (available on all nodes), or any
            * Hadoop-supported file system URI, and return it as an RDD of Strings.
            * @param path path to the text file on a supported file system
            * @param minPartitions suggested minimum number of partitions for the resulting RDD
            * @return RDD of lines of the text file
            */
            def textFile(
            path: String,
            minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
            assertNotStopped()
            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
            minPartitions).map(pair => pair._2.toString).setName(path)
            }

            办法阐明:

            1. 读取一个text 文件从HDFS或本地文件体系(一切节点都能拜访到的),或其它支撑hadoop的文件体系的uri,最终回来一个string的RDD参数path 途径参数minPartitions 回来成果的最少分区数,有默认值为defaultMinPartitions,这个值与上一个parallelize 的办法中的意思相同,可是假如没有指定,最小值不能大于2,如下:
            /**
            * Default min number of partitions for Hadoop RDDs when not given by user
            * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
            * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
            */
            def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

            要害信息:假如不是像HDFS这类自带便是分布式的存储,每个节点都可拜访。而是运用本地的地址时,你要保证每个节点都有相同的一份数据。比方,你运用 On Yarn来运转时,发动的executor或许是节点上的任何一台,那它拜访本地的地址来获取数据,假如不是每个节点都有,就会有或许犯错(拜访不到)。

            所以一般情况下,做测验,运用local形式运转时,运用本地地址时,没问题,便利测验。

            下面我创立几个惯例的试试:

            1种:变装小说val distFile = sc.textFile("data.txt")

            2种:val distFile = sc.textFile("data.txt", 10)

            3种:val distFile = sc.textFile("hdfs://testdata/apps/hive/data.txt", 100)

            为了便利,实践spark还供给了一些其它办法来创立RDD。如下:

            wholeTextFiles

            binaryFiles

            binaryRecords

            hadoopRDD

            hadoopFile

            newAPIHadoopFile

            sequenceFile

            objectFile

            比方,咱们想拜访HBase的文件,能够这样做:

             val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") 
            val sc = new SparkContext(sparkConf)
            val tablename = "student"
            val conf = HBaseConfiguration.create()
            conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
            conf.set("hbase.zookeeper.property.clientPort", "2181")
            conf.set(TableInputFormat.INPUT_TABLE, tablename)
            val hbaseRDD = sc.newAPIHadoopRDD(conf,
            classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableByt章鱼网竟彩足球推荐-「Spark大数据系列」spark RDD 浅显易懂之 RDD的创立(二)esWritable],
            classOf[org.apache.hadoop.hbase.client.Result])

            最终:咱们要记住一点,当咱们建议创立RDD时,生成的RDD仅仅记载下了创立信息,而没有真实加载数据,只要在做action时,才会真实加载数据。具体更具体的阐明请到【第3章 3.1.1 RDD具体解析】中检查。

            【作者是自己:写于2018年8月20日】

            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP