03. Installation

2015/04/21 16:17

Elasticsearch java 7 필요로 한다. 특히 이글을 쓰는 시점에서, Oracle J 1.8.0_25 버전을 사용할 것을 추천한다. Java 설치는 platform 따라 다르다. 따라서 여기서 Java 설치에 대해서 깊이 들어가지는 않을 것이다.

여러분이 elasticsearch 설치하기 전에 java version 체크하라는 것을 말하는 것으로 충분하다. (필요하다면 java 설치하거나 업그레이드 하라)


java -version

echo $JAVA_HOME


Java 셋업하고 나서, elasticsearch 다운로드하고 실행한다. www.elasticsearch.org/download 에서 과거에 릴리즈된 모든 버전의 바이너리를 이용할 있다. 릴리즈별로 zip/tar 압축 파일을 선택하거나 DEB/RPM package 선택할 있다. 간단하게 말하자면, 그냥 tar 파일을 사용하자.


Elasticsearch 1.5.1 tar 파일을 다음과 같이 다운로드 하자. (윈도우 사용자는 zip package 다운로드 해야 한다.)


curl -L -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.5.1.tar.gz


그리고 다음과 같이 압축을 푼다. (윈도우 사용자는 zip package unzip 해야 한다.)


tar -xvf elasticsearch-1.5.1.tar.gz


현재 directory 뭉텡이의 file/folder들을 생성할 것이다. 다음과 같이 bin directory 이동하자.


cd elasticsearch-1.5.1/bin


이제 여러분은 지금 node에서 단일 cluster 실행할 준비가 되었다. (윈도우 사용자는 elasticsearch.bat 파일을 실행하라)


./elasticsearch


모든 것이 제대로 되었다면, 아래와 같은 메시지들을 확인할 있을 것이다.


./elasticsearch
[2014-03-13 13:42:17,218][INFO ][node           ] [New Goblin] version[1.5.1], pid[2085], build[5c03844/2014-02-25T15:52:53Z]
[2014-03-13 13:42:17,219][INFO ][node           ] [New Goblin] initializing ...
[2014-03-13 13:42:17,223][INFO ][plugins        ] [New Goblin] loaded [], sites []
[2014-03-13 13:42:19,831][INFO ][node           ] [New Goblin] initialized
[2014-03-13 13:42:19,832][INFO ][node           ] [New Goblin] starting ...
[2014-03-13 13:42:19,958][INFO ][transport      ] [New Goblin] bound_address {inet[/0:0:0:0:0:0:0:0:9300]}, publish_address {inet[/192.168.8.112:9300]}
[2014-03-13 13:42:23,030][INFO ][cluster.service] [New Goblin] new_master [New Goblin][rWMtGj3dQouz2r6ZFL9v4g][mwubuntu1][inet[/192.168.8.112:9300]], reason: zen-disco-join (elected_as_master)
[2014-03-13 13:42:23,100][INFO ][discovery      ] [New Goblin] elasticsearch/rWMtGj3dQouz2r6ZFL9v4g
[2014-03-13 13:42:23,125][INFO ][http           ] [New Goblin] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[/192.168.8.112:9200]}
[2014-03-13 13:42:23,629][INFO ][gateway        ] [New Goblin] recovered [1] indices into cluster_state
[2014-03-13 13:42:23,630][INFO ][node           ] [New Goblin] started


너무 상세히 들어가지 말고, "New Goblin"이라고 이름 붙여진 node name 있을 것이다. 단일 cluster에서 master로서 선출되고 실행된 node이다. 지금 순간에 master 무엇을 의미하는지에 대해서 걱정하지 말라. 여기서 중요한 것은 하나의 cluster에서 하나의 node 실행했다는 것이다.


이전에 언급한 것처럼, cluster node name override (재정의) 있다. Command line에서 elasticsearch 실행할 다음과 같이 있다.


./elasticsearch --cluster.name my_cluster_name --node.name my_node_name


또한 HTTP 주소와 포트에 대한 정보를 가진 http mark 라인이 접속 가능한 주소임을 주목하라. 기본적으로 elasticsearch 9200 포트 번호를 사용하여 REST API 접근할 있다. 필요할 , 포트 번호는 설정할 있다.

02. Basic Concepts

2015/04/21 15:46

Elasticsearch 핵심적인 몇가지 concept(개념) 있다. 처음부터 이러한 concept 이해하는 것이 elasticsearch 대해서 배우는 과정 중에서 굉장히 도움이 것이다.


[Near Realtime (NRT)]

