Hadoop & Ejemplos de Mapreduce: crear el primer programa en Java

En este tutorial, aprenderá a usar Hadoop con ejemplos de MapReduce. Los datos de entrada utilizados son SalesJan2009.csv. Contiene información relacionada con las ventas como nombre del producto, precio, modo de pago, ciudad, país del cliente, etc. El objetivo es averiguar la cantidad de productos vendidos en cada país.

En este tutorial, aprenderá:

  • Primer programa MapReduce de Hadoop
  • Explicación de la clase SalesMapper
  • Explicación de la clase SalesCountryReducer
  • Explicación de la clase SalesCountryDriver

Primer programa MapReduce de Hadoop

Ahora, en este tutorial de MapReduce, crearemos nuestro primer programa Java MapReduce:

Datos de ventasEne2009

Asegúrese de tener Hadoop instalado. Antes de comenzar con el proceso real, cambie el usuario a 'hduser' (el ID utilizado durante la configuración de Hadoop, puede cambiar al ID de usuario utilizado durante la configuración de la programación de Hadoop).

su - hduser_

Paso 1)

Cree un nuevo directorio con el nombre MapReduceTutorial como se muestra en el siguiente ejemplo de MapReduce

sudo mkdir MapReduceTutorial

Dar permisos

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Descargar archivos aquí

Verifique los permisos de archivo de todos estos archivos

y si faltan permisos de 'lectura', conceda los mismos

Paso 2)

Exportar classpath como se muestra en el siguiente ejemplo de Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Paso 3)

Compile archivos Java (estos archivos están presentes en el directorio Final-MapReduceHandsOn ). Sus archivos de clase se colocarán en el directorio del paquete.

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Esta advertencia se puede ignorar con seguridad.

Esta compilación creará un directorio en un directorio actual con el nombre del paquete especificado en el archivo fuente de Java (es decir, SalesCountry en nuestro caso) y colocará todos los archivos de clases compilados en él.

Paso 4)

Cree un nuevo archivo Manifest.txt

sudo gedit Manifest.txt

agregue las siguientes líneas,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver es el nombre de la clase principal. Tenga en cuenta que debe presionar la tecla Intro al final de esta línea.

Paso 5)

Crear un archivo Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Verifique que se haya creado el archivo jar

Paso 6)

Iniciar Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Paso 7)

Copie el archivo SalesJan2009.csv en ~ / inputMapReduce

Ahora use el siguiente comando para copiar ~ / inputMapReduce a HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Podemos ignorar con seguridad esta advertencia.

Verifique si un archivo está realmente copiado o no.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Paso 8)

Ejecutar el trabajo MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Esto creará un directorio de salida llamado mapreduce_output_sales en HDFS. El contenido de este directorio será un archivo con las ventas de productos por país.

Paso 9)

El resultado se puede ver a través de la interfaz de comando como,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Los resultados también se pueden ver a través de una interfaz web como-

Abra r en un navegador web.

Ahora seleccione 'Examinar el sistema de archivos' y navegue hasta / mapreduce_output_sales

Parte abierta -r-00000

Explicación de la clase SalesMapper

En esta sección, entenderemos la implementación de la clase SalesMapper .

1. Comenzamos especificando un nombre de paquete para nuestra clase. SalesCountry es un nombre de nuestro paquete. Tenga en cuenta que la salida de la compilación, SalesMapper.class irá a un directorio con este nombre de paquete: SalesCountry .

Seguido de esto, importamos paquetes de bibliotecas.

La siguiente instantánea muestra una implementación de la clase SalesMapper-

Explicación del código de muestra:

1. Definición de la clase SalesMapper-

