From 4e15e3a29e6c7146d59217a56690194a541ebfa0 Mon Sep 17 00:00:00 2001 From: kenneth Date: Fri, 22 Dec 2023 16:51:09 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=86=E9=A2=91=E8=BD=AC=E7=A0=81=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=E9=98=9F=E5=88=97(asynq)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 5 +- go.mod | 10 +++ go.sum | 66 +++++++++++++++++++ internal/db/store.go | 2 + internal/db/tx_create_video.go | 29 ++++++++ internal/handlers/home.go | 2 - internal/handlers/server.go | 21 +++--- internal/handlers/video.go | 38 ++++++++--- internal/pkg/config/config.go | 3 + internal/worker/convert_hls.go | 99 ++++++++++++++++++++++++++++ internal/worker/distributor.go | 22 +++++++ internal/worker/logger.go | 16 +++++ internal/worker/processor.go | 54 +++++++++++++++ main.go | 21 +++++- web/templates/video/edit.html.tmpl | 2 +- web/templates/video/videos.html.tmpl | 34 ++-------- 16 files changed, 373 insertions(+), 51 deletions(-) create mode 100644 internal/db/tx_create_video.go create mode 100644 internal/worker/convert_hls.go create mode 100644 internal/worker/distributor.go create mode 100644 internal/worker/logger.go create mode 100644 internal/worker/processor.go diff --git a/Makefile b/Makefile index 8d8baf3..a64f1a7 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,9 @@ DB_URL=postgresql://root:secret@localhost:5432/mediahls?sslmode=disable network: docker network create media-hls-network +redis: + docker run --name rd -d -p 6378:6379 redis:7.2.3 --requirepass "secret" + postgres: docker run --name postgres --network media-hls-network -p 5432:5432 -e POSTGRES_USER=root -e POSTGRES_PASSWORD=secret -d postgres:16-alpine @@ -33,4 +36,4 @@ test: server: go run main.go -.PHONY: network postgres createdb dropdb psql migrateup migratedown sqlc test server \ No newline at end of file +.PHONY: network redis postgres createdb dropdb psql migrateup migratedown sqlc test server \ No newline at end of file diff --git a/go.mod b/go.mod index 531c2d3..29a763e 100644 --- a/go.mod +++ b/go.mod @@ -23,13 +23,21 @@ require ( github.com/BurntSushi/toml v1.3.2 // indirect github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-redis/redis/v8 v8.11.5 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.5.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/hibiken/asynq v0.24.1 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/redis/go-redis/v9 v9.3.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -41,6 +49,8 @@ require ( golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 0cfe9fe..692a62e 100644 --- a/go.sum +++ b/go.sum @@ -7,22 +7,39 @@ github.com/aead/chacha20poly1305 v0.0.0-20201124145622-1a5aba2a8b29 h1:1DcvRPZOd github.com/aead/chacha20poly1305 v0.0.0-20201124145622-1a5aba2a8b29/go.mod h1:UzH9IX1MMqOcwhoNOIjmTQeAxrFgzs50j4golQtXXxU= github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 h1:52m0LGchQBBVqJRyYYufQuIbVqRawmubW3OFGqK1ekw= github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635/go.mod h1:lmLxL+FV291OopO93Bwf9fQLQeLyt33VJRUg5VJ30us= +github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/csrf v1.7.2 h1:oTUjx0vyf2T+wkrx09Trsev1TE+/EbDAeHtSTbtC2eI= github.com/gorilla/csrf v1.7.2/go.mod h1:F1Fj3KG23WYHE6gozCmBAezKookxbIvUJT+121wTuLk= github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= @@ -33,8 +50,13 @@ github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kX github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hibiken/asynq v0.24.1 h1:+5iIEAyA9K/lcSPvx3qoPtsKJeKI5u9aOIvUmSsazEw= +github.com/hibiken/asynq v0.24.1/go.mod h1:u5qVeSbrnfT+vtG5Mq8ZPzQu/BmCKMHvTGb91uy9Tts= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -57,6 +79,11 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= +github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= +github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= @@ -69,6 +96,7 @@ github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9yS github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -79,12 +107,15 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -92,16 +123,51 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= diff --git a/internal/db/store.go b/internal/db/store.go index 93a9683..9f57b30 100644 --- a/internal/db/store.go +++ b/internal/db/store.go @@ -14,6 +14,8 @@ type Store interface { IsUniqueViolation(err error) bool IsForeignKeyViolation(err error) bool IsNoRows(err error) bool + + CreateVideoTx(ctx context.Context, arg CreateVideoTxParam) (CreateVideoTxResult, error) } type SQLStore struct { diff --git a/internal/db/tx_create_video.go b/internal/db/tx_create_video.go new file mode 100644 index 0000000..ce6025e --- /dev/null +++ b/internal/db/tx_create_video.go @@ -0,0 +1,29 @@ +package db + +import "context" + +type CreateVideoTxParam struct { + CreateVideoParams + AfterCreate func(video Video) error +} + +type CreateVideoTxResult struct { + Video Video +} + +func (store *SQLStore) CreateVideoTx(ctx context.Context, arg CreateVideoTxParam) (CreateVideoTxResult, error) { + var result CreateVideoTxResult + + err := store.ExecTx(ctx, func(q *Queries) error { + var err error + + result.Video, err = q.CreateVideo(ctx, arg.CreateVideoParams) + if err != nil { + return err + } + + return arg.AfterCreate(result.Video) + }) + + return result, err +} diff --git a/internal/handlers/home.go b/internal/handlers/home.go index 088fcba..c70b980 100644 --- a/internal/handlers/home.go +++ b/internal/handlers/home.go @@ -1,7 +1,6 @@ package handlers import ( - "log" "net/http" "strings" @@ -23,7 +22,6 @@ func (server *Server) homeView(w http.ResponseWriter, r *http.Request) { if len(item.Description) > 65 { temp := strings.TrimSpace(item.Description[0:65]) + "..." item.Description = temp - log.Println(item.Description) } result = append(result, item) } diff --git a/internal/handlers/server.go b/internal/handlers/server.go index 17d3a8f..a46683e 100644 --- a/internal/handlers/server.go +++ b/internal/handlers/server.go @@ -21,6 +21,7 @@ import ( "github.com/zhang2092/mediahls/internal/pkg/config" "github.com/zhang2092/mediahls/internal/pkg/logger" "github.com/zhang2092/mediahls/internal/pkg/token" + "github.com/zhang2092/mediahls/internal/worker" ) type Server struct { @@ -31,11 +32,12 @@ type Server struct { router *mux.Router secureCookie *securecookie.SecureCookie - store db.Store - tokenMaker token.Maker + store db.Store + tokenMaker token.Maker + taskDistributor worker.TaskDistributor } -func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.Store) (*Server, error) { +func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) { tokenMaker, err := token.NewPasetoMaker(conf.TokenSymmetricKey) if err != nil { return nil, fmt.Errorf("cannot create token maker: %w", err) @@ -47,12 +49,13 @@ func NewServer(templateFS fs.FS, staticFS fs.FS, conf *config.Config, store db.S // secureCookie.MaxAge(7200) server := &Server{ - templateFS: templateFS, - staticFS: staticFS, - conf: conf, - secureCookie: secureCookie, - store: store, - tokenMaker: tokenMaker, + templateFS: templateFS, + staticFS: staticFS, + conf: conf, + secureCookie: secureCookie, + store: store, + tokenMaker: tokenMaker, + taskDistributor: taskDistributor, } server.setupRouter() diff --git a/internal/handlers/video.go b/internal/handlers/video.go index d020c31..f60a0d3 100644 --- a/internal/handlers/video.go +++ b/internal/handlers/video.go @@ -10,10 +10,12 @@ import ( "time" "github.com/gorilla/mux" + "github.com/hibiken/asynq" "github.com/zhang2092/mediahls/internal/db" "github.com/zhang2092/mediahls/internal/pkg/convert" "github.com/zhang2092/mediahls/internal/pkg/fileutil" "github.com/zhang2092/mediahls/internal/pkg/logger" + "github.com/zhang2092/mediahls/internal/worker" ) // obj @@ -137,16 +139,30 @@ func (server *Server) editVideo(w http.ResponseWriter, r *http.Request) { ctx := r.Context() u := withUser(ctx) if len(vm.ID) == 0 { - _, err := server.store.CreateVideo(ctx, db.CreateVideoParams{ - ID: genId(), - Title: vm.Title, - Description: vm.Description, - Images: vm.Images, - OriginLink: vm.OriginLink, - PlayLink: "", - UserID: u.ID, - CreateBy: u.Name, - }) + arg := db.CreateVideoTxParam{ + CreateVideoParams: db.CreateVideoParams{ + ID: genId(), + Title: vm.Title, + Description: vm.Description, + Images: vm.Images, + OriginLink: vm.OriginLink, + PlayLink: "", + UserID: u.ID, + CreateBy: u.Name, + }, + AfterCreate: func(video db.Video) error { + taskPayload := worker.PayloadConvertHLS{ + Id: video.ID, + } + opts := []asynq.Option{ + asynq.MaxRetry(3), + asynq.ProcessIn(10 * time.Second), + asynq.Queue(worker.QueueCritical), + } + return server.taskDistributor.DistributeConvertHLS(ctx, &taskPayload, opts...) + }, + } + _, err := server.store.CreateVideoTx(ctx, arg) if err != nil { vm.Summary = "添加视频失败" server.renderEditVideo(w, r, vm) @@ -204,6 +220,8 @@ func (server *Server) deleteVideo(w http.ResponseWriter, r *http.Request) { } // transfer 视频转码 +// 已弃用 +// 在视频添加的时候 同时添加到队列 通过队列去视频转码 func (server *Server) transfer(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) xid := vars["xid"] diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index cc1b368..1935efa 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -9,6 +9,9 @@ import ( type Config struct { DBDriver string `mapstructure:"DB_DRIVER"` DBSource string `mapstructure:"DB_SOURCE"` + RDSource string `mapstructure:"RD_SOURCE"` + RDPassowrd string `mapstructure:"RD_PWSSWORD"` + RDIndex int `mapstructure:"RD_INDEX"` ServerAddress string `mapstructure:"SERVER_ADDRESS"` TokenSymmetricKey string `mapstructure:"TOKEN_SYMMETRIC_KEY"` AccessTokenDuration time.Duration `mapstructure:"ACCESS_TOKEN_DURATION"` diff --git a/internal/worker/convert_hls.go b/internal/worker/convert_hls.go new file mode 100644 index 0000000..cbbc003 --- /dev/null +++ b/internal/worker/convert_hls.go @@ -0,0 +1,99 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + "time" + + "github.com/hibiken/asynq" + "github.com/zhang2092/mediahls/internal/db" + "github.com/zhang2092/mediahls/internal/pkg/convert" + "github.com/zhang2092/mediahls/internal/pkg/logger" +) + +const TaskConvertHLS = "task:convert_hls" + +type PayloadConvertHLS struct { + Id string `json:"id"` +} + +func (distributor *RedisTaskDistributor) DistributeConvertHLS( + ctx context.Context, + payload *PayloadConvertHLS, + opts ...asynq.Option, +) error { + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal task payload: %w", err) + } + + task := asynq.NewTask(TaskConvertHLS, jsonPayload, opts...) + info, err := distributor.client.EnqueueContext(ctx, task) + if err != nil { + return fmt.Errorf("failed to enqueue task: %w", err) + } + + log.Printf("type: %s\n", task.Type()) + log.Printf("payload: %s\n", task.Payload()) + log.Printf("queue: %s\n", info.Queue) + log.Printf("max_retry: %d\n", info.MaxRetry) + log.Printf("enqueued task\n") + return nil +} + +func (processor *RedisTaskProcessor) ProcessTaskConvertHLS( + ctx context.Context, + task *asynq.Task, +) error { + var payload PayloadConvertHLS + if err := json.Unmarshal(task.Payload(), &payload); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } + + video, err := processor.store.GetVideo(ctx, payload.Id) + if err != nil { + return fmt.Errorf("failed to get video by id [%s] in db: %w", payload.Id, err) + } + + arg := db.UpdateVideoStatusParams{ + ID: video.ID, + Status: 1, + UpdateAt: time.Now(), + UpdateBy: "任务队列", + } + video, err = processor.store.UpdateVideoStatus(ctx, arg) + if err != nil { + return fmt.Errorf("failed to video by id [%s]: in db: %w", payload.Id, err) + } + + err = convert.ConvertHLS("media/"+video.ID+"/", strings.TrimPrefix(video.OriginLink, "/")) + if err != nil { + logger.Logger.Errorf("Convert HLS [%s]-[%s]: %v", video.ID, video.OriginLink, err) + arg = db.UpdateVideoStatusParams{ + ID: video.ID, + Status: 2, + UpdateAt: time.Now(), + UpdateBy: "任务队列", + } + _, _ = processor.store.UpdateVideoStatus(ctx, arg) + return fmt.Errorf("failed to convert hls by [%s]: %w", payload.Id, err) + } + + // 转码成功 + if _, err = processor.store.SetVideoPlay(ctx, db.SetVideoPlayParams{ + ID: video.ID, + Status: 200, + PlayLink: "/media/" + video.ID + "/stream/", + UpdateAt: time.Now(), + UpdateBy: "任务队列", + }); err != nil { + logger.Logger.Errorf("Set Video Play [%s]-[%s]: %v", video.ID, video.OriginLink, err) + return fmt.Errorf("failed to set video [%s] play: %w", video.ID, err) + } + + logger.Logger.Infof("[%s]-[%s] 转码完成", video.ID, video.OriginLink) + return nil +} diff --git a/internal/worker/distributor.go b/internal/worker/distributor.go new file mode 100644 index 0000000..244394e --- /dev/null +++ b/internal/worker/distributor.go @@ -0,0 +1,22 @@ +package worker + +import ( + "context" + + "github.com/hibiken/asynq" +) + +type TaskDistributor interface { + DistributeConvertHLS(ctx context.Context, payload *PayloadConvertHLS, opts ...asynq.Option) error +} + +type RedisTaskDistributor struct { + client *asynq.Client +} + +func NewRedisTaskDistributor(redisOpt asynq.RedisClientOpt) TaskDistributor { + client := asynq.NewClient(redisOpt) + return &RedisTaskDistributor{ + client: client, + } +} diff --git a/internal/worker/logger.go b/internal/worker/logger.go new file mode 100644 index 0000000..426a807 --- /dev/null +++ b/internal/worker/logger.go @@ -0,0 +1,16 @@ +package worker + +import ( + "context" + "log" +) + +type Logger struct{} + +func NewLogger() *Logger { + return &Logger{} +} + +func (logger *Logger) Printf(ctx context.Context, format string, v ...interface{}) { + log.Printf(format, v...) +} diff --git a/internal/worker/processor.go b/internal/worker/processor.go new file mode 100644 index 0000000..90b067e --- /dev/null +++ b/internal/worker/processor.go @@ -0,0 +1,54 @@ +package worker + +import ( + "context" + "log" + + "github.com/go-redis/redis/v8" + "github.com/hibiken/asynq" + "github.com/zhang2092/mediahls/internal/db" +) + +const ( + QueueCritical = "critical" + QueueDefault = "default" +) + +type TaskProcessor interface { + Start() error + ProcessTaskConvertHLS(ctx context.Context, task *asynq.Task) error +} + +type RedisTaskProcessor struct { + server *asynq.Server + store db.Store +} + +func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) TaskProcessor { + logger := NewLogger() + redis.SetLogger(logger) + + config := asynq.Config{ + Concurrency: 2, // 最大并发数量 + Queues: map[string]int{ + QueueCritical: 10, + QueueDefault: 5, + }, + ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { + log.Printf("type: %s\n", task.Type()) + log.Printf("payload: %s\n", task.Payload()) + log.Printf("process task failed\n") + }), + } + server := asynq.NewServer(redisOpt, config) + return &RedisTaskProcessor{ + server: server, + store: store, + } +} + +func (processor *RedisTaskProcessor) Start() error { + mux := asynq.NewServeMux() + mux.HandleFunc(TaskConvertHLS, processor.ProcessTaskConvertHLS) + return processor.server.Start(mux) +} diff --git a/main.go b/main.go index ec19d83..e013eec 100644 --- a/main.go +++ b/main.go @@ -6,10 +6,12 @@ import ( "io/fs" "log" + "github.com/hibiken/asynq" "github.com/zhang2092/mediahls/internal/db" "github.com/zhang2092/mediahls/internal/handlers" "github.com/zhang2092/mediahls/internal/pkg/config" "github.com/zhang2092/mediahls/internal/pkg/logger" + "github.com/zhang2092/mediahls/internal/worker" ) //go:embed web/templates @@ -44,7 +46,15 @@ func main() { } store := db.NewStore(conn) - server, err := handlers.NewServer(templates, statics, config, store) + redisOpt := asynq.RedisClientOpt{ + Addr: config.RDSource, + Password: config.RDPassowrd, + DB: config.RDIndex, + } + + taskDistributor := worker.NewRedisTaskDistributor(redisOpt) + go runTaskProcessor(redisOpt, store) + server, err := handlers.NewServer(templates, statics, config, store, taskDistributor) if err != nil { log.Fatal("cannot create server: ", err) } @@ -60,3 +70,12 @@ func main() { // log.Println("ok") } + +func runTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) { + taskProcessor := worker.NewRedisTaskProcessor(redisOpt, store) + log.Printf("task processor start\n") + err := taskProcessor.Start() + if err != nil { + log.Fatal("failed to start task processor: %w", err) + } +} diff --git a/web/templates/video/edit.html.tmpl b/web/templates/video/edit.html.tmpl index 52128db..20a7a88 100644 --- a/web/templates/video/edit.html.tmpl +++ b/web/templates/video/edit.html.tmpl @@ -98,7 +98,7 @@ {{end}} - + {{end}} diff --git a/web/templates/video/videos.html.tmpl b/web/templates/video/videos.html.tmpl index 00f8003..40ad4da 100644 --- a/web/templates/video/videos.html.tmpl +++ b/web/templates/video/videos.html.tmpl @@ -12,25 +12,24 @@
{{.Title}}

{{.Description}}

- + {{if not (eq .Status 1)}} 编辑 {{end}} - {{if eq .Status 0}} - - {{else if eq .Status 200}} + {{if eq .Status 200}} 播放 {{end}}
{{if eq .Status 1}} +

等待队列处理...

+ {{else if eq .Status 1}}

转码中...

{{else if eq .Status 2}}

转码失败

{{end}} -

@@ -43,25 +42,6 @@ {{define "js"}}