5. IMPLEMENTATION

5.1 API Design

  • Producer APIs

 class Producer {

    /* Sends the data, partitioned by key to the topic using either the */
    /* synchronous or the asynchronous producer */
    public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

    /* Sends a list of data, partitioned by key to the topic using either */
    /* the synchronous or the asynchronous producer */
    public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

    /* Closes the producer and cleans up */
    public void close();

    }
  • producer design
    • kafka.producer.SyncProducer
    • kafka.producer.async.AsyncProducer
    • producer 의 모든 기능을 하나의 API 를 통해 클라이언트에 노출 시키기 위함
  • 큐잉/버퍼링 된 다양한 producer 요청과 배치 데이터의 비동기 전송을 다를 수 있게 해줌
    • kafka.producer.Producer - 적절한 카프카 브로커로 직렬화, 전송 작업을 진행하기 전에 다양한 producer 요청을 배치하도록 하는 기능을 제공 (producer.type=async)
    • queue.time, batch.size 등의 설정을 통해 이벤트 발생 후 특정 크기나 시간으로 큐잉 되도록 제어 할 수 있음
    • 하나의 백그라운드 스레드(kafka.producer.async.ProducerSendThread) 는 배치 데이터를 디큐하고, kafka.producer.EventHandler 가 데이터를 직렬화하여 적절한 카프카 브로커 파티션으로 전송함
    • event.handler 파라메터 설정을 통해 사용자 이벤트 핸들러를 플러그인 할 수 있음. 다양한 프로듀서 큐 파이프라인 일때, 사용자 로깅/추적 코드 또는 사용자 모니터링 로직 등에 콜백 값을 끼워 넣을 수 있음 (kafka.producer.async.CallbackHandler 인터페이스를 구현하고 callback.handler 설정 파라메터를 해당 클래스에 설정하여)
  • 사용자 명시 Encoder 를 통해 데이터 직렬화 적용이 가능 (기본 값은 kafka.serializer.DefaultEncoder 설정)

    interface Encoder<T> {
        public Message toMessage(T data);
        }
    
  • 사용자 명시 Partitioner 를 통한 로드 밸런싱을 제공함

    • 파티션 API 는 하나의 파티션 아이디를 반환하기 위해서 키와 사용가능한 브로커 파티션들의 수를 사용함

      • 이 아이디는 프로듀서 요청에 대한 브로커 파티션을 선택하기 위한 브로커 _ids 와 파티션들의 정렬 리스트의 색으로 사용 되어짐

      • 기본 파티셔닝 정책은 hash(key)%numPartitions 이며, 만약 키가 null 일 경우에는 임의의 브로커 파티션이 선택 됨

      • partitioner.class 설정 파라메터를 통해 사용자 정의 파티션을 정책을 적용 할 수 있음

    • interface Partitioner<T> {
          int partition(T key, int numPartitions);
          }
      
  • Consumer APIs

    • The low-level API - 단일 브로커와의 연결을 유지, 오프셋의 대한 상태를 전혀 보장 하지 않음 (사용자가 필요시 구현)
    class SimpleConsumer {
    
        /* Send fetch request to a broker and get back a set of messages. */
        public ByteBufferMessageSet fetch(FetchRequest request);
    
        /* Send a list of fetch requests to a broker and get back a response set. */
        public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    
        /**
        * Get a list of valid offsets (up to maxSize) before the given time.
        * The result is a list of offsets, in descending order.
        * @param time: time in millisecs,
        *              if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
        *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
        */
        public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
        }
    
    • The high-level API - 컨슈머로로부터 브로커의 세부사항을 나타내지 않음. 상태를 보장

      /* create a connection to the cluster */
          ConsumerConnector connector = Consumer.create(consumerConfig);
      
          interface ConsumerConnector {
      
          /**
          * This method is used to get a list of KafkaStreams, which are iterators over
          * MessageAndMetadata objects from which you can obtain messages and their
          * associated metadata (currently only topic).
          *  Input: a map of <topic, #streams>
          *  Output: a map of <topic, list of message streams>
          */
          public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
      
          /**
          * You can also obtain a list of KafkaStreams, that iterate over messages
          * from topics that match a TopicFilter. (A TopicFilter encapsulates a
          * whitelist or a blacklist which is a standard Java regex.)
          */
          public List<KafkaStream> createMessageStreamsByFilter(
              TopicFilter topicFilter, int numStreams);
      
          /* Commit the offsets of all messages consumed so far. */
          public commitOffsets()
      
          /* Shut down the connector */
          public shutdown()
          }
      

    5.2 Network Layer

  • 상당히 직관적인 NIO 서버. 파일 전송 구현은 MessageSet 인터페이스와 writeTo 메소드를 주어 완료 할 수 있음

  • transferTo 구현은 file-backed 메시지를 더욱 효율적으로 사용하도록 설정 함

