spark支持多种数据源,从总体来分分为两大部分:文件系统和数据库。
创新互联公司专注于企业营销型网站建设、网站重做改版、安福网站定制设计、自适应品牌网站建设、H5建站、商城系统网站开发、集团公司官网建设、外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为安福等各大城市提供网站开发制作服务。
文件系统
文件系统主要有本地文件系统、Amazon S3、HDFS等。
文件系统中存储的文件有多种存储格式。spark支持的一些常见格式有:
| 格式名称 | 结构化 | 说明 | 
|---|---|---|
| 文件文件 | 否 | 普通文件文件,每行一条记录 | 
| JSON | 半结构化 | 常见的基于文本的半结构化数据 | 
| CSV | 是 | 常见的基于文本的格式,在电子表格应用中使用 | 
| SequenceFiles | 是 | 一种用于键值对数据的常见Hadoop文件格式 | 
文本文件
- 读取 - 读取单个文件,参数为文件全路径,输入的每一行都会成为RDD的一个元素。 - python
 - input = sc.textFile("file://opt/module/spark/README.md")- scala
 - val input = sc.textFile("file://opt/module/spark/README.md")- java
 - JavaRDD- input = sc.textFile("file://opt/module/spark/README.md") 
- 读取多个文件时,可以使用textFile将参数改为目录或以逗号文件的多个文件名即可。如果是小文件,也可以使用wholeTextFiles读取为一个Pair RDD(键是文件名,值是文件内容)。
 - val input = sc.wholeTextFiles("file://opt/module/spark/datas") val result = input.mapValues{ y => { val nums = y.split(" ").map(x => x.toDouble) nums.sum / nums.size.toDouble } }