Elasticsearch near realtime 검색 플랫폼이다. 이것이 의미하는 바는 여러분이 document index하는 시점으로부터 검색 결과를 얻기까지 약간의 지연 현상(일반적으로 1) 있음을 의미한다.


[Cluster]

Cluster 여러분의 전체 데이터를 담고 있는 하나 이상의 node(server) 모음을 말한다. 그리고 Cluster 모든 node 있는 federated indexing ( node 개별적으로 존재하는 index들의 연합) 검색 기능을 제공한다. Cluster 기본적으로 "elasticsearch"라는 유일한 이름으로 구분된다. 하나의 node 한번 cluster join하도록 셋팅되면 다른 곳에 참여할 없기 때문에 cluster 이름이 중요하다. Prodoction에서는 명시적으로 cluster name 설정하는 것이 좋다. 하지만, testing/development 목적으로는 default name 사용하는 것이 유용하다.


하나의 node만으로 cluster 구성하는 것도 유효하고 완벽하게 동작한다는 것에 유념하라. 더욱이, 유일한 cluster name 가지는 독립적인 여러 cluster 구성하는 것도 가능하다.


[Node]

Node cluster 일부분이고, 데이터를 저장하고, cluster index 검색 기능에 참여하는 하나의 서버이다. Cluster 같이 node 유일한 이름을 갖는다. 기본적으로는 시작 시점에 node 할당되는 임의의 Marvel character명을 갖는다. 기본적으로 부여되는 node명을 원하지 않으면 여러분이 원하는 어떤 이름으로도 node명을 정할 있다. 이름은 네트워크에 있는 어느 서버가 여러분의 elasticsearch cluster 있는 node인지 구분하기 위한 관리하는데 사용되므로 중요하다.


Node cluster name으로 특정 cluser join하도록 설정할 있다. 기본적으로 개별 node elasticsearch라는 이름의 cluster join하도록 셋팅한다. 이것은 여러분이 네트워크 상에 많은 node 실행한다면 - 서로 discover 가능하다고 가정하면 - 자동으로 elasticsearch라고 이름 붙여진 cluster join하여 cluster 구성함을 의미한다.


여러분은 많은 node 하나의 cluster 구성할 있다. 나아가, 네트워크 상에 현재 구동되어 있는 Elasticsearch ndoe 없다면, 하나의 node 실행하여 elasticsearch 이름붙여진 새로운 단일 node cluster 기본적으로 구성할 수도 있다.


[Index]

Index 유사한 특징들을 가진 document들의 집합이다. 예를 들어, customer data 대한 index, 제품 카탈로그에 대한 다른 index, order(주문) 대한 다른 index 가질 있다. Index name (소문자여야만 한다.)으로 구분되고, 이름은 document 대한 indexing, search, update, delete 작업을 수행하기 위해 index 참조하는데 사용한다.


하나의 cluster에서 여러분이 원하는 대로 여러 개의 index 정의할 있다.


[Type]

Index내에서 하나 이상의 type 정의할 있다. Type 논리적인 index category/partition이다. 일반적으로, type common field 가진 document 대해 정의되어 있다. 예를 들어, 여러분이 blogging platform 운영하고 여러분의 모든 data 하나의 index 저장한다고 가정해 보자. index에는 user data, blog data, comments data 등에 대한 type 정의되어 있을 것이다.


[Document]

Document index되는 정보의 기본 단위이다. 예를 들어, 단일 고객에 대한 document, 단일 상품에 대한 document, 단일 주문에 대한 document등을 가질 있다. 이러한 document 인터넷상에서 데이터 형식으로 폭넓게 사용되는 JSON으로 표시된다. (Javascript Object Notation)


여러분은 Index/type내에 여러분이 저장하고자 하는 많은 document 저장할 있다. Document 물리적으로는 index내에 있다고 할지라도, document 실제로 index내에서 Type으로 index되고 지정되어야만 한다는 것에 유의하라.


[Shards & Replicas]

Index 잠재적으로 하나의 node에서 하드웨어 limit 넘어서는 대용량의 데이터를 저장할 있다예를 들어서 1TB disk space 차지하는 10억개의 document 대한 하나의 index 하나의 node 있는 disk에는 맞지 않거나 single node만으로 검색 요청을 수용하기에는 너무 느릴 것이다.


문제를 해결하기 위해서, elasticsearch 여러분의 index shard라고 부르는 여러 개의 조각으로 세분화할 있도록 한다. 여러분이 index 생성할 , 여러분이 원하는 shard 수를 간단히 지정할 있다. shard cluster 있는 어떤 node에서도 host 있는 fully-functional and independent "index" 자체이다.