5.3 Messages

  • 메시지 = 바이트 배열 타입의 불분명한 키/값 + 고정길이 헤더

  • 헤더 = CRC32 체크섬 + 포맷 버전 + 식별자 속성 + 타임스템프

  • MessageSet 인터페이스를 통해 벌크 읽기/쓰기(NIO 채널) 메소드를 간단하게 사용 할 수 있음

5.4 Message Format

/**
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
*    bit 0 ~ 2 : Compression codec.
*      0 : no compression
*      1 : gzip
*      2 : snappy
*      3 : lz4
*    bit 3 : Timestamp type
*      0 : create time
*      1 : log append time
*    bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*/

5.5. Log

  • "mytopic" 이라는 이름을 갖는 토픽이 두 개 파티션을 갖는다고 하면, my_topic_0 과 my_topic_1 이라는 이름의 경로로 해당 토픽의 데이터가 나누어 저장 됨
  • 로그 파일 포맷은 로그 엔트리들 하나의 순서임. 각 로그 엔트리는 4바이트 정수이며, N 메시지 바이트 크기를 저장함
  • 각 메시지는 고유하게 식별 가능한 64비트의 오프셋을 가지며, 이 오프셋은 특정 토픽 파티션의 메시지 시작 바이트 위치를 갖음
  • 메시지의 정확한 바이너리 포맷은 표준화된 인터페이스로 관리되어져, 프로듀서, 브로커, 클라이언트 간의 메시지의 재복사나 변환 작업 없이 전송이 가능함
    • 표준화 인터페이스
  • 메시지 오프셋을 이디로 사용하는 것은 일반 적인지 않음. 본래 프로듀서가 생성하는 GUID를 사용하는 것이었으나 컨슈머가 여러개의 서버에서 유일한 GUID를 보장 받는 것은 어렵고, 오프셋 아이디를 랜덤하게 생성한 후 매핑 정보를 유지하는 것은 복잡성과 비용이 큼.
  • 그래서 파티션 단위의 아토믹한 카운터를 제공 (파티션 아이디와 노드 아이디를 이용하여 메시지의 고유 식별 값 생성, 단순한 참조가 가능)
On-disk format of a message

    offset         : 8 bytes 
    message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
    crc            : 4 bytes
    magic value    : 1 byte
    attributes     : 1 byte
    timestamp      : 8 bytes (Only exists when magic value is greater than zero)
    key length     : 4 bytes
    key            : K bytes
    value length   : 4 bytes
    value          : V bytes
  • Writes

    • 로그는 시리얼한 추가를 허용하며, 항상 마지막 파일로 저장 됨. 설정 된 허용 크기를 넘어설 경우 새로운 파일을 쓰기 위해 처음으로 돌아감.
      • 로그는 두개의 설정 파라메터를 취함
      • M - 운영체제가 강제로 파일을 디스크에 플러쉬 하기 전에 쓰려는 메시지 수
      • S - 강제로 플러쉬 된 후 시간(초)
  • Reads

    • 읽기는 하나의 메시지의 64비트 논리 오프셋과 S 바이트의 최대 chunk 크기로 수행 되며, S 바이트 안에 포함된 메시지들의 반복자를 반환

    • S 는 어떤 단일 메시지의 크기보다는 크게 의도 되어지나 비정상적으로 큰 메시지의 경우 여러번의 읽기 실패가 발생 할 수 있음

      • 읽기 실패 시 마다 S 크기가 2배로 증가 됨 (성공 할 때 까지)

      • 하나의 최대 메시지와 버퍼 사이즈 크기는 서버가 특정 크기를 초과하는 메시지를 거부할 수 있도록 설정 할 수 있음

      • 또한 클라이언트에게 완전한 메시지를 전달 할 수 있도록 최대 크기 경계를 줄 수 있음

      • 오프셋 읽기의 실제 프로세스는 데이터가 저장 되어진 첫번째 세그먼트 위치가 필요하며, 전역 오프셋에서 구체적인 오프셋 값을 계산하여 파일 읽기를 수행

    • 로그에는 접근 가능한 가장 최근의 메시지 정보를 제공하여, 사용자가 구독을 바로 수행 할 수 있도록 함 (SLA 기한에도 참고 가능)

    • 만약 존재하지 않는 오프셋을 읽으려고하면 OutOfRangeException 오류가 발생

  • Deletes

    • 한번에 하나의 로그 세그먼트가 삭제 됨. 로그 메니저는 어떤 파일이 삭제 가능한지에 대한 삭제 정책을 허용함

    • 삭제 작업 중 읽기 락이 발생하지 않도록 copy-on-write 스타일 세그먼트 리스트를 구현 방식을 사용 (삭제 동안 지속적으로 읽기가 가능)

