@@ -4,10 +4,13 @@ import (
44 "bytes"
55 "encoding/json"
66 "fmt"
7+ "io"
78 "net/http"
89 "strings"
910 "time"
1011
12+ "github.com/devstream-io/devstream/internal/pkg/configmanager"
13+
1114 "github.com/devstream-io/devstream/internal/pkg/plugininstaller"
1215 "github.com/devstream-io/devstream/internal/pkg/statemanager"
1316 "github.com/devstream-io/devstream/pkg/util/log"
@@ -17,24 +20,24 @@ var httpClient = &http.Client{
1720 Timeout : 5 * time .Second ,
1821}
1922
20- func RenderAuthConfig (options plugininstaller.RawOptions ) (plugininstaller.RawOptions , error ) {
21- opts , err := NewOptions (options )
22- if err != nil {
23- return nil , err
24- }
25-
26- for _ , p := range opts .Plugins {
27- for _ , c := range p .Connections {
28- c .Token = c .Authx .Token
29- c .Username = c .Authx .Username
30- c .Password = c .Authx .Password
31- c .AppId = c .Authx .AppId
32- c .SecretKey = c .Authx .SecretKey
33- }
34- }
35-
36- return opts .Encode ()
37- }
23+ // func RenderAuthConfig(options plugininstaller.RawOptions) (plugininstaller.RawOptions, error) {
24+ // opts, err := NewOptions(options)
25+ // if err != nil {
26+ // return nil, err
27+ // }
28+ //
29+ // for _, p := range opts.Plugins {
30+ // for _, c := range p.Connections {
31+ // c.Auth .Token = c.Authx.Token
32+ // c.Username = c.Authx.Username
33+ // c.Password = c.Authx.Password
34+ // c.AppId = c.Authx.AppId
35+ // c.SecretKey = c.Authx.SecretKey
36+ // }
37+ // }
38+ //
39+ // return opts.Encode()
40+ // }
3841
3942func ApplyConfig (options plugininstaller.RawOptions ) error {
4043 opts , err := NewOptions (options )
@@ -55,26 +58,108 @@ func ApplyConfig(options plugininstaller.RawOptions) error {
5558func createConnections (host string , pluginName string , connections []Connection ) error {
5659 for i , c := range connections {
5760 log .Infof ("Connection %d: %s" , i , c .Name )
58- configs , err := json .Marshal (c )
59- if err != nil {
61+ if err := createConnection (host , pluginName , c ); err != nil {
6062 return err
6163 }
62- log .Debugf ("Connection configs: %s" , string (configs ))
64+ }
65+ log .Infof ("All %s connections have been created." , pluginName )
66+ return nil
67+ }
6368
64- url := fmt .Sprintf ("%s/plugins/%s/connections" , strings .TrimRight (host , "/" ), pluginName )
65- log .Debugf ("URL: %s" , url )
69+ func createConnection (host string , pluginName string , c Connection ) error {
70+ configs , err := json .Marshal (c )
71+ if err != nil {
72+ return err
73+ }
74+ fmt .Println (string (configs ))
75+ log .Debugf ("Connection configs: %s" , string (configs ))
76+ url := fmt .Sprintf ("%s/plugins/%s/connections" , strings .TrimRight (host , "/" ), pluginName )
77+ log .Debugf ("URL: %s" , url )
78+ if err = apiClient (http .MethodPost , url , configs ); err != nil {
79+ return err
80+ }
81+ return nil
82+ }
6683
67- if err := createConnection (url , configs ); err != nil {
68- return err
69- }
84+ func updateConnection (host string , pluginName string , c Connection ) error {
85+ id , err := getConnection (host , pluginName , c )
86+ if err != nil {
87+ return err
88+ }
89+ if id == 0 {
90+ return nil
91+ }
92+ configs , err := json .Marshal (c )
93+ if err != nil {
94+ return err
95+ }
96+ log .Debugf ("UPDATE Connection configs: %s" , string (configs ))
97+ url := fmt .Sprintf ("%s/plugins/%s/connections/%d" , strings .TrimRight (host , "/" ), pluginName , id )
98+ log .Debugf ("URL: %s" , url )
99+ if err = apiClient (http .MethodPatch , url , configs ); err != nil {
100+ return err
70101 }
102+ return nil
103+ }
71104
72- log .Infof ("All %s connections have been created." , pluginName )
105+ func deleteConnection (host string , pluginName string , c Connection ) error {
106+ id , err := getConnection (host , pluginName , c )
107+ if err != nil {
108+ return err
109+ }
110+ if id == 0 {
111+ return nil
112+ }
113+ configs , err := json .Marshal (c )
114+ if err != nil {
115+ return err
116+ }
117+ log .Debugf ("DELETE Connection configs: %s" , string (configs ))
118+ url := fmt .Sprintf ("%s/plugins/%s/connections/%d" , strings .TrimRight (host , "/" ), pluginName , id )
119+ log .Debugf ("URL: %s" , url )
120+ if err = apiClient (http .MethodDelete , url , configs ); err != nil {
121+ return err
122+ }
73123 return nil
74124}
75125
76- func createConnection (url string , bodyWithJson []byte ) error {
77- req , err := http .NewRequest (http .MethodPost , url , bytes .NewBuffer (bodyWithJson ))
126+ func getConnection (host string , pluginName string , c Connection ) (uint64 , error ) {
127+ url := fmt .Sprintf ("%s/plugins/%s/connections" , strings .TrimRight (host , "/" ), pluginName )
128+ log .Debugf ("URL: %s" , url )
129+ req , err := http .NewRequest (http .MethodGet , url , nil )
130+ if err != nil {
131+ return 0 , err
132+ }
133+ req .Header .Set ("Content-Type" , "application/json" )
134+
135+ resp , err := httpClient .Do (req )
136+ if err != nil {
137+ return 0 , err
138+ }
139+ defer resp .Body .Close ()
140+ if resp .StatusCode == http .StatusOK {
141+ resBody , err := io .ReadAll (resp .Body )
142+ if err != nil {
143+ return 0 , err
144+ }
145+ connections := make ([]Connection , 0 )
146+ err = json .Unmarshal (resBody , & connections )
147+ if err != nil {
148+ return 0 , err
149+ }
150+ for _ , v := range connections {
151+ if v .Name == c .Name {
152+ return v .ID , nil
153+ }
154+ }
155+ return 0 , nil
156+ }
157+
158+ return 0 , fmt .Errorf (resp .Status )
159+ }
160+
161+ func apiClient (method string , url string , bodyWithJson []byte ) error {
162+ req , err := http .NewRequest (method , url , bytes .NewBuffer (bodyWithJson ))
78163 if err != nil {
79164 return err
80165 }
@@ -99,7 +184,66 @@ func DeleteConfig(options plugininstaller.RawOptions) error {
99184}
100185
101186func UpdateConfig (options plugininstaller.RawOptions ) error {
102- // TODO(daniel-hutao): implement later
187+ opts , err := NewOptions (options )
188+ if err != nil {
189+ return err
190+ }
191+ cfg , err := configmanager .NewManager ("config-devlake-config.yaml" ).LoadConfig ()
192+ if err != nil {
193+ return err
194+ }
195+ smgr , _ := statemanager .NewManager (* cfg .State )
196+ stateLakeConfig := smgr .GetState ("devlake-config_default" )
197+ // map connections to pluginName
198+ pluginConnections := make (map [string ][]Connection )
199+ states , err := NewOptions (stateLakeConfig .Options )
200+ if err != nil {
201+ return err
202+ }
203+
204+ for _ , v := range states .Plugins {
205+ pluginConnections [v .Name ] = v .Connections
206+ }
207+
208+ for _ , p := range opts .Plugins {
209+ // Map Connection.Name -> Connection for config
210+ configConnectionMap := make (map [string ]Connection )
211+ // Map Connection.Name -> Connection for state
212+ stateConnectionMap := make (map [string ]Connection )
213+ for _ , configConnection := range p .Connections {
214+ configConnectionMap [configConnection .Name ] = configConnection
215+ }
216+ // Construct a map
217+ for _ , stateConnection := range pluginConnections [p .Name ] {
218+ stateConnectionMap [stateConnection .Name ] = stateConnection
219+ }
220+ for k := range configConnectionMap {
221+ // Create connection which is not in State
222+ if _ , ok := stateConnectionMap [k ]; ! ok {
223+ if err = createConnection (opts .DevLakeAddr , p .Name , configConnectionMap [k ]); err != nil {
224+ return err
225+ }
226+ continue
227+ }
228+ // Update connection which is different from State
229+ if stateConnectionMap [k ] != configConnectionMap [k ] {
230+ if err = updateConnection (opts .DevLakeAddr , p .Name , configConnectionMap [k ]); err != nil {
231+ return err
232+ }
233+ continue
234+ }
235+ }
236+ for k := range stateConnectionMap {
237+ // Delete connection which is not in config
238+ if _ , ok := configConnectionMap [k ]; ! ok {
239+ if err = deleteConnection (opts .DevLakeAddr , p .Name , stateConnectionMap [k ]); err != nil {
240+ return err
241+ }
242+ continue
243+ }
244+ }
245+ }
246+
103247 return nil
104248}
105249
0 commit comments