SalesMapper de clase pública extiende MapReduceBase implementa Mapper {

Cada clase de mapeador debe extenderse desde la clase MapReduceBase y debe implementar la interfaz de mapeador .

2. Definición de la función 'mapa'

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

La parte principal de la clase Mapper es un método 'map ()' que acepta cuatro argumentos.

En cada llamada al método 'map ()' , se pasa un par clave-valor ( 'clave' y 'valor' en este código).

El método 'map ()' comienza dividiendo el texto de entrada que se recibe como argumento. Utiliza el tokenizador para dividir estas líneas en palabras.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Aquí, ',' se usa como delimitador.

Después de esto, se forma un par utilizando un registro en el séptimo índice de la matriz 'SingleCountryData' y un valor '1' .

output.collect (nuevo texto (SingleCountryData [7]), uno);

Elegimos el registro en el séptimo índice porque necesitamos datos del país y se encuentra en el séptimo índice en la matriz 'SingleCountryData' .

Tenga en cuenta que nuestros datos de entrada se encuentra en el siguiente formato (donde País es a las 7 º índice, con 0 como un índice de partida) -

Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitud, Longitud

Una salida de mapeador es nuevamente un par clave-valor que se genera usando el método 'collect ()' de 'OutputCollector' .

Explicación de la clase SalesCountryReducer

En esta sección, entenderemos la implementación de la clase SalesCountryReducer .

1. Comenzamos especificando un nombre del paquete para nuestra clase. SalesCountry es un nombre de nuestro paquete. Tenga en cuenta que la salida de la compilación, SalesCountryReducer.class irá a un directorio con este nombre de paquete: SalesCountry .

Seguido de esto, importamos paquetes de bibliotecas.

La siguiente instantánea muestra una implementación de la clase SalesCountryReducer-

Explicación del código:

1. Definición de la clase SalesCountryReducer-

Public class SalesCountryReducer extiende MapReduceBase implementa Reducer {

Aquí, los dos primeros tipos de datos, 'Texto' e 'IntWritable' son el tipo de datos del valor clave de entrada al reductor.

La salida del asignador tiene el formato , . Esta salida del mapeador se convierte en entrada del reductor. Entonces, para alinearse con su tipo de datos, Text e IntWritable se usan como tipo de datos aquí.

Los dos últimos tipos de datos, 'Texto' e 'IntWritable' son tipos de datos de salida generados por reductor en forma de par clave-valor.

Cada clase de reductor debe extenderse desde la clase MapReduceBase y debe implementar la interfaz Reducer .

2. Definición de la función 'reducir'

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Una entrada al método reduce () es una clave con una lista de múltiples valores.

Por ejemplo, en nuestro caso, será-

, , , , , .

Esto se le da al reductor como

Entonces, para aceptar argumentos de esta forma, se utilizan los dos primeros tipos de datos, a saber, Text e Iterator . El texto es un tipo de datos de clave e Iterator es un tipo de datos para la lista de valores de esa clave.

El siguiente argumento es de tipo OutputCollector que recopila la salida de la fase reductora.

El método reduce () comienza copiando el valor de la clave e inicializando el recuento de frecuencia a 0.

Text key = t_key; int FrequencyForCountry = 0;

Luego, usando el ciclo ' while ' , iteramos a través de la lista de valores asociados con la clave y calculamos la frecuencia final sumando todos los valores.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Ahora, enviamos el resultado al colector de salida en forma de clave y recuento de frecuencia obtenido .

El siguiente código hace esto-

output.collect(key, new IntWritable(frequencyForCountry));

Explicación de la clase SalesCountryDriver

En esta sección, entenderemos la implementación de la clase SalesCountryDriver

1. Comenzamos especificando un nombre de paquete para nuestra clase. SalesCountry es un nombre de nuestro paquete. Tenga en cuenta que la salida de la compilación, SalesCountryDriver.class irá al directorio con este nombre de paquete: SalesCountry .

Aquí hay una línea que especifica el nombre del paquete seguido de un código para importar paquetes de biblioteca.

2. Defina una clase de controlador que creará un nuevo trabajo de cliente, un objeto de configuración y anunciará las clases Mapper y Reducer.

La clase de controlador es responsable de configurar nuestro trabajo MapReduce para que se ejecute en Hadoop. En esta clase, especificamos el nombre del trabajo, el tipo de datos de entrada / salida y los nombres de las clases de asignador y reductor .

3. En el siguiente fragmento de código, configuramos los directorios de entrada y salida que se utilizan para consumir el conjunto de datos de entrada y producir la salida, respectivamente.

arg [0] y arg [1] son los argumentos de la línea de comandos que se pasan con un comando dado en MapReduce, es decir,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Activa nuestro trabajo

Debajo del código, comience la ejecución del trabajo MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}

Articulos interesantes...