DBInputFormat για μεταφορά δεδομένων από SQL σε βάση δεδομένων NoSQL

Στόχος αυτού του ιστολογίου είναι να μάθει πώς να μεταφέρει δεδομένα από βάσεις δεδομένων SQL σε HDFS, πώς να μεταφέρει δεδομένα από βάσεις δεδομένων SQL σε βάσεις δεδομένων NoSQL.

Σε αυτό το ιστολόγιο θα διερευνήσουμε τις δυνατότητες και τις δυνατότητες ενός από τα πιο σημαντικά συστατικά της τεχνολογίας Hadoop, δηλαδή του MapReduce.



Σήμερα, οι εταιρείες υιοθετούν το πλαίσιο Hadoop ως την πρώτη τους επιλογή αποθήκευσης δεδομένων, λόγω των δυνατοτήτων του να χειρίζεται αποτελεσματικά μεγάλα δεδομένα. Αλλά γνωρίζουμε επίσης ότι τα δεδομένα είναι ευπροσάρμοστα και υπάρχουν σε διάφορες δομές και μορφές. Για τον έλεγχο μιας τόσο τεράστιας ποικιλίας δεδομένων και των διαφορετικών μορφών της, θα πρέπει να υπάρχει ένας μηχανισμός για την κάλυψη όλων των ποικιλιών και παράλληλα να παράγει ένα αποτελεσματικό και συνεπές αποτέλεσμα.



Το πιο ισχυρό στοιχείο στο πλαίσιο Hadoop είναι το MapReduce το οποίο μπορεί να παρέχει τον έλεγχο στα δεδομένα και τη δομή του καλύτερα από τους άλλους ομολόγους του. Αν και απαιτεί γενική καμπύλη μάθησης και πολυπλοκότητα προγραμματισμού, αν μπορείτε να χειριστείτε αυτές τις πολυπλοκότητες, σίγουρα μπορείτε να χειριστείτε οποιοδήποτε είδος δεδομένων με το Hadoop.

Το πλαίσιο MapReduce διαιρεί όλες τις εργασίες επεξεργασίας του σε βασικά δύο φάσεις: Map and Reduce.



Η προετοιμασία των μη επεξεργασμένων δεδομένων σας για αυτές τις φάσεις απαιτεί κατανόηση ορισμένων βασικών κατηγοριών και διεπαφών. Η σούπερ τάξη για αυτές τις επανεπεξεργασίες είναι ΕίσοδοςFormat.

ο ΕίσοδοςFormat Το class είναι μία από τις βασικές τάξεις του Hadoop MapReduce API. Αυτή η τάξη είναι υπεύθυνη για τον καθορισμό δύο κύριων πραγμάτων:

  • Διαχωρισμός δεδομένων
  • Αναγνώστης εγγραφών

Διαχωρισμός δεδομένων είναι μια θεμελιώδης έννοια στο πλαίσιο Hadoop MapReduce που καθορίζει τόσο το μέγεθος των μεμονωμένων εργασιών χαρτών όσο και τον πιθανό διακομιστή εκτέλεσης. ο Αναγνώστης εγγραφών είναι υπεύθυνος για τις πραγματικές εγγραφές ανάγνωσης από το αρχείο εισαγωγής και την υποβολή τους (ως ζεύγη κλειδιών / τιμών) στον χαρτογράφο.



Ο αριθμός των χαρτογράφων αποφασίζεται με βάση τον αριθμό των διαχωρισμών. Είναι δουλειά του InputFormat να δημιουργήσει τα διαχωριστικά. Το μεγαλύτερο μέρος του διαχωρισμού χρόνου ισοδυναμεί με το μέγεθος μπλοκ, αλλά δεν είναι πάντα ότι οι διαχωρισμοί θα δημιουργούνται με βάση το μέγεθος μπλοκ HDFS. Εξαρτάται πλήρως από τον τρόπο με τον οποίο η μέθοδος getSplits () του InputFormat έχει παρακαμφθεί.

Υπάρχει μια θεμελιώδης διαφορά μεταξύ MR split και HDFS block. Ένα μπλοκ είναι ένα φυσικό κομμάτι δεδομένων, ενώ ένα διαχωρισμό είναι απλώς ένα λογικό κομμάτι που διαβάζει ένας χαρτογράφος. Η διάσπαση δεν περιέχει τα δεδομένα εισόδου, απλώς περιέχει μια αναφορά ή διεύθυνση των δεδομένων. Το split έχει βασικά δύο πράγματα: Ένα μήκος σε byte και ένα σύνολο θέσεων αποθήκευσης, που είναι απλά χορδές.

