분류 sql

Redis Streams 소개

컨텐츠 정보

  • 조회 414 (작성일 )

본문

Stream은 보다 추상적인 방식으로 로그 데이터 구조를 모델링 하는 Redis 5.0에 도입 된 새로운 데이터 유형입니다. 그러나 로그의 본질은 여전히 ​​그대로입니다. 로그 파일처럼 종종 추가 전용 모드에서 열린 파일로 구현되는 Redis Streams는 주로 추가 전용 데이터 구조입니다. 적어도 개념적으로는 메모리에 표시되는 추상 데이터 유형이기 때문에 Redis Streams는 로그 파일의 한계를 극복하기 위해 강력한 작업을 구현합니다.

데이터 구조 자체가 매우 단순함에도 불구하고 Redis 스트림을 가장 복잡한 유형의 Redis로 만드는 이유는 필수가 아닌 추가 기능을 구현한다는 사실입니다. 즉, 소비자가 생산자가 스트림에 추가 한 새 데이터를 기다릴 수 있는 일련의 차단 작업입니다. , 그리고 여기에 소비자 그룹이라는 개념이 추가되었습니다.

소비자 그룹은 처음에 인기 있는 메시징 시스템 Kafka (TM)에 의해 도입되었습니다. Redis는 완전히 다른 용어로 유사한 아이디어를 다시 구현하지만 목표는 동일합니다. 즉, 클라이언트 그룹이 동일한 메시지 스트림의 다른 부분을 사용하여 협력 할 수 있도록 하는 것입니다.


Streams basics 


Redis Streams의 정의와 사용 방법을 이해하기 위해 모든 고급 기능을 무시하고 대신 데이터 구조 자체를 조작하고 액세스하는 데 사용되는 명령에 초점을 맞춥니 다. 이것은 기본적으로 목록, 집합, 정렬 된 집합 등과 같은 대부분의 다른 Redis 데이터 유형에 공통적인 부분입니다. 그러나 목록에는 BLPOP 및 이와 유사한 명령으로 내 보낸 선택적인 더 복잡한 차단 API도 있습니다. 따라서 스트림은 이와 관련하여 목록과 크게 다르지 않으며 추가 API가 더 복잡하고 강력하다는 것입니다.

Streams는 추가 전용 데이터 구조이므로 XADD라고 하는 기본 쓰기 명령은 지정된 스트림에 새 항목을 추가합니다. 스트림 항목은 단순한 문자열이 아니라 하나 이상의 필드-값 쌍으로 구성됩니다. 이러한 방식으로 스트림의 각 항목은 각 줄에 여러 개의 분리 된 필드가 있는 CSV 형식으로 작성된 추가 전용 파일처럼 이미 구조화되어 있습니다.


> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0


XADD 명령에 대한 위의 호출은 자동 생성 항목 ID (특히 1518951480106-0)를 사용하여 mystream 키의 스트림에 sensor-id : 1234, temperature : 19.8 항목을 추가합니다. 첫 번째 인수로 mystream 키 이름을 가져오고 두 번째 인수는 스트림 내부의 모든 항목을 식별하는 항목 ID입니다.

그러나 이 경우 서버에서 새 ID를 생성하기를 원하기 때문에 *를 전달했습니다. 모든 새 ID는 단조롭게 증가하므로 더 간단하게 말하면 추가 된 모든 새 항목은 모든 이전 항목에 비해 더 높은 ID를 갖게 됩니다. 서버에 의한 ID 자동 생성은 거의 항상 원하는 것이며 ID를 명시적으로 지정하는 이유는 매우 드뭅니다. 이것에 대해서는 나중에 더 이야기하겠습니다. 각 스트림 항목에 ID가 있다는 사실은 로그 파일과의 또 다른 유사성입니다. 여기서 줄 번호 또는 파일 내의 바이트 오프셋을 사용하여 지정된 항목을 식별 할 수 있습니다. XADD 예제로 돌아가서 키 이름과 ID 뒤에 오는 다음 인수는 스트림 항목을 구성하는 필드 값 쌍입니다.

XLEN 명령을 사용하여 스트림 내부의 항목 수를 가져올 수 있습니다.


> XLEN mystream
(integer) 1


Entry IDs 


XADD 명령에 의해 반환되고 지정된 스트림 내의 각 항목을 단일하게 식별하는 항목 ID는 두 부분으로 구성됩니다.


<millisecondsTime>-<sequenceNumber>


밀리 초 시간 부분은 실제로 스트림 ID를 생성하는 로컬 Redis 노드의 로컬 시간이지만 현재 밀리 초 시간이 이전 입력 시간보다 작은 경우 이전 입력 시간이 대신 사용되므로 시계가 뒤로 점프하는 경우 단조롭게 증가하는 ID 속성은 여전히 ​​유지됩니다. 시퀀스 번호는 동일한 밀리 초 내에 생성 된 항목에 사용됩니다. 시퀀스 번호는 64 비트 너비이므로 실제적으로 동일한 밀리 초 내에 생성 할 수 있는 항목 수에는 제한이 없습니다.

이러한 ID의 형식은 처음에는 이상하게 보일 수 있으며 부드러운 독자는 시간이 ID의 일부인 이유를 궁금해 할 수 있습니다. 그 이유는 Redis 스트림이 ID 별 범위 쿼리를 지원하기 때문입니다. ID는 항목이 생성 된 시간과 관련이 있기 때문에 기본적으로 무료로 시간 범위를 쿼리 할 수 있는 기능을 제공합니다. XRANGE 명령을 다루는 동안 곧 이것을 보게 될 것입니다.

어떤 이유로 사용자가 시간과 관련이 없지만 실제로 다른 외부 시스템 ID와 연결된 증분 ID가 필요한 경우 XADD 명령은 다음과 같이 자동 생성을 트리거 하는 * 와일드 카드 ID 대신 명시 적 ID를 사용할 수 있습니다. 다음 예에서 :


> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2


이 경우 최소 ID는 0-1이며 명령은 이전 ID와 같거나 더 작은 ID를 허용하지 않습니다.


> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item


Getting data from Streams 


이제 마침내 XADD를 통해 스트림에 항목을 추가 할 수 있습니다. 그러나 데이터를 스트림에 추가하는 것은 매우 분명하지만 데이터를 추출하기 위해 스트림을 쿼리 할 수 있는 방법은 그렇게 명확하지 않습니다. 로그 파일의 비유를 계속하면 한 가지 분명한 방법은 일반적으로 Unix 명령 tail -f로 수행하는 작업을 모방하는 것입니다. 즉, 스트림에 추가되는 새 메시지를 얻기 위해 수신을 시작할 수 있습니다. . 주어진 요소가 BLPOP와 같은 팝 스타일 작업을 차단하는 단일 클라이언트에 도달하는 Redis의 차단 목록 작업과 달리 스트림을 사용하면 여러 소비자가 스트림에 추가 된 새 메시지를 볼 수 있습니다. tail -f 프로세스는 로그에 추가 된 내용을 볼 수 있습니다. 전통적인 용어를 사용하여 스트림이 여러 클라이언트에게 메시지를 팬 아웃 할 수 있기를 원합니다.


그러나 이것은 하나의 잠재적 액세스 모드일 뿐입니다. 또한 메시징 시스템이 아니라 시계열 저장소라는 매우 다른 방식으로 스트림을 볼 수 있습니다. 이 경우 새 메시지를 추가하는 것도 유용 할 수 있지만 또 다른 자연 쿼리 모드는 시간 범위별로 메시지를 가져 오거나 커서를 사용하여 메시지를 반복하여 모든 기록을 점진적으로 확인하는 것입니다. 이것은 확실히 또 다른 유용한 액세스 모드입니다.

