-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
79 lines (65 loc) · 2.63 KB
/
main.go
File metadata and controls
79 lines (65 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main
import (
"context"
"fmt"
"io/fs"
"log"
"os"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/DIMO-Network/clickhouse-infra/pkg/migrate"
indexmigrations "github.com/DIMO-Network/cloudevent/clickhouse/migrations"
sigmigrations "github.com/DIMO-Network/model-garage/pkg/migrations"
"github.com/redpanda-data/benthos/v4/public/service"
// Import aws for s3 output.
_ "github.com/redpanda-data/connect/v4/public/components/aws"
// Import sql for clickhouse output.
_ "github.com/redpanda-data/connect/v4/public/components/sql"
// Import io for http endpoints.
_ "github.com/redpanda-data/connect/v4/public/components/io"
// Import pure for basic processing.
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
// Import prometheus for metrics.
_ "github.com/redpanda-data/connect/v4/public/components/prometheus"
// Add our custom plugin packages here.
_ "github.com/DIMO-Network/dps/internal/processors/eventstoslice"
_ "github.com/DIMO-Network/dps/internal/processors/parquet"
_ "github.com/DIMO-Network/dps/internal/processors/signalstoslice"
_ "github.com/DIMO-Network/dps/internal/processors/splitvalues"
)
func main() {
host := envOrDefault("CLICKHOUSE_HOST", "localhost")
port := envOrDefault("CLICKHOUSE_PORT", "9440")
user := envOrDefault("CLICKHOUSE_USER", "default")
pass := envOrDefault("CLICKHOUSE_PASSWORD", "")
dimoDB := envOrDefault("CLICKHOUSE_DIMO_DATABASE", "dimo")
indexDB := envOrDefault("CLICKHOUSE_INDEX_DATABASE", "dimo_index")
dimoDSN := fmt.Sprintf("clickhouse://%s:%s/%s?username=%s&password=%s&secure=true&dial_timeout=5s", host, port, dimoDB, user, pass)
indexDSN := fmt.Sprintf("clickhouse://%s:%s/%s?username=%s&password=%s&secure=true&dial_timeout=5s", host, port, indexDB, user, pass)
runMigration("signal", dimoDSN, sigmigrations.BaseFS)
runMigration("file_index", indexDSN, indexmigrations.BaseFS)
service.RunCLI(context.Background())
}
func envOrDefault(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func runMigration(name, dsn string, baseFs fs.FS) {
log.Printf("Running migration: %s", name)
start := time.Now()
dbOptions, err := clickhouse.ParseDSN(dsn)
if err != nil {
log.Fatalf("Failed to parse DSN for %s: %v", name, err)
}
db := clickhouse.OpenDB(dbOptions)
if err := migrate.RunGoose(context.Background(), []string{"up", "-v"}, baseFs, db); err != nil {
_ = db.Close()
log.Fatalf("Migration %s failed: %v", name, err)
}
if err := db.Close(); err != nil {
log.Fatalf("Failed to close db after %s migration: %v", name, err)
}
log.Printf("Migration %s completed in %s", name, time.Since(start))
}