Sharding 2가지 주요 이유 때문에 중요하다.

  • Content Volume 수평적 확장 가능
  • 성능과 Throughput 향상을 위해 (여러 node 분산되어 있는) shard 분산/병렬 처리 가능

Shard 어떻게 분산되어 있는지, document들이 search request로부터 어떻게 수집되는지에 대한 메커니즘은 elasticsearch 의해 완벽하게 관리된다. 그리고 여러분은 user로서 알기 쉽다.


언제라도 실패할 있는 네트워크나 클라우드 환경에서, 무슨 이유에서건 shard node offline 되거나 사라지는 경우에 대한 failover 메커니즘을 가지는 것이 강력하게 추천되는 유용한 기능이다. 이러한 목적으로 elasticsearch (replica shard 혹은 짧게 replicas라고 부르는) index shard 대한 하나 이상의 복사본을 가질 있다.


Replication 2가지 이유 때문에 중요하다.

  • Shard/node fail, high availability 제공한다. 이런 이유로, replica shard 절대로 원본 (original/primary) shard 동일한 node 할당되면 된다.
  • Search volume 대한 scale out 제공하고, 모든 replica에서 병렬적으로 검색을 실행할 있기 때문에 throughput 향상된다.

요약하자면, index 여러 개의 shard 나누어질 있다. Index 0(replicas 없는 경우) 이상 복제될 있다. 한번 복제되면, index primary shard(원본) replica shards를 갖는다. Shards와 replicas의 개수는 index 생성될 , index별로 정의될 있다. Index 생성된 이후에는 어느 때라도 동적으로 replicas 수를 변경할 있지만, 사후에 shard 숫자를 변경할 수는 없다. (동적으로 replicas 수를 변경할 수는 있지만 shard 숫자는 변경할 없다.)


기본적으로, elasticsearch내의 index 5개의 primary shard 1개의 replica 할당한다. 이것은 여러분이 cluster내에 최소한 2개의 node 가지고 있다면, 여러분의 index 5개의 primary shard 5개의 replica shard (1개의 replica set) 가진다는 의미이다. 인덱스당 10개의 shard 갖는다.


{Note}

elasticsearch shard Lucene index이다. 하나의 Lucene Index 저장할 있는 Max Document 수가 있다. Limit Integer.MAX_VALUE -128 해당하는 2,147,483,519개이다. _cat/shards API 통해서 shard size 모니터링할 있다.

(Chapter 3) 16. RDD Operations

2015/04/08 13:21

RDD 다음 2가지 형태의 Operation 지원한다.

  • Transformation
  • Action

Transformation map, filter 같은 새로운 RDD return하는 RDD상의 operation이다. Action driver program으로 결과를 return하거나 storage 결과를 write하고, count first 같은 연산을 시작하는 operation이다. Spark에서는 transformation action 아주 다르게 취급한다. 따라서, 여러분이 수행하고자 하는 작업의 형태에 대해서 이해하는 것이 아주 중요하다. 만약 여러분이 여전히 주어진 기능에 대해서 transformation인지, action인지 혼란스럽다면 return 타입을 통해 있다. Transformation 새로운 RDD return하지만, action 다른 data type return한다.


Transformations


Transformation 새로운 RDD return하는 operation이다. Lazy evaluation section에서 논의되겠지만, transformed RDD 단지 여러분이 action에서 사용할 , 느긋하게(lazily) 계산된다. 많은 transformation(변형) element측면에서 일어난다. Transformation 한번에 하나의 element상에서 동작하지만, 모든 transformation 그런 것은 아니다.


예를 들면, 수많은 메시지를 담고 있는 log.txt라는 로그 파일을 가지고 있다고 생각해보자. 그리고 중에서 error 메시지만을 추출하고 싶다. 여기에 앞에서 보았던 filter transformation 사용할 있다. Spark 3가지 언어로 filter API 살펴보면 다음과 같다.


Example 3-8. Python filter example

inputRDD = sc.textFile("log.txt")

errorsRDD = inputRDD.filter(lambda x:"error" in x)


Example 3-9. Scala filter example

var inputRDD = sc.textFile("log.txt")

var errorsRDD = inputRDD.filter(line => line.contains("error"))


Example 3-10. Java filter example

JavaRDD<String> inputRDD = sc.textFile("log.txt");

JavaRDD<String> errorsRDD = inputRDD.filter(

    new Function<String, Boolean>() {

         public Boolean call (String x) {return x.contains("error");}

    }

);


