본문 바로가기

보물창고/Big Data

Spark WordCount 데이터 마다 주석(파악 중)

반응형


Spark WordCount MapReduce 샘플 코드 분석 중입니다

코드 원본 중간중간에 데이터를 출력해서 확인하고 있습니다









실행 환경

CentOs 6.5, CDH5(hadoop2.3.0 CDH 5.0.3, Spark1.0.0 CDH5.1.0 )

Spark on YARN - client mode

한대의 노드에서 테스트


실행 스크립트

spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --deploy-mode client --master yarn /data01/SparkExample/sparkwordcount-0.0.1-SNAPSHOT.jar SparkExample/inputfile.txt


코드 원본위치

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java


Input file data (inputfile.txt)

apple

banana counter

counter one two three

three one

five seven eight

twenty one three five counter six

one siz helga

apple banana fiver 



코드에 주석

package com.cloudera.sparkwordcount;

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/


import java.util.Arrays;

import java.util.List;

import java.util.regex.Pattern;


import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;


import scala.Tuple2;


public final class JavaWordCount {

private static final Pattern SPACE = Pattern.compile(" ");


public static void main(String[] args) throws Exception {


if (args.length < 1) {

System.err.println("Usage: JavaWordCount <file>");

System.exit(1);

}


SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");

JavaSparkContext ctx = new JavaSparkContext(sparkConf);

JavaRDD<String> lines = ctx.textFile(args[0], 1);


JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

@Override

public Iterable<String> call(String s) {

System.out.println("words");

System.out.println("String s : "+ s); // String s : apple, String s : banana counter, String s : counter one two three, String s : three one , ...

Iterable<String> a = Arrays.asList(SPACE.split(s));

System.out.println("Arrays.asList(SPACE.split(s) : " + a); //Arrays.asList(SPACE.split(s) : [apple], Arrays.asList(SPACE.split(s) : [banana, counter], Arrays.asList(SPACE.split(s) : [counter, one, two, three], ...

return a;

}

});


JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {

@Override

public Tuple2<String, Integer> call(String s) {

System.out.println("ones");

System.out.println("String s : "+ s); // String s : apple, String s : banana, String s : counter, String s : counter, String s : one, String s : two, String s : three, ...

Tuple2<String, Integer> a = new Tuple2<String, Integer>(s, 1);

System.out.println("new Tuple2<String, Integer>(s, 1) : " + a); // new Tuple2<String, Integer>(s, 1) : (apple,1), (banana,1), (counter,1), (counter,1)

return a;

}

});


// 하둡 reduce와 마찬가지로 정렬되어옮?

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

// :1, :1, :1의 값 부분을 계속 더하는 구조?

@Override

public Integer call(Integer i1, Integer i2) {

System.out.println("counts");

System.out.println("Integer i1 : "+i1+ " Integer i2 : "+i2); // Integer i1 : 1 Integer i2 : 1, Integer i1 : 2 Integer i2 : 1, Integer i1 : 1 Integer i2 : 1, Integer i1 : 3 Integer i2 : 1

System.out.println("i1 + i2 : "+(i1 + i2)); // i1 + i2 : 2, i1 + i2 : 2, i1 + i2 : 3, i1 + i2 : 4

return i1 + i2;

}

});


//여기서 부터는 클라이언트 드라이버에서 동작 (위에는 YARN내부에서 동작, logHDFS에 있음)

List<Tuple2<String, Integer>> output = counts.collect();

System.out.println("output");

System.out.println("output : "+ output); // output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)]

for (Tuple2<?,?> tuple : output) {

System.out.println(tuple._1() + ": " + tuple._2());

}

ctx.stop();

}

}


log 결과1 - HDFS

/var/log/hadoop-yarn/apps/inbrein/logs/application_1405498888069_0217

words

String s : apple

Arrays.asList(SPACE.split(s) : [apple]

ones

String s : apple

new Tuple2<String, Integer>(s, 1) : (apple,1)

words

String s : banana counter

Arrays.asList(SPACE.split(s) : [banana, counter]

ones

String s : banana

new Tuple2<String, Integer>(s, 1) : (banana,1)

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

words

String s : counter one two three

Arrays.asList(SPACE.split(s) : [counter, one, two, three]

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

ones

String s : two

new Tuple2<String, Integer>(s, 1) : (two,1)

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

words

String s : three one

Arrays.asList(SPACE.split(s) : [three, one]

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

words

String s : five seven eight

Arrays.asList(SPACE.split(s) : [five, seven, eight]

ones

String s : five

new Tuple2<String, Integer>(s, 1) : (five,1)

ones

String s : seven

new Tuple2<String, Integer>(s, 1) : (seven,1)

ones

String s : eight

new Tuple2<String, Integer>(s, 1) : (eight,1)

words

String s : twenty one three five counter six

Arrays.asList(SPACE.split(s) : [twenty, one, three, five, counter, six]

ones

String s : twenty

new Tuple2<String, Integer>(s, 1) : (twenty,1)

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : five

new Tuple2<String, Integer>(s, 1) : (five,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : six

new Tuple2<String, Integer>(s, 1) : (six,1)

words

String s : one siz helga

Arrays.asList(SPACE.split(s) : [one, siz, helga]

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 3 Integer i2 : 1

i1 + i2 : 4

ones

String s : siz

new Tuple2<String, Integer>(s, 1) : (siz,1)

ones

String s : helga

new Tuple2<String, Integer>(s, 1) : (helga,1)

words

String s : apple banana fiver

Arrays.asList(SPACE.split(s) : [apple, banana, fiver]

ones

String s : apple

new Tuple2<String, Integer>(s, 1) : (apple,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : banana

new Tuple2<String, Integer>(s, 1) : (banana,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : fiver

new Tuple2<String, Integer>(s, 1) : (fiver,1)

words

String s : 

Arrays.asList(SPACE.split(s) : []

ones

String s : 

new Tuple2<String, Integer>(s, 1) : (,1)


log 결과2 - console
stdout

output

output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)]

counter: 3

banana: 2

two: 1

seven: 1

eight: 1

one: 4

fiver: 1

siz: 1

: 1

six: 1

apple: 2

three: 3

helga: 1

five: 2

twenty: 1


반응형