보물창고/Big Data2017.03.22 23:00






아임클라우드는 빅데이터 기반 기술과 인공지능 기술을 개발하는 빅데이터 및 인공지능 전문 기업입니다.

아임클라우드의 Hadoop 기반 빅데이터 시스템은 국내 최고의 전문가 그룹으로 구성되며 고객 여러분의 빅데이터 고민을 해결해 드립니다. 인공지능의 영역은 무궁무진합니다. 강력한 DeepLearning 기술을 통해 가장 필요한 일을 함께 하는 파트너가 되어드립니다.





Imcloud Apache Cloudera CDH kudu 

관련 자료 정리 (hadoop echo)




Kudu는 Hadoop과 Hbase(phoenix)사이의 틈새를 공략한 솔루션 입니다.
 - 빅데이터의 분산 저장과 처리를 하면서 CRUD를 할 수 있는 환경에 대한 요구가 많았습니다. 기존에는 Hbase(phoenix)와 Impala가 일부 이런 요구들을 매꿔 왔으나 기능적으로 여전히 부족한 면이 있어 Kudu같은 솔루션이 나오게 되었습니다.
 - Kudu는 한마디로 말하면 Nosql DB의 엔진으로 이야기 할 수 있습니다.

데이터 저장 & 이용
 - Kudu는 Raft라는 기술을 이용하여 디스크에 직접 데이터를 읽고 쓰는 서버를  구동시키며 윗 레이어에  Impala를 얹어 사용하는 방법이 현 시점에서 가장 일반적인 사용방법이 입니다. java와 C++ API를 이용하여 Kudu데이터를 직접 이용할 수 있는 방법도 제공합니다.

성능
 - 아래에 링크된 논문의 내용을 보면 대략 Impala + parquet 의 성능과 비슷합니다. 성능이 비슷하지만 Kudu를 무시하지 못할 이점은 CRUD를 할 수 있다는 것이죠.

key 설정
 - Kudu에 안내되어 있는 내용을 보면 Hbase의 Split 설정과 MongoDB의 Shard 키와 비슷한 Key를 설정하게 되어있습니다
 - 키의 역할은 데이터를 구분하는데 사용되며 이 역시 Hbase와 MongoDB처럼 데이터를 구분하여 저장하는 기준이 되기 때문에 이를 설정하는 것에 따른 튜닝이 필요할 것으로 보입니다


현재 현업에서 사용하고 있고 적용하려고 준비하는 곳들이 있습니다. 성능적인 면은 기대를 하고 있으며 분산처리를 하면서 CRUD를 할 수 있는 Hadoop 에코가 출시한것에 대해 기대가 높은것으로 보입니다.






설치
Cloudera dependency install






Kudu 개념적인 설명


기타 설명


Kudu에대한 자주 묻는 질문(Kudu의 구성과 사용 가능 범위에 대한 내용이 있음)

Frequently Asked Questions

https://kudu.apache.org/faq.html


Kubu 스키마 디자인할대 고려해야할 사항

구글 번역

https://translate.google.co.kr/translate?sl=en&tl=ko&js=y&prev=_t&hl=ko&ie=UTF-8&u=https%3A%2F%2Fkudu.apache.org%2Foverview.html&edit-text=&act=url


논문

Hbase+ Phoenix, Kudu+impala, imapla+Parquet의 성능 비교

Kudu: Storage for Fast Analytics on Fast Data

https://kudu.apache.org/kudu.pdf

구글 번역

https://translate.google.co.kr/translate?sl=en&tl=ko&js=y&prev=_t&hl=ko&ie=UTF-8&u=https%3A%2F%2Fkudu.apache.org%2Foverview.html&edit-text=&act=url


 




데이터 저장 구조


데이터 복제와 안정성을위해 사용된 알고리즘

Raft consensus algorithm

Paxos 의 대안으로 고안된 합의 알고리즘

https://raft.github.io/

위키 Raft  구글 번역

https://translate.google.co.kr/translate?sl=en&tl=ko&js=y&prev=_t&hl=ko&ie=UTF-8&u=https%3A%2F%2Fen.wikipedia.org%2Fwiki%2FRaft_%28computer_science%29&edit-text=&act=url



Paxos

합의를 해결하기위한 프로토콜 모음

위키 Paxos 구글 번역

https://translate.google.co.kr/translate?sl=en&tl=ko&js=y&prev=_t&hl=ko&ie=UTF-8&u=https%3A%2F%2Fen.wikipedia.org%2Fwiki%2FRaft_%28computer_science%29&edit-text=&act=url






기타


Using Apache Kudu with Apache Impala (incubating)

테이블 생성 관련한 내용이 포함된 링크 입니다

https://kudu.apache.org/docs/kudu_impala_integration.html



2016 02 24에 업데이트됨. 과거 버전

kudu에 대한 전체적인 설명이 있는 슬라이드 쉐어

https://www.slideshare.net/AsimJalis/apache-kudu 




Impala 데이터 타입 기준으로 Kudu 데이터 타입 제약 사항


DECIMAL, TIMESTAMP, CHAR, VARCHAR, ARRAY, MAP 및 STRUCT 데이터 유형은 Kudu 테이블과 함께 사용할 수 없습니다.

https://www.cloudera.com/documentation/enterprise/latest/topics/impala_array.html#array


BOOLEAN, FLOAT 및 DOUBLE 데이터 유형은 Kudu 테이블의 기본 키 열에 사용할 수 없습니다.

https://www.cloudera.com/documentation/enterprise/latest/topics/impala_boolean.html#boolean







신고





Posted by 파란물
보물창고/Big Data2014.07.25 22:00





Apache Spark Resource Management and YARN App Models

http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

위 문서 내용을 공부하면서 발번역 하였습니다

흐름을 파악하는데 참고해 주세요


 Spark를 스터디하면서 Spark on YARN 모드 2종류를 이해하기가 가장 어려웠습니다. 영어가 부족한데다가 글로 설명된 내용을 보니 계속 미궁속으로 빠지고 있었습니다. 번역하면서 본 이 글은 저의 궁금증을 깔끔하게 해소시켜주는 핵심적인 글이고 저외에 다른 누군가도 이 글이 도움이 되었으면 하는 바람에 블로그에 올려 놓습니다





Apache Spark 리소스 매니지먼트와 YARN App 모델 (Apache Spark Resource Management and YARN App Models)

* YARN에서 MapReduce의 클러스터 리소스 관리와 Spark에대한 차이점을 간략히 봅니다
MapReduce 이후 가장 인기 있는 Apache YARN 응용프로그램은 Apache Spark입니다
클라우데라에서 Spark-on-YARN을 안정시키기 위해 많은 노력을 했습니다(SPARK-1101) 
SPARK-1101 : https://issues.apache.org/jira/browse/SPARK-1101

이 포스트에서, Spark와 MapReduce 아키택쳐의 차이점에대해 알아볼것입니다
당신이 관심있는, YARN 클러스터 ResourceManager에서 실행하는 방법을 설명합니다

* Applications
MapReduce 에서 computation에서 최고 단위는 job입니다
job의 동작은 데이터를 로드해서 map연산 후 shuffles를하고 reduce연산한 결과를 영구저장소에 데이터를 씁니다
Spark도 비슷한 개념의 작업이 있습니다 (job하나에 더 많은 map과 reduce단계를 구상할 수 있지만)
또한 여러 job을 순차 또는 병렬로 실행할 수 있는 "application"이라는 상위 레벨 구조를 가지고 있습니다



* Spark 어플리케이션 아키택쳐 (Spark application architecture)
Spark API에 익숙한 사람들을 위해, application은 SparkContext 클래스 인스턴스에 해당합니다
application은 하나의 배치작업을 위해 사용될 수 있습니다
여러 job과 상호작용하는 세션은 시간차이를 두거나, 오래 실행되는 서버가 지속적으로 적합 합니다
MapReduce와는 달리 application은 어떤 작업도 수행하지 않습니다
이 경우에 대리자로서 Executor라는 프로세스를 갖게 됩니다.
이 방법은 빠른 액세스를 위해 메모리에 데이터를 저장 할뿐만 아니라 초고속의 작업시간을 가집니다

* Executors
MapReduce는 자체 프로세스에서 각 job을 실행합니다. job이 완료되면 프로세스가 사라집니다
Spark는 많은 job을 단일 프로세스에서 동시에 실행할 수 있으며 이 과정에 job이 실행되고 있지 않은 경우에도 Spark application의 lifetime동안 실행됩니다

