Version 3

 

- 여기서 시도할 방식은 흔히 Secondary Sorting이라는 것이다.

- 보통 reducer로 넘어오는 밸류 리스트는 특별한 순서가 없이 랜덤하다. 앞에서 보았던 SortingComparator를 보면 키값 비교를 위해 단순히 키만 비교하기 때문

- 만일 밸류리스트에 순서를 줄 수 있다면 이 문제를 reducer단에서 해결가능!

* 즉 같은 단어를 갖는 DocID의 리스트를 소팅된 상태로 받을 수 있다면 간단하게 같은 DocID에서 넘어온 단어들을 한번만 출력가능

 

* 또한 Inverted index의 문서 리스트가 ID로 소팅이 되기때문에 다른 연산 (AND연산등)이 간단해진다.

 

Version 3 - 새타입사용

 

- Secondary sorting을 하려면 커스텀타입이 키로 사용되어서 밸류가 키로 들어가야함

* 그래야 나중에 GroupingComparator가 그룹핑시 밸류는 무시하고 SortingComparator는 소팅시 밸류를 염두에 둘수 있다.

 

- WordID라는 타입을 정의

* WritableComparable에서 계승

* 다음 2개의 멤버변수를 가짐

 

 

Version 3 - main

 

- partitoner, SortingComparator, GroupComparator를 모두 커스텀클래스로 교체

 

 

 

 

 

 

Version 2

 

- 앞에서 설명했듯이 이 버전은 Mapper에서 단순무식하게 (word.docID)쌍을 출력하는 것이 아니라 HashSet을 이용한 unique한 (word.docID)쌍을 내보낸다.

 

- StringTokenizer를 이용해 파싱이 끝나면 루프를 돌면서 단어들을 HashSet에 집어넣은 다음에 HashSet을 iteration하면서 나온 단어들과 해당문서의 docID를 Reducer로 넘긴다.

 

Version2 -map

 

 

 

Version2의 문제

 

- 버전 2는 버전1에 비해 Mapper에서 Reducer로 넘어가는 데이터의 크기가 훨씬작음

 

- 하지만 아주 큰 텍스트를 가진 문서들이 많은 경우 HashSet의 크기가 커져 역시 메모리에러의 가능성 존재

 

- 다른 방식은 Version 1 처럼 Mapper/Reducer를 구현하고 중간의 Shuffling/Sorting 방법을 바꿔보는 것이다.

 

 

INVERTED INDEX V1

 

이번에는 앞서 보았던 데이터파일중의 하나인 2M.ID.CONTENTS파일을 이용해 Inverted Index를 만들어봅니다.

예를들어 hadoop이란 단어가 들어간 문서들의 리스트를 모아보는 것인데 이를 모든 단어들에 대해 수집합니다.

기본적으로 텍스트검색엔진이 수행하는 일이 이것인데 보다 자세한 랭킹을 위해 단어가 나타난 위치 등등..의 세세 정보를 기록합니다.

 

 

V1 (Version 1)

 

- V1은 아무런 최적화작업없이 WordCount를 조금 바꾼 형태로 구현됩니다.

- WordCount에서는 텍스트부분을 파싱한 다음에 만들어진 토큰들에 대해 다음과 같이 reducer로의 출력쌍을 만들었습니다.

* context.write(word, new LongWritable(1));

- InvertedIndex에서는 context.write(word,new Text(docID));

- 위와같이 단어키에 대해 docID를 밸류로 내보낸다.

- Reducer 부분에서는 그냥 넘어오는 docID를 계속해서 스트링버퍼에 append한 후 결과물로 내보낸다.

 

Version1 -reduce

- Map 부분은 앞서 본 프로그램과 너무 비슷해서 건너뜁니다.

 

 

 

Version 1의 문제

 

실행해보면 아마도 Heap memory 에러와 같은 것을 볼 수 있을 것입니다.

이유는 특정 단어의 경우 한 문서에도 여러번 나오는등 빈도수가 아주 큰데 지금의 구현은 한 문서에 어떤 단어가 여러번 나올 경우 그 수만큼 반복하기 때문이다.

 

채결책!

JVM의 메모리 증가. 디폴트로 태스크마다 할당되는 JVM은 200M의 메모리를 사용. mapred-site.xml의 mapred.child.java.opts파라미터을 이용해 증가 (아래예는 1GB로 증가)

 

 

Mapper단에서 HashSet을 구현하여 같은 단어들이 여러번 나오더라도 한번만 emit하던지 아니면 빈도수를 문서 ID와 함께 내보냅니다.

JOIN ID AND TITLE V2

 

MyMapper.setup (2)

 

 

