In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
spark supports multiple data sources, which are generally divided into two parts: the file system and the database.
File system
file system mainly includes local file system, Amazon S3, HDFS and so on.
Files stored in the file system can be stored in a variety of formats. Some common formats supported by spark are:
Format name structured description file is not an ordinary file file, one record per line JSON semi-structured text-based semi-structured data CSV is a common text-based format, the use of SequenceFiles in spreadsheet applications is a common Hadoop file format text file for key-value data
Read
Read a single file, the parameter is the full path of the file, and each line you enter becomes an element of RDD.
Pythoninput = sc.textFile ("file://opt/module/spark/README.md")scalaval input = sc.textFile (" file://opt/module/spark/README.md")javaJavaRDD input = sc.textFile ("file://opt/module/spark/README.md") reads multiple files, you can use textFile to change the parameter to a directory or multiple file names of comma files.) If it is a small file, it can also be read as a Pair RDD using wholeTextFiles (the key is the file name and the value is the file content). Val input = sc.wholeTextFiles ("file://opt/module/spark/datas")val result = input.mapValues {y = > {val nums = y.split (") .map (x = > x.toDouble) nums.sum / nums.size.toDouble}} write
When outputs a text file, you can use the saveAsTextFile () method to receive a directory and output the contents of the RDD to multiple files in the directory.
```result.saveAsTextFile (outputFile) ```JSON
Read
The data is read as a text file, and then parsed using the JSON parser. Python uses the built-in library to read JSONimport json...input = sc.textFile ("file.json") data = input.map (lambda x: json.loads (x)) scala uses Jackson to read JSONimport com.fasterxml.jackson.databind.ObjectMapperimport com.fasterxml.jackson.module.scala.DefaultScalaModule...case class Person (name: String LovesPandas: Boolean)... val input = sc.textFile ("file.json") val mapper = new ObjectMapper () mapper.registerModule (DefaultScalaModule) val result = input.flatMap (record = > {try {Some (mapper.readValue (record, mapper.readValue [person])} catch {case e: Exception = > None}}) java uses Jackson to read JSONclass ParseJson implements FlatMapFunction {public Iterable call (Iterator lines) throws Exception {ArrayList people = new ArrayList () ObjectMapper mapper = new ObjectMapper (); while (lines.hasNext ()) {String line = lines.next (); try {people.add (mapper.readValue (line, Person.class));} catch (Exception e) {/ / skip failed data}} return people }} JavaRDD input = sc.textFile ("file.json"); JavaRDD result = input.mapPartitions (new ParseJson ())
Write
Use the JSON parser to convert the structured RDD into a string RDD, and then use the text file API to output. Python (data.filter (lambda x: X ["lovesPandas"]) .map (lambda x: json.dumps (x)) .saveAsTextFile (outputFile) scalaresult.filter (p = > p.lovesPandas) .map (mapper.writeValueAsString (_)) .saveAsTextFile (outputFile) javaclass WriteJson implements FlatMapFunction {public Iterable call (Iterator people) throws Exception {ArrayList text = new ArrayList (); ObjectMapper mapper = new ObjectMapper (); while (people.hasNext ()) {Person person = people.next () Text.add (mapper.writeValueAsString (person));} return text;}} JavaRDD result = input.mapPartitions (new ParseJson ()) .filter (new LikesPandas ()); JavaRDD formatted = result.mapPartitions (new WriteJson ()); formatted.saveAsTextFile (outfile); CSV and TSV
CSV and TSV files have fixed fields on each line, separated by delimiters (CSV uses commas; tsv uses tabs).
Read
Read the csv or tsv file as a plain text file, and then parse it using the response parser, in the same way as json.
Python uses built-in libraries to read csv
All fields in the file do not contain the newline character import csvimport StringIO...def loadRecord (line): input = StringIO.StringIO (line) reader = csv.DictReader (input, fieldnames= ["name", "favouriteAnimal"]) return reader.next () "" read each line record "input = sc.textFile (inputFile) .map (loadRecord) the field in the file contains the newline character def loadRecords (fileNameContents): input = StringIO.StringIO (fileNameContents [1]) reader = csv.DictReader (input) Fieldnames= ["name", "favoriteAnimal"]) return reader "read the entire file"fullFileData = sc.wholeTextFiles (inputFile) .flatMap (loadRecords)
Scala uses the opencsv library to read csv
All fields in the file do not contain the newline character import Java.io.StringReaderimport au.com.bytecode.opencsv.CSVReader...val input = sc.textFile (inputFile) val result = input.map {line = > {val reader = new CSVReader (new StringReader (line)) reader.readNext ()}} the fields in the file contain the newline character case class Person (name: String, favoriteAnimal: String) val input = sc.wholeTextFiles (inputFile) val result = input.flatMap (case (_) Txt) = > {val reader = new CSVReader (new StringReader (txt)) reader.readAll (). Map (x = > Person (x (0), x (1)}
Java uses the opencsv library to read csv
Write
When a csv or tsv file is exported, the fields are converted to an array in the specified order, and then output as a plain text file. Pythondef writeRecords (records): output = StringIO.StringIO () writer = csv.DictWriter (output, fieldnames= ["name", "favoriteAnimal"]) for record in records: writer.writerow (record) return [output.getValue ()] pandaLovers.mapPartitions (writeRecords) .saveAsTextFile (outputFile) scalapandasLovers.map (person = > List (person.name) Person.favoriteAnimal) .toArray) .mapPartitions {people = > {val stringWriter = new StringWriter () val csvWriter = new CSVWriter (stringWriter) csvWriter.writeAll (people.toList) Iterator (stringWriter.toString)} .saveAsTextFile (outFile) SequenceFile
SequenceFile is a common Hadoop data format in the form of key-value pairs. Because Hadoop uses a custom serialization framework, the key-value pair type of SequenceFile needs to implement the Writable interface of Hadoop.
Read
Pythondata = sc.sequenceFile (inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable") scalaval data = sc.sequenceFile (inFile, classOf [Text], classof [IntWritable]) .map {case (x, y) = > (x.toString, y.get ())} javapublic static class ConvertToNativeTypes implements PairFunction {public Tuple2 call (Tuple2 record) {return new Tuple2 (record._1.toString (), record._2.get ()) }} JavaPairRDD result = sc.sequenceFile (fileName, Text.class, IntWritable.class) .mapToPair (new ConvertToNativeTypes ())
Write
Pythondata = sc.parallelize ([("Panda", 3), ("Kay", 6), ("Snail", 2)]) data.saveAsSequeceFile (outputFile) scalaval data = sc.parallelize (List (("Panda", 3), ("Kay", 6), ("Snail", 2)) data.saveAsSequenceFile (outputFile) java (there is no saveAsSequenceFile method in java Implemented in custom hadoop format) public static class ConvertToWritableTypes implements PairFunction {public Tuple2 call (Tuple2 record) {return new Tuple2 (new Text (record._1), new IntWritable (record._2)) }} JavaPairRDD result = sc.parallelizePairs (input) .mapToPair (new ConvertToNativeTypes ()); result.saveAsHadoopFile (fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class); Database
database is mainly divided into relational database (MySQL, PostgreSQL, etc.) and non-relational database (HBase, ElasticSearch, etc.).
JDBC database connection
spark uses JDBC to access relational databases (MySQL, PostgreSQL, and so on) and only needs to build an org.apache.spark.rdd.JdbcRDD.
Def createConnection () = {Class.forName ("com.mysql.jdbc.Driver"). NewInstance () DriverManager.getConnection ("jdbc:mysql://localhost/test", "root", "root")} def extractValues (r: ResultSet) = {(r.getInt (1), r.getString (2))} val data = new JdbcRDD (sc, createConnection, "SELECT * FROM panda WHERE id > =? AND id"
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.