마지막으로, 소비자의 관점에서 스트림을 본다면, 우리는 또 다른 방법, 즉 이러한 메시지를 처리하는 여러 소비자에게 분할 될 수 있는 메시지 스트림으로서 스트림에 액세스하기를 원할 수 있습니다. 소비자 그룹은 단일 스트림에 도착하는 메시지의 하위 집합 만 볼 수 있습니다. 이러한 방식으로 단일 소비자가 모든 메시지를 처리 ​​할 필요 없이 여러 소비자에 걸쳐 메시지 처리를 확장 할 수 있습니다. 각 소비자는 처리 할 다른 메시지만 받게 됩니다. 이것은 기본적으로 Kafka (TM)가 소비자 그룹에서 하는 일입니다. 소비자 그룹을 통해 메시지를 읽는 것은 Redis Stream에서 읽는 또 다른 흥미로운 모드입니다.

Redis Streams는 다양한 명령을 통해 위에서 설명한 세 가지 쿼리 모드를 모두 지원합니다. 다음 섹션에서는 가장 간단하고 사용하기 쉬운 범위 쿼리부터 시작하여 모두 표시합니다.


Querying by range: XRANGE and XREVRANGE 


범위별로 스트림을 쿼리하려면 시작과 끝의 두 ID 만 지정하면 됩니다. 반환되는 범위에는 시작 또는 끝이 ID 인 요소가 포함되므로 범위가 포함됩니다. 두 개의 특수 ID-및 +는 각각 가능한 가장 작은 ID와 가장 큰 ID를 의미합니다.


> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"


반환 된 각 항목은 ID와 필드-값 쌍의 목록이라는 두 항목의 배열입니다. -문자의 왼쪽 부분은 항목이 생성 된 시점에서 스트림 항목을 생성 한 로컬 노드의 밀리 초 단위의 Unix 시간이기 때문에 항목 ID는 시간과 관계가 있다고 이미 언급했습니다. 스트림은 완전히 지정된 XADD 명령으로 복제되므로 복제본은 마스터와 동일한 ID를 갖게 됩니다. 즉, XRANGE를 사용하여 시간 범위를 쿼리 할 수 ​​있습니다. 그러나 그렇게 하기 위해 ID의 시퀀스 부분을 생략 할 수 있습니다. 생략하면 범위의 시작 부분에서 0으로 간주되고 끝 부분에서는 최대 값으로 간주됩니다. 유효한 순서 번호. 이런 식으로 단 2 밀리 초의 Unix 시간을 사용하여 쿼리 하면 해당 시간 범위에서 생성 된 모든 항목을 포괄적 인 방식으로 가져옵니다. 예를 들어 2 밀리 초 기간을 쿼리 하려면 다음을 사용할 수 있습니다.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"


이 범위에는 단일 항목 만 있지만 실제 데이터 세트에서는 시간 범위를 쿼리하거나 2 밀리 초 안에 많은 항목이 있을 수 있으며 반환 된 결과는 엄청날 수 있습니다. 이러한 이유로 XRANGE는 끝에 선택적 COUNT 옵션을 지원합니다. 개수를 지정하면 처음 N 개 항목 만 가져올 수 있습니다. 더 많은 것을 원하면 마지막으로 반환 된 ID를 얻고 시퀀스 부분을 1 씩 증가 시킨 다음 다시 쿼리 할 수 ​​있습니다. 다음 예제에서 이것을 봅시다. XADD를 사용하여 10 개의 항목을 추가하기 시작합니다 (표시하지 않겠습니다. mystream 스트림이 10 개의 항목으로 채워 졌다고 가정하겠습니다). 반복을 시작하기 위해 명령 당 2 개의 항목을 가져 오려면 전체 범위에서 시작하지만 개수는 2입니다.


> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"