Για να το κατανοήσουμε καλύτερα, ας πάρουμε ένα παράδειγμα: Επεξεργασία δεδομένων που είναι αποθηκευμένα στη MySQL χρησιμοποιώντας MR. Δεδομένου ότι δεν υπάρχει έννοια των μπλοκ σε αυτήν την περίπτωση, η θεωρία: 'οι διαχωρισμοί δημιουργούνται πάντα με βάση το μπλοκ HDFS',αποτυγχάνει. Μια πιθανότητα είναι να δημιουργήσετε διαχωρισμούς με βάση εύρη σειρών στον πίνακα MySQL (και αυτό κάνει το DBInputFormat, μια μορφή εισαγωγής για την ανάγνωση δεδομένων από σχεσιακές βάσεις δεδομένων). Ενδέχεται να έχουμε k αριθμό διαχωρισμών που αποτελούνται από n σειρές.

Μόνο για το InputFormats που βασίζεται στο FileInputFormat (ένα InputFormat για το χειρισμό δεδομένων που είναι αποθηκευμένα σε αρχεία) οι διαιρέσεις δημιουργούνται με βάση το συνολικό μέγεθος, σε byte, των αρχείων εισαγωγής. Ωστόσο, το μπλοκ αρχείων FileSystem των αρχείων εισαγωγής αντιμετωπίζεται ως ανώτερο όριο για διαχωρισμούς εισόδου. Εάν έχετε ένα αρχείο μικρότερο από το μέγεθος του μπλοκ HDFS, θα λάβετε μόνο 1 mapper για αυτό το αρχείο. Εάν θέλετε να έχετε κάποια διαφορετική συμπεριφορά, μπορείτε να χρησιμοποιήσετε το mapred.min.split.size. Αλλά εξαρτάται και πάλι αποκλειστικά από τα getSplits () του InputFormat.

Έχουμε τόσες πολλές προϋπάρχουσες μορφές εισόδου διαθέσιμες στο πακέτο org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

δομή ενός προγράμματος java

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

Μη έγκυροInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Η προεπιλογή είναι TextInputFormat.

Ομοίως, έχουμε τόσες πολλές μορφές εξόδου που διαβάζουν τα δεδομένα από μειωτές και τα αποθηκεύουν σε HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Προεπιλογή είναι TextOutputFormat.

Όταν ολοκληρώσετε την ανάγνωση αυτού του ιστολογίου, θα έχετε μάθει:

  • Πώς να γράψετε ένα πρόγραμμα μείωσης χάρτη
  • Σχετικά με τους διαφορετικούς τύπους InputFormats που διατίθενται στο Mapreduce
  • Ποια είναι η ανάγκη του InputFormats
  • Τρόπος σύνταξης προσαρμοσμένων InputFormats
  • Πώς να μεταφέρετε δεδομένα από βάσεις δεδομένων SQL σε HDFS
  • Πώς να μεταφέρετε δεδομένα από βάσεις δεδομένων SQL (εδώ MySQL) σε βάσεις δεδομένων NoSQL (εδώ Hbase)
  • Πώς να μεταφέρετε δεδομένα από μία βάση δεδομένων SQL σε άλλο πίνακα σε βάσεις δεδομένων SQL (Ίσως αυτό να μην είναι τόσο σημαντικό αν το κάνουμε αυτό στην ίδια βάση δεδομένων SQL. Ωστόσο, δεν υπάρχει τίποτα λάθος στο να γνωρίζουμε το ίδιο. Δεν ξέρεις ποτέ πώς μπορεί να χρησιμοποιηθεί)

Προαπαιτούμενο:

  • Προεγκατεστημένο το Hadoop
  • Προεγκατεστημένο SQL
  • Προεγκατεστημένο το Hbase
  • Βασική κατανόηση Java
  • MapReduce γνώσεις
  • Βασικές γνώσεις πλαισίου Hadoop

Ας καταλάβουμε τη δήλωση προβλήματος που πρόκειται να λύσουμε εδώ:

