Skip to content

Commit 310a828

Browse files
authored
fix: resubscribe to previously subscribed configurations after rpcClient reconnects to the server (#802)
fix: resubscribe to previously subscribed configurations after rpcClient reconnects to the server
1 parent f1545e0 commit 310a828

File tree

3 files changed

+260
-0
lines changed

3 files changed

+260
-0
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 1999-2020 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package config_client
18+
19+
import (
20+
"strconv"
21+
22+
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
23+
)
24+
25+
type ConfigConnectionEventListener struct {
26+
client *ConfigClient
27+
taskId string
28+
}
29+
30+
func NewConfigConnectionEventListener(client *ConfigClient, taskId string) *ConfigConnectionEventListener {
31+
return &ConfigConnectionEventListener{
32+
client: client,
33+
taskId: taskId,
34+
}
35+
}
36+
37+
func (c *ConfigConnectionEventListener) OnConnected() {
38+
logger.Info("[ConfigConnectionEventListener] connect to config server for taskId: " + c.taskId)
39+
if c.client != nil {
40+
c.client.asyncNotifyListenConfig()
41+
}
42+
}
43+
44+
func (c *ConfigConnectionEventListener) OnDisConnect() {
45+
logger.Info("[ConfigConnectionEventListener] disconnect from config server for taskId: " + c.taskId)
46+
47+
if c.client != nil {
48+
taskIdInt, err := strconv.Atoi(c.taskId)
49+
if err != nil {
50+
logger.Errorf("[ConfigConnectionEventListener] parse taskId error: %v", err)
51+
return
52+
}
53+
54+
items := c.client.cacheMap.Items()
55+
for key, v := range items {
56+
if data, ok := v.(cacheData); ok {
57+
if data.taskId == taskIdInt {
58+
data.isSyncWithServer = false
59+
c.client.cacheMap.Set(key, data)
60+
}
61+
}
62+
}
63+
}
64+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 1999-2020 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package config_client
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"github.com/nacos-group/nacos-sdk-go/v2/clients/cache"
25+
"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_request"
26+
"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_response"
27+
"github.com/nacos-group/nacos-sdk-go/v2/model"
28+
"github.com/nacos-group/nacos-sdk-go/v2/util"
29+
"github.com/stretchr/testify/assert"
30+
)
31+
32+
func TestNewConfigConnectionEventListener(t *testing.T) {
33+
client := &ConfigClient{}
34+
taskId := "123"
35+
36+
listener := NewConfigConnectionEventListener(client, taskId)
37+
38+
assert.Equal(t, client, listener.client)
39+
assert.Equal(t, taskId, listener.taskId)
40+
}
41+
42+
func TestOnDisConnectWithMock(t *testing.T) {
43+
client := &ConfigClient{
44+
cacheMap: cache.NewConcurrentMap(),
45+
}
46+
47+
data1 := cacheData{
48+
dataId: "dataId1",
49+
group: "group1",
50+
tenant: "",
51+
taskId: 1,
52+
isSyncWithServer: true,
53+
}
54+
55+
data2 := cacheData{
56+
dataId: "dataId2",
57+
group: "group1",
58+
tenant: "",
59+
taskId: 1,
60+
isSyncWithServer: true,
61+
}
62+
63+
data3 := cacheData{
64+
dataId: "dataId3",
65+
group: "group2",
66+
tenant: "",
67+
taskId: 2,
68+
isSyncWithServer: true,
69+
}
70+
71+
key1 := util.GetConfigCacheKey(data1.dataId, data1.group, data1.tenant)
72+
key2 := util.GetConfigCacheKey(data2.dataId, data2.group, data2.tenant)
73+
key3 := util.GetConfigCacheKey(data3.dataId, data3.group, data3.tenant)
74+
75+
client.cacheMap.Set(key1, data1)
76+
client.cacheMap.Set(key2, data2)
77+
client.cacheMap.Set(key3, data3)
78+
79+
listener := NewConfigConnectionEventListener(client, "1")
80+
81+
listener.OnDisConnect()
82+
83+
item1, _ := client.cacheMap.Get(key1)
84+
item2, _ := client.cacheMap.Get(key2)
85+
item3, _ := client.cacheMap.Get(key3)
86+
87+
updatedData1 := item1.(cacheData)
88+
updatedData2 := item2.(cacheData)
89+
updatedData3 := item3.(cacheData)
90+
91+
assert.False(t, updatedData1.isSyncWithServer, "dataId1 should be marked as not sync")
92+
assert.False(t, updatedData2.isSyncWithServer, "dataId2 should be marked as not sync")
93+
assert.True(t, updatedData3.isSyncWithServer, "dataId3 should be marked as sync")
94+
}
95+
96+
func TestOnConnectedWithMock(t *testing.T) {
97+
listenChan := make(chan struct{}, 1)
98+
99+
client := &ConfigClient{
100+
listenExecute: listenChan,
101+
}
102+
103+
listener := NewConfigConnectionEventListener(client, "1")
104+
105+
listener.OnConnected()
106+
107+
time.Sleep(100 * time.Millisecond)
108+
109+
select {
110+
case <-listenChan:
111+
assert.True(t, true, "asyncNotifyListenConfig should be called")
112+
default:
113+
t.Fatalf("asyncNotifyListenConfig should be called but not")
114+
}
115+
}
116+
117+
type MockRpcClientForListener struct {
118+
requestCalled rpc_request.IRequest
119+
}
120+
121+
func (m *MockRpcClientForListener) Request(request rpc_request.IRequest) (rpc_response.IResponse, error) {
122+
m.requestCalled = request
123+
return &rpc_response.ConfigChangeBatchListenResponse{
124+
Response: &rpc_response.Response{
125+
ResultCode: 200,
126+
},
127+
ChangedConfigs: []model.ConfigContext{},
128+
}, nil
129+
}
130+
131+
func TestReconnectionFlow(t *testing.T) {
132+
ctx, cancel := context.WithCancel(context.Background())
133+
defer cancel()
134+
135+
mockRpc := &MockRpcClientForListener{}
136+
137+
listenChan := make(chan struct{}, 1)
138+
139+
client := &ConfigClient{
140+
ctx: ctx,
141+
configProxy: &MockConfigProxy{},
142+
cacheMap: cache.NewConcurrentMap(),
143+
listenExecute: listenChan,
144+
}
145+
146+
done := make(chan bool)
147+
go func() {
148+
for {
149+
select {
150+
case <-listenChan:
151+
mockRpc.Request(&rpc_request.ConfigBatchListenRequest{})
152+
done <- true
153+
case <-ctx.Done():
154+
return
155+
}
156+
}
157+
}()
158+
159+
data1 := cacheData{
160+
dataId: "dataId1",
161+
group: "group1",
162+
tenant: "",
163+
taskId: 1,
164+
isSyncWithServer: true,
165+
}
166+
167+
key1 := util.GetConfigCacheKey(data1.dataId, data1.group, data1.tenant)
168+
client.cacheMap.Set(key1, data1)
169+
170+
listener := NewConfigConnectionEventListener(client, "1")
171+
172+
initialData, _ := client.cacheMap.Get(key1)
173+
assert.True(t, initialData.(cacheData).isSyncWithServer, "initial data should be sync with server")
174+
175+
listener.OnDisConnect()
176+
177+
afterDisconnectData, _ := client.cacheMap.Get(key1)
178+
assert.False(t, afterDisconnectData.(cacheData).isSyncWithServer, "disconnect should set isSyncWithServer to false")
179+
180+
listener.OnConnected()
181+
182+
select {
183+
case <-done:
184+
case <-time.After(1 * time.Second):
185+
t.Fatalf("wait for done timeout")
186+
}
187+
188+
assert.NotNil(t, mockRpc.requestCalled, "should call request")
189+
190+
_, ok := mockRpc.requestCalled.(*rpc_request.ConfigBatchListenRequest)
191+
assert.True(t, ok, "should be a ConfigBatchListenRequest")
192+
}

clients/config_client/config_proxy.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien
173173
// TODO fix the group/dataId empty problem
174174
return rpc_request.NewConfigChangeNotifyRequest("", "", "")
175175
}, &ConfigChangeNotifyRequestHandler{client: client})
176+
177+
configListener := NewConfigConnectionEventListener(client, taskId)
178+
rpcClient.RegisterConnectionListener(configListener)
179+
176180
rpcClient.Tenant = cp.clientConfig.NamespaceId
177181
rpcClient.Start()
178182
}

0 commit comments

Comments
 (0)