Filter operation 기존의 inputRDD 대한 돌연변이가 아님에 주의하라. 전혀 새로운 RDD 대한 포인터를 return한다. inputRDD 프로그램내에서-예를 들어, 다른 단어들을 검색하는데-나중에 재사용할 있다. Warning이라는 단어를 검색하는데 inputRDD 사용해보자. , 새로운 transformation union 사용하여 error warning 단어를 포함하고 있는 모든 line 출력할 있다. 이에 대한 Python code 있지만, union() function 3가지 언어에서 모두 동일하다.


Example 3-11. Python union example

errorsRDD = inputRDD.filter(lambda x:"error" in x)

warningsRDD = inputRDD.filter(lambda x:"warning" in x)

badLinesRDD = errorsRDD.union(warningsRDD)


union filter와는 약간 다르다. 하나의 RDD 아니라 2개의 RDD 가지고 동작하기 때문이다. Transformation 복수 개의 input RDD 가지고 동작할 있다.


끝으로, transformation 사용하여 서로 다른 새로운 RDD 이끌어낼 , Spark 다른 RDD 사이의 dependency 집합을 추적하고 유지한다. 그것을 lineage graph라고 한다. Spark 요구에 맞춰 RDD 계산할 , 정보를 이용하고 Persistent RDD 일부 정보가 유실되었을 , 이를 이용해 복구한다.



<Figure 3-1. RDD lineage graph created during log analysis>

(Chapter 3) 15. Creating RDDs

2015/04/08 13:20

Spark RDD 생성하는 다음 2가지 방법을 제공한다.

  • Loading external dataset
  • Parallelizing collection in driver program

RDD 생성하기 위한 가장 간단한 방법은 memory 있는 collection 가져다가 parallelize method 통해 SparkContext 전달하는 방법이다. 방법은 Spark 공부할 , 아주 유용하다. Shell에서 간단하게 RDD 빠르게 생성하여 여러 operation 수행할 있기 때문이다. 그러나, prototyping이나 testing 목적이 아니라면, 하나의 machine에서 메모리에 전체 dataset 가지고 있는 형태이므로 폭넓게 사용되는 것이 아니라는 점에 주의하라.


(Example 3-2. Python Parallelize Example)

lines = sc.parallelize(["pandas", "I like pandas"])


(Example 3-3. Scala Parallelize Example)

var lines = sc.parallelize(List("pandas", "I like pandas"))


(Example 3-4. Java Parallelize Example)

JavaRDD<String> lines = sc.parallelize(Array.asList("pandas", "I like pandas"));


RDD 생성하는 일반적인 방법은 external storage로부터 data loading하는 것이다. External dataset loading하는 것은 Chapter 5에서 상세히 다룰 것이다. 그러나 우리는 이미 text file로부터 SparkContext.textFile(…) 이용하여 String 포함하는 RDD 생성하기 위해 data loading하는 것을 이미 살펴보았다.


(Example 3-5. Python textFile Example)

lines = sc.textFile("/path/to/README.md")


(Example 3-6. Scala textFile Example)

var lines = sc.textFile("/path/to/README.md")


(Example 3-7. Java textFile Example)

JavaRDD<String> lines = sc.textFile("/path/to/README.md");

(Chapter 3) 14. RDD Basics

2015/03/16 21:00

Chapter 3. Programming with RDDs

 

이 장에서는 Data를 가지고 작업을 수행할 수 있는 Spark's Core Abstraction RDD(Resilient Distributed Dataset)에 대해서 소개하고자 한다. RDD는 간단히 말해서 element들의 distributed collection이다. Spark내에서 모든 작업들은 결과를 계산하기 위해서 새로운 RDD를 생성하거나 기존 RDD를 변형하거나 RDD상에서 연산을 호출하는 것으로써 이루어진다. Spark의 엔진에서는 자동으로 RDD내의 Data  Cluster로 분산시키고,  수행하고자 하는 작업을 병렬로 처리한다.
 

