Skip to content

Commit bb7a65b

Browse files
authored
Merge pull request #39 from OdyseeTeam/implement-multiple-storage
Implement multiple storage
2 parents 150aa85 + 0ca5a4e commit bb7a65b

22 files changed

+433
-393
lines changed

.github/workflows/test.yaml

+5-7
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@ jobs:
1616
- uses: actions/checkout@v4
1717
with:
1818
fetch-depth: 0
19-
- run: |
20-
docker compose up -d minio db redis &&
21-
docker compose up -d cworker conductor &&
22-
docker compose up minio-prepare
2319
- uses: actions/setup-go@v5
2420
with:
25-
go-version: "1.22"
21+
go-version: "1.23"
2622
- uses: FedericoCarboni/setup-ffmpeg@v3
2723
id: setup-ffmpeg
2824
with:
2925
ffmpeg-version: "6.1.0"
26+
- run: |
27+
make test_prepare
3028
- name: Run tests
3129
run: go test -covermode=count -coverprofile=coverage.out ./...
3230
golangci:
@@ -36,10 +34,10 @@ jobs:
3634
- uses: actions/checkout@v4
3735
- uses: actions/setup-go@v5
3836
with:
39-
go-version: "1.22"
37+
go-version: "1.23"
4038
cache: false
4139
- name: golangci-lint
4240
uses: golangci/golangci-lint-action@v6
4341
with:
44-
version: v1.58
42+
version: v1.63
4543
args: -v

Makefile

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,22 @@
1-
CC=x86_64-linux-musl-gcc
2-
CXX=x86_64-linux-musl-g++
3-
GOARCH=amd64
4-
GOOS=linux
5-
LDFLAGS=-ldflags "-linkmode external -extldflags -static"
6-
GO_BUILD=go build
7-
BUILD_DIR=dist
8-
LOCAL_ARCH=$(shell uname)
9-
VERSION := $(shell git describe --tags --match 'v*'|sed -e 's/v//')
1+
LOCAL_ARCH ?= $(shell uname)
2+
VERSION ?= $(shell git describe --tags --match 'v*'|sed -e 's/v//')
103
TRANSCODER_VERSION ?= $(shell git describe --tags --match 'transcoder-v*'|sed 's/transcoder-v\([0-9.]*\).*/\1/')
4+
BUILD_DIR ?= dist
5+
GOOS ?= linux
6+
GOARCH ?= amd64
7+
GO_BUILD ?= go build
118

12-
transcoder: $(BUILD_DIR)/$(GOOS)_$(GOARCH)/transcoder
9+
transcoder:
1310
GOARCH=$(GOARCH) GOOS=$(GOOS) CGO_ENABLED=0 \
1411
$(GO_BUILD) -o $(BUILD_DIR)/$(GOOS)_$(GOARCH)/transcoder \
1512
-ldflags "-s -w -X github.com/OdyseeTeam/transcoder/internal/version.Version=$(TRANSCODER_VERSION)" \
1613
./pkg/conductor/cmd/
1714

18-
conductor_image:
15+
conductor_image: $(BUILD_DIR)/$(GOOS)_$(GOARCH)/transcoder
1916
docker buildx build -f docker/Dockerfile-conductor -t odyseeteam/transcoder-conductor:$(TRANSCODER_VERSION) --platform linux/amd64 .
2017
docker tag odyseeteam/transcoder-conductor:$(TRANSCODER_VERSION) odyseeteam/transcoder-conductor:latest
2118

22-
cworker_image:
19+
cworker_image: $(BUILD_DIR)/$(GOOS)_$(GOARCH)/transcoder
2320
docker buildx build -f docker/Dockerfile-cworker -t odyseeteam/transcoder-cworker:$(TRANSCODER_VERSION) --platform linux/amd64 .
2421
docker tag odyseeteam/transcoder-cworker:$(TRANSCODER_VERSION) odyseeteam/transcoder-cworker:latest
2522

@@ -30,9 +27,12 @@ test_down:
3027
docker-compose down
3128

3229
test_prepare:
33-
docker-compose up -d minio db redis
34-
docker-compose up -d cworker conductor
35-
docker-compose up minio-prepare
30+
make transcoder
31+
make conductor_image
32+
make cworker_image
33+
docker compose -p transcoder up -d minio db redis
34+
docker compose -p transcoder up -d cworker conductor
35+
docker compose -p transcoder up minio-prepare
3636

3737
test: test_prepare
3838
go test -covermode=count -coverprofile=coverage.out ./...

conductor.ex.yml

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1-
S3:
2-
Name: local
3-
Endpoint: http://minio:9000
4-
Bucket: transcoded
5-
Key: ody
6-
Secret: odyseetes3
7-
MaxSize: 1TB
1+
Storages:
2+
- Name: local
3+
Type: S3
4+
Endpoint: http://minio:9000
5+
Region: us-east-1
6+
Bucket: transcoded
7+
Key: ody
8+
Secret: odyseetes3
9+
MaxSize: 1TB
10+
CreateBucket: true
11+
- Name: remote
12+
Type: S3
13+
Endpoint: https://s3.wasabisys.com
14+
Region: us-east-1
15+
Bucket: production-videos
16+
Key: AKIAXXXXXXXXXXXXXXXX
17+
Secret: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
18+
MaxSize: 5TB
19+
CreateBucket: false
820

921
AdaptiveQueue:
1022
MinHits: 1

docker-compose.yml

+7-10
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
version: '3.7'
2-
31
services:
42
conductor:
5-
image: odyseeteam/transcoder-conductor:24.2.3
3+
image: odyseeteam/transcoder-conductor:latest
64
platform: linux/amd64
7-
container_name: tc-conductor
5+
container_name: conductor
86
command:
97
- ./transcoder
108
- conductor
@@ -26,9 +24,9 @@ services:
2624
max_attempts: 3
2725
window: 120s
2826
cworker:
29-
image: odyseeteam/transcoder-cworker:24.2.3
27+
image: odyseeteam/transcoder-cworker:latest
3028
platform: linux/amd64
31-
container_name: tc-cworker
29+
container_name: cworker
3230
command:
3331
- ./transcoder
3432
- worker
@@ -48,7 +46,7 @@ services:
4846
# - '9090:8080'
4947
redis:
5048
image: redis:7.0
51-
container_name: tc-redis
49+
container_name: redis
5250
ports:
5351
- '6379:6379'
5452
volumes:
@@ -60,7 +58,7 @@ services:
6058
restart: unless-stopped
6159
db:
6260
image: postgres:14
63-
container_name: tc-db
61+
container_name: db
6462
ports:
6563
- "5432:5432"
6664
environment:
@@ -78,7 +76,7 @@ services:
7876
retries: 5
7977
minio:
8078
image: minio/minio
81-
container_name: tc-minio
79+
container_name: minio
8280
ports:
8381
- "9000:9000"
8482
- "38861:38861"
@@ -104,7 +102,6 @@ services:
104102
depends_on: ["minio"]
105103

106104
volumes:
107-
108105
db-data:
109106
minio-data:
110107
redis-data:

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/OdyseeTeam/transcoder
22

3-
go 1.22
3+
go 1.23
44

55
require (
66
github.com/Pallinder/go-randomdata v1.2.0

internal/config/config.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
8+
"github.com/spf13/viper"
9+
)
10+
11+
type Storages []S3Config
12+
13+
type ConductorConfig struct {
14+
Storages Storages
15+
Redis string
16+
AdaptiveQueue AdaptiveQueue
17+
Library Library
18+
}
19+
20+
type WorkerConfig struct {
21+
Storage S3Config
22+
Redis string
23+
EdgeToken string
24+
}
25+
26+
type S3Config struct {
27+
Type string
28+
Name string
29+
Endpoint string
30+
Region string
31+
Bucket string
32+
Key string
33+
Secret string
34+
MaxSize string
35+
CreateBucket bool
36+
}
37+
38+
type AdaptiveQueue struct {
39+
MinHits int
40+
}
41+
42+
type Library struct {
43+
DSN string
44+
ManagerToken string
45+
}
46+
47+
func ProjectRoot() (string, error) {
48+
ex, err := os.Executable()
49+
if err != nil {
50+
return "", err
51+
}
52+
return filepath.Dir(ex), nil
53+
}
54+
55+
func Read(name string, cfg any) error {
56+
v := viper.New()
57+
v.SetConfigName(name)
58+
59+
pp, err := ProjectRoot()
60+
if err != nil {
61+
return err
62+
}
63+
v.AddConfigPath(pp)
64+
v.AddConfigPath(".")
65+
66+
if err := v.ReadInConfig(); err != nil {
67+
return fmt.Errorf("fatal error reading config file: %w", err)
68+
}
69+
70+
if err := v.Unmarshal(cfg); err != nil {
71+
return fmt.Errorf("unable to decode into struct: %w", err)
72+
}
73+
74+
return nil
75+
}
76+
77+
func ReadConductorConfig() (*ConductorConfig, error) {
78+
cfg := &ConductorConfig{}
79+
return cfg, Read("conductor", cfg)
80+
}
81+
82+
func ReadWorkerConfig() (*WorkerConfig, error) {
83+
cfg := &WorkerConfig{}
84+
return cfg, Read("worker", cfg)
85+
}
86+
87+
func (s Storages) Endpoints() []string {
88+
endpoints := []string{}
89+
for _, v := range s {
90+
endpoints = append(endpoints, v.Endpoint)
91+
}
92+
return endpoints
93+
}