이 모델의 장점은 위에 적은것과 같이,
속도 : 작업이 매우 빨리 시작되고 프로세스 메모리 내에 데이터가 있습니다
단점은 분할된 자원관리입니다
application에 대한 Executor의 수는 고정되어있고 Executor에 대한 자원이 고정되어있기 때문에 Application이 실행중인 전체 시간 동안에는 같은 자원을 사용합니다
YARN이 지원하는 컨테이너 resizing을 이용해 동적으로 자원을 주고 받는 기능을 Spark에 활용할 계획입니다

* Active Driver
job흐름과 스케줄 작업을 관리하기위해 Spark는 active driver 프로세스에 의존합니다
YARN모드일때 클러스터에서 driver를 실행할 수 있지만
일반적으로 driver 프로세스는 job을 시작하는데 사용되는 클라이언트 프로세스와 동일합니다
반면 MapReduce에서 클라이언트 프로세스가 오래 동작할때 job이 계속 실핼될 수 있습니다
jobTracker는 hadoop1.x에서 작업 일정에대한 책임이 있었고 hadoop2.x에서는 MapReduce 어플리케이션의 master역할을 맡았습니다

* 플러그 자원 관리 (Pluggable Resource Management)
Spark 형플러그 형식으로 클러스터 관리를 지원합니다
클러스터 관리자는 실 행프로그램 프로세스를 시작 하기위한 책임이 있습니다
Spark applecation 작성자는 Spark가 어떤 클러스터 관리자에서 실행되고 있는지 걱정할 필요가 없습니다

Spark를 지원하는 클러스터 종류에는 YARN과 Mesos, 그리고 고유한 "standalone"이 있습니다
이러한 세가지 프레임워크는 두종류의 구성요소로 되어 있습니다
중앙 master 서비스는 application이 프로세스를 실행하기 위해 (YARN ResourceManager, Mesos Master, Spark 독립 master) 자원을 얻는것을 결정합니다
뿐만 아니라 어딘가에서 실행이 결정되면
모든 노드에서 실행되는 slave서비스는 실제 executor 프로세스를 실행합니다(YARN NodeManager, Mesos slave, Spark 독립 slave)
또한 자신의 동작내용과 자원 상황 을모니터링 할 수 있습니다

* YARN에서 실행하는 이유? (Why Run on YARN?)
Spark의 클러스터 manager로 YARN을 사용하면 Spark 독립모드와 Mesos를 이용하면 몇가지 장점이 있습니다
  • YARN을 사용하면 자원을 동적으로 공유하고 중앙 YARN에서 실행되는 모든 프레임 워크 사이의 클러스터자원을 동일한 풀에서 구성할 수 있습니다
    그런 다음 구성변경 없이 Impala 쿼리 및 Spark application의 일부를 MapReduce전체 클러스터에서 실행 할 수 있습니다
  • YARN 스케줄러의 categorizing(카테고리화), isolating(분리), 과 prioritizing wprkload(워크로드 우선순위)의 모든 기능을 활용할 수 있습니다
    YARN 스케줄러의 모든 기능 : http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
  • Spark 독립형 모드에서는 모 든클러스터 모드에서 각 application이 실행되는 반면 YARN에서는 사용할 executor의 수를 선택합니다
  • YARN은 Spark 보안을 지원하는 유일한 클러스터 관리 자입니다
    실제로 프로세스간 보안인증을 사용하는 Kerberos 암호화를 hadoop cluster에 서실행할  수있습니다
* YARN에서 실행하기 (Running on YARN)
Spark on YARN을 실행하는 경우 Spark executor은 YARN 컨테이너로 실행됩니다
MapReduce 스케줄러가 하나의 컨테이너에서 각 task에대한 JVM을 실행 할 때 Spark는 같은 컨테이너 안에서 여러 task를 호스트 합니다
규모가 있는 명령의 task를 빠르게 실행할 수 있는 방법 입니다

Spark는 YARN의 "yarn-cluster" mode와 "yarn-client"mode두가지를 지원합니다
대체로 yarn-cluster mode는  production jobs에 일때 의미가 있습니다
yarn-client mode는 application 출력(번역중-stdout?)을 참조하거나 대화형 디버깅을 할때 일때 의미가 있습니다

차이를 이해 할때 YARN의 Master응용프로그램 개념의 이해가 필요합니다
YARN에서 각 application 인스턴스는 첫번째 컨테이너가 Application Master 프로세스를 담당하고 해당 application을 실행합니다
application은 ResourceManager에서 리소스 할당을 요청하할 책임이 있으며 대신할 컨테이너를 start하도록 NodeManager에게 이야기 합니다
Application Master는 Active client에대한 요구를 사전에 없앱니다
application을 시작하는 과정은 멀어졌(번역중-추상화?)고 클러스터 관리는 YARN의 실행으로 계속 관리됩니다

yarn-cluster mode에서 dirver는 Application Master에서 실행됩니다
동일한 프로세스에서(YARN container) YARN에 리소스를 요청하는것과 YARN컨테이너 내부에서 프로세스를 실행하는 application을 모두 구동하는것에 책임이 있습니다
application을 start하는 클라이언트는 전체 lifetime시간동안 있어야할 필요가 없습니다

이미지 : yarn-cluster mode


yarn-cluster mode는 대화식 Spark사용에는 적합하지 않습니다
Spark shell과 PySpark처럼 사용자 입력을 요청하는 Spark application은 Spark application을 start하는 클라이언트 프로세스 내에서 실행하는 Spark dirver가 필요합니다
yarn-client mode에서 Application Master는 단지 YARN에서 executor 컨테이너를 요청하는 존재입니다
클라이언트가 시작한 후 작업을 스케줄링하는 컨테이너와 통신합니다

이미지 : yarn-client mode


아래 테이블은 각 모드 사이의 차이를 간결하게 나타냈습니다


* 주요개념 요약 (Key Concepts in Summary)
  • Application : 이것은 하나의 job이 될 수도 있으며 작업의 순서와 필요에 따라 새 명령을 실행하는  long-running 서비스 또는 대화형 탐색 세션 입니다
  • Spark Driver : Spark context를 실행하는 프로세스 입니다. 이는 application 세션을 나타냅니다. 
  • Spark Application Master : YARN driver에의해 자원 요청을 협상하고 host/containers Spark application을 실행하는데에 적절한 설정을 찾는데 책임을 지지 않습니다 application당 하나의 application master가 있습니다
  • Spark Executor : 하나의 Spark application은 노드에서 하나의 JVM 인스턴스를 제공합니다. executor은 lifetime동안 많은 job을 동시에 수행합니다. 노드는 많은 Spark실행자를 가질수 있으며, 각 클라이언트 application에 대한 Spark Executor를 실행하 많은 노드가 있습니다
  • Spark Task : Spark Task는 분산된 데이터 집합의 파티션 작업 단위를 나타냅니다

Further reading:

Sandy Ryza is a data scientist at Cloudera, and an Apache Hadoop committer.


신고





Posted by 파란물
보물창고/Big Data2014.07.24 21:00






Spark WordCount MapReduce 샘플 코드 분석 중입니다

코드 원본 중간중간에 데이터를 출력해서 확인하고 있습니다









실행 환경

CentOs 6.5, CDH5(hadoop2.3.0 CDH 5.0.3, Spark1.0.0 CDH5.1.0 )

Spark on YARN - client mode

한대의 노드에서 테스트


실행 스크립트

spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --deploy-mode client --master yarn /data01/SparkExample/sparkwordcount-0.0.1-SNAPSHOT.jar SparkExample/inputfile.txt


코드 원본위치

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java


Input file data (inputfile.txt)

apple

banana counter

counter one two three

three one

five seven eight

twenty one three five counter six

one siz helga

apple banana fiver 



코드에 주석

package com.cloudera.sparkwordcount;

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/


import java.util.Arrays;

import java.util.List;

import java.util.regex.Pattern;


import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;


import scala.Tuple2;


public final class JavaWordCount {

private static final Pattern SPACE = Pattern.compile(" ");


public static void main(String[] args) throws Exception {


if (args.length < 1) {

System.err.println("Usage: JavaWordCount <file>");

System.exit(1);

}


SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");

JavaSparkContext ctx = new JavaSparkContext(sparkConf);

JavaRDD<String> lines = ctx.textFile(args[0], 1);


JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

@Override

public Iterable<String> call(String s) {

System.out.println("words");

System.out.println("String s : "+ s); // String s : apple, String s : banana counter, String s : counter one two three, String s : three one , ...

Iterable<String> a = Arrays.asList(SPACE.split(s));

System.out.println("Arrays.asList(SPACE.split(s) : " + a); //Arrays.asList(SPACE.split(s) : [apple], Arrays.asList(SPACE.split(s) : [banana, counter], Arrays.asList(SPACE.split(s) : [counter, one, two, three], ...

return a;

}

});


JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {

@Override

public Tuple2<String, Integer> call(String s) {

System.out.println("ones");

System.out.println("String s : "+ s); // String s : apple, String s : banana, String s : counter, String s : counter, String s : one, String s : two, String s : three, ...

Tuple2<String, Integer> a = new Tuple2<String, Integer>(s, 1);

System.out.println("new Tuple2<String, Integer>(s, 1) : " + a); // new Tuple2<String, Integer>(s, 1) : (apple,1), (banana,1), (counter,1), (counter,1)

return a;

}

});


// 하둡 reduce와 마찬가지로 정렬되어옮?

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

// :1, :1, :1의 값 부분을 계속 더하는 구조?

@Override

public Integer call(Integer i1, Integer i2) {

System.out.println("counts");

System.out.println("Integer i1 : "+i1+ " Integer i2 : "+i2); // Integer i1 : 1 Integer i2 : 1, Integer i1 : 2 Integer i2 : 1, Integer i1 : 1 Integer i2 : 1, Integer i1 : 3 Integer i2 : 1

System.out.println("i1 + i2 : "+(i1 + i2)); // i1 + i2 : 2, i1 + i2 : 2, i1 + i2 : 3, i1 + i2 : 4

return i1 + i2;

}

});


//여기서 부터는 클라이언트 드라이버에서 동작 (위에는 YARN내부에서 동작, logHDFS에 있음)

List<Tuple2<String, Integer>> output = counts.collect();

System.out.println("output");

System.out.println("output : "+ output); // output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)]

for (Tuple2<?,?> tuple : output) {

System.out.println(tuple._1() + ": " + tuple._2());

}

ctx.stop();

}

}


log 결과1 - HDFS

/var/log/hadoop-yarn/apps/inbrein/logs/application_1405498888069_0217

words

String s : apple