다음 두 항목으로 반복을 계속하려면 반환 된 마지막 ID (1519073279157-0)를 선택하고 접두사를 추가해야 합니다 (이 경우 결과 배타적 범위 간격, 즉 (1519073279157-0). 이제 다음 XRANGE 호출의 새 시작 인수로 사용할 수 있습니다.


> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"


기타 등등. XRANGE 복잡도는 검색하려면 O (log (N))이고 M 요소를 반환하려면 O (M)이므로 적은 수를 사용하면 명령에 로그 시간 복잡도가 있으므로 반복의 각 단계가 빠릅니다. 따라서 XRANGE는 사실상 스트림 반복기이며 XSCAN 명령이 필요하지 않습니다.

XREVRANGE 명령은 XRANGE와 동일하지만 요소를 역순으로 반환하므로 XREVRANGE의 실제 사용은 Stream의 마지막 항목을 확인하는 것입니다.


> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"


XREVRANGE 명령은 시작 및 중지 인수를 역순으로 사용합니다.


Listening for new items with XREAD 


스트림의 범위별로 항목에 액세스하지 않으려는 경우 일반적으로 스트림에 도착하는 새 항목을 구독하는 것이 좋습니다. 이 개념은 채널을 구독하는 Redis Pub / Sub 또는 키가 새 요소를 가져올 때까지 기다리는 Redis 차단 목록과 관련이 있을 수 있지만 스트림을 사용하는 방식에는 근본적인 차이가 있습니다.

  1. 스트림에는 데이터를 기다리는 여러 클라이언트 (소비자)가있을 수 있습니다. 기본적으로 모든 새 항목은 지정된 스트림에서 데이터를 기다리는 모든 소비자에게 전달됩니다. 이 동작은 각 소비자가 다른 요소를 받는 차단 목록과 다릅니다. 그러나 여러 소비자에게 팬 아웃 하는 기능은 Pub / Sub와 유사합니다.
  2. Pub / Sub에서 메시지는 실행되고 잊어 버리고 어쨌든 저장되지 않으며 차단 목록을 사용하는 동안 클라이언트가 메시지를 수신하면 메시지가 목록에서 팝 (효과적으로 제거)되고 스트림은 근본적으로 다른 방식으로 작동합니다. 모든 메시지는 스트림에 무기한 추가됩니다 (사용자가 명시 적으로 항목 삭제를 요청하지 않는 한). 다른 소비자는 마지막으로 수신 된 메시지의 ID를 기억하여 관점에서 새 메시지가 무엇인지 알 수 있습니다.
  3. Streams 소비자 그룹은 동일한 스트림에 대해 서로 다른 그룹, 처리 된 항목에 대한 명시 적 승인, 보류 중인 항목을 검사하는 기능, 처리되지 않은 메시지 청구 및 각각에 대한 일관된 기록 가시성을 통해 Pub / Sub 또는 차단 목록이 달성 할 수 없는 제어 수준을 제공합니다. 메시지의 개인 과거 기록 만 볼 수 있는 단일 클라이언트.

스트림에 도착하는 새 메시지를 수신하는 기능을 제공하는 명령을 XREAD라고 합니다. XRANGE보다 조금 더 복잡하므로 간단한 양식을 표시하기 시작하고 나중에 전체 명령 레이아웃이 제공됩니다.


> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"


위는 XREAD의 비 차단 형식입니다. COUNT 옵션은 필수가 아닙니다. 실제로 명령의 유일한 필수 옵션은 호출 소비자가 각 스트림에 대해 이미 본 해당 최대 ID와 함께 키 목록을 지정하는 STREAMS 옵션입니다. 지정된 ID보다 큰 ID를 가진 메시지 만 클라이언트에 제공하십시오.

위의 명령에서 우리는 STREAMS mystream 0을 작성 했으므로 0-0보다 큰 ID를 가진 Stream mystream의 모든 메시지를 원합니다. 위의 예에서 볼 수 있듯이 명령은 키 이름을 반환합니다. 실제로 두 개 이상의 키를 사용하여 이 명령을 호출하여 다른 스트림에서 동시에 읽을 수 있기 때문입니다. 예를 들어 다음과 같이 작성할 수 있습니다. STREAMS mystream otherstream 0 0. STREAMS 옵션 다음에 키 이름을 제공하고 나중에 ID를 제공해야 하는 방법에 유의하십시오. 이러한 이유로 STREAMS 옵션은 항상 마지막 옵션이어야 합니다.

XREAD가 한 번에 여러 스트림에 액세스 할 수 있고 우리가 소유 한 마지막 ID를 지정하여 새로운 메시지를 얻을 수 있다는 사실과는 별개로 이 간단한 형태의 명령은 XRANGE와 다른 작업을 수행하지 않습니다. 그러나 흥미로운 부분은 BLOCK 인수를 지정하여 XREAD를 차단 명령으로 쉽게 전환 할 수 있다는 것입니다.


> XREAD BLOCK 0 STREAMS mystream $


위의 예에서 COUNT를 제거하는 것 외에 시간 제한이 0 밀리 초인 새 BLOCK 옵션을 지정했습니다 (즉, 시간이 초과되지 않음을 의미 함). 또한 mystream 스트림에 대한 일반 ID를 전달하는 대신 특수 ID $를 전달했습니다. 이 특수 ID는 XREAD가 mystream 스트림에 이미 저장된 최대 ID를 마지막 ID로 사용해야 하므로 청취를 시작한 시간부터 새 메시지 만 수신합니다. 이것은 어떤 면에서 tail -f Unix 명령과 유사합니다.

BLOCK 옵션을 사용하는 경우 특수 ID $를 사용할 필요가 없습니다. 유효한 모든 ID를 사용할 수 있습니다. 명령이 차단 없이 요청을 즉시 처리 할 수 ​​있으면 그렇게 하고 그렇지 않으면 차단합니다. 일반적으로 새 항목에서 시작하는 스트림을 사용하려는 경우 ID $로 시작하고 이후에 수신 된 마지막 메시지의 ID를 계속 사용하여 다음 호출을 수행합니다.

XREAD의 차단 형식은 여러 키 이름을 지정하는 것만으로도 여러 스트림을 수신 할 수 있습니다. 지정된 해당 ID보다 큰 요소가 있는 스트림이 하나 이상 있어 요청이 동 기적으로 제공 될 수 있는 경우 결과와 함께 반환됩니다. 그렇지 않으면 명령이 차단되고 지정된 ID에 따라 새 데이터를 가져 오는 첫 번째 스트림의 항목을 반환합니다.

차단 목록 작업과 유사하게, 의미 체계가 FIFO 스타일이기 때문에 데이터를 기다리는 클라이언트의 관점에서 차단 스트림 읽기가 공정합니다. 지정된 스트림에 대해 차단 된 첫 번째 클라이언트는 새 항목을 사용할 수 있을 때 가장 먼저 차단 해제됩니다.

XREAD에는 COUNT 및 BLOCK 외에 다른 옵션이 없으므로 소비자를 하나 또는 여러 스트림에 연결하는 특정 목적을 가진 매우 기본적인 명령입니다. 소비자 그룹 API를 사용하여 스트림을 소비하는 더 강력한 기능을 사용할 수 있지만 소비자 그룹을 통한 읽기는 이 가이드의 다음 섹션에서 다룰 XREADGROUP이라는 다른 명령으로 구현됩니다.


Consumer groups 


당면한 작업이 다른 클라이언트에서 동일한 스트림을 사용하는 것이라면 XREAD는 이미 N 개의 클라이언트로 팬 아웃 하는 방법을 제공하며 잠재적으로 더 많은 읽기 확장 성을 제공하기 위해 복제본을 사용할 수도 있습니다. 그러나 특정 문제에서 우리가 원하는 것은 많은 클라이언트에게 동일한 메시지 스트림을 제공하는 것이 아니라 동일한 스트림에서 많은 클라이언트에게 다른 메시지 하위 집합을 제공하는 것입니다. 이것이 유용한 명백한 경우는 처리 속도가 느린 메시지의 경우입니다. 스트림의 서로 다른 부분을 수신 할 N 개의 서로 다른 워커를 가질 수 있는 능력을 통해 서로 다른 메시지를 준비된 서로 다른 워커에게 라우팅하여 메시지 처리를 확장 할 수 있습니다. 더 많은 일을 하십시오.

실제로 3 명의 소비자 C1, C2, C3와 메시지 1, 2, 3, 4, 5, 6, 7을 포함하는 스트림이 있다고 가정하면 다음 다이어그램에 따라 메시지를 제공하는 것이 좋습니다. :


1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1


이를 달성하기 위해 Redis는 소비자 그룹이라는 개념을 사용합니다. Redis 소비자 그룹은 구현 관점에서 Kafka (TM) 소비자 그룹과 관련이 없음을 이해하는 것이 매우 중요합니다. 그러나 그것들은 기능면에서 비슷하기 때문에 이 아이디어가 독창적으로 대중화 되었기 때문에 Kafka (TM) 용어를 유지하기로 결정했습니다.

소비자 그룹은 스트림에서 데이터를 가져오고 실제로 여러 소비자에게 서비스를 제공하여 특정 보장을 제공하는 의사 소비자와 같습니다.


  1. 각 메시지는 다른 소비자에게 제공되므로 동일한 메시지가 여러 소비자에게 배달 될 수 없습니다.
  2. 소비자는 소비자를 구현하는 클라이언트가 선택해야 하는 대소 문자를 구분하는 문자열 인 이름으로 소비자 그룹 내에서 식별됩니다. 즉, 연결이 끊긴 후에도 클라이언트가 동일한 소비자라고 다시 주장하기 때문에 스트림 소비자 그룹은 모든 상태를 유지합니다. 그러나 이것은 또한 고유 식별자를 제공하는 것은 클라이언트에 달려 있음을 의미합니다.
  3. 각 소비자 그룹에는 사용되지 않은 첫 번째 ID의 개념이 있으므로 소비자가 새 메시지를 요청할 때 이전에 전달되지 않은 메시지 만 제공 할 수 있습니다.
  4. 그러나 메시지를 사용하려면 특정 명령을 사용한 명시 적 승인이 필요합니다. Redis는 다음과 같이 승인을 중단합니다.이 메시지는 소비자 그룹에서 제거 될 수 있도록 올바르게 처리되었습니다.
  5. 소비자 그룹은 현재 보류중인 모든 메시지, 즉 소비자 그룹의 일부 소비자에게 전달되었지만 아직 처리 된 것으로 확인되지 않은 메시지를 추적합니다. 이 기능 덕분에 스트림의 메시지 기록에 액세스 할 때 각 소비자는 전달 된 메시지 만 볼 수 있습니다.


어떤 의미에서 소비자 그룹은 스트림에 대한 일부 상태로 상상할 수 있습니다.


+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+


이 관점에서 보면 소비자 그룹이 할 수 있는 작업, 대기 중인 메시지의 기록을 소비자에게 제공하는 방법, 새 메시지를 요청하는 소비자에게 제공되는 방법을 이해하는 것은 매우 간단합니다. last_delivered_id보다 큰 메시지 ID입니다. 동시에 소비자 그룹을 Redis 스트림에 대한 보조 데이터 구조로 보면 단일 스트림에 여러 소비자 그룹이 있고 다른 소비자 집합이 있을 수 있음이 분명합니다. 실제로 동일한 스트림에서 클라이언트가 XREAD를 통해 소비자 그룹 없이 읽고 클라이언트가 다른 소비자 그룹에서 XREADGROUP을 통해 읽도록 하는 것도 가능합니다.

이제 기본적인 소비자 그룹 명령을 보기 위해 확대 할 때입니다. 다음과 같습니다.

  • XGROUP은 소비자 그룹을 생성, 삭제 및 관리하는 데 사용됩니다.
  • XREADGROUP은 소비자 그룹을 통해 스트림에서 읽는 데 사용됩니다.
  • XACK는 소비자가 보류 중인 메시지를 올바르게 처리 된 것으로 표시 할 수 있는 명령입니다.

Creating a consumer group 


스트림 유형의 키 mystream이 이미 존재한다고 가정하고 소비자 그룹을 만들려면 다음을 수행하면 됩니다.


> XGROUP CREATE mystream mygroup $
OK


소비자 그룹을 만들 때 위의 명령에서 볼 수 있듯이 ID를 지정해야 합니다. 예제에서는 $ 만 있습니다. 이는 다른 상태 중에서 소비자 그룹이 첫 번째 소비자 연결시 다음에 제공 할 메시지, 즉 그룹이 방금 생성되었을 때 마지막 메시지 ID가 무엇인지에 대한 아이디어를 가지고 있어야 하기 때문입니다. 우리가 했던 것처럼 $를 제공하면 지금부터 스트림에 도착하는 새 메시지 만 그룹의 소비자에게 제공됩니다. 대신 0을 지정하면 소비자 그룹은 시작할 스트림 기록의 모든 메시지를 소비합니다. 물론 다른 유효한 ID를 지정할 수 있습니다. 알고 있는 것은 소비자 그룹이 지정한 ID보다 큰 메시지를 전달하기 시작한다는 것입니다. $는 스트림에서 현재 가장 큰 ID를 의미하므로 $를 지정하면 새 메시지 만 사용하는 효과가 있습니다.

XGROUP CREATE는 또한 선택 사항 인 MKSTREAM 하위 명령을 마지막 인수로 사용하여 스트림이 없는 경우 자동으로 스트림을 만드는 것을 지원합니다.

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK


이제 소비자 그룹이 생성되었으므로 XREADGROUP 명령을 사용하여 소비자 그룹을 통해 즉시 메시지 읽기를 시도 할 수 있습니다. 시스템이 Alice 또는 Bob에게 다른 메시지를 반환하는 방법을 알아보기 위해 소비자로부터 Alice와 Bob을 호출 할 것입니다.

XREADGROUP은 XREAD와 매우 유사하며 동일한 BLOCK 옵션을 제공합니다. 그렇지 않으면 동기 명령입니다. 그러나 항상 지정해야 하는 필수 옵션이 있습니다. GROUP이고 두 개의 인수 (소비자 그룹 이름 및 읽기를 시도하는 이용자 이름)가 있습니다. 옵션 COUNT도 지원되며 XREAD의 옵션과 동일합니다.

스트림에서 읽기 전에 내부에 몇 가지 메시지를 넣어 보겠습니다.


> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0


참고 : 여기서 메시지는 필드 이름이고 과일은 관련 값입니다. 스트림 항목은 작은 사전이라는 점을 기억하세요.


소비자 그룹을 사용하여 무언가를 읽어 볼 시간입니다.


> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"


XREADGROUP 응답은 XREAD 응답과 같습니다. 그러나 위에 제공된 GROUP <group-name> <consumer-name>에 유의하십시오. 소비자 그룹 mygroup을 사용하여 스트림에서 읽기를 원하고 소비자 Alice라고 말합니다. 소비자가 소비자 그룹으로 작업을 수행 할 때마다 해당 이름을 지정하여 그룹 내에서이 소비자를 고유하게 식별해야 합니다.

위의 명령 줄에 또 다른 매우 중요한 세부 정보가 있습니다. 필수 STREAMS 옵션 다음에 mystream 키에 대해 요청 된 ID는 특수 ID>입니다. 이 특수 ID는 소비자 그룹의 컨텍스트에서만 유효하며 이는 지금까지 다른 소비자에게 메시지가 전달되지 않았음을 의미합니다.

이것은 거의 항상 원하는 것입니다. 그러나 0 또는 다른 유효한 ID와 같은 실제 ID를 지정할 수도 있습니다. 그러나이 경우 XREADGROUP에 이력을 제공하기 위해 XREADGROUP에 요청합니다. 보류 중인 메시지는 그룹의 새 메시지를 볼 수 없습니다. 따라서 기본적으로 XREADGROUP은 지정한 ID에 따라 다음과 같은 동작을 합니다.

  • ID가 특수 ID>이면 명령은 지금까지 다른 소비자에게 전달되지 않은 새 메시지 만 반환하고 부작용으로 소비자 그룹의 마지막 ID를 업데이트합니다.
  • ID가 다른 유효한 숫자 ID이면 명령을 통해 보류 중인 메시지 기록에 액세스 할 수 있습니다. 즉,이 지정된 소비자 (제공된 이름으로 식별 됨)에게 전달되고 지금까지 XACK로 확인되지 않은 메시지 집합입니다.


COUNT 옵션없이 ID 0을 지정하여 이 동작을 즉시 테스트 할 수 있습니다. 보류 중인 메시지, 즉 사과에 대한 메시지 만 표시됩니다.


> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"


그러나 메시지가 처리 된 것으로 확인되면 더 이상 보류 중인 메시지 기록에 포함되지 않으므로 시스템은 더 이상 아무것도 보고하지 않습니다.


> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)


