Version 2

 

- 앞에서 설명했듯이 이 버전은 Mapper에서 단순무식하게 (word.docID)쌍을 출력하는 것이 아니라 HashSet을 이용한 unique한 (word.docID)쌍을 내보낸다.

 

- StringTokenizer를 이용해 파싱이 끝나면 루프를 돌면서 단어들을 HashSet에 집어넣은 다음에 HashSet을 iteration하면서 나온 단어들과 해당문서의 docID를 Reducer로 넘긴다.

 

Version2 -map

 

 

 

Version2의 문제

 

- 버전 2는 버전1에 비해 Mapper에서 Reducer로 넘어가는 데이터의 크기가 훨씬작음

 

- 하지만 아주 큰 텍스트를 가진 문서들이 많은 경우 HashSet의 크기가 커져 역시 메모리에러의 가능성 존재

 

- 다른 방식은 Version 1 처럼 Mapper/Reducer를 구현하고 중간의 Shuffling/Sorting 방법을 바꿔보는 것이다.

 

 

INVERTED INDEX V1

 

이번에는 앞서 보았던 데이터파일중의 하나인 2M.ID.CONTENTS파일을 이용해 Inverted Index를 만들어봅니다.

예를들어 hadoop이란 단어가 들어간 문서들의 리스트를 모아보는 것인데 이를 모든 단어들에 대해 수집합니다.

기본적으로 텍스트검색엔진이 수행하는 일이 이것인데 보다 자세한 랭킹을 위해 단어가 나타난 위치 등등..의 세세 정보를 기록합니다.

 

 

V1 (Version 1)

 

- V1은 아무런 최적화작업없이 WordCount를 조금 바꾼 형태로 구현됩니다.

- WordCount에서는 텍스트부분을 파싱한 다음에 만들어진 토큰들에 대해 다음과 같이 reducer로의 출력쌍을 만들었습니다.

* context.write(word, new LongWritable(1));

- InvertedIndex에서는 context.write(word,new Text(docID));

- 위와같이 단어키에 대해 docID를 밸류로 내보낸다.

- Reducer 부분에서는 그냥 넘어오는 docID를 계속해서 스트링버퍼에 append한 후 결과물로 내보낸다.

 

Version1 -reduce

- Map 부분은 앞서 본 프로그램과 너무 비슷해서 건너뜁니다.

 

 

 

Version 1의 문제

 

실행해보면 아마도 Heap memory 에러와 같은 것을 볼 수 있을 것입니다.

이유는 특정 단어의 경우 한 문서에도 여러번 나오는등 빈도수가 아주 큰데 지금의 구현은 한 문서에 어떤 단어가 여러번 나올 경우 그 수만큼 반복하기 때문이다.

 

채결책!

JVM의 메모리 증가. 디폴트로 태스크마다 할당되는 JVM은 200M의 메모리를 사용. mapred-site.xml의 mapred.child.java.opts파라미터을 이용해 증가 (아래예는 1GB로 증가)

 

 

Mapper단에서 HashSet을 구현하여 같은 단어들이 여러번 나오더라도 한번만 emit하던지 아니면 빈도수를 문서 ID와 함께 내보냅니다.

JOIN ID AND TITLE V2

 

MyMapper.setup (2)

 

 

실행결과

- JoinlDtitle과 같은 결과를 내겠지만 속도라는 측면에서 훨씬 더 빠르게 수행된다.

 

JOIN ID AND TITLE V2

 

main 함수

 

- org.apache.hadoop.filecache.Distributed Cache를 임포트 한다.

 

- doclDFreq로 HDFS상의 Top Citation파일의 위치를 저장한 다음에 (main 함수의 실행인자로 받아들이게 구현) 다음함수를 호출해서 DistributedCache에 등록

(이 함수는 여러번 호출되어도 무방)

* DistributedCache.addCacheFile(new URI(doclDFreq), conf);

 

MyMapper.setup(1)

 

- Mapper의 setup메소드에서는 다음 함수를 호출하여 distributed cache로 등록된 파일들의 위치정보를 받는다(이젠 모두 로컬파일시스템의 path!)

 

localFiles=DistributedCache.qetLocalCacheFiles(context.getConfiguration());

 

- 이때 리턴되는 값은 Path의 배열인데 이 경우 우린 첫번째 원소만 필요하다. 그걸 String으로 바꿔서 Java의 File I/O stream을 이용해 한줄씩 읽어서 해쉬맵에 저장

 

다음글에서 계속 공부할께요 ^-^

 

 

JOIN ID AND TITLE V2

 

개요

