Skip to content

Commit 13bc192

Browse files
authored
feat(database/gdb): add sharding feature for schema and table (#4014)
1 parent bae78fb commit 13bc192

File tree

9 files changed

+476
-7
lines changed

9 files changed

+476
-7
lines changed

contrib/drivers/mysql/mysql_z_unit_feature_model_sharding_test.go

Lines changed: 229 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,234 @@ import (
1818
"github.com/gogf/gf/v2/test/gtest"
1919
)
2020

21-
func Test_Model_Sharding_Table(t *testing.T) {
21+
const (
22+
TestDbNameSh0 = "test_0"
23+
TestDbNameSh1 = "test_1"
24+
TestTableName = "user"
25+
)
26+
27+
type ShardingUser struct {
28+
Id int
29+
Name string
30+
}
31+
32+
// createShardingDatabase creates test databases and tables for sharding
33+
func createShardingDatabase(t *gtest.T) {
34+
// Create databases
35+
dbs := []string{TestDbNameSh0, TestDbNameSh1}
36+
for _, dbName := range dbs {
37+
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbName)
38+
_, err := db.Exec(ctx, sql)
39+
t.AssertNil(err)
40+
41+
// Switch to the database
42+
sql = fmt.Sprintf("USE `%s`", dbName)
43+
_, err = db.Exec(ctx, sql)
44+
t.AssertNil(err)
45+
46+
// Create tables
47+
tables := []string{"user_0", "user_1", "user_2", "user_3"}
48+
for _, table := range tables {
49+
sql := fmt.Sprintf(`
50+
CREATE TABLE IF NOT EXISTS %s (
51+
id int(11) NOT NULL,
52+
name varchar(255) NOT NULL,
53+
PRIMARY KEY (id)
54+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
55+
`, table)
56+
_, err := db.Exec(ctx, sql)
57+
t.AssertNil(err)
58+
}
59+
}
60+
}
61+
62+
// dropShardingDatabase drops test databases
63+
func dropShardingDatabase(t *gtest.T) {
64+
dbs := []string{TestDbNameSh0, TestDbNameSh1}
65+
for _, dbName := range dbs {
66+
sql := fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", dbName)
67+
_, err := db.Exec(ctx, sql)
68+
t.AssertNil(err)
69+
}
70+
}
71+
72+
func Test_Sharding_Basic(t *testing.T) {
73+
return
74+
gtest.C(t, func(t *gtest.T) {
75+
var (
76+
tablePrefix = "user_"
77+
schemaPrefix = "test_"
78+
)
79+
80+
// Create test databases and tables
81+
createShardingDatabase(t)
82+
defer dropShardingDatabase(t)
83+
84+
// Create sharding configuration
85+
shardingConfig := gdb.ShardingConfig{
86+
Table: gdb.ShardingTableConfig{
87+
Enable: true,
88+
Prefix: tablePrefix,
89+
Rule: &gdb.DefaultShardingRule{
90+
TableCount: 4,
91+
},
92+
},
93+
Schema: gdb.ShardingSchemaConfig{
94+
Enable: true,
95+
Prefix: schemaPrefix,
96+
Rule: &gdb.DefaultShardingRule{
97+
SchemaCount: 2,
98+
},
99+
},
100+
}
101+
102+
// Prepare test data
103+
user := ShardingUser{
104+
Id: 1,
105+
Name: "John",
106+
}
107+
108+
model := db.Model(TestTableName).
109+
Sharding(shardingConfig).
110+
ShardingValue(user.Id).
111+
Safe()
112+
113+
// Test Insert
114+
_, err := model.Data(user).Insert()
115+
t.AssertNil(err)
116+
117+
// Test Select
118+
var result ShardingUser
119+
err = model.Where("id", user.Id).Scan(&result)
120+
t.AssertNil(err)
121+
t.Assert(result.Id, user.Id)
122+
t.Assert(result.Name, user.Name)
123+
124+
// Test Update
125+
_, err = model.Data(g.Map{"name": "John Doe"}).
126+
Where("id", user.Id).
127+
Update()
128+
t.AssertNil(err)
129+
130+
// Verify Update
131+
err = model.Where("id", user.Id).Scan(&result)
132+
t.AssertNil(err)
133+
t.Assert(result.Name, "John Doe")
134+
135+
// Test Delete
136+
_, err = model.Where("id", user.Id).Delete()
137+
t.AssertNil(err)
138+
139+
// Verify Delete
140+
count, err := model.Where("id", user.Id).Count()
141+
t.AssertNil(err)
142+
t.Assert(count, 0)
143+
})
144+
}
145+
146+
// Test_Sharding_Error tests error cases
147+
func Test_Sharding_Error(t *testing.T) {
148+
return
149+
gtest.C(t, func(t *gtest.T) {
150+
// Create test databases and tables
151+
createShardingDatabase(t)
152+
defer dropShardingDatabase(t)
153+
154+
// Test missing sharding value
155+
model := db.Model(TestTableName).
156+
Sharding(gdb.ShardingConfig{
157+
Table: gdb.ShardingTableConfig{
158+
Enable: true,
159+
Prefix: "user_",
160+
Rule: &gdb.DefaultShardingRule{TableCount: 4},
161+
},
162+
}).Safe()
163+
164+
_, err := model.Insert(g.Map{"id": 1, "name": "test"})
165+
t.AssertNE(err, nil)
166+
t.Assert(err.Error(), "sharding value is required when sharding feature enabled")
167+
168+
// Test missing sharding rule
169+
model = db.Model(TestTableName).
170+
Sharding(gdb.ShardingConfig{
171+
Table: gdb.ShardingTableConfig{
172+
Enable: true,
173+
Prefix: "user_",
174+
},
175+
}).
176+
ShardingValue(1)
177+
178+
_, err = model.Insert(g.Map{"id": 1, "name": "test"})
179+
t.AssertNE(err, nil)
180+
t.Assert(err.Error(), "sharding rule is required when sharding feature enabled")
181+
})
182+
}
183+
184+
// Test_Sharding_Complex tests complex sharding scenarios
185+
func Test_Sharding_Complex(t *testing.T) {
186+
return
187+
gtest.C(t, func(t *gtest.T) {
188+
// Create test databases and tables
189+
createShardingDatabase(t)
190+
defer dropShardingDatabase(t)
191+
192+
shardingConfig := gdb.ShardingConfig{
193+
Table: gdb.ShardingTableConfig{
194+
Enable: true,
195+
Prefix: "user_",
196+
Rule: &gdb.DefaultShardingRule{TableCount: 4},
197+
},
198+
Schema: gdb.ShardingSchemaConfig{
199+
Enable: true,
200+
Prefix: "test_",
201+
Rule: &gdb.DefaultShardingRule{SchemaCount: 2},
202+
},
203+
}
204+
205+
users := []ShardingUser{
206+
{Id: 1, Name: "User1"},
207+
{Id: 2, Name: "User2"},
208+
{Id: 3, Name: "User3"},
209+
}
210+
211+
for _, user := range users {
212+
model := db.Model(TestTableName).
213+
Sharding(shardingConfig).
214+
ShardingValue(user.Id).
215+
Safe()
216+
217+
_, err := model.Data(user).Insert()
218+
t.AssertNil(err)
219+
}
220+
221+
// Test batch query
222+
for _, user := range users {
223+
model := db.Model(TestTableName).
224+
Sharding(shardingConfig).
225+
ShardingValue(user.Id).
226+
Safe()
227+
228+
var result ShardingUser
229+
err := model.Where("id", user.Id).Scan(&result)
230+
t.AssertNil(err)
231+
t.Assert(result.Id, user.Id)
232+
t.Assert(result.Name, user.Name)
233+
}
234+
235+
// Clean up
236+
for _, user := range users {
237+
model := db.Model(TestTableName).
238+
Sharding(shardingConfig).
239+
ShardingValue(user.Id).
240+
Safe()
241+
242+
_, err := model.Where("id", user.Id).Delete()
243+
t.AssertNil(err)
244+
}
245+
})
246+
}
247+
248+
func Test_Model_Sharding_Table_Using_Hook(t *testing.T) {
22249
var (
23250
table1 = gtime.TimestampNanoStr() + "_table1"
24251
table2 = gtime.TimestampNanoStr() + "_table2"
@@ -127,7 +354,7 @@ func Test_Model_Sharding_Table(t *testing.T) {
127354
})
128355
}
129356

130-
func Test_Model_Sharding_Schema(t *testing.T) {
357+
func Test_Model_Sharding_Schema_Using_Hook(t *testing.T) {
131358
var (
132359
table = gtime.TimestampNanoStr() + "_table"
133360
)

database/gdb/gdb_model.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type Model struct {
5353
onConflict interface{} // onConflict is used for conflict keys on Upsert clause.
5454
tableAliasMap map[string]string // Table alias to true table name, usually used in join statements.
5555
softTimeOption SoftTimeOption // SoftTimeOption is the option to customize soft time feature for Model.
56+
shardingConfig ShardingConfig // ShardingConfig for database/table sharding feature.
57+
shardingValue any // Sharding value for sharding feature.
5658
}
5759

5860
// ModelHandler is a function that handles given Model and returns a new Model that is custom modified.

database/gdb/gdb_model_delete.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
6464
},
6565
Model: m,
6666
Table: m.tables,
67+
Schema: m.schema,
6768
Data: dataHolder,
6869
Condition: conditionStr,
6970
Args: append([]interface{}{dataValue}, conditionArgs...),
@@ -80,6 +81,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
8081
},
8182
Model: m,
8283
Table: m.tables,
84+
Schema: m.schema,
8385
Condition: conditionStr,
8486
Args: conditionArgs,
8587
}