Έχουμε έναν πίνακα υπαλλήλων στο MySQL DB στη σχετική βάση δεδομένων Edureka. Τώρα, σύμφωνα με την επιχειρηματική απαίτηση, πρέπει να μεταφέρουμε όλα τα διαθέσιμα δεδομένα σε σχεσιακό DB σε σύστημα αρχείων Hadoop, δηλαδή HDFS, NoSQL DB γνωστό ως Hbase.

Έχουμε πολλές επιλογές για να κάνουμε αυτήν την εργασία:

  • Κουτάλα
  • Λαγκάδι
  • ΜΕΙΩΣΗ ΧΑΡΤΗ

Τώρα, δεν θέλετε να εγκαταστήσετε και να διαμορφώσετε οποιοδήποτε άλλο εργαλείο για αυτήν τη λειτουργία. Απομένουν μόνο μία επιλογή που είναι το πλαίσιο επεξεργασίας του Hadoop MapReduce. Το πλαίσιο MapReduce θα σας δώσει πλήρη έλεγχο των δεδομένων κατά τη μεταφορά. Μπορείτε να χειριστείτε τις στήλες και να τοποθετήσετε απευθείας σε οποιαδήποτε από τις δύο θέσεις προορισμού.

Σημείωση:

  • Πρέπει να κατεβάσουμε και να βάλουμε τη σύνδεση MySQL στο classpath του Hadoop για να πάρουμε πίνακες από τον πίνακα MySQL. Για να το κάνετε αυτό, κατεβάστε το σύνδεσμο com.mysql.jdbc_5.1.5.jar και διατηρήστε το στον κατάλογο Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Λήψεις / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Επίσης, τοποθετήστε όλα τα βάζα Hbase στο Hadpop classpath για να κάνετε το πρόγραμμα MR σας πρόσβαση στο Hbase. Για να το κάνετε αυτό, εκτελέστε την ακόλουθη εντολή :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / κοινή χρήση / hadoop / mapreduce / lib /

Οι εκδόσεις λογισμικού που έχω χρησιμοποιήσει κατά την εκτέλεση αυτής της εργασίας είναι:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Eclipse Moon

Προκειμένου να αποφευχθεί το πρόγραμμα σε οποιοδήποτε ζήτημα συμβατότητας, συνταγογραφώ στους αναγνώστες μου να εκτελούν την εντολή με παρόμοιο περιβάλλον.

Προσαρμοσμένο DBInputWritable:

πακέτο com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable δημόσια κλάση DBInputWritable εργαλεία Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) ρίχνει το IOException {} δημόσιο άκυρο readFields (ResultSet rs) ρίχνει το SQLException // Το αντικείμενο αποτελέσματος αντιπροσωπεύει τα δεδομένα που επιστρέφονται από μια δήλωση SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) ρίχνει το IOException { } public void write (PreparedStatement ps) ρίχνει το SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Προσαρμοσμένο DBOutputWritable:

πακέτο com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable δημόσια κλάση DBOutputWritable εργαλεία Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) ρίχνει το IOException {} public void readFields (ResultSet rs) ρίχνει το SQLException {} δημόσια άκυρη εγγραφή (DataOutput out) ρίχνει το IOException {} δημόσια άκυρη εγγραφή (PreparedStatement ps) ρίχνει το SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Πίνακας εισαγωγής:

δημιουργία βάσης δεδομένων edureka
δημιουργία πίνακα emp (empid int not null, name varchar (30), dept varchar (20), primer key (empid))
εισαγωγή σε τιμές emp (1, 'abhay', 'developmentement'), (2, 'brundesh', 'test')
επιλέξτε * από emp

Περίπτωση 1: Μεταφορά από MySQL σε HDFS

