Hadoop & Exemple Mapreduce: Creați primul program în Java

Cuprins:

Anonim

În acest tutorial, veți învăța să utilizați Hadoop cu exemple de MapReduce. Datele de intrare utilizate sunt SalesJan2009.csv. Conține informații legate de vânzări, cum ar fi numele produsului, prețul, modul de plată, orașul, țara clientului etc. Scopul este de a afla numărul de produse vândute în fiecare țară.

În acest tutorial, veți învăța-

  • Primul program Hadoop MapReduce
  • Explicația clasei SalesMapper
  • Explicația clasei SalesCountryReducer
  • Explicația clasei SalesCountryDriver

Primul program Hadoop MapReduce

Acum, în acest tutorial MapReduce, vom crea primul nostru program Java MapReduce:

Date de vânzări2009

Asigurați-vă că ați instalat Hadoop. Înainte de a începe cu procesul propriu-zis, schimbați utilizatorul în „hduser” (id utilizat în timpul configurației Hadoop, puteți trece la user-ul utilizat în timpul configurării de programare Hadoop).

su - hduser_

Pasul 1)

Creați un director nou cu numele MapReduceTutorial ca shwon în exemplul de mai jos MapReduce

sudo mkdir MapReduceTutorial

Acordați permisiuni

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Descărcați fișiere aici

Verificați permisiunile de fișiere ale tuturor acestor fișiere

și dacă lipsesc permisiunile „citire”, atunci acordați același

Pasul 2)

Exportați classpath așa cum se arată în exemplul Hadoop de mai jos

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Pasul 3)

Compilați fișiere Java (aceste fișiere sunt prezente în directorul Final-MapReduceHandsOn ). Fișierele sale de clasă vor fi introduse în directorul pachetului

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Acest avertisment poate fi ignorat în siguranță.

Această compilație va crea un director într-un director curent numit cu numele pachetului specificat în fișierul sursă java (adică SalesCountry în cazul nostru) și va pune toate fișierele de clasă compilate în el.

Pasul 4)

Creați un fișier nou Manifest.txt

sudo gedit Manifest.txt

adăugați următoarele rânduri,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver este numele clasei principale. Vă rugăm să rețineți că trebuie să apăsați tasta Enter la sfârșitul acestei linii.

Pasul 5)

Creați un fișier Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Verificați dacă fișierul jar este creat

Pasul 6)

Porniți Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Pasul 7)

Copiați fișierul SalesJan2009.csv în ~ / inputMapReduce

Acum utilizați comanda de mai jos pentru a copia ~ / inputMapReduce în HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Putem ignora în siguranță acest avertisment.

Verificați dacă un fișier este copiat sau nu.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Pasul 8)

Rulați jobul MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Aceasta va crea un director de ieșire numit mapreduce_output_sales pe HDFS. Conținutul acestui director va fi un fișier care conține vânzările de produse pe țară.

Pasul 9)

Rezultatul poate fi văzut prin interfața de comandă ca,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Rezultatele pot fi văzute și printr-o interfață web

Deschideți r într-un browser web.

Acum selectați „Răsfoiți sistemul de fișiere” și navigați la / mapreduce_output_sales

Deschideți partea-r-00000

Explicația clasei SalesMapper

În această secțiune, vom înțelege implementarea clasei SalesMapper .

1. Începem prin a specifica un nume de pachet pentru clasa noastră. SalesCountry este un nume al pachetului nostru. Vă rugăm să rețineți că rezultatul compilării, SalesMapper.class va intra într-un director numit de acest nume de pachet: SalesCountry .

Urmată de aceasta, importăm pachete de bibliotecă.

Mai jos arată instantaneu o implementare a SalesMapper clas

Exemplu de explicație a codului:

1. Definiția clasei SalesMapper-