- 写入
输出文本文件时,可使用saveAsTextFile()方法接收一个目录,将RDD中的内容输出到目录中的多个文件中。
```
result.saveAsTextFile(outputFile)
```JSON
- 读取 - 将数据作为文本文件读取,然后使用JSON解析器对数据进行解析。
- python使用内置库读取JSON
 - import json ... input = sc.textFile("file.json") data = input.map(lambda x: json.loads(x))- scala使用Jackson读取JSON
 - import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule ... case class Person(name: String, lovesPandas: Boolean) ... val input = sc.textFile("file.json") val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) val result = input.flatMap(record => { try { Some(mapper.readValue(record, classOf[Person])) } catch { case e: Exception => None } })- java使用Jackson读取JSON
 - class ParseJson implements FlatMapFunction- , Person> { public Iterable - call(Iterator - lines) throws Exception { ArrayList - people = new ArrayList - (); ObjectMapper mapper = new ObjectMapper(); while(lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch(Exception e) { //跳过失败的数据 } } return people; } } JavaRDD - input = sc.textFile("file.json"); JavaRDD - result = input.mapPartitions(new ParseJson()); 
- 写入 - 使用JSON解析器将结构化的RDD转为字符串RDD,然后使用文本文件API输出。
- python
 - (data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)- scala
 - result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)- java
 - class WriteJson implements FlatMapFunction- , String> { public Iterable - call(Iterator - people) throws Exception { ArrayList - text = new ArrayList - (); ObjectMapper mapper = new ObjectMapper(); while(people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text; } } JavaRDD - result = input.mapPartitions(new ParseJson()).filter(new LikesPandas()); JavaRDD - formatted = result.mapPartitions(new WriteJson()); formatted.saveAsTextFile(outfile); 
CSV与TSV
CSV与TSV文件每行都有固定的字段,字段之间使用分隔符(CSV使用逗号;tsv使用制表符)分隔。
- 读取 - 将csv或tsv文件当作普通文本文件读取,然后使用响应的解析器进行解析,同json处理方式。 
- python使用内置库读取csv - 文件中所有字段没有包含换行符
 - import csv import StringIO ... def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"]) return reader.next() """读取每行记录""" input = sc.textFile(inputFile).map(loadRecord)- 文件中的字段包含换行符
 - def loadRecords(fileNameContents): input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"]) return reader """读取整个文件""" fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
- scala使用opencsv库读取csv - 文件中所有字段没有包含换行符
 - import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... val input = sc.textFile(inputFile) val result = input.map{ line => { val reader = new CSVReader(new StringReader(line)) reader.readNext() } }- 文件中的字段包含换行符
 - case class Person(name: String, favoriteAnimal: String) val input = sc.wholeTextFiles(inputFile) val result = input.flatMap( case(_, txt) => { val reader = new CSVReader(new StringReader(txt)) reader.readAll().map(x => Person(x(0), x(1))) }
- java使用opencsv库读取csv - 文件中所有字段没有包含换行符
 - import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... public static class ParseLine implements Function- { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); return reader.readNext(); } } JavaPairRDD - csvData = sc.textFile(inputFile).map(new ParseLine()); - 文件中的字段包含换行符
 - public static class ParseLine implements FlatMapFunction- , String[]> { public Iterable - call(Tuple2 - file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2); return reader.readAll(); } } JavaRDD - keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine()); 
 
- 写入 - csv或tsv文件输出时,将个字段转为指定顺序的数组,然后采用普通文本文件的方式进行输出。
- python
 - def writeRecords(records): output = StringIO.StringIO() writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getValue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)- scala
 - pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{ people => { val stringWriter = new StringWriter() val csvWriter = new CSVWriter(stringWriter) csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) } }.saveAsTextFile(outFile)
SequenceFile
SequenceFile是键值对形式的常用Hadoop数据格式。由于Hadoop使用一套自定义的序列化框架,因此SequenceFile的键值对类型需实现Hadoop的Writable接口。
- 读取 - python
 - data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")- scala
 - val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}- java
 - public static class ConvertToNativeTypes implements PairFunction- , String, Integer> { public Tuple2 - call(Tuple2 - record) { return new Tuple2(record._1.toString(), record._2.get()); } } JavaPairRDD - result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes()); 
- 写入 - python
 - data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)]) data.saveAsSequeceFile(outputFile)- scala
 - val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2))) data.saveAsSequenceFile(outputFile)- java(java中没有saveAsSequenceFile方法,用自定义hadoop格式的方式实现)
 - public static class ConvertToWritableTypes implements PairFunction- , Text, IntWritable> { public Tuple2 - call(Tuple2 - record) { return new Tuple2(new Text(record._1), new IntWritable(record._2)); } } JavaPairRDD - result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class); 
数据库
数据库主要分为关系型数据库(MySQL、PostgreSQL等)和非关系型数据库(HBase、ElasticSearch等)。
JDBC数据库连接
spark使用JDBC访问关系型数据库(MySQL、PostgreSQL等),只需要构建一个org.apache.spark.rdd.JdbcRDD即可。
def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}
def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection, 
                "SELECT * FROM panda WHERE id >= ? AND id <= ?"),
                lowerBound = 1, upperBound = 3, 
                numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)HBase
spark通过Hadoop输入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)访问HBase。这种格式返回键值对数据,键类型为org.apache.hadoop.hbase.io.ImmutableBytesWritable,值类型为org.apache.hadoop.hbase.client.Result。
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], ClassOf[Result])ElasticSearch
spark使用ElasticSearch-Hadoop连接器从ElasticSearch中读写数据。ElasticSearch连接器依赖于SparkContext设置的配置项。ElasticSearch连接器也没有用到Spark封装的类型,而使用saveAsHadoopDataSet。
- 读取
def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args[1])
jobConf.set(ConfigurationOptions.ES_NODES, args[2])
val currentTweets = sc.hadoopRDD(jobConf, classOf[EsInputFormat[Object, MapWritable]], classOf[Object], ClassOf[MapWritable])
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }- 写入
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
网页标题:8.sparkcore之读写数据
文章来源:http://www.jxjierui.cn/article/jcphcs.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 