// XAddIfNotFull 添加数据到流中, 如果流长度超过 maxLen, 则返回 STREAM_MAXLEN_REACHED
func XAddIfNotFull(stream string, maxLen int64, ID string, values map[string]interface{}) (interface{}, error) {
script := redis.NewScript(`
local stream = KEYS[1]
local max_len = tonumber(ARGV[1])
local id = ARGV[2]
local current_len = redis.call('XLEN', stream)
if current_len >= max_len then
return redis.error_reply("STREAM_MAXLEN_REACHED")
end
return redis.call('XADD', stream, id, unpack(ARGV, 3))
`)
args := make([]interface{}, 0, 2+(len(values)*2))
args = append(args, maxLen, ID)
for k, v := range values {
args = append(args, k, v)
}
return script.Run(context.Background(), global.Redis, []string{stream}, args...).Result()
}
// StreamConsume Stream 消费
func StreamConsume(stream string, group string, consumer string, handler func(message *redis.XMessage)) error {
// 创建消费者组
err := global.Redis.XGroupCreateMkStream(context.Background(), stream, group, "0").Err()
if err != nil {
if !strings.Contains(err.Error(), "BUSYGROUP") { // 非消费者组已存在错误消息
return err
}
}
// 消息条数
count := 1000
// 休眠时间(pending)
pendingSleep := 5 * time.Second
for {
// 先处理 pending 中的消息
for {
streams, err := global.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, "0"},
Count: int64(count),
Block: 0,
}).Result()
if err != nil {
return err
}
for _, v := range streams {
for _, message := range v.Messages {
handler(&message)
}
}
if len(streams) == 0 {
return errors.New("redis.XStream 数量错误")
}
if len(streams[0].Messages) == 0 {
break
}
time.Sleep(pendingSleep) // 这里建议休眠一会, 防止消息处理失败导致CPU占用过高
}
// 处理从未投递给此消费者组的新消息
streams, err := global.Redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, ">"},
Count: int64(count),
Block: 0,
}).Result()
if err != nil {
return err
}
for _, v := range streams {
for _, message := range v.Messages {
handler(&message)
}
}
}
}