실행결과

- JoinlDtitle과 같은 결과를 내겠지만 속도라는 측면에서 훨씬 더 빠르게 수행된다.

 

JOIN ID AND TITLE V2

 

main 함수

 

- org.apache.hadoop.filecache.Distributed Cache를 임포트 한다.

 

- doclDFreq로 HDFS상의 Top Citation파일의 위치를 저장한 다음에 (main 함수의 실행인자로 받아들이게 구현) 다음함수를 호출해서 DistributedCache에 등록

(이 함수는 여러번 호출되어도 무방)

* DistributedCache.addCacheFile(new URI(doclDFreq), conf);

 

MyMapper.setup(1)

 

- Mapper의 setup메소드에서는 다음 함수를 호출하여 distributed cache로 등록된 파일들의 위치정보를 받는다(이젠 모두 로컬파일시스템의 path!)

 

localFiles=DistributedCache.qetLocalCacheFiles(context.getConfiguration());

 

- 이때 리턴되는 값은 Path의 배열인데 이 경우 우린 첫번째 원소만 필요하다. 그걸 String으로 바꿔서 Java의 File I/O stream을 이용해 한줄씩 읽어서 해쉬맵에 저장

 

다음글에서 계속 공부할께요 ^-^

 

 

JOIN ID AND TITLE V2

 

개요

- 앞서 간략히 Distributed Cache라는 것에 대해 이야기했는데 이는 작은 사이즈의 읽기전용 파일(흔히 사전)을 task tracker에 working directory에 복사해주는 메커니즘을 말한다.

 

* Distributed Cache로 사용될 파일은 먼저 HDFS에 복사되고 그 위치가 사용되어야한다.

* Distributed Cache의 디폴트최대크기는 10GB인데 실질적으로 의미있는 최대크기는 100MB 정도.

 

사용전략

-어느파일을 Distributed Cache로 사용할 것인가?

* CountCitation의 실행 후 나온결과에 TopN을 돌려서 나온 결과를 distributed cache로 사용.

N이 작다면 이 파일의 크기는 작다.

* TopN의 결과 파일이 이미 HDFS에 있으므로 이들 그대로 사용한다.

- 프로그램의 인자로 이 distributed cache파일의 HDFS상의 위치를 넘긴다.

- Main함수에서 이 위치를 단순히 Distributed Cache로 등록한다.

- Mapper 실행시 setup 메소드에서 이 위치를 참조하여 파일을 오픈한다.

- 앞서 이야기했듯이 이 파일은 task의 실행 전에 HDFS에서 로컬파일 시스템으로 복사되기 때문에 그냥 로컬 텍스트 파일을 읽는 코드를 작성하여 ID를 키로 하여 해쉬맵에 빈도수를 값으로 저장한다.

 

- Mapper의 map 메소드에서 읽혀지는 Title ID 쌍에 대해 ID가 앞서 setup에서 만들어진 해쉬맵에 존재하는지 확인

* 존재하면 키를 타이틀로 하고 밸류는 DocID + "t"+빈도수로 해서 reducer로 내보낸다.

 

- 이 경우 Reducer는 딱히 할일이 없기 때문에 Identity Reducer를 지정한다.

 

 

SORT STRING

 

개요

- 하둡은 분산환경의 머지소팅 프레임웍

- 스트링소팅의 경우 아주 쉽게 코딩이 가능

* Identity Mapper와 Identity Reducer를 사용!

- 입력파일은 한줄에 한 문자열이 있는 텍스트 파일

 

main함수

 

 

내부동작

 

- IdentityMapper -> 입력을 그대로 출력

- Partitioner -> Reducer가 하나! / Reducer가 여러개라면 다른 전략이 필요

- GroupingComparator,SortingComparator!

 

 

 

MyReducer.reduce

 

public void reduce(Text key, lterable<Text>

valuelter, final Context context) throws

IOException, InterruptedException

{

String title=null;

String frequency=null;

int count=0;

for(Text t: valuelter)

{

String str=t.toString();

String[]tokens=str.split("\\t");

if(tokens[1].equals("1"))

title=tokens[0];

else

frequency=tokens[0];

count++;

}

if (count == 2 && title !=null && frequency !=null)

context.write(key,new Text(title+"\\t"+frequency));

}

 

 

 

실행결과

(top 10 most referred wikipedia pages)

 

14532    ltaly    23075

15573    Japan    21453

3383    Vrazil    13632

38523    Departments of France    10900

9239    Europe    12545

9316    England    34884

14533    India    24610

5405    China    12187

18951490    American football    9536

5407    California    15945

 

+ Recent posts