În acest tutorial, veți învăța să utilizați Hadoop cu exemple de MapReduce. Datele de intrare utilizate sunt SalesJan2009.csv. Conține informații legate de vânzări, cum ar fi numele produsului, prețul, modul de plată, orașul, țara clientului etc. Scopul este de a afla numărul de produse vândute în fiecare țară.
În acest tutorial, veți învăța-
- Primul program Hadoop MapReduce
- Explicația clasei SalesMapper
- Explicația clasei SalesCountryReducer
- Explicația clasei SalesCountryDriver
Primul program Hadoop MapReduce
Acum, în acest tutorial MapReduce, vom crea primul nostru program Java MapReduce:
Asigurați-vă că ați instalat Hadoop. Înainte de a începe cu procesul propriu-zis, schimbați utilizatorul în „hduser” (id utilizat în timpul configurației Hadoop, puteți trece la user-ul utilizat în timpul configurării de programare Hadoop).
su - hduser_
Pasul 1)
Creați un director nou cu numele MapReduceTutorial ca shwon în exemplul de mai jos MapReduce
sudo mkdir MapReduceTutorial
Acordați permisiuni
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Descărcați fișiere aici
Verificați permisiunile de fișiere ale tuturor acestor fișiere
și dacă lipsesc permisiunile „citire”, atunci acordați același
Pasul 2)
Exportați classpath așa cum se arată în exemplul Hadoop de mai jos
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Pasul 3)
Compilați fișiere Java (aceste fișiere sunt prezente în directorul Final-MapReduceHandsOn ). Fișierele sale de clasă vor fi introduse în directorul pachetului
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Acest avertisment poate fi ignorat în siguranță.
Această compilație va crea un director într-un director curent numit cu numele pachetului specificat în fișierul sursă java (adică SalesCountry în cazul nostru) și va pune toate fișierele de clasă compilate în el.
Pasul 4)
Creați un fișier nou Manifest.txt
sudo gedit Manifest.txt
adăugați următoarele rânduri,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver este numele clasei principale. Vă rugăm să rețineți că trebuie să apăsați tasta Enter la sfârșitul acestei linii.
Pasul 5)
Creați un fișier Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Verificați dacă fișierul jar este creat
Pasul 6)
Porniți Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Pasul 7)
Copiați fișierul SalesJan2009.csv în ~ / inputMapReduce
Acum utilizați comanda de mai jos pentru a copia ~ / inputMapReduce în HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Putem ignora în siguranță acest avertisment.
Verificați dacă un fișier este copiat sau nu.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Pasul 8)
Rulați jobul MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Aceasta va crea un director de ieșire numit mapreduce_output_sales pe HDFS. Conținutul acestui director va fi un fișier care conține vânzările de produse pe țară.
Pasul 9)
Rezultatul poate fi văzut prin interfața de comandă ca,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Rezultatele pot fi văzute și printr-o interfață web
Deschideți r într-un browser web.
Acum selectați „Răsfoiți sistemul de fișiere” și navigați la / mapreduce_output_sales
Deschideți partea-r-00000
Explicația clasei SalesMapper
În această secțiune, vom înțelege implementarea clasei SalesMapper .
1. Începem prin a specifica un nume de pachet pentru clasa noastră. SalesCountry este un nume al pachetului nostru. Vă rugăm să rețineți că rezultatul compilării, SalesMapper.class va intra într-un director numit de acest nume de pachet: SalesCountry .
Urmată de aceasta, importăm pachete de bibliotecă.
Mai jos arată instantaneu o implementare a SalesMapper clas
Exemplu de explicație a codului:
1. Definiția clasei SalesMapper-
public class SalesMapper extinde MapReduceBase implementează Mapper
Fiecare clasă de mapare trebuie extinsă din clasa MapReduceBase și trebuie să implementeze interfața Mapper .
2. Definirea funcției „hartă”
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Partea principală a clasei Mapper este o metodă „map ()” care acceptă patru argumente.
La fiecare apel către metoda „map ()” , este transmisă o pereche cheie-valoare ( „cheie” și „valoare” în acest cod).
metoda 'map ()' începe prin divizarea textului de intrare care este primit ca argument. Folosește tokenizer pentru a împărți aceste rânduri în cuvinte.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Aici, ',' este folosit ca delimitator.
După aceasta, se formează o pereche folosind o înregistrare la indexul 7 al matricei „SingleCountryData” și o valoare „1” .
output.collect (text nou (SingleCountryData [7]), unul);
Alegem înregistrarea la indexul 7, deoarece avem nevoie de date de țară și este situată la indexul 7 în matricea „SingleCountryData” .
Notă Vă rugăm ca datele noastre de intrare este în formatul de mai jos ( în cazul în care Țara se află la 7 mii de index, cu 0 ca un indice de pornire) -
Data_transacției, Produs, Preț, Tip_plată, Nume, Oraș, Stat, Țară , Cont_Creat, Last_Login, Latitudine, Longitudine
O ieșire a mapperului este din nou o pereche cheie-valoare care este emisă folosind metoda „collect ()” a „OutputCollector” .
Explicația clasei SalesCountryReducer
În această secțiune, vom înțelege implementarea clasei SalesCountryReducer .
1. Începem prin a specifica un nume al pachetului pentru clasa noastră. SalesCountry este un nume al pachetului. Vă rugăm să rețineți că rezultatul compilării, SalesCountryReducer.class va intra într-un director numit cu acest nume de pachet: SalesCountry .
Urmată de aceasta, importăm pachete de bibliotecă.
Mai jos arată instantaneu o implementare a SalesCountryReducer clas
Explicatie cod:
1. Definiția clasei SalesCountryReducer-
public class SalesCountryReducer extinde MapReduceBase implementează Reducer
Aici, primele două tipuri de date, „Text” și „IntWritable” sunt tipul de date al valorii cheie de intrare la reductor.
Ieșirea cartografului este sub forma
Ultimele două tipuri de date, „Text” și „IntWritable” sunt tipuri de date de ieșire generate de reductor sub formă de pereche cheie-valoare.
Fiecare clasă de reductor trebuie extinsă din clasa MapReduceBase și trebuie să implementeze interfața Reducer .
2. Definirea funcției „reduce”
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
O intrare în metoda reduce () este o cheie cu o listă de valori multiple.
De exemplu, în cazul nostru, va fi
Acest lucru este dat reductorului ca
Deci, pentru a accepta argumentele acestei forme, sunt utilizate primele două tipuri de date, și anume, Text și Iterator
Următorul argument este de tip OutputCollector
metoda reduce () începe prin copierea valorii cheii și inițializarea numărului de frecvențe la 0.
Cheie text = t_cheie; int frequencyForCountry = 0;
Apoi, folosind bucla „while” , parcurgem lista de valori asociate cheii și calculăm frecvența finală prin însumarea tuturor valorilor.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Acum, împingem rezultatul către colectorul de ieșire sub forma cheii și a numărului de frecvențe obținut .
Codul de mai jos face acest lucru-
output.collect(key, new IntWritable(frequencyForCountry));
Explicația clasei SalesCountryDriver
În această secțiune, vom înțelege implementarea clasei SalesCountryDriver
1. Începem prin a specifica un nume de pachet pentru clasa noastră. SalesCountry este un nume al pachetului. Vă rugăm să rețineți că rezultatul compilării, SalesCountryDriver.class va intra în directorul denumit de acest nume de pachet: SalesCountry .
Iată o linie care specifică numele pachetului urmat de cod pentru a importa pachetele bibliotecii.
2. Definiți o clasă de drivere care va crea un nou job de client, obiect de configurare și va promova clasele Mapper și Reducer.
Clasa șoferului este responsabilă pentru setarea jobului nostru MapReduce pentru a rula în Hadoop. În această clasă, specificăm numele lucrării, tipul de date de intrare / ieșire și numele claselor de mapare și reductoare .
3. În fragmentul de cod de mai jos, setăm directoare de intrare și ieșire care sunt utilizate pentru a consuma setul de date de intrare și, respectiv, pentru a produce ieșire.
arg [0] și arg [1] sunt argumentele din linia de comandă transmise cu o comandă dată în MapReduce hands-on, adică
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Declanșează-ne treaba
Mai jos codul începe executarea jobului MapReduce-
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}