public class SalesMapper extinde MapReduceBase implementează Mapper {

Fiecare clasă de mapare trebuie extinsă din clasa MapReduceBase și trebuie să implementeze interfața Mapper .

2. Definirea funcției „hartă”

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Partea principală a clasei Mapper este o metodă „map ()” care acceptă patru argumente.

La fiecare apel către metoda „map ()” , este transmisă o pereche cheie-valoare ( „cheie” și „valoare” în acest cod).

metoda 'map ()' începe prin divizarea textului de intrare care este primit ca argument. Folosește tokenizer pentru a împărți aceste rânduri în cuvinte.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Aici, ',' este folosit ca delimitator.

După aceasta, se formează o pereche folosind o înregistrare la indexul 7 al matricei „SingleCountryData” și o valoare „1” .

output.collect (text nou (SingleCountryData [7]), unul);

Alegem înregistrarea la indexul 7, deoarece avem nevoie de date de țară și este situată la indexul 7 în matricea „SingleCountryData” .

Notă Vă rugăm ca datele noastre de intrare este în formatul de mai jos ( în cazul în care Țara se află la 7 mii de index, cu 0 ca un indice de pornire) -

Data_transacției, Produs, Preț, Tip_plată, Nume, Oraș, Stat, Țară , Cont_Creat, Last_Login, Latitudine, Longitudine

O ieșire a mapperului este din nou o pereche cheie-valoare care este emisă folosind metoda „collect ()” a „OutputCollector” .

Explicația clasei SalesCountryReducer

În această secțiune, vom înțelege implementarea clasei SalesCountryReducer .

1. Începem prin a specifica un nume al pachetului pentru clasa noastră. SalesCountry este un nume al pachetului. Vă rugăm să rețineți că rezultatul compilării, SalesCountryReducer.class va intra într-un director numit cu acest nume de pachet: SalesCountry .

Urmată de aceasta, importăm pachete de bibliotecă.

Mai jos arată instantaneu o implementare a SalesCountryReducer clas

Explicatie cod:

1. Definiția clasei SalesCountryReducer-

public class SalesCountryReducer extinde MapReduceBase implementează Reducer {

Aici, primele două tipuri de date, „Text” și „IntWritable” sunt tipul de date al valorii cheie de intrare la reductor.

Ieșirea cartografului este sub forma , . Această ieșire a cartografului devine intrare în reductor. Deci, pentru a se alinia cu tipul său de date, Text și IntWritable sunt utilizate ca tip de date aici.

Ultimele două tipuri de date, „Text” și „IntWritable” sunt tipuri de date de ieșire generate de reductor sub formă de pereche cheie-valoare.

Fiecare clasă de reductor trebuie extinsă din clasa MapReduceBase și trebuie să implementeze interfața Reducer .

2. Definirea funcției „reduce”

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

O intrare în metoda reduce () este o cheie cu o listă de valori multiple.

De exemplu, în cazul nostru, va fi

, , , , , .

Acest lucru este dat reductorului ca

Deci, pentru a accepta argumentele acestei forme, sunt utilizate primele două tipuri de date, și anume, Text și Iterator . Textul este un tip de date de cheie, iar Iterator este un tip de date pentru lista de valori pentru acea cheie.

Următorul argument este de tip OutputCollector care colectează ieșirea fazei reductorului.

metoda reduce () începe prin copierea valorii cheii și inițializarea numărului de frecvențe la 0.

Cheie text = t_cheie; int frequencyForCountry = 0;

Apoi, folosind bucla „while” , parcurgem lista de valori asociate cheii și calculăm frecvența finală prin însumarea tuturor valorilor.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Acum, împingem rezultatul către colectorul de ieșire sub forma cheii și a numărului de frecvențe obținut .

Codul de mai jos face acest lucru-

output.collect(key, new IntWritable(frequencyForCountry));

Explicația clasei SalesCountryDriver

În această secțiune, vom înțelege implementarea clasei SalesCountryDriver

1. Începem prin a specifica un nume de pachet pentru clasa noastră. SalesCountry este un nume al pachetului. Vă rugăm să rețineți că rezultatul compilării, SalesCountryDriver.class va intra în directorul denumit de acest nume de pachet: SalesCountry .

Iată o linie care specifică numele pachetului urmat de cod pentru a importa pachetele bibliotecii.

2. Definiți o clasă de drivere care va crea un nou job de client, obiect de configurare și va promova clasele Mapper și Reducer.

Clasa șoferului este responsabilă pentru setarea jobului nostru MapReduce pentru a rula în Hadoop. În această clasă, specificăm numele lucrării, tipul de date de intrare / ieșire și numele claselor de mapare și reductoare .

3. În fragmentul de cod de mai jos, setăm directoare de intrare și ieșire care sunt utilizate pentru a consuma setul de date de intrare și, respectiv, pentru a produce ieșire.

arg [0] și arg [1] sunt argumentele din linia de comandă transmise cu o comandă dată în MapReduce hands-on, adică

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Declanșează-ne treaba

Mai jos codul începe executarea jobului MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}