Redis Stream

// XAddIfNotFull 添加数据到流中, 如果流长度超过 maxLen, 则返回 STREAM_MAXLEN_REACHED
func XAddIfNotFull(stream string, maxLen int64, ID string, values map[string]interface{}) (interface{}, error) {
    script := redis.NewScript(`
local stream = KEYS[1]
local max_len = tonumber(ARGV[1])
local id = ARGV[2]

local current_len = redis.call('XLEN', stream)
if current_len >= max_len then
    return redis.error_reply("STREAM_MAXLEN_REACHED")
end

return redis.call('XADD', stream, id, unpack(ARGV, 3))
`)
    args := make([]interface{}, 0, 2+(len(values)*2))
    args = append(args, maxLen, ID)
    for k, v := range values {
        args = append(args, k, v)
    }
    return script.Run(context.Background(), global.Redis, []string{stream}, args...).Result()
}

// StreamConsume Stream 消费
func StreamConsume(stream string, group string, consumer string, handler func(message *redis.XMessage)) error {
    // 创建消费者组
    err := global.Redis.XGroupCreateMkStream(context.Background(), stream, group, "0").Err()
    if err != nil {
        if !strings.Contains(err.Error(), "BUSYGROUP") { // 非消费者组已存在错误消息
            return err
        }
    }

    // 消息条数
    count := 1000

    // 休眠时间(pending)
    pendingSleep := 5 * time.Second

    for {
        // 先处理 pending 中的消息
        for {
            streams, err := global.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
                Group:    group,
                Consumer: consumer,
                Streams:  []string{stream, "0"},
                Count:    int64(count),
                Block:    0,
            }).Result()
            if err != nil {
                return err
            }

            for _, v := range streams {
                for _, message := range v.Messages {
                    handler(&message)
                }
            }

            if len(streams) == 0 {
                return errors.New("redis.XStream 数量错误")
            }

            if len(streams[0].Messages) == 0 {
                break
            }

            time.Sleep(pendingSleep) // 这里建议休眠一会, 防止消息处理失败导致CPU占用过高
        }

        // 处理从未投递给此消费者组的新消息
        streams, err := global.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
            Group:    group,
            Consumer: consumer,
            Streams:  []string{stream, ">"},
            Count:    int64(count),
            Block:    0,
        }).Result()
        if err != nil {
            return err
        }

        for _, v := range streams {
            for _, message := range v.Messages {
                handler(&message)
            }
        }
    }
}