JOIN ID AND TITLE V2

 

개요

- 앞서 간략히 Distributed Cache라는 것에 대해 이야기했는데 이는 작은 사이즈의 읽기전용 파일(흔히 사전)을 task tracker에 working directory에 복사해주는 메커니즘을 말한다.

 

* Distributed Cache로 사용될 파일은 먼저 HDFS에 복사되고 그 위치가 사용되어야한다.

* Distributed Cache의 디폴트최대크기는 10GB인데 실질적으로 의미있는 최대크기는 100MB 정도.

 

사용전략

-어느파일을 Distributed Cache로 사용할 것인가?

* CountCitation의 실행 후 나온결과에 TopN을 돌려서 나온 결과를 distributed cache로 사용.

N이 작다면 이 파일의 크기는 작다.

* TopN의 결과 파일이 이미 HDFS에 있으므로 이들 그대로 사용한다.

- 프로그램의 인자로 이 distributed cache파일의 HDFS상의 위치를 넘긴다.

- Main함수에서 이 위치를 단순히 Distributed Cache로 등록한다.

- Mapper 실행시 setup 메소드에서 이 위치를 참조하여 파일을 오픈한다.

- 앞서 이야기했듯이 이 파일은 task의 실행 전에 HDFS에서 로컬파일 시스템으로 복사되기 때문에 그냥 로컬 텍스트 파일을 읽는 코드를 작성하여 ID를 키로 하여 해쉬맵에 빈도수를 값으로 저장한다.

 

- Mapper의 map 메소드에서 읽혀지는 Title ID 쌍에 대해 ID가 앞서 setup에서 만들어진 해쉬맵에 존재하는지 확인

* 존재하면 키를 타이틀로 하고 밸류는 DocID + "t"+빈도수로 해서 reducer로 내보낸다.

 

- 이 경우 Reducer는 딱히 할일이 없기 때문에 Identity Reducer를 지정한다.

 

 

SORT STRING

 

개요

- 하둡은 분산환경의 머지소팅 프레임웍

- 스트링소팅의 경우 아주 쉽게 코딩이 가능

* Identity Mapper와 Identity Reducer를 사용!

- 입력파일은 한줄에 한 문자열이 있는 텍스트 파일

 

main함수

 

 

내부동작

 

- IdentityMapper -> 입력을 그대로 출력

- Partitioner -> Reducer가 하나! / Reducer가 여러개라면 다른 전략이 필요

- GroupingComparator,SortingComparator!

 

 

 

MyReducer.reduce

 

public void reduce(Text key, lterable<Text>

valuelter, final Context context) throws

IOException, InterruptedException

{

String title=null;

String frequency=null;

int count=0;

for(Text t: valuelter)

{

String str=t.toString();

String[]tokens=str.split("\\t");

if(tokens[1].equals("1"))

title=tokens[0];

else

frequency=tokens[0];

count++;

}

if (count == 2 && title !=null && frequency !=null)

context.write(key,new Text(title+"\\t"+frequency));

}

 

 

 

실행결과

(top 10 most referred wikipedia pages)

 

14532    ltaly    23075

15573    Japan    21453

3383    Vrazil    13632

38523    Departments of France    10900

9239    Europe    12545

9316    England    34884

14533    India    24610

5405    China    12187

18951490    American football    9536

5407    California    15945

 

 

 

MyMapper1.map

 

- 2M.Title.ID가 이 Mapper에 의해 처리

- map 함수의 키로는 Title스트링이, 밸류로는 그 타이틀을 갖는 문서의 ID가 들어옴

- ID를 키로 그리고 Title+"\t+1"을 밸류로 Reducer로 넘긴다.

 

public static class MyMapper1 extends Mapper<Text, Text, Text, Text>

{

@Override

protected void map (Text key, Text value, final Context context) throws IOException, InterruptedException

{

context.write(value, new Text(key+"\t+1"));

context.getCounter("Stats", "Number of Title+DocID").increment(1);

}

}

 

 

 

 

MyMapper2.map

 

- CountCitation(과 TopN)의 결과가 이 Mapper에 의해 처리

- map 함수의 키로는 ID가 들어오고 밸류로는 빈도수가 들어온다.

- Reducer로 출력할 때는 ID를 그대로 키로 그리고 빈도수 +"\t"+2을 밸류로 넘긴다.

 

public static class MyMapper2 extends Mapper<Text, Text, Text, Text>

