Spark WordCount MapReduce 샘플 코드 분석 중입니다
코드 원본 중간중간에 데이터를 출력해서 확인하고 있습니다
- spark 스터디할 수 있는 링크 정리
http://paranwater.tistory.com/413
실행 환경
CentOs 6.5, CDH5(hadoop2.3.0 CDH 5.0.3, Spark1.0.0 CDH5.1.0 )
Spark on YARN - client mode
한대의 노드에서 테스트
실행 스크립트
spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --deploy-mode client --master yarn /data01/SparkExample/sparkwordcount-0.0.1-SNAPSHOT.jar SparkExample/inputfile.txt
코드 원본위치
Input file data (inputfile.txt)
apple banana counter counter one two three three one five seven eight twenty one three five counter six one siz helga apple banana fiver |
코드에 주석
package com.cloudera.sparkwordcount; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
import java.util.Arrays; import java.util.List; import java.util.regex.Pattern;
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); }
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { System.out.println("words"); System.out.println("String s : "+ s); // String s : apple, String s : banana counter, String s : counter one two three, String s : three one , ...
Iterable<String> a = Arrays.asList(SPACE.split(s));
System.out.println("Arrays.asList(SPACE.split(s) : " + a); //Arrays.asList(SPACE.split(s) : [apple], Arrays.asList(SPACE.split(s) : [banana, counter], Arrays.asList(SPACE.split(s) : [counter, one, two, three], ...
return a; } });
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { System.out.println("ones"); System.out.println("String s : "+ s); // String s : apple, String s : banana, String s : counter, String s : counter, String s : one, String s : two, String s : three, ...
Tuple2<String, Integer> a = new Tuple2<String, Integer>(s, 1); System.out.println("new Tuple2<String, Integer>(s, 1) : " + a); // new Tuple2<String, Integer>(s, 1) : (apple,1), (banana,1), (counter,1), (counter,1)
return a; } });
// 하둡 reduce와 마찬가지로 정렬되어옮? JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { // 키:1, 키:1, 키:1의 값 부분을 계속 더하는 구조? @Override public Integer call(Integer i1, Integer i2) {
System.out.println("counts"); System.out.println("Integer i1 : "+i1+ " Integer i2 : "+i2); // Integer i1 : 1 Integer i2 : 1, Integer i1 : 2 Integer i2 : 1, Integer i1 : 1 Integer i2 : 1, Integer i1 : 3 Integer i2 : 1
System.out.println("i1 + i2 : "+(i1 + i2)); // i1 + i2 : 2, i1 + i2 : 2, i1 + i2 : 3, i1 + i2 : 4
return i1 + i2; } });
//여기서 부터는 클라이언트 드라이버에서 동작 (위에는 YARN내부에서 동작, log도 HDFS에 있음) List<Tuple2<String, Integer>> output = counts.collect(); System.out.println("output"); System.out.println("output : "+ output); // output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)]
for (Tuple2<?,?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } ctx.stop(); } } |
log 결과1 - HDFS
/var/log/hadoop-yarn/apps/inbrein/logs/application_1405498888069_0217
words String s : apple Arrays.asList(SPACE.split(s) : [apple] ones String s : apple new Tuple2<String, Integer>(s, 1) : (apple,1) words String s : banana counter Arrays.asList(SPACE.split(s) : [banana, counter] ones String s : banana new Tuple2<String, Integer>(s, 1) : (banana,1) ones String s : counter new Tuple2<String, Integer>(s, 1) : (counter,1) words String s : counter one two three Arrays.asList(SPACE.split(s) : [counter, one, two, three] ones String s : counter new Tuple2<String, Integer>(s, 1) : (counter,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 ones String s : one new Tuple2<String, Integer>(s, 1) : (one,1) ones String s : two new Tuple2<String, Integer>(s, 1) : (two,1) ones String s : three new Tuple2<String, Integer>(s, 1) : (three,1) words String s : three one Arrays.asList(SPACE.split(s) : [three, one] ones String s : three new Tuple2<String, Integer>(s, 1) : (three,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 ones String s : one new Tuple2<String, Integer>(s, 1) : (one,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 words String s : five seven eight Arrays.asList(SPACE.split(s) : [five, seven, eight] ones String s : five new Tuple2<String, Integer>(s, 1) : (five,1) ones String s : seven new Tuple2<String, Integer>(s, 1) : (seven,1) ones String s : eight new Tuple2<String, Integer>(s, 1) : (eight,1) words String s : twenty one three five counter six Arrays.asList(SPACE.split(s) : [twenty, one, three, five, counter, six] ones String s : twenty new Tuple2<String, Integer>(s, 1) : (twenty,1) ones String s : one new Tuple2<String, Integer>(s, 1) : (one,1) counts Integer i1 : 2 Integer i2 : 1 i1 + i2 : 3 ones String s : three new Tuple2<String, Integer>(s, 1) : (three,1) counts Integer i1 : 2 Integer i2 : 1 i1 + i2 : 3 ones String s : five new Tuple2<String, Integer>(s, 1) : (five,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 ones String s : counter new Tuple2<String, Integer>(s, 1) : (counter,1) counts Integer i1 : 2 Integer i2 : 1 i1 + i2 : 3 ones String s : six new Tuple2<String, Integer>(s, 1) : (six,1) words String s : one siz helga Arrays.asList(SPACE.split(s) : [one, siz, helga] ones String s : one new Tuple2<String, Integer>(s, 1) : (one,1) counts Integer i1 : 3 Integer i2 : 1 i1 + i2 : 4 ones String s : siz new Tuple2<String, Integer>(s, 1) : (siz,1) ones String s : helga new Tuple2<String, Integer>(s, 1) : (helga,1) words String s : apple banana fiver Arrays.asList(SPACE.split(s) : [apple, banana, fiver] ones String s : apple new Tuple2<String, Integer>(s, 1) : (apple,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 ones String s : banana new Tuple2<String, Integer>(s, 1) : (banana,1) counts Integer i1 : 1 Integer i2 : 1 i1 + i2 : 2 ones String s : fiver new Tuple2<String, Integer>(s, 1) : (fiver,1) words String s : Arrays.asList(SPACE.split(s) : [] ones String s : new Tuple2<String, Integer>(s, 1) : (,1) |
log 결과2 - console
stdout
output output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)] counter: 3 banana: 2 two: 1 seven: 1 eight: 1 one: 4 fiver: 1 siz: 1 : 1 six: 1 apple: 2 three: 3 helga: 1 five: 2 twenty: 1 |
'보물창고 > Big Data' 카테고리의 다른 글
윈도우 하둡 스터디 하면서 본 MS 한글 문서 Getting Started Azure HDInsight 한글 문서들 Window hadoop (0) | 2014.10.28 |
---|---|
윈도우 하둡 종류 정리 [window hadoop] (0) | 2014.10.23 |
윈도우 하둡 window hadoop HDInsight Azure 압축 호환에 관련된 글 (Gzip, Bzip2, LZO, LZ4, snappy) (0) | 2014.10.21 |
[발번역] Apache Spark 리소스 매니지먼트와 YARN App 모델 (Apache Spark Resource Management and YARN App Models) (0) | 2014.07.25 |
CDH 5.1.0 Documentation Running Spark Application을 발번역한 내요입니다 (1) | 2014.07.23 |
Spark Cluster Manager Types (스파크 클러스터 매니저 타입 3종류 번역) (0) | 2014.07.22 |
Spark 스터디 하둡에코 (hadoop) (0) | 2014.07.10 |
Hortonworks 샌드박스에 있는 Ambari 관련 내용 번역 입니다 (sandbox) (0) | 2013.05.06 |