πακέτο com.inputFormat.copy import java.net.URI εισαγωγή org.apache.hadoop.conf. Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBC Configuration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat εισαγωγή org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable δημόσια κλάση MainDbtohdfs {public static void main (String [] args) ρίχνει την Εξαίρεση {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // class driver' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // όνομα χρήστη' root ') // κωδικός εργασίας Job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOut, FilePutat new Path (args [0])) DBInputFormat.setInput (εργασία, DBInputWritable.class, 'emp', // όνομα πίνακα εισαγωγής null, null, new String [] {'empid', 'name', 'dept'} / / στήλες πίνακα) Διαδρομή p = νέο Path (args [0]) FileSystem fs = FileSystem.get (new URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true); 0: 1)}}

Αυτό το κομμάτι κώδικα μας επιτρέπει να προετοιμάσουμε ή να διαμορφώσουμε τη μορφή εισόδου για πρόσβαση στην πηγή SQL DB. Η παράμετρος περιλαμβάνει την κλάση προγράμματος οδήγησης, το URL έχει τη διεύθυνση της βάσης δεδομένων SQL, το όνομα χρήστη και τον κωδικό πρόσβασης.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // όνομα χρήστη 'root') //Κωδικός πρόσβασης

Αυτό το κομμάτι κώδικα μας επιτρέπει να περάσουμε τις λεπτομέρειες των πινάκων στη βάση δεδομένων και να το ορίσουμε στο αντικείμενο εργασίας. Οι παράμετροι περιλαμβάνουν φυσικά την παρουσία εργασίας, την προσαρμοσμένη εγγράψιμη κλάση που πρέπει να εφαρμόσει τη διεπαφή DBWritable, το όνομα του πίνακα προέλευσης, την κατάσταση αν υπάρχει άλλη null, οποιεσδήποτε άλλες παραμέτρους ταξινόμησης άλλες null, τη λίστα των στηλών πίνακα αντίστοιχα

DBInputFormat.setInput (εργασία, DBInputWritable.class, 'emp', // όνομα πίνακα εισαγωγής null, null, new String [] {'empid', 'name', 'dept'} // στήλες πίνακα)

Mapper

πακέτο com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io. Εισαγωγή κειμένου org.apache.hadoop.io . Ο χάρτης δημόσιας τάξης IntWritable επεκτείνει το Mapper
προστατευμένος άκυρος χάρτης (LongWritable key, DBInputWritable value, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (νέο κείμενο (όνομα + '+ id +' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Μειωτής: Χρησιμοποιείται μειωτής ταυτότητας

Εντολή για εκτέλεση:

hadoop βάζο dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Έξοδος: Ο πίνακας MySQL μεταφέρθηκε σε HDFS

hadoop dfs -ls / dbtohdfs / *

Περίπτωση 2: Μεταφορά από έναν πίνακα στο MySQL σε άλλο στο MySQL

δημιουργία πίνακα εξόδου στη MySQL

δημιουργία πίνακα υπαλλήλου1 (όνομα varchar (20), id int, dept varchar (20))

πακέτο com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBC Configuration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat εισαγωγή org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable εισαγωγή org.apache.hadoop.io.NullWritable δημόσια τάξη Mainonetable_to_other_table {public static void main (String [] args) ρίχνει την Εξαίρεση {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost) : 3306 / edureka ', // db url' root ', // όνομα χρήστη' root ') // κωδικός εργασίας Job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) εργασία .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // όνομα πίνακα εισαγωγής null, null, new String [] { ',' name ',' dept '} // στήλες πίνακα) DBOutputFormat.setOutput (εργασία,' υπάλληλος1 ', // όνομα πίνακα εξόδου νέο String [] {' name ',' id ',' dept '} // πίνακας στήλες) System.exit (job.waitForCompletion (true); 0: 1)}}

Αυτό το κομμάτι κώδικα μας επιτρέπει να διαμορφώσουμε το όνομα του πίνακα εξόδου στο SQL DB. Οι παράμετροι είναι το παράδειγμα εργασίας, το όνομα πίνακα εξόδου και τα ονόματα στηλών εξόδου αντίστοιχα.

DBOutputFormat.setOutput (εργασία, 'υπάλληλος1', // όνομα πίνακα εξόδου νέα συμβολοσειρά [] {'name', 'id', 'dept'} // στήλες πίνακα)

Mapper: Ίδιο με την περίπτωση 1

Περιστέλλων:

πακέτο com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable δημόσια κλάση Reduce επεκτείνει Reducer {προστατευμένο κενό κενό (Κλειδί κειμένου, τιμές Iterable, Context ctx) {int sum = 0 String line [] = key.toString (). Split (') δοκιμάστε το {ctx.write (νέο DBOutputWritable (γραμμή [0] .toString (), Integer.parseInt (γραμμή [1] .toString ()), γραμμή [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Εντολή για εκτέλεση:

hadoop βάζο dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Έξοδος: Μεταφέρθηκαν δεδομένα από τον πίνακα EMP στη MySQL σε έναν άλλο πίνακα υπαλλήλου1 στη MySQL

Περίπτωση 3: Μεταφορά από τον πίνακα στο MySQL στον πίνακα NoSQL (Hbase)

Δημιουργία πίνακα Hbase για τη στέγαση εξόδου από τον πίνακα SQL:

δημιουργία 'υπαλλήλου', 'official_info'

Κατηγορία οδηγού:

πακέτο Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBC Configuration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat εισαγωγή org.apache.hadoop.hbase.mapreduce.TableOutputFormat εισαγωγή org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Hable Πίνακας εισαγωγής org.apache.hadoop.hbase.client.HTableInterface εισαγωγή org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) ρίχνει την εξαίρεση {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost: 3306 / edureka') , // db url 'root', // όνομα χρήστη 'root') // κωδικός εργασίας Job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob («υπάλληλος», Reduce.class, job) job.setInputFormat (NotFormat). class) DBInputFormat.setInput (εργασία, DBInputWritable.class, 'emp', // όνομα πίνακα εισαγωγής null, null, new String [] {'empid', 'name', 'dept'} // στήλες πίνακα) System.exit (job.waitForCompletion (true); 0: 1)}}

Αυτό το κομμάτι κώδικα σάς επιτρέπει να διαμορφώσετε την κλάση κλειδιού εξόδου η οποία σε περίπτωση hbase είναι ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Εδώ περνάμε το όνομα του πίνακα hbase και τον μειωτή για να ενεργήσουμε στο τραπέζι.

ποια είναι η χρήση του προγραμματισμού υποδοχών
TableMapReduceUtil.initTableReducerJob («υπάλληλος», Reduce.class, εργασία)

Mapper:

πακέτο Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable δημόσια τάξη Ο χάρτης επεκτείνει τον Mapper {private IntWritable one = new IntWritable (1) προστατευμένος χάρτης κενού (LongWritable id, DBInputWritable value, konteks περιβάλλοντος) {try {String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), νέο κείμενο (γραμμή +' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptException e) {e.printStackTrace ()}}}

Σε αυτό το κομμάτι κώδικα παίρνουμε τιμές από τους προμηθευτές της κλάσης DBinputwritable και μετά τις μεταδίδουμε
ImmutableBytesWritable έτσι ώστε να φτάσουν στον μειωτή σε μορφή bytewriatble που κατανοεί η Hbase.

String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), νέο κείμενο (γραμμή +' + dept ))

Περιστέλλων:

πακέτο Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce επεκτείνει το TableReducer {δημόσιο κενό μείωσης (κλειδί ImmutableBytesWritable, Iterable τιμές, περιβάλλον περιβάλλοντος) ρίχνει το IOException, InterruptException {String [] penyebab = null // Τιμές βρόχου για (Κείμενο val: τιμές) {menyebabkan = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('όνομα'), Bytes.toBytes (αιτία [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (αιτία [1 ])) konteks.write (κλειδί, βάλτε)}}

Αυτό το κομμάτι κώδικα μας επιτρέπει να αποφασίσουμε την ακριβή σειρά και τη στήλη στην οποία θα αποθηκεύουμε τιμές από τον μειωτή. Εδώ αποθηκεύουμε κάθε empid σε ξεχωριστή σειρά καθώς κάναμε το empid ως κλειδί σειράς που θα ήταν μοναδικό. Σε κάθε σειρά αποθηκεύουμε τις επίσημες πληροφορίες των υπαλλήλων στην οικογένεια στηλών 'official_info' στις στήλες 'όνομα' και 'τμήμα' αντίστοιχα.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (αιτία [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (αιτία [1])) konteks.write (κλειδί, τοποθέτηση)

Μεταφερόμενα δεδομένα στο Hbase:

υπάλληλος σάρωσης

Όπως βλέπουμε, καταφέραμε να ολοκληρώσουμε το έργο της μετεγκατάστασης των επιχειρηματικών μας δεδομένων από ένα σχετικό SQL DB σε ένα NoSQL DB με επιτυχία.

Στο επόμενο ιστολόγιο θα μάθουμε πώς να γράφουμε και να εκτελούμε κωδικούς για άλλες μορφές εισόδου και εξόδου.

Συνεχίστε να δημοσιεύετε τα σχόλια, τις ερωτήσεις σας ή τυχόν σχόλια. Θα ήθελα πολύ να ακούσω από εσάς.

Έχετε μια ερώτηση για εμάς; Παρακαλώ αναφέρετέ το στην ενότητα σχολίων και θα επικοινωνήσουμε μαζί σας.

Σχετικές αναρτήσεις: