视频转码接入队列(asynq)
This commit is contained in:
@@ -14,6 +14,8 @@ type Store interface {
|
||||
IsUniqueViolation(err error) bool
|
||||
IsForeignKeyViolation(err error) bool
|
||||
IsNoRows(err error) bool
|
||||
|
||||
CreateVideoTx(ctx context.Context, arg CreateVideoTxParam) (CreateVideoTxResult, error)
|
||||
}
|
||||
|
||||
type SQLStore struct {
|
||||
|
||||
29
internal/db/tx_create_video.go
Normal file
29
internal/db/tx_create_video.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package db
|
||||
|
||||
import "context"
|
||||
|
||||
type CreateVideoTxParam struct {
|
||||
CreateVideoParams
|
||||
AfterCreate func(video Video) error
|
||||
}
|
||||
|
||||
type CreateVideoTxResult struct {
|
||||
Video Video
|
||||
}
|
||||
|
||||
func (store *SQLStore) CreateVideoTx(ctx context.Context, arg CreateVideoTxParam) (CreateVideoTxResult, error) {
|
||||
var result CreateVideoTxResult
|
||||
|
||||
err := store.ExecTx(ctx, func(q *Queries) error {
|
||||
var err error
|
||||
|
||||
result.Video, err = q.CreateVideo(ctx, arg.CreateVideoParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return arg.AfterCreate(result.Video)
|
||||
})
|
||||
|
||||
return result, err
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
@@ -23,7 +22,6 @@ func (server *Server) homeView(w http.ResponseWriter, r *http.Request) {
|
||||
if len(item.Description) > 65 {
|
||||
temp := strings.TrimSpace(item.Description[0:65]) + "..."
|
||||
item.Description = temp
|
||||
log.Println(item.Description)
|
||||
}
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/zhang2092/mediahls/internal/pkg/config"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/logger"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/token"
|
||||
"github.com/zhang2092/mediahls/internal/worker"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
@@ -31,11 +32,12 @@ type Server struct {
|
||||
router *mux.Router
|
||||
secureCookie *securecookie.SecureCookie
|
||||
|
||||
store db.Store
|
||||
tokenMaker token.Maker
|
||||
store db.Store
|
||||
tokenMaker token.Maker
|
||||
taskDistributor worker.TaskDistributor
|
||||
}
|
||||
|
||||
func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.Store) (*Server, error) {
|
||||
func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) {
|
||||
tokenMaker, err := token.NewPasetoMaker(conf.TokenSymmetricKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create token maker: %w", err)
|
||||
@@ -47,12 +49,13 @@ func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.S
|
||||
// secureCookie.MaxAge(7200)
|
||||
|
||||
server := &Server{
|
||||
templateFS: templateFS,
|
||||
staticFS: staticFS,
|
||||
conf: conf,
|
||||
secureCookie: secureCookie,
|
||||
store: store,
|
||||
tokenMaker: tokenMaker,
|
||||
templateFS: templateFS,
|
||||
staticFS: staticFS,
|
||||
conf: conf,
|
||||
secureCookie: secureCookie,
|
||||
store: store,
|
||||
tokenMaker: tokenMaker,
|
||||
taskDistributor: taskDistributor,
|
||||
}
|
||||
|
||||
server.setupRouter()
|
||||
|
||||
@@ -10,10 +10,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zhang2092/mediahls/internal/db"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/convert"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/fileutil"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/logger"
|
||||
"github.com/zhang2092/mediahls/internal/worker"
|
||||
)
|
||||
|
||||
// obj
|
||||
@@ -137,16 +139,30 @@ func (server *Server) editVideo(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
u := withUser(ctx)
|
||||
if len(vm.ID) == 0 {
|
||||
_, err := server.store.CreateVideo(ctx, db.CreateVideoParams{
|
||||
ID: genId(),
|
||||
Title: vm.Title,
|
||||
Description: vm.Description,
|
||||
Images: vm.Images,
|
||||
OriginLink: vm.OriginLink,
|
||||
PlayLink: "",
|
||||
UserID: u.ID,
|
||||
CreateBy: u.Name,
|
||||
})
|
||||
arg := db.CreateVideoTxParam{
|
||||
CreateVideoParams: db.CreateVideoParams{
|
||||
ID: genId(),
|
||||
Title: vm.Title,
|
||||
Description: vm.Description,
|
||||
Images: vm.Images,
|
||||
OriginLink: vm.OriginLink,
|
||||
PlayLink: "",
|
||||
UserID: u.ID,
|
||||
CreateBy: u.Name,
|
||||
},
|
||||
AfterCreate: func(video db.Video) error {
|
||||
taskPayload := worker.PayloadConvertHLS{
|
||||
Id: video.ID,
|
||||
}
|
||||
opts := []asynq.Option{
|
||||
asynq.MaxRetry(3),
|
||||
asynq.ProcessIn(10 * time.Second),
|
||||
asynq.Queue(worker.QueueCritical),
|
||||
}
|
||||
return server.taskDistributor.DistributeConvertHLS(ctx, &taskPayload, opts...)
|
||||
},
|
||||
}
|
||||
_, err := server.store.CreateVideoTx(ctx, arg)
|
||||
if err != nil {
|
||||
vm.Summary = "添加视频失败"
|
||||
server.renderEditVideo(w, r, vm)
|
||||
@@ -204,6 +220,8 @@ func (server *Server) deleteVideo(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// transfer 视频转码
|
||||
// 已弃用
|
||||
// 在视频添加的时候 同时添加到队列 通过队列去视频转码
|
||||
func (server *Server) transfer(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
xid := vars["xid"]
|
||||
|
||||
@@ -9,6 +9,9 @@ import (
|
||||
type Config struct {
|
||||
DBDriver string `mapstructure:"DB_DRIVER"`
|
||||
DBSource string `mapstructure:"DB_SOURCE"`
|
||||
RDSource string `mapstructure:"RD_SOURCE"`
|
||||
RDPassowrd string `mapstructure:"RD_PWSSWORD"`
|
||||
RDIndex int `mapstructure:"RD_INDEX"`
|
||||
ServerAddress string `mapstructure:"SERVER_ADDRESS"`
|
||||
TokenSymmetricKey string `mapstructure:"TOKEN_SYMMETRIC_KEY"`
|
||||
AccessTokenDuration time.Duration `mapstructure:"ACCESS_TOKEN_DURATION"`
|
||||
|
||||
99
internal/worker/convert_hls.go
Normal file
99
internal/worker/convert_hls.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zhang2092/mediahls/internal/db"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/convert"
|
||||
"github.com/zhang2092/mediahls/internal/pkg/logger"
|
||||
)
|
||||
|
||||
const TaskConvertHLS = "task:convert_hls"
|
||||
|
||||
type PayloadConvertHLS struct {
|
||||
Id string `json:"id"`
|
||||
}
|
||||
|
||||
func (distributor *RedisTaskDistributor) DistributeConvertHLS(
|
||||
ctx context.Context,
|
||||
payload *PayloadConvertHLS,
|
||||
opts ...asynq.Option,
|
||||
) error {
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task payload: %w", err)
|
||||
}
|
||||
|
||||
task := asynq.NewTask(TaskConvertHLS, jsonPayload, opts...)
|
||||
info, err := distributor.client.EnqueueContext(ctx, task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to enqueue task: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("type: %s\n", task.Type())
|
||||
log.Printf("payload: %s\n", task.Payload())
|
||||
log.Printf("queue: %s\n", info.Queue)
|
||||
log.Printf("max_retry: %d\n", info.MaxRetry)
|
||||
log.Printf("enqueued task\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (processor *RedisTaskProcessor) ProcessTaskConvertHLS(
|
||||
ctx context.Context,
|
||||
task *asynq.Task,
|
||||
) error {
|
||||
var payload PayloadConvertHLS
|
||||
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
||||
}
|
||||
|
||||
video, err := processor.store.GetVideo(ctx, payload.Id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get video by id [%s] in db: %w", payload.Id, err)
|
||||
}
|
||||
|
||||
arg := db.UpdateVideoStatusParams{
|
||||
ID: video.ID,
|
||||
Status: 1,
|
||||
UpdateAt: time.Now(),
|
||||
UpdateBy: "任务队列",
|
||||
}
|
||||
video, err = processor.store.UpdateVideoStatus(ctx, arg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to video by id [%s]: in db: %w", payload.Id, err)
|
||||
}
|
||||
|
||||
err = convert.ConvertHLS("media/"+video.ID+"/", strings.TrimPrefix(video.OriginLink, "/"))
|
||||
if err != nil {
|
||||
logger.Logger.Errorf("Convert HLS [%s]-[%s]: %v", video.ID, video.OriginLink, err)
|
||||
arg = db.UpdateVideoStatusParams{
|
||||
ID: video.ID,
|
||||
Status: 2,
|
||||
UpdateAt: time.Now(),
|
||||
UpdateBy: "任务队列",
|
||||
}
|
||||
_, _ = processor.store.UpdateVideoStatus(ctx, arg)
|
||||
return fmt.Errorf("failed to convert hls by [%s]: %w", payload.Id, err)
|
||||
}
|
||||
|
||||
// 转码成功
|
||||
if _, err = processor.store.SetVideoPlay(ctx, db.SetVideoPlayParams{
|
||||
ID: video.ID,
|
||||
Status: 200,
|
||||
PlayLink: "/media/" + video.ID + "/stream/",
|
||||
UpdateAt: time.Now(),
|
||||
UpdateBy: "任务队列",
|
||||
}); err != nil {
|
||||
logger.Logger.Errorf("Set Video Play [%s]-[%s]: %v", video.ID, video.OriginLink, err)
|
||||
return fmt.Errorf("failed to set video [%s] play: %w", video.ID, err)
|
||||
}
|
||||
|
||||
logger.Logger.Infof("[%s]-[%s] 转码完成", video.ID, video.OriginLink)
|
||||
return nil
|
||||
}
|
||||
22
internal/worker/distributor.go
Normal file
22
internal/worker/distributor.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
type TaskDistributor interface {
|
||||
DistributeConvertHLS(ctx context.Context, payload *PayloadConvertHLS, opts ...asynq.Option) error
|
||||
}
|
||||
|
||||
type RedisTaskDistributor struct {
|
||||
client *asynq.Client
|
||||
}
|
||||
|
||||
func NewRedisTaskDistributor(redisOpt asynq.RedisClientOpt) TaskDistributor {
|
||||
client := asynq.NewClient(redisOpt)
|
||||
return &RedisTaskDistributor{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
16
internal/worker/logger.go
Normal file
16
internal/worker/logger.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
)
|
||||
|
||||
type Logger struct{}
|
||||
|
||||
func NewLogger() *Logger {
|
||||
return &Logger{}
|
||||
}
|
||||
|
||||
func (logger *Logger) Printf(ctx context.Context, format string, v ...interface{}) {
|
||||
log.Printf(format, v...)
|
||||
}
|
||||
54
internal/worker/processor.go
Normal file
54
internal/worker/processor.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zhang2092/mediahls/internal/db"
|
||||
)
|
||||
|
||||
const (
|
||||
QueueCritical = "critical"
|
||||
QueueDefault = "default"
|
||||
)
|
||||
|
||||
type TaskProcessor interface {
|
||||
Start() error
|
||||
ProcessTaskConvertHLS(ctx context.Context, task *asynq.Task) error
|
||||
}
|
||||
|
||||
type RedisTaskProcessor struct {
|
||||
server *asynq.Server
|
||||
store db.Store
|
||||
}
|
||||
|
||||
func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) TaskProcessor {
|
||||
logger := NewLogger()
|
||||
redis.SetLogger(logger)
|
||||
|
||||
config := asynq.Config{
|
||||
Concurrency: 2, // 最大并发数量
|
||||
Queues: map[string]int{
|
||||
QueueCritical: 10,
|
||||
QueueDefault: 5,
|
||||
},
|
||||
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
|
||||
log.Printf("type: %s\n", task.Type())
|
||||
log.Printf("payload: %s\n", task.Payload())
|
||||
log.Printf("process task failed\n")
|
||||
}),
|
||||
}
|
||||
server := asynq.NewServer(redisOpt, config)
|
||||
return &RedisTaskProcessor{
|
||||
server: server,
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
func (processor *RedisTaskProcessor) Start() error {
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc(TaskConvertHLS, processor.ProcessTaskConvertHLS)
|
||||
return processor.server.Start(mux)
|
||||
}
|
||||
Reference in New Issue
Block a user