XACK의 작동 방식을 아직 모르더라도 걱정하지 마십시오. 처리 된 메시지가 더 이상 액세스 할 수 있는 기록의 일부가 아니라는 생각입니다.

이제 Bob이 무언가를 읽을 차례입니다.


> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"


Bob은 최대 두 개의 메시지를 요청했으며 동일한 mygroup 그룹을 통해 읽고 있습니다. 따라서 Redis는 새 메시지만 보고 합니다. 보시다시피 "사과"메시지는 이미 Alice에게 전달되었으므로 전달되지 않았으므로 Bob은 오렌지와 딸기 등을 얻습니다.

이렇게 하면 Alice, Bob 및 그룹의 다른 소비자가 동일한 스트림에서 다른 메시지를 읽고 아직 처리하지 않은 메시지 기록을 읽거나 메시지를 처리 ​​된 것으로 표시 할 수 있습니다. 이를 통해 스트림에서 메시지를 소비하기 위한 다른 토폴로지와 의미를 만들 수 있습니다.

유의해야 할 몇 가지 사항이 있습니다.

  • 소비자는 처음 언급 될 때 자동으로 생성되며 명시 적으로 생성 할 필요가 없습니다.
  • XREADGROUP을 사용하더라도 여러 키에서 동시에 읽을 수 있지만 이것이 작동하려면 모든 스트림에서 동일한 이름을 가진 소비자 그룹을 만들어야 합니다. 이것은 일반적인 요구 사항은 아니지만 기능이 기술적으로 사용 가능하다는 점을 언급 할 가치가 있습니다.
  • XREADGROUP은 스트림에서 읽더라도 소비자 그룹이 읽기의 부작용으로 수정되어 마스터 인스턴스에서만 호출 할 수 있기 때문에 쓰기 명령입니다.

Ruby 언어로 작성된 소비자 그룹을 사용하는 소비자 구현의 예는 다음과 같습니다. Ruby 코드는 Ruby를 모르더라도 거의 모든 숙련 된 프로그래머가 읽을 수 있도록 만들어졌습니다.


require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end