- 앞서 간략히 Distributed Cache라는 것에 대해 이야기했는데 이는 작은 사이즈의 읽기전용 파일(흔히 사전)을 task tracker에 working directory에 복사해주는 메커니즘을 말한다.

 

* Distributed Cache로 사용될 파일은 먼저 HDFS에 복사되고 그 위치가 사용되어야한다.

* Distributed Cache의 디폴트최대크기는 10GB인데 실질적으로 의미있는 최대크기는 100MB 정도.

 

사용전략

-어느파일을 Distributed Cache로 사용할 것인가?

* CountCitation의 실행 후 나온결과에 TopN을 돌려서 나온 결과를 distributed cache로 사용.

N이 작다면 이 파일의 크기는 작다.

* TopN의 결과 파일이 이미 HDFS에 있으므로 이들 그대로 사용한다.

- 프로그램의 인자로 이 distributed cache파일의 HDFS상의 위치를 넘긴다.

- Main함수에서 이 위치를 단순히 Distributed Cache로 등록한다.

- Mapper 실행시 setup 메소드에서 이 위치를 참조하여 파일을 오픈한다.

- 앞서 이야기했듯이 이 파일은 task의 실행 전에 HDFS에서 로컬파일 시스템으로 복사되기 때문에 그냥 로컬 텍스트 파일을 읽는 코드를 작성하여 ID를 키로 하여 해쉬맵에 빈도수를 값으로 저장한다.

 

- Mapper의 map 메소드에서 읽혀지는 Title ID 쌍에 대해 ID가 앞서 setup에서 만들어진 해쉬맵에 존재하는지 확인

* 존재하면 키를 타이틀로 하고 밸류는 DocID + "t"+빈도수로 해서 reducer로 내보낸다.

 

- 이 경우 Reducer는 딱히 할일이 없기 때문에 Identity Reducer를 지정한다.

 

 

SORT STRING

 

개요

- 하둡은 분산환경의 머지소팅 프레임웍

- 스트링소팅의 경우 아주 쉽게 코딩이 가능

* Identity Mapper와 Identity Reducer를 사용!

- 입력파일은 한줄에 한 문자열이 있는 텍스트 파일

 

main함수

 

 

내부동작

 

- IdentityMapper -> 입력을 그대로 출력

- Partitioner -> Reducer가 하나! / Reducer가 여러개라면 다른 전략이 필요

- GroupingComparator,SortingComparator!

 

 

 

MyReducer.reduce

 

public void reduce(Text key, lterable<Text>

valuelter, final Context context) throws

IOException, InterruptedException

{

String title=null;

String frequency=null;

int count=0;

for(Text t: valuelter)

{

String str=t.toString();

String[]tokens=str.split("\\t");

if(tokens[1].equals("1"))

title=tokens[0];

else

frequency=tokens[0];

count++;

}

if (count == 2 && title !=null && frequency !=null)

context.write(key,new Text(title+"\\t"+frequency));

}

 

 

 

실행결과

(top 10 most referred wikipedia pages)

 

14532    ltaly    23075

15573    Japan    21453

3383    Vrazil    13632

38523    Departments of France    10900

9239    Europe    12545

9316    England    34884

14533    India    24610

5405    China    12187

18951490    American football    9536

5407    California    15945

 

 

 

MyMapper1.map

 

- 2M.Title.ID가 이 Mapper에 의해 처리

- map 함수의 키로는 Title스트링이, 밸류로는 그 타이틀을 갖는 문서의 ID가 들어옴

- ID를 키로 그리고 Title+"\t+1"을 밸류로 Reducer로 넘긴다.

 

public static class MyMapper1 extends Mapper<Text, Text, Text, Text>

{

@Override

protected void map (Text key, Text value, final Context context) throws IOException, InterruptedException

{

context.write(value, new Text(key+"\t+1"));

context.getCounter("Stats", "Number of Title+DocID").increment(1);

}

}

 

 

 

 

MyMapper2.map

 

- CountCitation(과 TopN)의 결과가 이 Mapper에 의해 처리

- map 함수의 키로는 ID가 들어오고 밸류로는 빈도수가 들어온다.

- Reducer로 출력할 때는 ID를 그대로 키로 그리고 빈도수 +"\t"+2을 밸류로 넘긴다.

 

public static class MyMapper2 extends Mapper<Text, Text, Text, Text>

{

@Override

protected void map(Text key, Text value, final Context context)

throws IOException, Interrupted Exception

{

context.write(key, new Text(value+"\t"+2));

context.getCounter("Stats",DocID+Citation").increment(1);

}

}

 

 

+ Recent posts