This commit is contained in:
2025-06-18 17:44:49 +08:00
parent b171122a32
commit 0878a4e6de
66 changed files with 2841 additions and 1423 deletions

21
internal/tasks/asynq.go Normal file
View File

@@ -0,0 +1,21 @@
package tasks
import "github.com/redis/go-redis/v9"
// RedisClientConnector 是一个适配器,它包装了现有的 redis.Client
// 并实现了 asynq.RedisConnOpt 接口。
type RedisClientConnector struct {
Client *redis.Client
}
func NewRedisClientConnector(c *redis.Client) *RedisClientConnector {
return &RedisClientConnector{
Client: c,
}
}
// MakeRedisClient 实现了 asynq.RedisConnOpt 接口。
// 它直接返回已存在的客户端实例。
func (c *RedisClientConnector) MakeRedisClient() interface{} {
return c.Client
}

59
internal/tasks/audit.go Normal file
View File

@@ -0,0 +1,59 @@
package tasks
import (
"context"
"encoding/json"
"fmt"
"management/internal/erpserver/model/system"
"github.com/hibiken/asynq"
)
const TaskConsumeAuditLog = "task:audit"
type PayloadConsumeAuditLog struct {
*system.AuditLog
}
func (d *RedisTaskDistributor) DistributeTaskConsumeAuditLog(
ctx context.Context,
payload *PayloadConsumeAuditLog,
opts ...asynq.Option,
) error {
jsonPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal task payload: %w", err)
}
task := asynq.NewTask(TaskConsumeAuditLog, jsonPayload, opts...)
_, err = d.client.EnqueueContext(ctx, task)
if err != nil {
return fmt.Errorf("failed to enqueue task: %w", err)
}
//d.log.Info("enqueued task",
// zap.String("type", task.Type()),
// zap.Binary("payload", task.Payload()),
// zap.String("queue", info.Queue),
// zap.Int("max_retry", info.MaxRetry),
//)
return nil
}
func (p *RedisTaskProcessor) ProcessTaskConsumeAuditLog(ctx context.Context, task *asynq.Task) error {
var payload PayloadConsumeAuditLog
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", asynq.SkipRetry)
}
if err := p.auditService.Create(ctx, payload.AuditLog); err != nil {
return fmt.Errorf("failed to process task: %w", err)
}
//p.log.Info("processed task",
// zap.String("type", task.Type()),
// zap.Binary("payload", task.Payload()),
//)
return nil
}

View File

@@ -0,0 +1,25 @@
package tasks
import (
"context"
"github.com/drhin/logger"
"github.com/hibiken/asynq"
)
type TaskDistributor interface {
DistributeTaskConsumeAuditLog(ctx context.Context, payload *PayloadConsumeAuditLog, opts ...asynq.Option) error
}
type RedisTaskDistributor struct {
log *logger.Logger
client *asynq.Client
}
func NewRedisTaskDistributor(log *logger.Logger, opt *RedisClientConnector) TaskDistributor {
client := asynq.NewClient(opt)
return &RedisTaskDistributor{
log: log,
client: client,
}
}

View File

@@ -0,0 +1,65 @@
package tasks
import (
"context"
v1 "management/internal/erpserver/service/v1"
"github.com/drhin/logger"
"github.com/hibiken/asynq"
"go.uber.org/zap"
)
const (
QueueCritical = "critical"
QueueDefault = "default"
)
type TaskProcessor interface {
Start() error
Shutdown()
ProcessTaskConsumeAuditLog(ctx context.Context, task *asynq.Task) error
}
type RedisTaskProcessor struct {
log *logger.Logger
server *asynq.Server
auditService v1.AuditLogService
}
func NewRedisTaskProcessor(log *logger.Logger, opt *RedisClientConnector, auditService v1.AuditLogService) TaskProcessor {
server := asynq.NewServer(
opt,
asynq.Config{
Queues: map[string]int{
QueueCritical: 10,
QueueDefault: 5,
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
log.Error("process task failed", err,
zap.String("type", task.Type()),
zap.Binary("payload", task.Payload()),
)
}),
},
)
return &RedisTaskProcessor{
log: log,
server: server,
auditService: auditService,
}
}
func (p *RedisTaskProcessor) Start() error {
mux := asynq.NewServeMux()
mux.HandleFunc(TaskConsumeAuditLog, p.ProcessTaskConsumeAuditLog)
return p.server.Start(mux)
}
func (p *RedisTaskProcessor) Shutdown() {
p.server.Shutdown()
}