여기에서 알 수 있듯이, 기록, 즉 보류 중인 메시지 목록을 사용하여 시작하는 것이 아이디어입니다. 이는 소비자가 이전에 충돌했을 수 있기 때문에 유용하므로 다시 시작할 경우 확인을 받지 않고 전달 된 메시지를 다시 읽고 싶습니다. 메시지를 여러 번 또는 한 번 처리 할 수 ​​있습니다 (적어도 소비자 오류의 경우이지만 관련된 Redis 지속성 및 복제의 제한도 있습니다.이 주제에 대한 특정 섹션 참조).


기록이 소비되고 빈 메시지 목록이 표시되면> 특수 ID를 사용하여 새 메시지를 소비하도록 전환 할 수 있습니다.


Recovering from permanent failures 


위의 예를 통해 동일한 소비자 그룹에 참여하는 소비자를 작성할 수 있습니다. 각 소비자는 처리 할 메시지의 하위 집합을 가져오고 실패에서 복구 할 때 자신에게 전달 된 보류 중인 메시지를 다시 읽을 수 있습니다. 그러나 현실 세계에서 소비자는 영구적으로 실패하고 회복하지 못할 수 있습니다. 어떤 이유로 든 중지 한 후 복구 되지 않는 소비자의 보류 중인 메시지는 어떻게 됩니까?

Redis 소비자 그룹은 해당 메시지가 소유권을 변경하고 다른 소비자에게 재 할당되도록 특정 소비자의 보류 중인 메시지를 요청하기 위해 이러한 상황에서 사용되는 기능을 제공합니다. 이 기능은 매우 명시 적입니다. 소비자는 보류 중인 메시지 목록을 검사해야 하며 특수 명령을 사용하여 특정 메시지를 요청해야 합니다. 그렇지 않으면 서버가 메시지를 영원히 보류 상태로 두고 이전 소비자에게 할당됩니다. 이러한 방식으로 여러 응용 프로그램에서 이러한 기능을 사용할지 여부와 사용 방법을 정확하게 선택할 수 있습니다.

이 프로세스의 첫 번째 단계는 소비자 그룹에서 보류 중인 항목을 관찰 할 수 있는 명령이며 XPENDING이라고 합니다. 이것은 항상 호출하기에 안전하고 메시지의 소유권을 변경하지 않는 읽기 전용 명령입니다. 가장 간단한 형식으로 명령은 스트림의 이름과 소비자 그룹의 이름 인 두 개의 인수를 사용하여 호출됩니다.

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"


이러한 방식으로 호출되면 명령은 소비자 그룹에서 보류 중인 메시지의 총 수를 출력합니다.이 경우에는 두 개의 메시지, 보류 중인 메시지 중 더 낮은 메시지 ID와 더 높은 메시지 ID, 마지막으로 소비자 목록과 보류 중인 메시지 수를 출력합니다. 있다. Alice가 요청한 단일 메시지가 XACK를 사용하여 확인 되었기 때문에 두 개의 보류 메시지가 있는 Bob 만 있습니다.

전체 명령 서명이 다음과 같기 때문에 XPENDING에 더 많은 인수를 제공하여 더 많은 정보를 요청할 수 있습니다.


XPENDING <key> <groupname> [<start-id> <end-id> <count> [<consumer-name>]]


시작 및 종료 ID (XRANGE에서와 같이-및 + 일 수 있음)와 명령에 의해 반환 되는 정보의 양을 제어하는 ​​개수를 제공하여 보류 중인 메시지에 대해 더 많이 알 수 있습니다. 선택적 최종 인수 인 소비자 이름은 지정된 소비자에 대해 보류 중인 메시지로 출력을 제한하려는 경우 사용되지만 다음 예제에서는 이 기능을 사용하지 않습니다.


> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1


이제 각 메시지에 대한 세부 정보가 있습니다. ID, 소비자 이름, 유휴 시간 (밀리 초), 메시지가 일부 소비자에게 마지막으로 전달 된 후 밀리 초가 지난 후 마지막으로 주어진 횟수 메시지가 전달되었습니다. Bob이 보낸 두 개의 메시지가 있으며 약 20 시간 동안 74170458 밀리 초 동안 유휴 상태입니다.


아무도 XRANGE를 사용하여 첫 번째 메시지 내용이 무엇인지 확인하는 것을 막지 못합니다.


> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"


인수에서 동일한 ID를 두 번 반복하면 됩니다. 이제 몇 가지 아이디어를 얻었으므로 Alice는 메시지를 처리하지 않은지 20 시간이 지나면 Bob이 제 시간에 복구 되지 않을 것이라고 결정할 수 있으며 이러한 메시지를 요청하고 Bob 대신 처리를 재개 할 때입니다. 이를 위해 XCLAIM 명령을 사용합니다.

이 명령은 소비자 그룹 변경 사항의 복제에 사용되기 때문에 매우 복잡하고 전체 형식의 옵션으로 가득 차 있지만 일반적으로 필요한 인수 만 사용합니다. 이 경우 다음과 같이 간단합니다.


XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>


기본적으로 이 특정 키 및 그룹에 대해 지정된 메시지 ID가 소유권을 변경하고 지정된 소비자 이름 <consumer>에 할당되기를 원합니다. 그러나 최소 유휴 시간도 제공하므로 언급 된 메시지의 유휴 시간이 지정된 유휴 시간보다 큰 경우에만 작업이 작동합니다. 두 클라이언트가 동시에 메시지를 요청하려고 재 시도하기 때문에 유용합니다.


Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0


그러나 메시지를 요청하면 부작용으로 유휴 시간이 재설정 됩니다! 그리고 배달 카운터 수를 증가 시키므로 두 번째 클라이언트가 요청을 실패하게 됩니다. 이런 식으로 메시지의 사소한 재 처리를 방지합니다 (일반적인 경우 정확히 한 번 처리 할 수 없는 경우에도).


다음은 명령 실행의 결과입니다.


> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"


메시지는 Alice에 의해 성공적으로 요청되었으며 이제 메시지를 처리하고 승인 할 수 있으며 원래 소비자가 복구되지 않더라도 작업을 진행할 수 있습니다.

위의 예에서 주어진 메시지를 성공적으로 요구하는 부작용으로 XCLAIM 명령도 해당 메시지를 반환한다는 것이 분명합니다. 그러나 이것은 필수 사항이 아닙니다. JUSTID 옵션은 성공적으로 요청 된 메시지의 ID 만 반환하는 데 사용할 수 있습니다. 이는 클라이언트와 서버 사이에 사용되는 대역폭 (및 명령의 성능)을 줄이고 자 소비자가 보류 기록을 다시 검색하는 방식으로 구현되기 때문에 메시지에 관심이 없는 경우에 유용합니다. 수시로 메시지.

청구는 별도의 프로세스로 구현 될 수도 있습니다. 보류 중인 메시지 목록 만 확인하고 활성 상태 인 것으로 보이는 소비자에게 유휴 메시지를 할당하는 프로세스입니다. Redis 스트림의 관찰 기능 중 하나를 사용하여 활성 소비자를 얻을 수 있습니다. 이것이 다음 섹션의 주제입니다.


Automatic claiming 


Redis 6.2에 추가 된 XAUTOCLAIM 명령은 위에서 설명한 청구 프로세스를 구현합니다. XPENDING 및 XCLAIM은 다양한 유형의 복구 메커니즘에 대한 기본 구성 요소를 제공합니다. 이 명령은 Redis가 이를 관리하도록 하여 일반 프로세스를 최적화하고 대부분의 복구 요구에 대한 간단한 솔루션을 제공합니다.

XAUTOCLAIM은 유휴 보류 메시지를 식별하고 해당 메시지의 소유권을 소비자에게 이전합니다. 명령의 서명은 다음과 같습니다.


XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]


따라서 위의 예에서 자동 청구를 사용하여 다음과 같은 단일 메시지를 요청할 수 있습니다.


> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"


XCLAIM과 마찬가지로 명령은 요청 된 메시지의 배열로 응답하지만 보류 중인 항목을 반복 할 수 있는 스트림 ID도 반환합니다. 스트림 ID는 커서이며 다음 호출에서 이를 사용하여 유휴 대기 메시지를 계속 청구 할 수 있습니다.


> XAUTOCLAIM mystream mygroup Lora 3600000 1526569498055-0 COUNT 1
1) 0-0
2) 1) 1526569506935-0
   2) 1) "message"
      2) "strawberry"


XAUTOCLAIM이 "0-0"스트림 ID를 커서로 반환하면 소비자 그룹 보류 항목 목록의 끝에 도달했음을 의미합니다. 그렇다고 새로운 유휴 보류 메시지가 없다는 의미는 아니므로 스트림 시작 부분에서 XAUTOCLAIM을 호출하여 프로세스가 계속됩니다.


Claiming and the delivery counter 


XPENDING 출력에서 ​​관찰하는 카운터는 각 메시지의 배달 수입니다. 카운터는 두 가지 방식으로 증가합니다. XCLAIM을 통해 메시지가 성공적으로 요청 된 경우 또는 보류 중인 메시지의 기록에 액세스하기 위해 XREADGROUP 호출이 사용되는 경우입니다.

오류가 발생하면 메시지가 여러 번 전달되는 것이 정상이지만 결국에는 일반적으로 처리되고 확인됩니다. 그러나 처리 코드에서 버그를 유발하는 방식으로 손상되었거나 제작 되었기 때문에 특정 메시지를 처리하는 데 문제가 있을 수 있습니다. 이 경우 소비자는 이 특정 메시지를 계속해서 처리하지 못합니다. 배달 시도 카운터가 있으므로 이 카운터를 사용하여 어떤 이유로 처리 할 수 없는 메시지를 감지 할 수 있습니다. 따라서 배달 카운터가 선택한 주어진 많은 수에 도달하면 이러한 메시지를 다른 스트림에 넣고 시스템 관리자에게 알림을 보내는 것이 현명 할 것입니다. 이것은 기본적으로 Redis Streams가 배달 못한 편지 개념을 구현하는 방식입니다.


Streams observability 


관찰력이 부족한 메시징 시스템은 작업하기가 매우 어렵습니다. 메시지를 소비하는 사람, 보류 중인 메시지, 주어진 스트림에서 활성화 된 소비자 그룹 집합을 알지 못하면 모든 것이 불투명 해집니다. 이러한 이유로 Redis Streams와 소비자 그룹은 무슨 일이 일어나고 있는지 관찰하는 방법이 다릅니다. 이미 유휴 시간 및 배달 수와 함께 특정 순간에 처리 중인 메시지 목록을 검사 할 수 있는 XPENDING에 대해 설명했습니다.

그러나 우리는 그 이상을 원할 수 있으며 XINFO 명령은 스트림 또는 소비자 그룹에 대한 정보를 얻기 위해 하위 명령과 함께 사용할 수 있는 관찰 성 인터페이스입니다. 이 명령은 스트림 및 해당 소비자 그룹의 상태에 대한 다양한 정보를 표시하기 위해 하위 명령을 사용합니다. 예를 들어 XINFO STREAM은 스트림 자체에 대한 정보를 보고 합니다.


> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1526569495631-0
    2) 1) "message"
       2) "apple"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"


출력은 스트림이 내부적으로 인코딩되는 방법에 대한 정보를 표시하고 스트림의 첫 번째 및 마지막 메시지도 표시합니다. 사용 가능한 또 다른 정보는이 스트림과 관련된 소비자 그룹의 수입니다. 우리는 소비자 그룹에 대한 더 많은 정보를 요청할 수 있습니다.


> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
   7) last-delivered-id
   8) "1588152489012-0"
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0
   7) last-delivered-id
   8) "1588152498034-0"


이 출력과 이전 출력에서 ​​볼 수 있듯이 XINFO 명령은 일련의 필드 값 항목을 출력합니다. 이것은 관찰 성 명령이기 때문에 인간 사용자는 보고 된 정보를 즉시 이해할 수 있으며 명령이 이전 클라이언트와의 호환성을 깨지 않고 더 많은 필드를 추가하여 향후 더 많은 정보를 보고 할 수 있습니다. XPENDING과 같이 대역폭 효율성이 더 높아야 하는 다른 명령은 필드 이름 없이 정보만 보고 합니다.

GROUPS 하위 명령이 사용되는 위 예제의 출력은 필드 이름을 관찰하면서 명확해야 합니다. 그룹에 등록 된 소비자를 확인하여 특정 소비자 그룹의 상태를 보다 자세히 확인할 수 있습니다.

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983


명령 구문을 기억하지 못하는 경우 명령 자체에 도움을 요청하십시오.


> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname>  -- Show consumer groups of group <groupname>.
3) GROUPS <key>                 -- Show the stream consumer groups.
4) STREAM <key>                 -- Show information about the stream.
5) HELP                         -- Print this help.


Differences with Kafka (TM) partitions 


Redis 스트림의 소비자 그룹은 어떤면에서 Kafka (TM) 파티셔닝 기반 소비자 그룹과 유사 할 수 있지만 Redis 스트림은 실질적으로 매우 다릅니다. 파티션은 논리적 일 뿐이며 메시지는 단일 Redis 키에 저장되므로 다른 클라이언트가 제공되는 방식은 파티션 클라이언트가 읽는 파티션이 아니라 새 메시지를 처리 ​​할 준비가 된 사람을 기반으로 합니다. 예를 들어 소비자 C3가 영구적으로 실패하는 경우 Redis는 마치 두 개의 논리 파티션 만 있는 것처럼 도착하는 모든 새 메시지를 C1 및 C2에 계속 제공합니다.

마찬가지로 특정 소비자가 다른 소비자보다 메시지 처리 속도가 훨씬 빠르면 이 소비자는 동일한 시간 단위에서 비례 적으로 더 많은 메시지를 받게 됩니다. Redis는 확인되지 않은 모든 메시지를 명시 적으로 추적하고 누가 어떤 메시지를 수신했는지 기억하고 첫 번째 메시지의 ID를 소비자에게 전달하지 않았기 때문에 가능합니다.

그러나 이것은 또한 Redis에서 동일한 스트림의 메시지를 여러 Redis 인스턴스로 분할하려는 경우 여러 키와 Redis Cluster 또는 기타 애플리케이션 별 샤딩 시스템과 같은 일부 샤딩 시스템을 사용해야 함을 의미합니다. 단일 Redis 스트림은 여러 인스턴스로 자동 분할 되지 않습니다.

개략적으로 다음이 사실이라고 말할 수 있습니다.

  • 1 개의 스트림-> 1 명의 소비자를 사용하는 경우 메시지를 순서대로 처리합니다.
  • N 개의 소비자와 함께 N 개의 스트림을 사용하여 주어진 소비자 만 N 개의 스트림의 하위 집합에 도달하도록 하려면 위의 1 개 스트림-> 1 명의 소비자 모델을 확장 할 수 있습니다.
  • 1 개의 스트림-> N 개의 소비자를 사용하는 경우 N 개의 소비자에 대한로드 밸런싱을 수행하지만 이 경우 지정된 소비자가 다른 소비자가 처리하는 것보다 메시지 3을 더 빨리 처리 할 수 ​​있으므로 동일한 논리 항목에 대한 메시지가 순서가 맞지 않게 소비 될 수 있습니다. 메시지 4.