library/library.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,33 @@ const (
2222
)
2323

2424
var ErrStreamNotFound = errors.New("stream not found")
25-
var storageURLs = map[string]string{
26-
"wasabi": "https://s3.wasabisys.com/t-na2.odycdn.com",
27-
"legacy": "https://na-storage-1.transcoder.odysee.com/t-na",
28-
}
2925

3026
type Storage interface {
3127
Name() string
32-
GetURL(tid string) string
28+
GetURL(item string) string
3329
Put(stream *Stream, _ bool) error
3430
Delete(tid string) error
3531
DeleteFragments(tid string, fragments []string) error
3632
// GetFragment(sdHash, name string) (storage.StreamFragment, error)
3733
}
3834

3935
type Library struct {
40-
db *db.Queries
41-
storage Storage
42-
log logging.KVLogger
36+
db *db.Queries
37+
storages map[string]Storage
38+
log logging.KVLogger
4339
}
4440

4541
type Config struct {
46-
Storage Storage
47-
DB db.DBTX
48-
Log logging.KVLogger
42+
Storages map[string]Storage
43+
DB db.DBTX
44+
Log logging.KVLogger
4945
}
5046

5147
func New(config Config) *Library {
5248
return &Library{
53-
db: db.New(config.DB),
54-
log: config.Log,
55-
storage: config.Storage,
49+
db: db.New(config.DB),
50+
log: config.Log,
51+
storages: config.Storages,
5652
}
5753
}
5854

@@ -137,7 +133,12 @@ func (lib *Library) RetireVideos(storageName string, maxSize uint64) (uint64, ui
137133
func (lib *Library) Retire(video db.Video) error {
138134
var deleted bool
139135

140-
ll := lib.log.With("tid", video.TID, "sd_hash", video.SDHash)
136+
ll := lib.log.With("tid", video.TID, "sd_hash", video.SDHash, "storage", video.Storage)
137+
138+
storage, ok := lib.storages[video.Storage]
139+
if !ok {
140+
return fmt.Errorf("storage %s not found", video.Storage)
141+
}
141142

142143
if video.Manifest.Valid {
143144
manifest := &Manifest{}
@@ -147,7 +148,7 @@ func (lib *Library) Retire(video db.Video) error {
147148
} else if len(manifest.Files) == 0 {
148149
ll.Warn("empty video manifest", "err", err)
149150
} else {
150-
err = lib.storage.DeleteFragments(video.TID, manifest.Files)
151+
err = storage.DeleteFragments(video.TID, manifest.Files)
151152
if err != nil {
152153
ll.Warn("failed to delete fragments for remote video", "err", err)
153154
} else {
@@ -156,7 +157,7 @@ func (lib *Library) Retire(video db.Video) error {
156157
}
157158
}
158159
if !deleted {
159-
err := lib.storage.Delete(video.TID)
160+
err := storage.Delete(video.TID)
160161
if err != nil {
161162
ll.Warn("failed to delete remote video", "err", err)
162163
return err
@@ -173,7 +174,7 @@ func (lib *Library) Retire(video db.Video) error {
173174
return nil
174175
}
175176

176-
// RetireVideos deletes older videos from S3, keeping total size of remote videos at maxSize.
177+
// ValidateStreams removes broken streams from the database.
177178
func (lib *Library) ValidateStreams(storageName string, offset, limit int32, remove bool) ([]string, []string, error) {
178179
broken := []string{}
179180
valid := []string{}
@@ -219,7 +220,7 @@ func (lib *Library) ValidateStreams(storageName string, offset, limit int32, rem
219220
defer p.Release()
220221

221222
for _, v := range items {
222-
url := fmt.Sprintf("%s/%s", storageURLs[v.Storage], v.Path)
223+
url := lib.storages[v.Storage].GetURL(v.Path)
223224
tids[url] = v.TID
224225
wg.Add(1)
225226
_ = p.Invoke(url)

library/library_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ func (s *librarySuite) TestAddChannel() {
4141
func (s *librarySuite) TestAddGetVideo() {
4242
var err error
4343

44-
lib := New(Config{DB: s.DB, Storage: NewDummyStorage("storage1", "https://storage.host"), Log: zapadapter.NewKV(nil)})
45-
newStream := GenerateDummyStream()
44+
dummyStorage := NewDummyStorage("dummy1", "https://storage.host")
45+
lib := New(Config{DB: s.DB, Storages: map[string]Storage{dummyStorage.Name(): dummyStorage}, Log: zapadapter.NewKV(nil)})
46+
newStream := GenerateDummyStream(dummyStorage)
4647

4748
url, err := lib.GetVideoURL(newStream.SDHash())
4849
s.ErrorIs(err, ErrStreamNotFound)

0 commit comments

Comments
 (0)