Arrays.asList(SPACE.split(s) : [apple]

ones

String s : apple

new Tuple2<String, Integer>(s, 1) : (apple,1)

words

String s : banana counter

Arrays.asList(SPACE.split(s) : [banana, counter]

ones

String s : banana

new Tuple2<String, Integer>(s, 1) : (banana,1)

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

words

String s : counter one two three

Arrays.asList(SPACE.split(s) : [counter, one, two, three]

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

ones

String s : two

new Tuple2<String, Integer>(s, 1) : (two,1)

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

words

String s : three one

Arrays.asList(SPACE.split(s) : [three, one]

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

words

String s : five seven eight

Arrays.asList(SPACE.split(s) : [five, seven, eight]

ones

String s : five

new Tuple2<String, Integer>(s, 1) : (five,1)

ones

String s : seven

new Tuple2<String, Integer>(s, 1) : (seven,1)

ones

String s : eight

new Tuple2<String, Integer>(s, 1) : (eight,1)

words

String s : twenty one three five counter six

Arrays.asList(SPACE.split(s) : [twenty, one, three, five, counter, six]

ones

String s : twenty

new Tuple2<String, Integer>(s, 1) : (twenty,1)

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : three

new Tuple2<String, Integer>(s, 1) : (three,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : five

new Tuple2<String, Integer>(s, 1) : (five,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : counter

new Tuple2<String, Integer>(s, 1) : (counter,1)

counts

Integer i1 : 2 Integer i2 : 1

i1 + i2 : 3

ones

String s : six

new Tuple2<String, Integer>(s, 1) : (six,1)

words

String s : one siz helga

Arrays.asList(SPACE.split(s) : [one, siz, helga]

ones

String s : one

new Tuple2<String, Integer>(s, 1) : (one,1)

counts

Integer i1 : 3 Integer i2 : 1

i1 + i2 : 4

ones

String s : siz

new Tuple2<String, Integer>(s, 1) : (siz,1)

ones

String s : helga

new Tuple2<String, Integer>(s, 1) : (helga,1)

words

String s : apple banana fiver

Arrays.asList(SPACE.split(s) : [apple, banana, fiver]

ones

String s : apple

new Tuple2<String, Integer>(s, 1) : (apple,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : banana

new Tuple2<String, Integer>(s, 1) : (banana,1)

counts

Integer i1 : 1 Integer i2 : 1

i1 + i2 : 2

ones

String s : fiver

new Tuple2<String, Integer>(s, 1) : (fiver,1)

words

String s : 

Arrays.asList(SPACE.split(s) : []

ones

String s : 

new Tuple2<String, Integer>(s, 1) : (,1)


log 결과2 - console
stdout

output

output : [(counter,3), (banana,2), (two,1), (seven,1), (eight,1), (one,4), (fiver,1), (siz,1), (,1), (six,1), (apple,2), (three,3), (helga,1), (five,2), (twenty,1)]

counter: 3

banana: 2

two: 1

seven: 1

eight: 1

one: 4

fiver: 1

siz: 1

: 1

six: 1

apple: 2

three: 3

helga: 1

five: 2

twenty: 1


신고





Posted by 파란물
보물창고/Big Data2014.07.23 21:00







CDH 5.1.0 Documentation

Running Spark Application을 발번역한 내요입니다


전체적인 흐름을 잡는데 이용해주세요


http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Installation-Guide/cdh5ig_running_spark_apps.html










*Spark 응용프로그램 실행 (Running Spark Applications)
Spark 어플리케이션은 맵리듀스와 job과 유사합니다
각 어플리케이션은 사용자가 제공한 코드를 실행해 결과를 연산하는 self-contained(독립형) computation입니다
맵리듀스 작업과 마찬가지로 Spark 어플리케이션은 여러 노드의 자원을 사용할 수 있습니다

각 어플리케이션은 실행을 조정하는 드라이 버프로세스를 가지고 있습니다
이 프로세스는 foreground(client mode) or background(cluster mode)에서 실행할 수 있습니다.
Client모드는 좀 더 간단하지만 Cluster모드를 사용하면 어플리케이션을 쉽게 종료하지 못합니다
Spark 어플리케이션을 실행한 후 로그아웃 할 수 있습니다

Spark는 계산을 실행하는 executors를 실행합니다
많은 executors는 작업 크기에 따라 클러스터에 분산됩니다

Spark는 두가지 모드에서 실행할 수 있습니다
Standalone mode : 
독림보드에서 , Spark는 worker의 노력을 조정하는 master데몬을 사용하여 executors프로그램을 실행합니다
독립모드는 기본으로 secure clusters모드를 사용할 수 없습니다
YARN mode:
YARN모드에서, YARN은 ResourceManager는 Spark master기능을 수행합니다 worker의 기능은 YARN NodeManager의 데본에 의해 executors를 실행합니다
YANR모드는 조금더 복잡한 설정을 하지만 보안을 지원하고  YARN의 cluster-wide(전체 클러스터) 더 나은  통합 지원관리 정책을 제공합니다

여러 Spark어플리케이션은 한번에 실행할 수 있습니다
Spark on YARN에서 실행하는 경우 YARN client mode나 cluster mode를 선택하는 기준은 application-by-application에 기초하여 결정할 수 있습니
Client mode에서 Spark를 실행하면 드라이버 프로세스는 로컬에서 실행됩니다
cluster mode에서 실행하면 ApplicationMaster에서 원격으로 실행 됩니다

다음절에서는 샘플 응용프로그램을 사용하여 SparkPi, Spark와 함께 PI패키지로 값을 계산하는 세가지 모드를 설명합니다

* 설정 (Configuration)
Spark를 구성하는 가장 쉬운 방법은 $SPARK_HOME/conf/spark-defaults.conf를 설정하는 것입니다

이 파일은 "key value"형태로 라인에 포함되어 있습니다
line시작 부분에 해시기호 (#)을 넣어 주석을 작상 할수 있습니다
참고 : 끝이나 줄의 중간에 주석을 추가 할 수 없습니다.

다음은 Spark의 defaults.conf 파일의 예입니다 
spark.master     spark://mysparkmaster.cloudera.com:7077
spark.eventLog.enabled    true
spark.eventLog.dir        hdfs:///user/spark/eventlog
# Set spark executor memory
spark.executor.memory     2g
spark.logConf             true

Spark의 default.conf에 보든 응용프로그램에서 사용할 설정 키(값)를 넣는 것이 좋습니다
설정키에 대한 자세한 내용은 스크립트를 참조사세요
스크립트 (Spark configuration) : http://spark.apache.org/docs/1.0.0/configuration.html

* Spark-Submit 스크립트 (The Spark-Submit Script)
Spark-submit을 이용하여 Spark 어플리케이션을 시작할 수 있습니다
spark-core 패키지를 설치한 경로에 있습니다

참고 : Spark는 --key=value;이런 옵션을 처리할 수 없습니다 --key value이렇게 사용하세요 (즉 기호대신 공백을 사용합니다)

spark-submit을 실행하려면 컴파일된 Spark JAR 어플리케이션이 필요합니다
다음 섹션에서 Spark와 함께 제공되는 샘플 JAR, SparkPi를 이용합니다
이것은 Pi의 근사치를 계산합니다

* 독립모드에서 SparkPi를 실행 (Running SparkPi in Standalone Mode)
제공된 --master과 --deploy-mode의 클라이언트 arguments는 독립모드로 SparkPi를 실행합니다
spark-submit \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \
--master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
$SPARK_HOME/examples/*/scala-*/spark-examples-*.jar 10

JAR이름의 다음에 오는 arguments가 어플리케이션에 공급됩니다
이 경우 argument는 Pi에대한 근사치가 되도록 잘 제어합니다

* YARN client mode에서 SparkPi를 실행 (Running SparkPi in YARN Client Mode)
다음과 같이 SparkPi를 실행하는 명령은 다음과 같습니다
spark-submit  \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \    
--master yarn \    
$SPARK_HOME/examples/*/scala-*/spark-examples-*.jar 10

* YARN cluster mode에서 SparkPi 실행 (Running SparkPi in YARN Cluster Mode)
다음과 같이 SparkPi를 실행하는 명령은 다음과 같습니다
spark-submit  \    
--class org.apache.spark.examples.SparkPi \    
--deploy-mode cluster \    
--master yarn \    
$SPARK_HOME/examples/*/scala-*/spark-examples-*.jar 10

이 명령은 job이 완료될때까지 상태를 표시합니다 표시를 중지하려면 control-C를 누릅니다
client모드에서와 같이 cluster모드도 spark-submit 프로세스를 종료해도 어플리케이션은 종료되지 않습니다
실행중인 어플리케이션의 상태를 모니터링 하기 위해 
"yarn application -list" 를 실행합니다

* YARN 최적화 모드 (Optimizing YARN Mode)
일반적으로 다음 로그처럼 spark-submit할때마다 Spark assembly JAR 파일을 HDFS에 복사합니다
14/06/11 14:21:49 INFO yarn.Client: Uploading
file:/home/cmccabe/spark/b2.4/examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar to 
hdfs://a2402.halxg.cloudera.com:6000/user/cmccabe/.sparkStaging/application_1402278226964_0012/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar
14/06/11 14:21:50 INFO yarn.Client: Uploading
file:/home/cmccabe/spark/b2.4/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar to
hdfs://a2402.halxg.cloudera.com:6000/user/cmccabe/.sparkStaging/application_1402278226964_0012/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar

수동으로 HDFS에 Spark assembly JAR파일을 업로드하여 매번 복사되는 일을 방지할 수 있습니다
그런다음 HDFS경로에 SPARK_JAR환경 변수를 설정합니다
hdfs dfs -mkdir -p /user/spark/share/lib 
hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar  \     
/user/spark/share/lib/spark-assembly.jar 
SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar

참고 : 클라우데라 매니저를 사용하는 경우 Spark assembly jar파일은 자동적으로 HDFS에 업로드 됩니다  (/user/spark/share/lib/spark-assembly.jar)

* Building Spark Applications (Building Spark Applications)
가 장좋은 방법은 Spark 어플리케이션을 컴파일에 포함합는 것입니
Spark와 Hadoop을 위한 경우를 제외하 고모든 dependencies를 포함하는 단일 assembly JAR을 Building합니다

aeembly JAR할때 Spark와 Hadoop은 제외한다 이미 cluster 및 런타임 class path에 있기 때문이다
메이븐에서는 명시된 Spark와 Hadoop의 종속을 표시할 수 있습니다

항상 호환성 문제를 방지하기 위해 실행하는 Spark버전과 같은 버전으로 building합니다
예를 들어 Spark 0.9에서 컴파일된 어플리케이션은 1.0에서 실행 안될 수 있습니다

Spark 0.9또는 그 이전에 컴파일된 일부 어플리케이션은 Spark 1.0에서 소스코드를 변경해야 합니다 Spark 1.0에서 컴파일한 어플리케이션은 이래의 모든보전에서 다 시컴파일 해야 합니다


신고





Posted by 파란물
보물창고/Big Data2014.07.22 21:00





Spark Cluster Manager Types

3종류를 발번역 하였습니다


전체적인 흐름을 파악하는데 참고해주세요









클러스터 관리자 유형 
시스템은 현재 3종류의 클러스터 관리자를 지원합니다.


Standalone – a simple cluster manager included with Spark that makes it easy to set up a 
독립형 - 간단한 클러스터 매니저가 Spark에 포함되어있어 관리자는 클러스터를 쉽게 구성할 수 있습니다

Apache Mesos – a general cluster manager that can also run Hadoop MapReduce and service applications.
Apache Mesos - Hadoop의 MapReduce와 서비스 응용프로그램을 실행할 수 있는 일반적인 클러스터 관리자

Hadoop YARN – the resource manager in Hadoop 2.
Hadoop YARN – Hadoop 2. 의 리소스 매니저를 이용합니다



http://spark.apache.org/docs/latest/spark-standalone.html
*스파크 독립모드 (Spark Standalone Mode)

Spark는 mesos와 YARN에 실행되는것 외에 간단한 독립형 배포 모드가 준비되어 있습니다
수동으로 Master와 Slave를 Start하여 독립 실행형 클러스터를 시작하거나 제공되는 start 스크립트를 사용할 수 있습니다.
단일 머신에서 테스트를 위해 사용할 수 있습니다

*클러스터에 Spark 독립모드 설치 (Installing Spark Standalone to a Cluster)
Spark를 독립모드로 설치하려면, 간단하게 클러스터의 각 노드에 컴파일된 버전의 Spark를 놓습니다.
릴리즈마다 미리 빌드된 Spark버전을 이용하거나 직접 빌드할 수 있습니다
직접 빌드 방법 : http://spark.apache.org/docs/latest/index.html#building

*클러스터를 수동으로 Start (Starting a Cluster Manually)
독립 Master서버를 실행할 수 있습니다
./sbin/start-master.sh

일단 시작되면, Master는 자신의 URL spark://HOST:PORT를 출력합니다
이 주소는 workers를 연결하는데 사용할 수 있게 SparkContext의 인자로 master를 전달 합니다
또한 Master의 웹 UI를 볼 수 있습니다 기본적으로 http://localhost:8080입니다

하나 이상의 workers를 Start하고 다음을 통해 Master에 연결할 수 있습니다
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

workers를 Start한 후 (기본적으로 http://localhost:8080) 마스터 Web UI를 보세요
새로운 노드의 CPU와 메모리(OS에 남아있는 용량-1GB 만큼)와 해당번호가 나타날 것입니다

마지막으로 다음옵션은 Master와 worker에게 전당할 수 있습니다.

ArgumentMeaning
-i IP--ip IPIP address or DNS name to listen on
-p PORT--port PORTPort for service to listen on (default: 7077 for master, random for worker)
--webui-port PORTPort for web UI (default: 8080 for master, 8081 for worker)
-c CORES--cores CORESTotal CPU cores to allow Spark applications to use on the machine (default: all available); only on worker
-m MEM--memory MEMTotal amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker
-d DIR--work-dir DIRDirectory to use for scratch space and job output logs (default: SPARK_HOME/work); only on worker

*클러스터 실행 스크립트 (Cluster Launch Scripts)
실행 스크립트를 이용하여 독립 Spark를 실행하려면, Spark 디렉토리에 conf/slaves 파일을 작성해야 합니다
Spark worker로 사용하려는 모든 시스템의 호스트이름을 한줄에 하나씩 적어야 합니다
Master는 (개인키를 사용하여)  password-less ssh를 통해 각 Slave브 시스템에 액세스 할 수 있어야 합니다
테스트를 위해 localhost를 사용할 수 있습니다.

이 파일을 설정한 후에 다음과 같은 쉘 스크립트를 사용하여 클러스터를 실행하거나 중지 시킬 수 있습니다. 
하둡의 배포 스크립트 를기반으로 SPARK_HOME/bin에 위치해 있습니다

sbin/start-master.sh - Starts a master instance on the machine the script is executed on.
sbin/start-slaves.sh - Starts a slave instance on each machine specified in the conf/slaves file.
sbin/start-all.sh - Starts both a master and a number of slaves as described above.
sbin/stop-master.sh - Stops the master that was started via the bin/start-master.sh script.
sbin/stop-slaves.sh - Stops all slave instances on the machines specified in the conf/slaves file.
sbin/stop-all.sh - Stops both the master and the slaves as described above.

이 스크립트는 당신의 로컬 컴퓨터가 아닌 Spark Master를 실행하는 컴퓨터에서 실행해야합니다

선택적으로 conf/spark-env.sh 환경변수를 설정하여 추가로 클러스터를 구성할 수 있습니다 
conf/spark-env.sh는 conf/spark-env.sh.template를 이용하여 만듭니다
설정을 적용하려면 모든 worker컴퓨터에 복사해야 합니다
다음 설정을 이용할 수 있습니다

Environment VariableMeaning
SPARK_MASTER_IPBind the master to a specific IP address, for example a public one.
SPARK_MASTER_PORTStart the master on a different port (default: 7077).
SPARK_MASTER_WEBUI_PORTPort for the master web UI (default: 8080).
SPARK_MASTER_OPTSConfiguration properties that apply only to the master in the form "-Dx=y" (default: none). See below for a list of possible options.
SPARK_LOCAL_DIRSDirectory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks.
SPARK_WORKER_CORESTotal number of cores to allow Spark applications to use on the machine (default: all available cores).
SPARK_WORKER_MEMORYTotal amount of memory to allow Spark applications to use on the machine, e.g. 1000m,2g (default: total memory minus 1 GB); note that each application's individual memory is configured using its spark.executor.memory property.
SPARK_WORKER_PORTStart the Spark worker on a specific port (default: random).
SPARK_WORKER_WEBUI_PORTPort for the worker web UI (default: 8081).
SPARK_WORKER_INSTANCESNumber of worker instances to run on each machine (default: 1). You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. If you do set this, make sure to also set SPARK_WORKER_CORES explicitly to limit the cores per worker, or else each worker will try to use all the cores.
SPARK_WORKER_DIRDirectory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work).
SPARK_WORKER_OPTSConfiguration properties that apply only to the worker in the form "-Dx=y" (default: none). See below for a list of possible options.
SPARK_DAEMON_MEMORYMemory to allocate to the Spark master and worker daemons themselves (default: 512m).
SPARK_DAEMON_JAVA_OPTSJVM options for the Spark master and worker daemons themselves in the form "-Dx=y" (default: none).
SPARK_PUBLIC_DNSThe public DNS name of the Spark master and workers (default: none).

참고 : 현재 시작 스크립트는 윈도우를 지원하지 않습니다
Window에서 Spark 클러스터를 실행하려면 수동으로 Master와 worker를 Start해야 합니다

SPARK_MASTER_OPTS는 다음과 같은 시스템 속성을 지원합니다.
Property NameDefaultMeaning
spark.deploy.spreadOuttrueWhether the standalone cluster manager should spread applications out across nodes or try to consolidate them onto as few nodes as possible. Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. 
spark.deploy.defaultCores(infinite)Default number of cores to give to applications in Spark's standalone mode if they don't set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default. 
spark.worker.timeout60Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats.

SPARK_WORKER_OPTS는 다음과 같은 시스템 속성을 지원합니다.
Property NameDefaultMeaning
spark.worker.cleanup.enabledfalseEnable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Applications directories are cleaned up regardless of whether the application is still running.
spark.worker.cleanup.interval1800 (30 minutes)Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine.
spark.worker.cleanup.appDataTtl7 * 24 * 3600 (7 days)The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently.


*클러스터에 응용프로그램 연결 (Connecting an Application to the Cluster)
Spark 클러스터에 응용프로그램을 실행하려면 단순하게 SparkContext 생성자로 Master의 spark://IP:PORT URL을 넣어줍니다
SparkContext : http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark

클러스터에 대한 대화형 Spark shell을 실행하려면 다음과 같이 합니다
./bin/spark-shell --master spark://IP:PORT

참고로 Spark Cluster 시스템중 하나에서 Spark-shell을 실행하려는 경우 bin/spark-shell 스크립트는 자동으로 conf/spark-env.sh에서 SPARK_MASTER_IP 및 SPARK_MASTER_PORT변수와 MASTER를 설정합니다

또한 옵션을 전달할 수 있습니다 --cores <numCores>를 이용하여 클러스터에서 사용할 코어수를 제어할 수 있습니다

*Spark응용프로그램의 실행가능한 컴파일 (Launching Compiled Spark Applications)
spark-submit 스크립트는 컴파일된 Spark 어플리케이션을 클러스터 에제출하는 가장 간단한 방법을 제공합니다
독립실행형 클러스터의 경우, 현재 Spark는 응용프로그램을 제출할때 클라이언트 프로세스의 내부 드라이버를 사용할 수 있게 배포를 지원합니다 (클라이언트 배포 드모).

만약 당신의 응용프로그램이 Spark submit을 통해 실행되는 경우, 응용프로그램의 jar는 자동으로 모든 worker노드에 배포 됩니다.
응용프로그램에 따라 추가되는 jar는 구분기호 쉼표를 사용하여 --jar를 통해 지정해야 합니다
(예를 들어 --jar jar1.jar2)
응용프로그램의 구성 또는 실행 환경을 제어하려면 Spark 구성을 참조하십시오
Spark 구성 : http://spark.apache.org/docs/latest/configuration.html

*리소스 스케줄링 (Resource Scheduling)
독림클러스터 모드는 현재 응용프로그램에서 간단한 FIFO스케줄러를 지원합니다
그러나, 다수의 동시 사용자에게 허용하려면 각 응용프로그램이 사용할 자원의 최대 수를 제어할 수 있습니다
기본적으로, 이는 클러스터의 모든 코어를 얻는것은 한번에 하나의 응용프로그램을 실행하는 경 우에만 의미가 있습니다
SparkConf에서 spark.cores.max를 설정하는것으로 코어수를 설정할 수 있습니다
val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

또한 spark.cores.max를 설정하지 않은 응용프로그램에 대한 기본값을 변경하려면 클러스터 마스터과정에서 spark.deploy.defaultCores를 설정하면 됩니다
설정 내용은 conf/spark-env.sh에 추가합니다

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
이렇게하면 사용자가 개별적으로 코어의 최대수를 안정해도 됩니다
공유 클러스터에 유용합니다

*모니터링 및 로깅 (Monitoring and Logging)
Spark의 독립모드는 모든클러스터를 모니터링하는 웹 기반의 사용자 인터페이스를 제공합니다.
Master와 각 worker클러스터 작업통계를 보여주는 웹 UI를 가지고 있습니다
기본적으로 8080포트에서 Master UI에서 엑세스 할 수 있습니다
포트는 구성 파일이나 명령줄 옵션을 통해 하나를 변경할 수 있습니다

또한 각 작업에 대한 자세한 로그 출력은 각각의 슬레이브 노드의 작업 디렉토리에 기록됩니다.
(SPARK_HOME/work 기본 작업 위치)
각 작업의 표준출력 및 표준 오류에대한 두개의 파일을 모두 콘솔에서 볼 수 있습니다

* 하둡과 함께 실행 (Running Alongside Hadoop)

같은 시스템에서 별도의 서비스로 실행하여 기존의 하둡 클러스터와 함께 Spark를 실행할 수 있습니다
Spark에서 하둡 데이터에 액세스하려면 HDFS://URL (일반적으로 HDFS://<namenode:9000/path 입니다. 하둡 네임노드의 웹 UI의 오른쪽 URL에서 찾을 수 있습니다)
또한 Spark를 대해 별도의 클러스터를 설정할 수 있습다 그러면 네트워크를 통행 HDFS에 액세스합니다 이럴경우 로컬 액세스하는것보다 느립니다
같은 네트워크에서 실행되는 경우에는 문제될게 없습니다(예를 들어 하둡랙 중에 몇개만 Spark를 배치한 경우)

*네트워크 보안을 위한 포트 구성 (Configuring Ports for Network Security)
Spark는 네트워크를 많이 사용합니다. 일부 환경은 강한 방화벽 설정을 위해 엄격한 요구사항이 있습니다
아래의 스파크가 통신하는 기본 포트 구성에대해 안내합니다
FromToDefault PortPurposeConfiguration SettingNotes
BrowserStandalone Cluster Master8080Web UImaster.ui.portJetty-based
BrowserDriver4040Web UIspark.ui.portJetty-based
BrowserHistory Server18080Web UIspark.history.ui.portJetty-based
BrowserWorker8081Web UIworker.ui.portJetty-based
ApplicationStandalone Cluster Master7077Submit job to clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
WorkerStandalone Cluster Master7077Join clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
ApplicationWorker(random)Join clusterSPARK_WORKER_PORT(standalone cluster)Akka-based
Driver and other WorkersWorker(random)
  • File server for file and jars
  • Http Broadcast
  • Class file server (Spark Shell only)

*고 가용성 (High Availability)
기복적으로 독립스케줄 클러스터는 Worker 의오류에 탄력적입니다(Spark자체가 다른 Work로 이동하여 작업손실을 복가능한대로 복구합니다.
그러나 스케줄러는 스케줄링 결정할때 Master를 사용합니다 이는 단일 장애지점을 만듦니다.
만 약마스터가 충돌나는 경우, 새로운 응용프로그램을 만들  수없습니다
이것을 회피하기 위해 우리는 아래 설명된 두개의 고 가용성 계획을 가지고 있습니다

* Zookeeper를 이용 Master가 대기 (Standby Masters with ZooKeeper)
개요
리더 선출에 일부 state storage를 제공하기위해 Zookeeper를 활용합니다
동일한 Zookeeper 인스턴스에 연결된 클러스터에 여러개의 Master를 Start할 수 있습니다
하나는 리더로 선출되고 나머지는 대기 모드로 유지 됩니다
현재의 리터가 죽으면(첫번재 리더가 다운되었을떄) 다른 마스터가 선출되고 이전 마스터의 상태로 복구외어 다음 스케줄 을실행합니다
전체 복구 프로세스는 1,2 분정도 소요됩니다
마스터 장애 조치 중 이미 실행중이었던 응용프로그램은 영향을 받지 않습니다 이런 지연은 새로운 응용프로그램 스케줄에 영향을 줍니다

Zookeeper를 시작하기 전에 자세히 알아보세요 : http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html

설정
이 복구 모드를 사용하기 위해 spark-env 환경 설정에 SPARK_DAEMON_JAVA_OPTS를 설정할 수 있습니다
System propertyMeaning
spark.deploy.recoveryModeSet to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.urlThe ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dirThe directory in ZooKeeper to store recovery state (default: /spark).

가능한 실수 : 당신의 클러스터에 여러개의 Master를 가지고 있지만 Zookeeper를 사용하는 Master를 구성하지 않는 경우
Master는 서로 발견하는데 실패하고 무두 Master라고 생각합니다
이는 정상적인 클러스터 상태로 이어지지 않습니다
(모든 마스터는 독립적으로 일정을 계획하는 등)

상세
Zookeeper클러스터를 설정한 후 고 가용성을 설정하는것은 간단합니다. 단순히 Zookeeper구성으로 서로 다른 노드에 여러개의 Master를 실행합니다 (Zookeeper URL및 디렉토리)
마스터는 언제든지 추가 삭제를 할 수 있습니다

새 응용프로그램을 스케줄 하거나 클러스터에 workers를 추가하기 위해 현재 Master의 IP주소를 알아야 합니다
단순하게 Master의 리스트 목록을 전달하여 실행 할 수 있습니다 
예를 들어 spark://host1:port1,host2:port2를 SparkContext가 가리키도록 start할 수 있습니다
이렇게 하는 이유는 SparkContext에 모든 Master를 등록하기 위해 서입니다(host1이 다운될때)
새로운 leader host2를 찾아 설정은 정확하게 계속 유지될 것입니다

Master등록과 정상 작동하는 것에는 중요한 차이점이 있습니다
Starting up할때 응용프로그램 또는 worker는 현재 lead Master를 찾아 등록할 수 있어야 합니다
성공적으로 등록되면 시스템이 구성됩니다(즉, Zookeeper에 저정된).
장애 조치가 발생했을 경우 새로운 leader가 leadership의 변화를 알리기 위해 이전에 등록된으용프로그램 및 worker에 알리게 됩니다
그래서 그들에게 새로운 Master의 존재를 알릴 필요가 없습니다

이때문에 설정을 통해 새로운 마스터를 언제든지 생성할 수 있습니다
단지 걱정해야 할 것은 새로운 어플리 케이션과 worker는 새로운 leader가 등록된 후에 register를 찾을 수 있습니다
등록이 되면 당신은 걱정할 필요가 없습니다

*로컬 파일 시스템과 단일 노드 복구 (Single-Node Recovery with Local File System)
개요
Zookeeper는 생산수준의 고 가용성으로 가는 가장 좋은 방법 입니다
하지만 down되었을때 Master가 다시 시작될 수 있도록 하는것은 파일 시스템 모드에서 처리 할 수 있습니다
응용프로그램과 workers를 register할때 Master 프로세스의 복구와 restart에 이용될수 있도록그들에게 제공되는 디렉토리에 충분히 기록할 수 있는 공간이 있어야 합니다


설정
이 복구 모드를 활성화 하기 위해 spark-env에 SPARK_DAEMON_JAVA_OPTS를 설정 할 수 있습니다
System propertyMeaning
spark.deploy.recoveryModeSet to FILESYSTEM to enable single-node recovery mode (default: NONE).
spark.deploy.recoveryDirectoryThe directory in which Spark will store recovery state, accessible from the Master's perspective.

상세
이 솔루션에 monit과 같은 monitor/manager를 이용하여 사용하거나 재 시작을 통해 수동 복구를 가능하게 합니다

파일시스템 복구가 정상적인 복구로 보일수 있지만  이모드는 특정 개발 또는 실험 목적을 위한 차선책 입니다
특히 stop-master.sh를 통해 Master를 종료할때 복구 기능을 제공하지 않습니다
그래서 새로운 Master를 시작할 때마다 그것은 복구 모드로 들어갑니다
previously-registered된 Workers/clients가 timeout될시간의 대기가 필요한 경우 최대 1분의 startup시간을 증가시킬수 있습니다

그것은 공식적으로 지원되지 않지만 보국 디렉토리를 NFS디렉토리에 마운트할 수 있습니다
원래의 Master노드는 완전히 죽은 경우 다른 노드에서 Master를 시작할  수있습니다
이전에 등록된 workers/applications을 복구하는 정상적인 절차 입니다(Zookeeper의 복구에 해당)
그러나 새로운 어플리케이션을 등록하기 위해 새로운 마스터를 찾을 수 있습니다




http://spark.apache.org/docs/latest/running-on-mesos.html
*Mesos에서 Spark 실행 (Running Spark on Mesos)

Spark는 Apacke Mesos하드웨어 관리 클러스터에서 실행 할 수 있습니다
Apache Mesos : http://mesos.apache.org

Spark에 Mesos를 포함시킨 장점은 다음과 같습니다
Spark와 다른 프레임 워크 사이의 동적 구분
Spark의 여러 인스턴스 사이의 확장 파티션

* 동작 구조 (How it Works)
독립클러스터 배포
아래 그림에서 클러스터 관리자는 Spark Master 인스턴스 입니다
Mesos를 사용할때 Mesos Master는 클러스터 관리자로 Spark의 Master를 대체 합니다



driver가 job 작성과 tasks 스케줄링을 위해 실행할때 Meso machines가 어떤 작업을 처리합니다
많은 short-lived(단기) 작업을 예약할때 이 계정으로 다른 프레임 워크를 사용하므로 여러 프레임워크 자 원의 정적 분할에 의지하지 않고 동일한 클러스터에 공존할 수 있습니다

Mesos를 설치하고 Mesos를 통해 Spark작업을 배포는 다음 절차를 따라 합니다

*Mesos설치 (Installing Mesos)
Spark 1.0.1은 Mesos 0.18.1과 함께 사용할 수 있도록 설계되어 Mesos의 특별한 패치가 필요하지 않습니다
이미 Mesos클러스터가 실행되고 있는겨우, 다음의 Mesos설치 단계를 건너 뛸 수 있습니다.

그렇지 않으면 Spark Mesos를 설치하면 다은 프레임 워크에서 사용하기 위해 Mesos를 설치하는것과 다르지 앖습니다(같다). Source나 미리 빌드된 패키지를 사용하여 Mesos를 설치할 수 있습니다

* Source를 이용(From Source)
Apache Mesos를 source로 설치하려면 다음 단계를 수행
Mesos mirror에서 자료를 다운로드 (Mesos mirror : http://www.apache.org/dyn/closer.cgi/mesos/0.18.1/)
Mesos를 컴파일하고 설치하기위한 시작 페이지 : http://mesos.apache.org/gettingstarted/

참고 : 시스템 기본 경로에 설치하지 않고 Meso를 실행합니다 (예를 들어, 설치할때 관리자 권한이 없는 경우)
--prefix를 이용해서 설치할 위치를 알려주는 옵션을 사용합니다
예를 들어 --prefix=/home/me/mesos
기본적인 prefix는 /usr/local 입니다

* 타사 패키지 (Third-Party Packages)
Apache Mesos 프로젝트는 source가 아닌 오직 binary 패키지로 배포합니다
그러나 다른 패키지에서 Mesos 설정에 도움이 될 수 있는 binary자료를 배포할 수 있습니다

그것은 Mesosphere입니다 여기서 제공하는 릴리즈를 사용하여 Mesos를 설치하려면 
다운로드 페이지에서 Mesos패키지를 다운로드 : http://mesosphere.io/downloads/
설치 및 설정에대한 지시를 따릅니다

Mesosphere 설치문서는 Master장애를 처리하기위해 Zookeeper 설정을 제시합니다
하지만 Zookeeper없이 단일 Master를 사용하여 Mesos를 실행할 수 있습니다

* 확인(Verification)
Mesos 클러스터가 Spark할 준비가 되었는지 확인하려면 Mesos Master  web UI 포트:5050에서 모든 machines에 slaves tab이 있는지 확인합니다

* Mesos에 Spark를 연결 (Connecting Spark to Mesos)
Spark에서 Mesos를 사용하려면 Mesos가 액세스 할 수 있는 장소에 사용할 수 있는 Spark 파이너리 패키지가 필요하 고 Spark 드라이버 프로그램은 Mesos에 연결하도록 구성합니다. 

* 업로드 Spark 패키지 (Uploading Spark Package)
Mesos slave에서 Mesos task를 처음 실행하는경우  slave에서 Mesos backend로 실행가기 위한 Spark binary 패키지가 꼭 있어야 합니다
Spark 패키지는 모든 하둡 액세스 URL에서 호스팅할 수 있습니다
HTTP를 포함 http:// 아마존 심플 스토리지 s3n:// 하둡 hdfs://

미리 컴파일된 패키지를 사용하려면 : 
Spark 다운로드 페이지에서 Spark binary 패키지를 다운로드
http / hdfs / s3 에 업로드

HDFS가 호스트라면 하둡명령어 fs put 명령어를 이용합니다
hadoop fs -put spark-1.0.1.tar.gz /path/to/spark-1.0.1.tar.gz

Spark 사용 자지정 컴파일된 버전을 사용하는 경우나 Spark source tarball/checkout을 사용하면  포함된 make-distribution.sh 스크립트를 사용하여 패키지를 만들어야 합니다.

여기있는 지침을 사용하여 다운로드와 빌드를 통해 Spark를 구축 : http://spark.apache.org/docs/latest/index.html
make-distribution.sh --tgz. 를 사용하여 binary 패키지 만들기
http / s3 / hdfs에 압축파일을 업로드

* Mesos Master URL을 사용 (Using a Mesos Master URL)
single-master에대한 Mesos Master URL 형태이다 mesos://host:5050
또는 Zookeeper를 사용한 multi-master의 Mesos 클러스터 mesos://zk://host:2181

Mesos와 올바르게 상호 작용하기 위해 드라이버는 spark-env.sh 설정파일을 필요로 합니다

  1. In spark.env.sh set some environment variables:
    • export MESOS_NATIVE_LIBRARY=<path to libmesos.so>. This path is typically <prefix>/lib/libmesos.so where the prefix is/usr/local by default. See Mesos installation instructions above. On Mac OS X, the library is called libmesos.dylibinstead of libmesos.so.
    • export SPARK_EXECUTOR_URI=<URL of spark-1.0.1.tar.gz uploaded above>.
  2. Also set spark.executor.uri to <URL of spark-1.0.1.tar.gz>.

이제 클러스터에 대에 Spark 어플리케이션을 시작할때 SparkContext을 만들때 mesos:// URL을 전달하면 됩니다
예를 들어 

val
conf = new SparkConf() .setMaster("mesos://HOST:5050") .setAppName("My app") .set("spark.executor.uri", "<path to spark-1.0.1.tar.gz uploaded above>") val sc = new SparkContext(conf)

또한 spark-submit과 conf/spark-defaults.conf 파일에 있는 spark.executor.uri 설정을 통해 구성할 수 있습니다
현재 Mesos 클라이언트 모드에서만 spark-submit Spark driver를 지원합니다

쉘을 실행하는 경우 spark.executor.uri매개 변수는 SPARK_EXECUTOR_URL에서 상속 받습니다 그래서 중복해서 시스템 속석으로 전달할 필요가 없습니다.

* Meso 실행 모드 (Mesos Run Modes)
Spark는 Mesos에서 두가지 모드로 실행됩니다
"fine-grainged"(기본값) 과 "coarse-grained" 
세분화(기본값)와 대단위

"fine-grained"모드에서(기본값)
각 Spark task는 별도의 Mesos task에서 실행됩니다
이 Spark의 여러인스턴스(다른 프레임워크)에 매우 미세한 단위에서 machines를 공유할 수 있습니다
각 응용프로그램은 더하거나 덜면서 machines을 가집니다
하지만 각 task실행에 추가 overhead를 가집니다
이 모드는 대화형 쿼리나 웹 요청같은 낮은 대기시간을 요구하 는사항에 적합하지 않을수 있습니다

"coarse-grained"(대단위) 모드는 각 Mesos시스템에서 하나의 long-running(오래 동작하는)한 Spark task가 시작됩니다
동적으로 그 안에 "mini-tasks"(작은 작업)을 스케줄 할 수 있습니다
장점으로는 훨씬 낮은 startup overhead시간 입니다 그러나 어플리에키션의 전체 지속 기간동안 Mesos의 예약 비용이 듭니다

coarse-grained모드에서 실행하려면 SparkConf에 spark-mesos.coarse 속성을 설정합니다
conf.set("spark.mesos.coarse", "true")
sparkconf : http://spark.apache.org/docs/latest/configuration.html#spark-properties

또한 coarse-grained모드에서 Spark 의최대 리소스를 제어할 수 있습니다 기본적으로 클러스터의 코어를 얻기위 해설정합니다
이는 한번에 하나의 응용프로그램을 실행하는 경우에만 의미가 있습니다
원하는 코어수를 설정할 수 있습니다 conf.set("spark.cores.max","10")

* 하둠과 함께 실행 (Running Alongside Hadoop)
Spark와 Mesos를 기존의 hadoop 시스템에서 별도의 서비스로 machines에서 실행할 수 있습니다
Spark에서 hadoop데이터에 액세스하려면 hdfs://URL의 전체가 필요합니다. (일반적으로 hdfs://<namenode>:9000/path, hadoop namenode 웹 UI의 오른쪽에서 URL을 찾을 수 있습니다

또한 둘 사이에 더 나은 자원 분리 및 공유를 위한 Mesos에서 hadoop MapReduce를 실행할 수 있습니다
각 노드의 리눅스 스케줄러를 통해 각각에 자원을 공유하는것과 다르게 이 경우 Mesos는 hadoop또는 Spark중 하나에 코어를 할당하는 통합 스케줄러 역할을 합니다
Mesos Hadoop 을참조하시기 바랍니다 : https://github.com/mesos/hadoop

두 경우에서 HDFS는 Mesos를 통해 스케줄받지 않고 Hadoop MapReduce와 별도로 실행합니다

* 문제 해결 및 디버길 (Troubleshooting and Debugging)

A few places to look during debugging:

  • Mesos master on port :5050
    • Slaves should appear in the slaves tab
    • Spark applications should appear in the frameworks tab
    • Tasks should appear in the details of a framework
    • Check the stdout and stderr of the sandbox of failed tasks
  • Mesos logs
    • Master and slave logs are both in /var/log/mesos by default

And common pitfalls:

  • Spark assembly not reachable/accessible
    • Slaves must be able to download the Spark binary package from the http://hdfs:// or s3n:// URL you gave
  • Firewall blocking communications
    • Check for messages about failed connections
    • Temporarily disable firewalls for debugging and then poke appropriate holes


YARN에서 Spark를 실행 (Running Spark on YARN)

YARN(하둡의 NextGen)에서 Spark 실행에 대한 지원은 0.6.0버전 이후에 향상 되었습니다.

* 준비 (Preparations)
Spark-on-YARN을 실행하기 위해선 YARN을 지원하도록 빌드된 Spark binary 배포판이 필요합니다
바이너리 배포판은 Spark프로젝트 웹 사이트에서 다운로드 할 수 있습니다 
Spark를 build하려면 Maven 빌드 가이드를 참조하세요 : http://spark.apache.org/docs/latest/building-with-maven.html

* 설정 ( Configuration)
configs의 대부분은 다른 배포 모드와 Spark on YARN과 동일합니다
이들에 대한 자세한 내용은 configuration 페이지를 참조하세요 : http://spark.apache.org/docs/latest/configuration.html
Spark on YARN에 필요한 configs가 있습니다

* 환경 변수 (Environment Variables)
YARN 용으로 출시된 Spark 프로세스의 환경변수에 SPARK_YARN_USER_ENV를 추가할 수 있습니다
이 환경변수의 목록(쉼표로 구분)을 예로들면 다음이 될 수 있습니다
SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"

Spark Properties

Property NameDefaultMeaning
spark.yarn.applicationMaster.waitTries10Set the number of times the ApplicationMaster waits for the the Spark master and then also the number of tries it waits for the SparkContext to be initialized
spark.yarn.submit.file.replication3HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.filesfalseSet to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
spark.yarn.scheduler.heartbeat.interval-ms5000The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
spark.yarn.max.executor.failures2*numExecutorsThe maximum number of executor failures before failing the application.
spark.yarn.historyServer.address(none)The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
spark.yarn.executor.memoryOverhead384The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
spark.yarn.driver.memoryOverhead384The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.

기본적으로, Spark on YARN은 로컬에 설치된 Spark jar을 사용합니다
하지만 Spark jar가 HDFS에 있으면 world-readable (모든위치) 에서 읽을 수 있습니다
응용프로그램 마다 실행을 분산시킬 필요가 없도록 YARN노드에서 캐시할 수 있습니다
HDFS에 있는 jar을 가리키려면 export SPARK_JAR=hdfs=hdfs:///some/path

* Spark on YARN 실행하기 (Launching Spark on YARN)
configuration 파일이 포함된 디렉토리가 (클라이언트에) HADOOP_CONF_DIR 또는 YARN_CONF_DIR에 반드시 있어야 합니다
이 configs는 YARN ResourceManager가 dfs에 write하고 connect하는데 사용됩니다

YARN에서 Spark 응용프로그램을 실행하는데 사용할수 있는 배포 모드는 두가지가 있습니다
yarn-cluster mode는 YARN의 클러스터 매니저에 관리되는 어플리케이션 master에서 Spark 드라이버가 동작합니다 그리고 클라이 언트 응용프로그램을 시작한 후 사라질 수 있습니다
yarn-client mode는 드라이버 클라이언트는 프로세스에서 실행되며 어플리케이션 master는 YARN에 리소스를 요청하는데 사용됩니다

Spark 독립모드와 Mesos모드와 달리 master의 주소는 "master"의 parameter에 지정됩니다
YARN mode에서 ResourceManager의 주소는 hadoop configuration에서 찾습니다
따라서 master parameter는 단순히 "yarn-client"나 "yarn-cluster"입니

yarn-cluster mode로 Spark 응용프로그램을 실행하려면
./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]

예를 들어
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn-cluster \
    --num-executors 3 \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    lib/spark-examples*.jar \
    10

위에서 default Application Master를 시작해서 YARN 클라이언트 프로그램을 시작합니다
SparkPi는 Application Master의 자식 스레드로 실행됩니다
클라이언트는 주기적으로 상태 업데이트를 위해 어플리케이션 master를 폴링하여 콘솔에 표시합니다
어플리케이션 실행이 완료되면 클라이언트가 종료됩니다
드라이버와 실행 프로그램 로그를 확인하는 방법은 아래 로그보기절을 참조하세요

yarn-client mode에서 Spark 어플리케이션을 실행하려면 같은 작업을 실행합니다
하지만 Spark shell을 사용하면 "yarn-client"는 "yarn-cluster"를 대체합니다
$ ./bin/spark-shell --master yarn-client

* 다른 jar 추가하기 (Adding Other JARs)
yarn-cluster mode에서 드라이버는 클라이언트와 다른 시스템에서 실행됩니다
그래서 SparkContext.addJar은 해당 local client 밖에서 동작하지 않습니다
SparkContext.addJar를 클라이언트에서 사용할 수 있게 만들려면 --jar 옵션 commend를 사용합니다
$ ./bin/spark-submit --class my.main.Class \
    --master yarn-cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar
    my-main-jar.jar
    app_arg1 app_arg2

* 어플리케이션 디버깅 (Debugging your Application)
YARN용어로 executors와 application master는 내부에서 "containers"를 실행합니다
YARN 응용프로그램이 완료된  후컨테이너의 로그를 처리하기위한 두가지 모드가 있습니다
로그 aggregation이 켜져있는 경우 (yarn.log-aggregation-enable 설정 포함), 컨테이너 로그를 HDFS에 복사하고 로컬 컴퓨터에서 삭제 됩니다
이러한 로그는 "yarn logs"명령을 사용하여 어디서나 클러스터에서 볼 수 있습니다
yarn logs -applicationId <app ID>

해당 어플리케이션의 모든 컨터에너의 로그파일 내용을 볼 수 있습니다

로그 aggregation 이 설정되어 있지 않은경우
로그는 각 시스템의 YARN_APP_LOGS_DIR에 유지됩니다
하둡 버전과 설치에 따라 다르지만 일반적으로 /tmp/logs/ 또는 정의된 $HADOOP_HOME/logs/userlogs에 있습니다
컨테이너의 로그를 보려면 컨테이너가 있는 호스트로가서 디렉토리를 봐야 합니다
하위 디렉토리에는 어플리케이션 ID와 컨테이너 ID로 로그파일이 구성되어 있습니다

컨테이너당 실행 환경을 보는것은 
큰값 yarn.nodemanager.delete.debug-delay-sec으로 증가합니다 (예를 들어 36000)
그리고 컨테이너가 시작되어있는 노드에서 yarn.nodemanager.local-dirs를 통해 어플리케이션 캐시에 접근할 수 있습니다

이 디렉토리에는 start스크립트가 포함되어 있고 jar와 컨테이너 실행에 사용되는 모든 환경 변수가 있습니다
이 과정은 class path문제를 디버깅 하는데 유용합니다
(이 작업을 가능하게 하는 클러스터 설정 및 노드 매니저를 다시 시작할 수 있는 관리자 권한이 필요합니다. 따라서 호스트 클러스터에서 적용할 수 없습니다)

* 중요 사항 (Important Notes)
hadoop 2.2 이전 YARN컨테이너 리소스 요청에 cores를 지원하지 않습니다
따라서 이전 버전에서 실행하는 겨우 commend line을 통해 cores의 숫자를 YARN에 전달할  수없습니다
core요청이 스케줄링 결정에 적용 여부는 그것이 어떻게 구성되어 있는지 스케줄러 사용에 의존하고 있다

Spark executors에 의해 사용되는 로컬 디렉토리는 YARN에 대해 구성된 로컬 디렉토리가 될 것입니다
(Hadoop YARN의 config yarn.nodemanager.local-dirs)
사용자가 spark.local.dir을 지정한 경우 그것은 무시됩니다

하둡 #과 유사한 --files와 --archives 옵션에 같이 지정하는 파일 이름을 지원합니다
예를 들어 --files localtest.txt#appSees.txt 이것은 로컬에는 localtest.txt 이름으로 있지만 HDFS에는 appSees.txt이름으로 연결되어 파일을 업로드 합니다
YARN에서 실행되는 경우 응용프로그램은 appSees.txt 이름으로 참조해야합니다

로컬 파일 --jar 옵션을 사용하여 yarn-cluster mode로 실행하는 경우 SparkContext.addJar기능이 작동할 수 있습니다
만약 HDFS, HTTP, HTTPS 또는 FTP파일과 함께 사용하는 경우에는 사용할 필요가 없습니다




신고





Posted by 파란물