{

@Override

protected void map(Text key, Text value, final Context context)

throws IOException, Interrupted Exception

{

context.write(key, new Text(value+"\t"+2));

context.getCounter("Stats",DocID+Citation").increment(1);

}

}

 

 

빅데이터공부하기 56_ JOIN ID & TITLE

 

개요

 

앞서 Count Citation 처럼 결과가 ID가 나오는 경우 무슨 내용인지 궁금.

 

2M.TITLE.ID

- ID별로 해당문서의 타이틀을 보여주는 데이터파일.

- 이 파일과 CountCitation과 TopN의 결과 파일을 ID로 조인하면 타이틀을 알아낼 수 있음

 

조인전략

- 여기서는 2개의 입력파일셋을 각기 mapper로 로드하여 조인을 reducer에서 수행

- 사실 TopN의 결과로 나온 10개의 ID에 대해서만 조인을 하자면 Distributed Cache라는 것을 사용하면 훨씬 효율적이다. 이에 대해서는 Advanced MapReduce프로그래밍 세션에서 다뤄보겠습니다.

 

MultipleInputs의 사용

 

MutipleInputs.addInputPath(pass, new Path(titleDocID), KeyValueTextInputFormat.class, MyMapper1.class);

- titleDicID에 있는 파일들을 읽을때는 MyMapper1.map을 mapper로 사용하면서 입력포맷으로는 KeyValueTextInputFormat을 사용하세요.

 

MutipleInputs.addInputPath(pass, new Path(doclDFreq), KeyValueTextInputFormat.class, MyMapper2.class);

- docIdFreq에 있는 파일들을 읽을때는 MyMapper2.map을 mapper로 사용하면서 입력포맷으로는 KeyValueTextInputFormat을 사용하세요.

 

이 경우 Job클래스의 setMapperClass는 호출이 불필요.

COUNT CITATION

 

개요

 

- 앞서본 2M.SPCID.DSTID를 바탕으로 가장 많이 레퍼런스된 위키피디아 페이지의 분서 번호가 뭔지 알아낸다.

 

- 기본적으로 word count와 흡사

* Map으로 들어오는 (srclD, dstID)pair를 (dstID, 1)pair로 바꿔서 넘기고 Reduce에서 단순히 dstID로 들어온 value단의 값을 합한다.

* 작업의 특성상 combiner의 적용이 가능.

 

Map 코드

 

public static class Map extends Mapper<Text, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

public void map(Text key, Text value, Context context)throws IOException, InterruptedException {

context.write(value, one);

}

}

 

Reduce 코드

 

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{

public void reduce(Text key, lterable<IntWritable>values, Context context)

throws IOEception, interruptedException{

int sum=0;

fot (IntWritable val : values)

sum +=val.get();

context.write(key,new intWritable(sum));

}

}

 

실행결과

 

앞서 결과에 TopN Job을 실행하여 추출된 top 10 DoclD

 

18951490        9536

38523            10900

5405             12187

9239             12545

3383             13632

5407             15945

15573            21453

14532            23075

14533            24610

9316              34884

 

 

 

 

 

빅데이터공부하기는 it개발자스터디공간을 이용해주세요 ^^

학원을 알아보신다면 --> www.oraclejava.co.kr

 

 

 

COUNT TRIGRAM

 

개요

- Word Count 프로그램의 확장. 연속된 3개의 단어 빈도수를 계산.

- 계산이 끝난 후 TopN을 내부에서 바로 호출

* Job Chaining의 한가지 예

* Count Trigram의 출력이 TopN의 입력으로 사용됨

 

Job Chaining 코드

 

 

실행결과

 

$ hadoop fs- cat/counttrigram/topN/part-r-00000

 

a number of 83818

a number of 89594

the u s 90349

the university of  98566

the end of  100059

member of the  107580

part of the 147861

as well as  169659

the united states  177699

one of the   225768

 

빅데이터공부하기 이번글은 소스코드에 대해서 공부할거에요.

 

 

 

소스코드 

 

1. 파라미터 N의 전달

 

먼저 이 값이 TopN프로그램의 세번째 인자로 지정

main 함수에서 이를 읽어 "top N"이란 파라미터로 정의

- job.getConfiguation(), setlnt("topN", Integer.parselnt(args[2]));

- Mapper와 Reducer의 setup메소드에서 이 값을 읽어간다.

 

int topN=10;

@Override

protected void setup(Context context)throws IOException, InterruptedException{
    topN=context.getConfiguation().getlnt("topN",10);

}

 

2. map

 

public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{

String[]pair= value.toString().split("\\t")

Long IValue=(long)integer.parseint(pair[1].toString());

insert(queue, pair[0],IValue,topN)

}

 

 

3. insert

 

public static void insert(PriorityQueue queue, String item,LongIValue, int TopN) {

ItemFreq head=(ItemFreq)queue peek();

 

if (queue size()<top N ll head.getFreq()<IValue){

ItemFreq itemFreq = new ItemFreq();

ItemFreq.setltem(item);

ItemFreq.setFreq(IValue);

queue.add(itemFreq);

if(head !=null && head.getFreq()<IValue) {// remove smallest item. queue.remove();

}

}

 

 

 

+ Recent posts