database/gdb/gdb_model_hook.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) {
122122
if h.originalSchemaName.IsNil() {
123123
h.originalSchemaName = gvar.New(h.Schema)
124124
}
125+
126+
// Sharding feature.
127+
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
128+
if err != nil {
129+
return nil, err
130+
}
131+
h.Table, err = h.Model.getActualTable(ctx, h.Table)
132+
if err != nil {
133+
return nil, err
134+
}
135+
125136
// Custom hook handler call.
126137
if h.handler != nil && !h.handlerCalled {
127138
h.handlerCalled = true
@@ -161,11 +172,23 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro
161172
h.originalSchemaName = gvar.New(h.Schema)
162173
}
163174

175+
// Sharding feature.
176+
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
177+
if err != nil {
178+
return nil, err
179+
}
180+
h.Table, err = h.Model.getActualTable(ctx, h.Table)
181+
if err != nil {
182+
return nil, err
183+
}
184+
164185
if h.handler != nil && !h.handlerCalled {
165186
h.handlerCalled = true
166187
return h.handler(ctx, h)
167188
}
168189

190+
// No need to handle table change.
191+
169192
// Schema change.
170193
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
171194
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
@@ -185,6 +208,16 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro
185208
h.originalSchemaName = gvar.New(h.Schema)
186209
}
187210