따라서 기본적으로 Kafka 파티션은 N 개의 서로 다른 Redis 키를 사용하는 것과 더 비슷하지만 Redis 소비자 그룹은 주어진 스트림에서 N 개의 서로 다른 소비자로 전달되는 메시지의 서버 측 부하 분산 시스템입니다.


Capped Streams 


많은 애플리케이션은 데이터를 스트림으로 영원히 수집하지 않습니다. 때로는 스트림 내에 주어진 항목 수를 최대로 포함하는 것이 유용하고, 주어진 크기에 도달하면 Redis에서 메모리에 있지 않고 빠르지 않지만 저장하기에 적합한 저장소로 데이터를 이동하는 것이 유용합니다. 잠재적으로 앞으로 수십 년 동안의 역사. Redis 스트림은 이를 지원합니다. 하나는 XADD 명령의 MAXLEN 옵션입니다. 이 옵션은 사용이 매우 간단합니다.


> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"


MAXLEN을 사용하면 지정된 길이에 도달하면 이전 항목이 자동으로 제거되므로 스트림이 일정한 크기로 유지됩니다. 현재 스트림에 지정된 기간보다 오래되지 않은 항목 만 유지하도록 지시하는 옵션은 없습니다. 이러한 명령은 일관되게 실행하기 위해 항목을 제거하기 위해 잠재적으로 오랫동안 차단 될 수 있기 때문입니다. 예를 들어 삽입 스파이크, 긴 일시 중지 및 다른 삽입이 모두 동일한 최대 시간으로 발생하는 경우 어떻게 되는지 상상해보십시오. 일시 중지 중에 너무 오래 된 데이터를 제거하기 위해 스트림이 차단됩니다. 따라서 계획을 수립하고 원하는 최대 스트림 길이를 이해하는 것은 사용자에게 달려 있습니다. 또한 스트림의 길이는 사용 된 메모리에 비례하지만 시간별 트리밍은 제어 및 예상하기가 덜 간단합니다. 이는 시간이 지남에 따라 자주 변경되는 삽입 속도에 따라 달라집니다 (그리고 변경되지 않을 때 크기는 사소합니다).

그러나 MAXLEN을 사용한 트리밍은 비용이 많이 들 수 있습니다. 메모리 효율성을 높이기 위해 스트림은 매크로 노드로 기수 트리로 표현됩니다. 수십 개의 요소로 구성된 단일 매크로 노드를 변경하는 것은 최적이 아닙니다. 따라서 다음과 같은 특수 형식으로 명령을 사용할 수 있습니다.


XADD mystream MAXLEN ~ 1000 * ... entry fields here ...


MAXLEN 옵션과 실제 개수 사이의 ~ 인수는 정확히 1000 개 항목 일 필요가 없다는 것을 의미합니다. 1000 또는 1010 또는 1030 일 수 있습니다. 최소한 1000 개의 항목을 저장해야 합니다. 이 인수를 사용하면 전체 노드를 제거 할 수 있을 때만 트리밍이 수행됩니다. 이것은 훨씬 더 효율적이며 일반적으로 원하는 것입니다.


XTRIM 명령도 있습니다.이 명령은 자체적으로 실행될 수 있다는 점을 제외하면 위의 MAXLEN 옵션이 수행하는 것과 매우 유사한 작업을 수행합니다.

> XTRIM mystream MAXLEN 10

또는 XADD 옵션의 경우 :


> XTRIM mystream MAXLEN ~ 10


그러나 XTRIM은 다양한 트리밍 전략을 수용하도록 설계되었습니다. 또 다른 트리밍 전략은 MINID로, 지정된 ID보다 낮은 ID를 가진 항목을 제거합니다.

XTRIM은 명시 적 명령이므로 사용자는 다양한 트리밍 전략의 가능한 단점에 대해 알고 있어야 합니다.

향후 XTRIM에 추가 될 수 있는 또 다른 유용한 제거 전략은 필요한 경우 데이터를 Redis에서 다른 스토리지 시스템으로 이동하기 위해 XRANGE 및 XTRIM을 쉽게 사용할 수 있도록 다양한 ID로 제거하는 것입니다.


Special IDs in the streams API 


Redis API에서 사용할 수 있는 몇 가지 특수 ID가 있음을 알 수 있습니다. 여기에 짧은 요약이 있으므로 향후 더 잘 이해할 수 있습니다.

처음 두 특수 ID는-및 +이며 XRANGE 명령을 사용하여 범위 쿼리에 사용됩니다. 이 두 ID는 각각 가능한 가장 작은 ID (기본적으로 0-1)와 가능한 가장 큰 ID (즉 18446744073709551615-18446744073709551615)를 의미합니다. 보시다시피 숫자 대신-및 +를 쓰는 것이 훨씬 더 깔끔합니다.

그런 다음 스트림 내에서 가장 큰 ID를 가진 항목의 ID라고 말하고 싶은 API가 있습니다. 이것이 $가 의미하는 바입니다. 예를 들어 XREADGROUP이 있는 새 항목 만 원하는 경우 이 ID를 사용하여 이미 모든 기존 항목이 있지만 미래에 삽입 될 새 항목이 없음을 나타냅니다. 마찬가지로 소비자 그룹의 ID를 만들거나 설정할 때 그룹의 소비자에게 새 항목을 전달하기 위해 마지막으로 전달 된 항목을 $로 설정할 수 있습니다.


보시다시피 $는 +를 의미하지 않습니다. +는 가능한 모든 스트림에서 가능한 가장 큰 ID이고 $는 주어진 항목을 포함하는 주어진 스트림에서 가장 큰 ID이기 때문에 두 가지 다른 것입니다. 또한 API는 일반적으로 + 또는 $ 만 이해하지만 여러 의미로 지정된 기호를 로드하지 않는 것이 유용했습니다.

또 다른 특수 ID는>입니다. 이는 소비자 그룹에만 관련되고 XREADGROUP 명령이 사용되는 경우에만 특별한 의미입니다. 이 특수 ID는 지금까지 다른 소비자에게 전달되지 않은 항목 만 원한다는 것을 의미합니다. 따라서 기본적으로> ID는 소비자 그룹의 마지막으로 전달 된 ID입니다.

마지막으로 XADD 명령에서만 사용할 수 있는 특수 ID *는 새 항목에 대한 ID를 자동으로 선택하는 것을 의미합니다.

그래서 우리는-, +, $,> 그리고 *를 가지고 있고, 모두 다른 의미를 가지고 있으며 대부분의 경우 다른 문맥에서 사용될 수 있습니다.


Persistence, replication and message safety 


다른 Redis 데이터 구조와 마찬가지로 Stream은 복제본에 비동기 적으로 복제되고 AOF 및 RDB 파일에 유지됩니다. 그러나 그다지 분명하지 않은 것은 소비자 그룹 전체 상태도 AOF, RDB 및 복제본으로 전파되므로 메시지가 마스터에서 보류 중인 경우 복제본도 동일한 정보를 갖게 된다는 것입니다. 마찬가지로 재시작 후 AOF는 소비자 그룹의 상태를 복원합니다.

그러나 Redis 스트림 및 소비자 그룹은 Redis 기본 복제를 사용하여 유지되고 복제되므로 다음을 참조하십시오.

  • 애플리케이션에서 메시지 지속성이 중요한 경우 강력한 fsync 정책과 함께 AOF를 사용해야 합니다.
  • 기본적으로 비동기 복제는 XADD 명령 또는 소비자 그룹 상태 변경이 복제되는 것을 보장하지 않습니다. 장애 조치 후 마스터에서 데이터를 수신하는 복제본의 기능에 따라 무언가 누락 될 수 있습니다.
  • WAIT 명령은 복제본 세트에 변경 사항을 강제로 전파하기 위해 사용될 수 있습니다. 그러나 이로 인해 데이터가 손실 될 가능성은 거의 없지만 Sentinel 또는 Redis Cluster에서 작동하는 Redis 장애 조치 프로세스는 가장 많이 업데이트 된 복제본으로 장애 조치를 수행하기 위해 최선의 검사 만 수행하며 특정 장애 조건에서 데이터가 부족한 복제본입니다.

