[R&D] 컨슈머
* 참고 자료
https://jhleed.tistory.com/180
카프카는 실시간 스트림 데이터 처리를 위해 도입. 실시간으로 데이터를 토픽에 프로듀서가 저장하고 컨슈머가 이를 읽어가는 구조임. 토픽은 데이터베이스 테이블이랑 비슷함. 차이점은 데이터베이스 레코드는 덮어쓰기가 가능하지만 토픽은 append-only이다.
kafka-python이라는 라이브러리 활용하여 카프카 컨슈머 클라이언트(Python) 구현
* from kafka import KafkaConsumer
* 다른 라이브러리가 성능이 좋지만 kafka-python은 오픈소스 자료가 많고 사용법이 간결함.
consumer = KafkaConsumer(
'CPS_BOARD_REPLY',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
위 파라미터 정보는 아래와 같다.
'CPS_BOARD_REPLY' : 토픽명
bootstrap_servers : 카프카 클러스터 호스트들과 포트 정보 리스트 (브로커 리스트)
auto_offset_reset : earliest / latest / none을 지정할 수 있고 각각 가장 초기, 가장 마지막 오프셋값 그리고 이전 오프셋을 찾지 못할 경우 에러를 뜻한다.
enable_auto_commit : 주기적으로 offset을 auto_commit
group_id : 컨슈머 그룹 식별자
value_deserializer : 프로듀서에서 value를 serialize 했기 때문에 사용
컨슈머 클라이언트는 설정된 timeout만큼 기다리면서 polling 할 수도 있고, 바로 받아온 데이터를 처리하는 batch 형태로도 처리 가능하다. 한 시간에 한번 처리하는 경우를 택할 수 있음.
MongoDB 커넥션은 pymongo의 MongoClient 사용.
* 날짜, oracleDB에서 대규모 데이터를 저장하기 위한 LOB 자료형 같은 예외 데이터 존재하기 때문에 컨슈머 및 자료형 별로 전처리 후 삽입* Kafka 브로커로부터 데이터를 읽어올 때마다 매번 삽입하기보다 일정 수준 쌓이면 이를 Bulk Inserting 하는 방식으로 MongoDB에 삽입함. 이때 데이터가 쌓일 때까지 기다리다가 Connection이 Timeout 되는 경우가 있어 데이터를 마냥 기다리지 않고 일정 타임아웃 두고 이를 초과할 경우에도 삽입함.