211+
// Sharding feature.
212+
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
213+
if err != nil {
214+
return nil, err
215+
}
216+
h.Table, err = h.Model.getActualTable(ctx, h.Table)
217+
if err != nil {
218+
return nil, err
219+
}
220+
188221
if h.handler != nil && !h.handlerCalled {
189222
h.handlerCalled = true
190223
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
@@ -196,6 +229,9 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro
196229
if h.removedWhere {
197230
h.Condition = whereKeyInCondition + h.Condition
198231
}
232+
233+
// No need to handle table change.
234+
199235
// Schema change.
200236
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
201237
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
@@ -215,6 +251,16 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro
215251
h.originalSchemaName = gvar.New(h.Schema)
216252
}
217253

254+
// Sharding feature.
255+
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
256+
if err != nil {
257+
return nil, err
258+
}
259+
h.Table, err = h.Model.getActualTable(ctx, h.Table)
260+
if err != nil {
261+
return nil, err
262+
}
263+
218264
if h.handler != nil && !h.handlerCalled {
219265
h.handlerCalled = true
220266
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
@@ -226,6 +272,9 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro
226272
if h.removedWhere {
227273
h.Condition = whereKeyInCondition + h.Condition
228274
}
275+
276+
// No need to handle table change.
277+
229278
// Schema change.
230279
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
231280
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)

database/gdb/gdb_model_insert.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ func (m *Model) doInsertWithOption(ctx context.Context, insertOption InsertOptio
335335
},
336336
Model: m,
337337
Table: m.tables,
338+
Schema: m.schema,
338339
Data: list,
339340
Option: doInsertOption,
340341
}

database/gdb/gdb_model_select.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ func (m *Model) doGetAllBySql(
684684
},
685685
Model: m,
686686
Table: m.tables,
687+
Schema: m.schema,
687688
Sql: sql,
688689
Args: m.mergeArguments(args),
689690
SelectType: selectType,

0 commit comments

Comments
 (0)