5.6 Distribution

  • Consumer Offset Tracking

    • high-level 컨슈머는 각 파티션 안에서 소비 되어진 최대 오프셋을 추적함. 이벤트 재시작 내 오프셋부터 재개 할 수 있도록 간헐적인 커밋을 수행 함

    • 카프카는 하나의 지명된 브로커(오프셋 메니저) 내 컨슈머 그룹에 대한 모든 오프셋 값을 저장 할 수 있는 옵션을 제공함

    • high-level 컨슈머는 자동으로 소비한 오프셋 값을 브로커로 전달하고 읽는 작업을 수행함

    • 만약 심플 컨슈머를 사용할 경우 오프셋을 직접 관리해야함

      • Java 심플 컨슈머의 경우 오직 주키퍼로부터 오프셋을 읽고/쓰는 것만 가능함

      • Scala 심플 컨슈머는 오프셋 메니저 확인하고, 직접 오프셋 메니저로 커밋 또는 패치가 가능함

      • 컨슈머는 GroupCoordinatorRequest, GroupCoordinatorResponse 를 통해 오프셋 메니저를 찾고 읽기가 가능함

      • OffsetCommitRequest - 오프셋 메니저는 해당 요청을 받을 경우 _consumer_offsets 토픽에 오프셋 값을 추가 함

        • 모든 레플리카 오프셋 토픽에 오프셋 값이 넘어 와야 해당 오프셋 값을 성공으로 커밋 (그렇지 않을 경우 실패 처리 후 재처리)
      • OffsetFetchRequest - 오프셋 캐쉬로부터 가장 마지막 커밋 된 오프셋 값을 반 (오프셋 메니저가 막 시작한 경우에는 오프셋 토픽 파션을 캐싱하는 작업을 진행, 패치 실패 시 OffsetsLoadInProgress 익셉션 출력 후 다시 돌아가 패치 재처리).

      • https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

  • Migrating offsets from ZooKeeper to Kafka

    • 기존 카프카는 오프셋 값을 zookeeper 에 저장하였으나(현재는 아님) 카프카로 마이그레이션이 가능함

      • 컨슈머 설정에서 offsets.storage=kafka, dual.commit.enabled=true 로 설정, 롤링 바운스 후 컨슈머 상태를 확인

      • dual.commit.enabled = false 로 변경 후 롤링 바운스 후 컨슈머 상태 확인

  • Zookeeper directories

  • Notation

  • Broker Node Registry

    • 컨슈머들이 브로커를 구분할 수 있도록 고유의 브로커 아이디 정보를 주키퍼 노드에 저장 (임시경로), 물리 장비가 변경 되어도 소비하는데 문제가 없도록 구성
    • /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
  • Broker Topic Registry

    • /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
  • Consumers and Consumer Groups

    • 컨슈머들 간의 데이터 소비의 밸런싱을 조정하기 위해 토픽의 컨슈머정보를 저장함.
  • Consumer Id Registry

    • /consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
  • Consumer Offsets

    • 각 파티션 마다 소비된 최대 오프셋 값을 저장

    • /consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)

  • Partition Owner Registry

    • /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
  • Cluster Id

    • 카프카 클러스터로부터 할당 받은 불변 고유의 식별자임.최대 22 캐릭터로 가질 수 있으며, 정규식으로 정의 되어질수 있음.

    • 패딩이 없이 url-safe base64 변체로 정의 되어질 수 있으며, 개념적으로 처음으로 클러스터가 실행되어질때 자동으로 생성 됨

      • /cluster/id znode 에 생성
  • Broker Node Registration

    • 브로커 노드는 기본적으로 독립적임.(보유정보만 퍼블리쉬) 브로커조인이 일어 났을 때, 브로커 노드 레지스트리 이하 경로에 해당 정보와 호스트 포트를 작성함. 또한 브로커에 존재하는 토픽 리스트와 논리 파티션 정보도 함께 등록함.
  • Consumer Registration Algorithm

  • Consumer Rebalancing Algorithm

  • 참고

results matching ""

    No results matching ""