5. IMPLEMENTATION
5.1 API Design
Producer APIs
class Producer {
/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* Closes the producer and cleans up */
public void close();
}
- producer design
- kafka.producer.SyncProducer
- kafka.producer.async.AsyncProducer
- producer 의 모든 기능을 하나의 API 를 통해 클라이언트에 노출 시키기 위함
- 큐잉/버퍼링 된 다양한 producer 요청과 배치 데이터의 비동기 전송을 다를 수 있게 해줌
- kafka.producer.Producer - 적절한 카프카 브로커로 직렬화, 전송 작업을 진행하기 전에 다양한 producer 요청을 배치하도록 하는 기능을 제공 (producer.type=async)
- queue.time, batch.size 등의 설정을 통해 이벤트 발생 후 특정 크기나 시간으로 큐잉 되도록 제어 할 수 있음
- 하나의 백그라운드 스레드(kafka.producer.async.ProducerSendThread) 는 배치 데이터를 디큐하고, kafka.producer.EventHandler 가 데이터를 직렬화하여 적절한 카프카 브로커 파티션으로 전송함
- event.handler 파라메터 설정을 통해 사용자 이벤트 핸들러를 플러그인 할 수 있음. 다양한 프로듀서 큐 파이프라인 일때, 사용자 로깅/추적 코드 또는 사용자 모니터링 로직 등에 콜백 값을 끼워 넣을 수 있음 (kafka.producer.async.CallbackHandler 인터페이스를 구현하고 callback.handler 설정 파라메터를 해당 클래스에 설정하여)
사용자 명시 Encoder 를 통해 데이터 직렬화 적용이 가능 (기본 값은 kafka.serializer.DefaultEncoder 설정)
interface Encoder<T> { public Message toMessage(T data); }
사용자 명시 Partitioner 를 통한 로드 밸런싱을 제공함
파티션 API 는 하나의 파티션 아이디를 반환하기 위해서 키와 사용가능한 브로커 파티션들의 수를 사용함
이 아이디는 프로듀서 요청에 대한 브로커 파티션을 선택하기 위한 브로커 _ids 와 파티션들의 정렬 리스트의 색으로 사용 되어짐
기본 파티셔닝 정책은 hash(key)%numPartitions 이며, 만약 키가 null 일 경우에는 임의의 브로커 파티션이 선택 됨
partitioner.class 설정 파라메터를 통해 사용자 정의 파티션을 정책을 적용 할 수 있음
interface Partitioner<T> { int partition(T key, int numPartitions); }
Consumer APIs
- The low-level API - 단일 브로커와의 연결을 유지, 오프셋의 대한 상태를 전혀 보장 하지 않음 (사용자가 필요시 구현)
class SimpleConsumer { /* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest request); /* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches); /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * @param time: time in millisecs, * if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available. * if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available. */ public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets); }
The high-level API - 컨슈머로로부터 브로커의 세부사항을 나타내지 않음. 상태를 보장
/* create a connection to the cluster */ ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { /** * This method is used to get a list of KafkaStreams, which are iterators over * MessageAndMetadata objects from which you can obtain messages and their * associated metadata (currently only topic). * Input: a map of <topic, #streams> * Output: a map of <topic, list of message streams> */ public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /** * You can also obtain a list of KafkaStreams, that iterate over messages * from topics that match a TopicFilter. (A TopicFilter encapsulates a * whitelist or a blacklist which is a standard Java regex.) */ public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams); /* Commit the offsets of all messages consumed so far. */ public commitOffsets() /* Shut down the connector */ public shutdown() }
5.2 Network Layer
상당히 직관적인 NIO 서버. 파일 전송 구현은 MessageSet 인터페이스와 writeTo 메소드를 주어 완료 할 수 있음
transferTo 구현은 file-backed 메시지를 더욱 효율적으로 사용하도록 설정 함
5.3 Messages
메시지 = 바이트 배열 타입의 불분명한 키/값 + 고정길이 헤더
헤더 = CRC32 체크섬 + 포맷 버전 + 식별자 속성 + 타임스템프
MessageSet 인터페이스를 통해 벌크 읽기/쓰기(NIO 채널) 메소드를 간단하게 사용 할 수 있음
5.4 Message Format
/**
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*/
5.5. Log
- "mytopic" 이라는 이름을 갖는 토픽이 두 개 파티션을 갖는다고 하면, my_topic_0 과 my_topic_1 이라는 이름의 경로로 해당 토픽의 데이터가 나누어 저장 됨
- 로그 파일 포맷은 로그 엔트리들 하나의 순서임. 각 로그 엔트리는 4바이트 정수이며, N 메시지 바이트 크기를 저장함
- 각 메시지는 고유하게 식별 가능한 64비트의 오프셋을 가지며, 이 오프셋은 특정 토픽 파티션의 메시지 시작 바이트 위치를 갖음
- 메시지의 정확한 바이너리 포맷은 표준화된 인터페이스로 관리되어져, 프로듀서, 브로커, 클라이언트 간의 메시지의 재복사나 변환 작업 없이 전송이 가능함
- 표준화 인터페이스
- 메시지 오프셋을 이디로 사용하는 것은 일반 적인지 않음. 본래 프로듀서가 생성하는 GUID를 사용하는 것이었으나 컨슈머가 여러개의 서버에서 유일한 GUID를 보장 받는 것은 어렵고, 오프셋 아이디를 랜덤하게 생성한 후 매핑 정보를 유지하는 것은 복잡성과 비용이 큼.
- 그래서 파티션 단위의 아토믹한 카운터를 제공 (파티션 아이디와 노드 아이디를 이용하여 메시지의 고유 식별 값 생성, 단순한 참조가 가능)
On-disk format of a message
offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes
Writes
- 로그는 시리얼한 추가를 허용하며, 항상 마지막 파일로 저장 됨. 설정 된 허용 크기를 넘어설 경우 새로운 파일을 쓰기 위해 처음으로 돌아감.
- 로그는 두개의 설정 파라메터를 취함
- M - 운영체제가 강제로 파일을 디스크에 플러쉬 하기 전에 쓰려는 메시지 수
- S - 강제로 플러쉬 된 후 시간(초)
- 로그는 시리얼한 추가를 허용하며, 항상 마지막 파일로 저장 됨. 설정 된 허용 크기를 넘어설 경우 새로운 파일을 쓰기 위해 처음으로 돌아감.
Reads
읽기는 하나의 메시지의 64비트 논리 오프셋과 S 바이트의 최대 chunk 크기로 수행 되며, S 바이트 안에 포함된 메시지들의 반복자를 반환
S 는 어떤 단일 메시지의 크기보다는 크게 의도 되어지나 비정상적으로 큰 메시지의 경우 여러번의 읽기 실패가 발생 할 수 있음
읽기 실패 시 마다 S 크기가 2배로 증가 됨 (성공 할 때 까지)
하나의 최대 메시지와 버퍼 사이즈 크기는 서버가 특정 크기를 초과하는 메시지를 거부할 수 있도록 설정 할 수 있음
또한 클라이언트에게 완전한 메시지를 전달 할 수 있도록 최대 크기 경계를 줄 수 있음
오프셋 읽기의 실제 프로세스는 데이터가 저장 되어진 첫번째 세그먼트 위치가 필요하며, 전역 오프셋에서 구체적인 오프셋 값을 계산하여 파일 읽기를 수행
로그에는 접근 가능한 가장 최근의 메시지 정보를 제공하여, 사용자가 구독을 바로 수행 할 수 있도록 함 (SLA 기한에도 참고 가능)
만약 존재하지 않는 오프셋을 읽으려고하면 OutOfRangeException 오류가 발생
Deletes
한번에 하나의 로그 세그먼트가 삭제 됨. 로그 메니저는 어떤 파일이 삭제 가능한지에 대한 삭제 정책을 허용함
삭제 작업 중 읽기 락이 발생하지 않도록 copy-on-write 스타일 세그먼트 리스트를 구현 방식을 사용 (삭제 동안 지속적으로 읽기가 가능)
5.6 Distribution
Consumer Offset Tracking
high-level 컨슈머는 각 파티션 안에서 소비 되어진 최대 오프셋을 추적함. 이벤트 재시작 내 오프셋부터 재개 할 수 있도록 간헐적인 커밋을 수행 함
카프카는 하나의 지명된 브로커(오프셋 메니저) 내 컨슈머 그룹에 대한 모든 오프셋 값을 저장 할 수 있는 옵션을 제공함
high-level 컨슈머는 자동으로 소비한 오프셋 값을 브로커로 전달하고 읽는 작업을 수행함
만약 심플 컨슈머를 사용할 경우 오프셋을 직접 관리해야함
Java 심플 컨슈머의 경우 오직 주키퍼로부터 오프셋을 읽고/쓰는 것만 가능함
Scala 심플 컨슈머는 오프셋 메니저 확인하고, 직접 오프셋 메니저로 커밋 또는 패치가 가능함
컨슈머는 GroupCoordinatorRequest, GroupCoordinatorResponse 를 통해 오프셋 메니저를 찾고 읽기가 가능함
OffsetCommitRequest - 오프셋 메니저는 해당 요청을 받을 경우 _consumer_offsets 토픽에 오프셋 값을 추가 함
- 모든 레플리카 오프셋 토픽에 오프셋 값이 넘어 와야 해당 오프셋 값을 성공으로 커밋 (그렇지 않을 경우 실패 처리 후 재처리)
OffsetFetchRequest - 오프셋 캐쉬로부터 가장 마지막 커밋 된 오프셋 값을 반 (오프셋 메니저가 막 시작한 경우에는 오프셋 토픽 파션을 캐싱하는 작업을 진행, 패치 실패 시 OffsetsLoadInProgress 익셉션 출력 후 다시 돌아가 패치 재처리).
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
Migrating offsets from ZooKeeper to Kafka
기존 카프카는 오프셋 값을 zookeeper 에 저장하였으나(현재는 아님) 카프카로 마이그레이션이 가능함
컨슈머 설정에서 offsets.storage=kafka, dual.commit.enabled=true 로 설정, 롤링 바운스 후 컨슈머 상태를 확인
dual.commit.enabled = false 로 변경 후 롤링 바운스 후 컨슈머 상태 확인
Zookeeper directories
Notation
Broker Node Registry
- 컨슈머들이 브로커를 구분할 수 있도록 고유의 브로커 아이디 정보를 주키퍼 노드에 저장 (임시경로), 물리 장비가 변경 되어도 소비하는데 문제가 없도록 구성
- /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
Broker Topic Registry
- /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
Consumers and Consumer Groups
- 컨슈머들 간의 데이터 소비의 밸런싱을 조정하기 위해 토픽의 컨슈머정보를 저장함.
Consumer Id Registry
- /consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
Consumer Offsets
각 파티션 마다 소비된 최대 오프셋 값을 저장
/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
Partition Owner Registry
- /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
Cluster Id
카프카 클러스터로부터 할당 받은 불변 고유의 식별자임.최대 22 캐릭터로 가질 수 있으며, 정규식으로 정의 되어질수 있음.
패딩이 없이 url-safe base64 변체로 정의 되어질 수 있으며, 개념적으로 처음으로 클러스터가 실행되어질때 자동으로 생성 됨
- /cluster/id znode 에 생성
Broker Node Registration
- 브로커 노드는 기본적으로 독립적임.(보유정보만 퍼블리쉬) 브로커조인이 일어 났을 때, 브로커 노드 레지스트리 이하 경로에 해당 정보와 호스트 포트를 작성함. 또한 브로커에 존재하는 토픽 리스트와 논리 파티션 정보도 함께 등록함.
Consumer Registration Algorithm
Consumer Rebalancing Algorithm
참고