따라서 Redis 스트림 및 소비자 그룹을 사용하여 애플리케이션을 설계 할 때 애플리케이션이 장애 중에 가져야 하는 의미 속성을 이해하고 그에 따라 구성하여 사용 사례에 대해 충분히 안전한지 평가해야 합니다.


Removing single items from a stream 


스트림에는 ID로만 스트림 중간에서 항목을 제거하는 특수 명령도 있습니다. 일반적으로 추가 전용 데이터 구조의 경우 이것은 이상한 기능처럼 보일 수 있지만 실제로는 개인 정보 보호 규정과 관련된 응용 프로그램에 유용합니다. 이 명령은 XDEL이라고 하며 삭제할 ID가 뒤에 오는 스트림 이름을 받습니다.

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"


그러나 현재 구현에서는 매크로 노드가 완전히 비어있을 때까지 메모리가 실제로 회수 되지 않으므로 이 기능을 남용해서는 안됩니다.


Zero length streams 


스트림과 다른 Redis 데이터 구조의 차이점은 다른 데이터 구조에 더 이상 요소가 없을 때 요소를 제거하는 명령 호출의 부작용으로 키 자체가 제거된다는 것입니다. 예를 들어, ZREM에 대한 호출이 정렬 된 세트의 마지막 요소를 제거하면 정렬 된 세트가 완전히 제거됩니다. 반면에 스트림은 0의 개수 (XADD 및 XTRIM 명령)와 함께 MAXLEN 옵션을 사용하거나 XDEL이 호출 되었기 때문에 0 요소에 머무를 수 있습니다.

이러한 비대칭이 존재하는 이유는 Streams에 연결된 소비자 그룹이 있을 수 있으며, 더 이상 스트림에 항목이 없다고 해서 소비자 그룹이 정의한 상태를 잃고 싶지 않기 때문입니다. 현재는 연결된 소비자 그룹이 없는 경우에도 스트림이 삭제되지 않지만 향후 변경 될 수 있습니다.


Total latency of consuming a message 


BLOCK 옵션이없는 XRANGE 및 XREAD 또는 XREADGROUP과 같은 비 블로킹 스트림 명령은 다른 Redis 명령과 마찬가지로 동기식으로 제공되므로 이러한 명령의 지연 시간을 논의하는 것은 의미가 없습니다. Redis 문서에서 명령의 시간 복잡성을 확인하는 것이 더 흥미롭습니다. 스트림 명령은 범위를 추출 할 때 최소한 정렬 된 집합 명령만큼 빠르며 XADD는 매우 빠르며 파이프 라이닝을 사용하면 평균 기계에서 초당 50 만 개에서 100 만 개의 항목을 쉽게 삽입 할 수 있다고 말하면 충분합니다.

그러나 소비자 그룹에서 소비자를 차단하는 맥락에서 메시지가 XADD를 통해 생성 된 순간부터 소비자가 메시지를 얻는 순간까지 메시지 처리 지연을 이해하려면 대기 시간이 흥미로운 매개 변수가 됩니다. XREADGROUP이 메시지와 함께 반환 되었습니다.


How serving blocked consumers work 


수행 된 테스트의 결과를 제공하기 전에 Redis가 스트림 메시지를 라우팅 하기 위해 사용하는 모델 (일반적으로 실제로 데이터를 기다리는 차단 작업이 어떻게 관리되는지)을 이해하는 것이 흥미롭습니다.


  • 차단 된 클라이언트는 차단 소비자가 하나 이상 있는 키를 해당 키를 기다리는 소비자 목록에 매핑 하는 해시 테이블에서 참조됩니다. 이렇게 하면 데이터를 받은 키가 주어지면 해당 데이터를 기다리는 모든 클라이언트를 해결할 수 있습니다.
  • 쓰기가 발생하면 이 경우 XADD 명령이 호출되면 signalKeyAsReady () 함수를 호출합니다. 이 기능은 처리해야 하는 키 목록에 키를 넣습니다. 이러한 키에는 차단 된 소비자에 대한 새 데이터가 있을 수 있기 때문입니다. 이러한 준비 키는 나중에 처리되므로 동일한 이벤트 루프주기 동안 키가 다른 쓰기를 수신 할 수 있습니다.
  • 마지막으로 이벤트 루프로 돌아 가기 전에 준비된 키가 최종적으로 처리됩니다. 각 키에 대해 데이터를 기다리는 클라이언트 목록이 스캔되고 해당되는 경우 해당 클라이언트는 도착한 새 데이터를 받습니다. 스트림의 경우 데이터는 소비자가 요청한 해당 범위의 메시지입니다.

보시다시피, 기본적으로 이벤트 루프로 돌아 가기 전에 XADD를 호출하는 클라이언트와 메시지를 소비하도록 차단 된 클라이언트는 모두 출력 버퍼에 응답을 가지므로 XADD 호출자는 동시에 Redis로부터 응답을 받아야 합니다. 소비자가 새 메시지를 받을 시간입니다.

이 모델은 푸시 기반입니다. 소비자 버퍼에 데이터를 추가하는 것은 XADD를 호출하는 작업에 의해 직접 수행되므로 대기 시간이 상당히 예측 가능한 경향이 있습니다.


Latency tests results 


이 대기 시간 특성을 확인하기 위해 컴퓨터 밀리 초 시간을 추가 필드로 갖는 메시지를 푸시하는 Ruby 프로그램의 여러 인스턴스와 소비자 그룹의 메시지를 읽고 처리하는 Ruby 프로그램을 사용하여 테스트를 수행했습니다. 메시지 처리 단계는 총 대기 시간을 이해하기 위해 현재 컴퓨터 시간과 메시지 타임 스탬프를 비교하는 것으로 구성되었습니다.

이러한 프로그램은 최적화되지 않았으며 최적이 아닌 조건에서 예상 할 수 있는 지연 시간 수치를 제공하기 위해 Redis를 실행하는 작은 2 개의 코어 인스턴스에서 실행되었습니다. 메시지는 초당 10k의 속도로 생성되었으며 10 명의 동시 소비자가 동일한 Redis 스트림 및 소비자 그룹의 메시지를 소비하고 승인했습니다.

얻은 결과 :

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%


따라서 요청의 99.9 %는 대기 시간이 2 밀리 초 미만이며 이상 값은 여전히 ​​평균에 매우 가깝습니다. 확인되지 않은 몇 백만 개의 메시지를 스트림에 추가해도 벤치 마크의 요지는 바뀌지 않으며 대부분의 쿼리는 여전히 매우 짧은 대기 시간으로 처리됩니다.


몇 가지 언급 :


  • 여기서 우리는 반복 당 최대 10,000 개의 메시지를 처리했습니다. 이는 XREADGROUP의 COUNT 매개 변수가 10000으로 설정되었음을 의미합니다. 이는 많은 지연 시간을 추가하지만 느린 소비자가 메시지 흐름을 유지할 수 있도록 하기 위해 필요합니다. 따라서 훨씬 더 작은 실제 대기 시간을 기대할 수 있습니다.
  • 이 벤치 마크에 사용 된 시스템은 오늘날의 표준에 비해 매우 느립니다.