Data Scientist Engineer 모두 이번 장을 읽어야 한다. Spark에서 RDD Core Concept이기 때문이다. Interactive Shell에 있는 example들을 실행해보기를 강력히 추천한다. 이번 장의 모든 code들은 이 책의 Git Hub Repository에 있다. (https://github.com/databricks/learning-spark)

 

RDD Basics

 

Spark에서 RDD는 간단하게 말하자면 Object들의 Distributed Collection이라고 할 수 있다. 각각의 RDD Cluster의 여러 다른 Node에서 계산되는데 사용되는 multiple partition으로 나누어져 있다. RDD는 다양한 형태의 User-Defined Class Python, Java, Scala Object를 포함할 수 있다.

 

User RDD를 외부 Dataset loading하는 경우와 Driver Program을 통해 Object Collection을 분산 처리하는 경우의 2가지 방법을 통해서 생성할 수 있다. 우리는 이미 SparkContext.textFile()을 사용해서 String 형태의 RDDtext file에서 loading하는 것을 살펴 보았다.

 

(Example 3-1. Creating an RDD of strings with textFile() in Pythpn)

>>> lines = sc.textFile("README.md")

 

한번 생성되면, RDD는 다음 2가지 형태의 연산을 제공한다. : transformation and action

 

Transformation(변형)은 이전 RDD로부터 새로운 RDD를 구성한다. 예를 들면, 우리가 전에 보았던 한가지transformation은 기술한 것과 매칭되는 data filtering이다. 예제로 만든 text file에서는 Python을 포함하는 string으로 이루어진 새로운 RDD를 생성하는데 사용할 수 있다.

 

>>> pythonLines = lines.filter(lambda line: "Python" in line)

 

한편, action RDD를 기반으로 결과를 계산하고, driver program에 그 결과를 return하거나 HDFS와 같은 외부storage system에 그 결과를 저장하는 것이다. 앞에서 살펴본 action의 한가지 예인 first() RDD에서 첫번째element return한다.

 

>>> pythonLines.first()

u'## Interactive Python Shell'

 

Transformation action의 차이점은 Spark RDD를 계산하는 방식 때문이다. 비록 여러분이 어느 때라도 새로운RDD를 정의할 수 있을지라도, Spark는 단지 최초에는 action에서 RDD를 사용하는 lazy fashion으로 계산을 수행한다. 이러한 접근법은 처음에는 일반적이지 않게 보일지도 모르지만, big data로 작업할 때는 이해가 된다. 예를 들어, 위에서 언급한 첫번째 예를 고려해 보자. 우리는 text file을 정의하고 Python 문자열을 가진 string을 추출해 낼 것이다. 만약, 우리가 lines = sc.textFile()을 호출했을 때, Spark text file내의 모든 line load하고 저장한다면 많은 storage space를 낭비하게 되고, 수많은 line filter output을 받게 될 것이다. 대신에, Sparktransformation의 전체 chain에 대해서 한차례 보고, 그 결과에 필요한 data에 대해서만 연산할 수 있다. 사실 first() action 수행할 때, Spark는 단지 first match line에 해당하는 line을 찾을 때까지 scan만 하면 된다. 전체 파일에 대해서 read하지 않아도 된다.

 

마지막으로 Spark RDD는 기본적으로 여러분이 action을 실행할 때마다 매번 재계산된다. 만약, 여러 action에서RDD를 재사용하고자 한다면, RDD.persist()를 사용하여 RDD Persist(지속하여 유지)하도록 Spark에게 요청할 수 있다. 최초 연산 후에, Spark RDD content memory (Cluster내의 여러 machine에 나누어진 memory)에 저장할 것이다. 그리고 이후 여러 action에서 재사용할 수 있다. Memory 대신 Disk RDD persist(지속적으로 유지)하는 것도 가능하다. 기본적으로 persist하지 않도록 하는 것이 비정상적으로 보일지도 모르지만, big dataset에 대해서 생각해 보면 이해가 된다. 만약 여러분이 RDD를 재사용하지 않을 것이라면, Spark는 단지 한차례 data를 이용하여 결과를 계산할 것이기 때문에 storage space를 낭비할 이유가 없다.

 

실제로는 memory data 중에서 subset load하고 반복적인 query를 수행하기 위하여 persist를 종종 사용한다.예를 들어, Python 문자열을 포함하고 있는 README line들에 대해 여러 결과를 계산하고 싶어한다면 다음과 같은 코드를 작성할 수 있다.

 

>>> pythonLines.persist()

>>> pythoneLines.count()

2

>>> pythonLines.first()

u'## Interactive Python Shell'

 

요약하자면, 모든 Spark program shell session은 다음과 같이 동작할 것이다.

a.External Data로부터 input RDD를 생성한다.

b.     Filter()와 같은 transformation을 사용하여 새로운 RDD를 정의하기 위해 변형한다.

c. 재사용을 위한 중간 결과물인 RDD를 유지하기 위하여 Spark persist()를 요청한다.

d.     Spark에 의해 최적화되고, 실행되는 병렬 연산을 시작하기 위해 count() first()와 같은 action을 실행한다.

 

이 장의 나머지 부분에서는 이 과정에 대해서 자세히 들여다볼 것이다. Spark의 가장 기본이 되는 공통 RDD operation에 대해서도 다룰 것이다.