Storm과 Esper로 실시간 분석 샘플 사용기
Storm & Esper
현재의 Big Data 분석 관점에서 실시간성이 가장 중요한 포인트 중에 하나다. 과거에는 Big Data 분석에서 알고리즘 분석에 치중한 나머지 데이터 제공은 Batch Processing에 머물러 있었는데, 최근 들어 Twitter Storm, Apache S4 등이 나오면서 실시간성 분석이 필요한 트렌드나 실시간 급상승 인기 검색어, Abuse 트래픽/고객 인식 등의 비즈니스에서 Big Data 분석과 실시간성이 결합된 추세가 되었다.
그래서 간략하게 Twitter Storm과 Esper를 활용한 개인 Twitter의 tps와 maxRetweets을 맛보기로 테스트 해본다.
Stream의 tuple은 Twitter Status의 createdAt, retweetCount이고 bolt는 EsperBolt를 활용해서 쿼리를 통해 tps, maxRetweets을 리턴한다.
이걸 좀더 확장한다면 리트윗이 많이된 트윗을 분석해서 실시간 이슈를 추출해낼 수 있게 된다.
1. Storm Cluster
- Nimbus는 Master 노드이며 Hadoop의 jobTracker와 비슷한 일을 하고 있다. 코드를 클러스터에 분산, 머신에 Task 할당/관리, 오류 모니터링 등의 역할을 함.
- Supervisor는 워커 프로세스이며 일을 할당받아서 하고 끝나면 리턴하는 역할을 함. Topology의 Subset임.
- Zookeeper는 Nimbus와 Supervisor사이에서 클러스터를 통해 협력해서 상태 관리, 장애 관리를 함.
2. Storm 개요
- Streams : 무한 tuple들의 시퀀스를 말함.
- Spouts : Streams의 소스. 예를 들면 트윗의 스트리밍 읽는 부분이 여기에 해당됨.
- Bolts : Input Streams을 가공해서 새로운 출력 Streams으로 내보내는 역할. 스트림의 변환.
- Topologies : Spouts와 Bolts의 관계망 그래프.
. 스파우트, 볼트는 클러스터전체에 걸처 쓰레드로 수행됨.
3. Storm 특징
- 다양한 프로그래밍 언어 지원 : Java, Clojure, Ruby, Python 등.
- Hadoop은 batch processing 개념이 강한데, Storm은 realtime processing.
- Fault-tolerant : Nimbus가 Supervisor를 감시, 재시작 등의 관리를 함. Topology가 감시됨.
- 확장성 : 병렬 연산이며, Scale-Out이 됨.
- ZeroMQ 활용으로 처리 성능이 좋음.
- 로컬 모드, 분산 모드 지원.
4. Esper 개요
- Java 기반의 ESP/CEP 컨테이너.
- 경량이고 embeded가 가능함.
- 오픈 소스인데 GPL.
- 엔진은 Time, Threads, Streams간의 독립 단위로 처리됨.
- EPL지원으로 SQL처럼 데이터를 프로세싱할 수 있음.
- 실시간 분석 가능.
아래는 Storm Local모드 설치 방법을 설명한다.
5. Zookeeper 설치
- Storm에서 zookeeper 3.3..3버전으로 구축 되어 있어서 3.3.3 버전 설치함.
6. Storm Local Mode 설치
7. 샘플 테스트
- storm-esper jar 파일 생성(아래는 수정내용 포함-TwitterEsperSample.java).
- 라이브러리 카피
- 실행 커맨드.
8. Storm 내부 구조
[참조 사이트]
그래서 간략하게 Twitter Storm과 Esper를 활용한 개인 Twitter의 tps와 maxRetweets을 맛보기로 테스트 해본다.
Stream의 tuple은 Twitter Status의 createdAt, retweetCount이고 bolt는 EsperBolt를 활용해서 쿼리를 통해 tps, maxRetweets을 리턴한다.
이걸 좀더 확장한다면 리트윗이 많이된 트윗을 분석해서 실시간 이슈를 추출해낼 수 있게 된다.
1. Storm Cluster
- Nimbus는 Master 노드이며 Hadoop의 jobTracker와 비슷한 일을 하고 있다. 코드를 클러스터에 분산, 머신에 Task 할당/관리, 오류 모니터링 등의 역할을 함.
- Supervisor는 워커 프로세스이며 일을 할당받아서 하고 끝나면 리턴하는 역할을 함. Topology의 Subset임.
- Zookeeper는 Nimbus와 Supervisor사이에서 클러스터를 통해 협력해서 상태 관리, 장애 관리를 함.
2. Storm 개요
- Streams : 무한 tuple들의 시퀀스를 말함.
- Spouts : Streams의 소스. 예를 들면 트윗의 스트리밍 읽는 부분이 여기에 해당됨.
- Bolts : Input Streams을 가공해서 새로운 출력 Streams으로 내보내는 역할. 스트림의 변환.
- Topologies : Spouts와 Bolts의 관계망 그래프.
. 스파우트, 볼트는 클러스터전체에 걸처 쓰레드로 수행됨.
3. Storm 특징
- 다양한 프로그래밍 언어 지원 : Java, Clojure, Ruby, Python 등.
- Hadoop은 batch processing 개념이 강한데, Storm은 realtime processing.
- Fault-tolerant : Nimbus가 Supervisor를 감시, 재시작 등의 관리를 함. Topology가 감시됨.
- 확장성 : 병렬 연산이며, Scale-Out이 됨.
- ZeroMQ 활용으로 처리 성능이 좋음.
- 로컬 모드, 분산 모드 지원.
4. Esper 개요
- Java 기반의 ESP/CEP 컨테이너.
- 경량이고 embeded가 가능함.
- 오픈 소스인데 GPL.
- 엔진은 Time, Threads, Streams간의 독립 단위로 처리됨.
- EPL지원으로 SQL처럼 데이터를 프로세싱할 수 있음.
- 실시간 분석 가능.
아래는 Storm Local모드 설치 방법을 설명한다.
5. Zookeeper 설치
- Storm에서 zookeeper 3.3..3버전으로 구축 되어 있어서 3.3.3 버전 설치함.
> wget http://ftp.daum.net/apache//zookeeper/zookeeper-3.3.3/zookeeper-3.3.3.tar.gz > tar xvfz zookeeper-3.3.3.tar.gz > cd zookeeper-3.3.3/conf > cp zoo_sample.cfg zoo.cfg > vi zoo.cfg dataDir=/database/zookeeper/data server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888 > vi /database/zookeeper/data/myid 각 서버별 server.X의 X값을 추가해 준다. > cd ../bin > ./zkServer.sh start
6. Storm Local Mode 설치
> wget https://github.com/downloads/nathanmarz/storm/storm-0.6.2.zip > unzip storm-0.6.2.zip > cd storm-0.6.2 # 설정 추가 > vi conf/storm.yml nimbus.host: "127.0.0.1" storm.zookeeper.servers: - "zookeeper1" - "zookeeper2" - "zookeeper3" # Storm 구동 계정 권한 chown -R storm:storm /mnt #서버 실행 > bin/storm ui > bin/storm supervisor > bin/storm nimbus
7. 샘플 테스트
- storm-esper jar 파일 생성(아래는 수정내용 포함-TwitterEsperSample.java).
TopologyBuilder builder = new TopologyBuilder(); TwitterSpout spout = new TwitterSpout(username, pwd); EsperBolt bolt = new EsperBolt.Builder() .addInputAlias("twitter", "default", "Tweets") .setAnonymousOutput("default", "tps", "maxRetweets") .addStatement("select count(*) as tps, max(retweetCount) as maxRetweets from Tweets.win:time_batch(2 sec)") .build(); builder.setSpout("twitter", spout); builder.setBolt("EsperBolt", bolt).shuffleGrouping("twitter"); StormTopology topology = builder.createTopology(); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, topology); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();
- 라이브러리 카피
storm-0.6.2/lib에 카피함. storm-esper-0.6.2-SNAPSHOT.jar twitter4j-core-2.2.5.jar twitter4j-stream-2.2.5.jar esper-4.3.0.jar antlr-2.7.7.jar antlr-runtime-3.2.jar
- 실행 커맨드.
> bin/storm jar /database/samples/storm/storm-esper-0.6.2-SNAPSHOT.jar org.tomdz.storm.esper.example.TwitterEsperSample 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 9] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 0] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 62] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 4] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 1] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 0] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 0] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696355000, 0] 2012-03-14 twitter source: twitter:3, stream: default, id: {}, [1331696354000, 0] 2012-03-14 EsperBolt source: EsperBolt:2, stream: default, id: {}, [70, 62] .......
8. Storm 내부 구조
/{storm-local-dir} | |-/nimbus | | | |-/inbox | | | | | |-/stormjar-{uuid}.jar | | | |-/stormdist | | | |-/{topology-id} | | | |-/stormjar.jar | | | |-/stormcode.ser | | | |-/stormconf.ser | |-/supervisor | | | |-/stormdist | | | | | |-/{topology-id} | | | | | |-/resources | | | | | |-/stormjar.jar | | | | | |-/stormcode.ser | | | | | |-/stormconf.ser | | | |-/localstate | | | |-/tmp | |-/{uuid} | | | |-/stormjar.jar |-/workers | |-/{worker-id} | |-/pids | |-/{pid} |-/heartbeats | |-/{worker-id}
[참조 사이트]