Category 竞猜大厅

实现原理

通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。

生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。

消费者实现

package main

import (

"context"

"time"

"github.com/IBM/sarama"

"github.com/sirupsen/logrus"

)

// 定义每个topic对应的延迟时间(ms)

var topicDelayConfig = map[string]time.Duration{

"delay-100ms": 100 * time.Millisecond,

"delay-200ms": 200 * time.Millisecond,

"delay-500ms": 500 * time.Millisecond,

"delay-1000ms": 1000 * time.Millisecond,

}

type delayConsumerHandler struct {

// 可以添加必要的依赖,如业务处理器等

}

func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {

logrus.Info("延迟队列消费者初始化完成")

return nil

}

func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {

logrus.Info("延迟队列消费者清理完成")

return nil

}

// ConsumeClaim 处理分区消息,实现延迟逻辑

func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

topic := claim.Topic()

delay, exists := topicDelayConfig[topic]

if !exists {

logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)

// 标记所有消息为已消费,避免重复处理

for range claim.Messages() {

sess.MarkMessage(msg, "")

}

return nil

}

// 按顺序处理消息(假设消息时间有序)

for msg := range claim.Messages() {

// 检查会话是否已关闭(如重平衡发生)

select {

case <-sess.Context().Done():

logrus.Info("会话已关闭,停止消费")

return nil

default:

}

// 计算需要延迟的时间

// 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间

produceTime := msg.Timestamp

processTime := produceTime.Add(delay)

now := time.Now()

// 如果当前时间未到处理时间,计算需要休眠的时间

if now.Before(processTime) {

sleepDuration := processTime.Sub(now)

logrus.Debugf(

"消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",

topic, msg.Offset, sleepDuration, produceTime, processTime,

)

// 休眠期间监听会话关闭信号,避免阻塞重平衡

select {

case <-sess.Context().Done():

logrus.Info("休眠期间会话关闭,停止消费")

return nil

case <-time.After(sleepDuration):

// 休眠完成,继续处理

}

}

// 延迟时间已到,处理消息

h.processMessage(msg)

// 标记消息为已消费

sess.MarkMessage(msg, "")

}

return nil

}

// 实际业务处理逻辑

func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {

logrus.Infof(

"处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",

msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,

)

// 这里添加实际的业务处理代码

}

// 初始化消费者示例

func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {

config := sarama.NewConfig()

config.Version = sarama.V2_8_1_0 // 指定Kafka版本

config.Consumer.Return.Errors = true

config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

// 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)

config.Consumer.Fetch.Min = 1

config.Consumer.Fetch.Default = 1024 * 1024

return sarama.NewConsumerGroup(brokers, groupID, config)

}

func main() {

brokers := []string{"localhost:9092"}

groupID := "delay-queue-group"

topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}

consumer, err := newDelayConsumer(brokers, groupID)

if err != nil {

logrus.Fatalf("创建消费者失败: %v", err)

}

defer consumer.Close()

handler := &delayConsumerHandler{}

ctx := context.Background()

// 持续消费

for {

if err := consumer.Consume(ctx, topics, handler); err != nil {

logrus.Errorf("消费出错: %v", err)

// 简单重试逻辑

time.Sleep(5 * time.Second)

}

}

}

生产者实现

package main

import (

"errors"

"time"

"github.com/IBM/sarama"

"github.com/sirupsen/logrus"

)

// 定义允许的延迟时长(毫秒)及其对应的Topic

var allowedDelays = map[time.Duration]string{

100 * time.Millisecond: "delay-100ms",

200 * time.Millisecond: "delay-200ms",

500 * time.Millisecond: "delay-500ms",

1000 * time.Millisecond: "delay-1000ms",

// 可根据需要添加更多允许的延迟时长

}

// DelayProducer 延迟消息生产者

type DelayProducer struct {

producer sarama.SyncProducer

}

// NewDelayProducer 创建延迟消息生产者实例

func NewDelayProducer(brokers []string) (*DelayProducer, error) {

config := sarama.NewConfig()

config.Version = sarama.V2_8_1_0 // 匹配Kafka版本

config.Producer.RequiredAcks = sarama.WaitForAll

config.Producer.Retry.Max = 3

config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer(brokers, config)

if err != nil {

return nil, err

}

return &DelayProducer{

producer: producer,

}, nil

}

// SendDelayMessage 发送延迟消息

// 参数:

// - key: 消息键

// - value: 消息内容

// - delay: 延迟时长

// 返回:

// - 消息的分区和偏移量

// - 错误信息(若延迟不合法或发送失败)

func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {

// 1. 校验延迟时长是否合法

topic, ok := allowedDelays[delay]

if !ok {

return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")

}

// 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)

msg := &sarama.ProducerMessage{

Topic: topic,

Key: sarama.ByteEncoder(key),

Value: sarama.ByteEncoder(value),

Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间

}

// 3. 发送消息

partition, offset, err = p.producer.SendMessage(msg)

if err != nil {

logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)

return 0, 0, err

}

logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",

topic, partition, offset, delay)

return partition, offset, nil

}

// Close 关闭生产者

func (p *DelayProducer) Close() error {

return p.producer.Close()

}

// 使用示例

func main() {

// 初始化生产者

producer, err := NewDelayProducer([]string{"localhost:9092"})

if err != nil {

logrus.Fatalf("初始化生产者失败: %v", err)

}

defer producer.Close()

// 发送合法延迟消息

_, _, err = producer.SendDelayMessage(

[]byte("test-key"),

[]byte("这是一条延迟消息"),

100*time.Millisecond, // 合法延迟

)

if err != nil {

logrus.Error("发送消息失败:", err)

}

// 尝试发送非法延迟消息(会被拒绝)

_, _, err = producer.SendDelayMessage(

[]byte("test-key"),

[]byte("这是一条非法延迟消息"),

300*time.Millisecond, // 不允许的延迟

)

if err != nil {

logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误

}

}

Copyright © 2088 竞技暴风-网游赛事活动中心 All Rights Reserved.
友情链接