Skip to content

Commit c2cdf8e

Browse files
committed
IGNITE-26717 [ducktests] add separate module for RU
1 parent 54edfd0 commit c2cdf8e

File tree

6 files changed

+148
-98
lines changed

6 files changed

+148
-98
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
Module provides test coverage for cluster rebalancing during rolling upgrade workflows.
18+
"""
19+
from ducktape.mark import defaults
20+
21+
from ignitetest.services.ignite import IgniteService
22+
from ignitetest.services.utils.control_utility import ControlUtility
23+
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
24+
from ignitetest.tests.rebalance.persistent_test import await_and_check_rebalance
25+
from ignitetest.tests.rebalance.util import NUM_NODES, BaseRebalanceTest, start_ignite, await_rebalance_start, \
26+
get_result
27+
from ignitetest.tests.util import preload_data
28+
from ignitetest.utils import cluster, ignite_versions
29+
from ignitetest.utils.version import LATEST, DEV_BRANCH, IgniteVersion
30+
31+
32+
class RollingUpgradeRebalanceTest(BaseRebalanceTest):
33+
"""
34+
Test validates the rebalance workflow and includes the following steps:
35+
* Start the cluster.
36+
* Populate data using IgniteClientApp.
37+
* Trigger a rebalance by joining a node running the target version, and wait for the rebalance to complete.
38+
"""
39+
@cluster(num_nodes=NUM_NODES)
40+
@ignite_versions(str(LATEST))
41+
@defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1], cache_count=[1], entry_count=[15_000],
42+
entry_size=[50_000], preloaders=[1], thread_pool_size=[None], batch_size=[None],
43+
batches_prefetch_count=[None], throttle=[None])
44+
def test_in_memory(self, ignite_version, upgrade_version, force, backups, cache_count, entry_count, entry_size,
45+
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle):
46+
"""
47+
Tests rebalance in-memory.
48+
"""
49+
return self.test_rebalance(ignite_version, upgrade_version, force, backups, cache_count, entry_count,
50+
entry_size, preloaders, thread_pool_size, batch_size, batches_prefetch_count,
51+
throttle, False)
52+
53+
@cluster(num_nodes=NUM_NODES)
54+
@ignite_versions(str(LATEST))
55+
@defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1], cache_count=[1], entry_count=[5_000],
56+
entry_size=[50_000], preloaders=[1], thread_pool_size=[None], batch_size=[None],
57+
batches_prefetch_count=[None], throttle=[None])
58+
def test_pds(self, ignite_version, upgrade_version, force, backups, cache_count, entry_count, entry_size,
59+
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle):
60+
"""
61+
Tests rebalance with persistence.
62+
"""
63+
return self.test_rebalance(ignite_version, upgrade_version, force, backups, cache_count, entry_count,
64+
entry_size, preloaders, thread_pool_size, batch_size, batches_prefetch_count,
65+
throttle, True)
66+
67+
def test_rebalance(self, ignite_version, upgrade_version, force, backups, cache_count, entry_count, entry_size,
68+
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle, with_persistence):
69+
"""
70+
Tests rebalance on node join with version upgrade.
71+
"""
72+
reb_params = self.get_reb_params(thread_pool_size=thread_pool_size, batch_size=batch_size,
73+
batches_prefetch_count=batches_prefetch_count, throttle=throttle,
74+
persistent=with_persistence)
75+
76+
data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count,
77+
entry_size=entry_size, preloaders=preloaders)
78+
79+
ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params)
80+
81+
control_utility = ControlUtility(ignites)
82+
83+
if with_persistence:
84+
control_utility.activate()
85+
86+
preload_time = preload_data(
87+
self.test_context,
88+
ignites.config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)),
89+
data_gen_params=data_gen_params)
90+
91+
new_node = IgniteService(self.test_context, ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
92+
num_nodes=1, modules=reb_params.modules)
93+
94+
control_utility.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
95+
96+
new_node.config._replace(version=upgrade_version)
97+
98+
new_node.start()
99+
100+
if with_persistence:
101+
control_utility.add_to_baseline(new_node.nodes)
102+
103+
await_and_check_rebalance(new_node)
104+
105+
control_utility.deactivate()
106+
else:
107+
await_rebalance_start(new_node)
108+
109+
new_node.await_rebalance()
110+
111+
return get_result(new_node.nodes, preload_time, cache_count, entry_count, entry_size)

modules/ducktests/tests/ignitetest/tests/rolling_upgrade/rolling_upgrade_test.py renamed to modules/ducktests/tests/ignitetest/rolling_upgrade/rolling_upgrade_test.py

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
"""
1717
Module contains rolling upgrade tests
1818
"""
19-
from time import monotonic
20-
2119
from ducktape.mark import defaults, matrix
2220

2321
from ignitetest.services.ignite import IgniteService
@@ -34,64 +32,51 @@ class RollingUpgradeTest(IgniteTest):
3432
Tests validates rolling upgrade
3533
"""
3634
@cluster(num_nodes=NUM_NODES)
37-
@ignite_versions(str(DEV_BRANCH))
38-
@defaults(init_version=[str(LATEST)], upgrade_version=[str(DEV_BRANCH)])
39-
@matrix(upgrade_coordinator=[True, False])
40-
def test_rolling_upgrade(self, ignite_version, init_version, upgrade_version, upgrade_coordinator=False,
41-
force_upgrade=False):
42-
self.logger.info(f"Initiating Rolling Upgrade test from {init_version} to {upgrade_version} "
43-
f"starting from coordinator [{upgrade_coordinator}]")
44-
45-
results = {}
35+
@ignite_versions(str(LATEST))
36+
@defaults(upgrade_version=[str(DEV_BRANCH)], force=[False])
37+
@matrix(upgrade_coordinator_first=[True, False])
38+
def test_rolling_upgrade(self, ignite_version, upgrade_version, upgrade_coordinator_first, force):
39+
self.logger.info(f"Initiating Rolling Upgrade test from {ignite_version} to {upgrade_version} "
40+
f"starting from coordinator [{upgrade_coordinator_first}]")
4641

47-
ignites = self.start_ignite_cluster(init_version, results)
42+
ignites = self.start_ignite_cluster(ignite_version)
4843

4944
control_sh = ControlUtility(ignites)
5045

51-
if not force_upgrade:
52-
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring)
46+
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
5347

54-
self.upgrade_ignite_cluster(ignites, upgrade_version, upgrade_coordinator, results)
48+
self.upgrade_ignite_cluster(ignites, upgrade_version, upgrade_coordinator_first)
5549

56-
if not force_upgrade:
57-
control_sh.disable_rolling_upgrade()
50+
control_sh.disable_rolling_upgrade()
5851

5952
ignites.stop()
6053

61-
return results
62-
63-
def start_ignite_cluster(self, ignite_version: str, results):
54+
def start_ignite_cluster(self, ignite_version: str):
6455
self.logger.info("Cluster start-up.")
6556

6657
ignite_cfg = IgniteConfiguration(
6758
version=IgniteVersion(ignite_version),
6859
metric_exporters={"org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi"})
6960

70-
start = monotonic()
71-
7261
ignites = IgniteService(self.test_context, ignite_cfg, num_nodes=self.test_context.expected_num_nodes)
7362

7463
ignites.start()
7564

76-
results['Ignite cluster start time (s)'] = round(monotonic() - start, 1)
77-
7865
ignites.await_event(f"Topology snapshot \\[ver={self.test_context.expected_num_nodes}",
7966
ignites.startup_timeout_sec, from_the_beginning=True)
8067

81-
self.logger.info(f"Initial cluster is up [nodes={self.test_context.expected_num_nodes}].")
68+
self.logger.info(f"Initial cluster is up [nodes={len(ignites.nodes)}].")
8269

8370
return ignites
8471

85-
def upgrade_ignite_cluster(self, ignites: IgniteService, upgrade_version: str, upgrade_coordinator: bool, results):
72+
def upgrade_ignite_cluster(self, ignites: IgniteService, upgrade_version: str, upgrade_coordinator_first: bool):
8673
self.logger.info(f"Starting rolling upgrade.")
8774

8875
ignites.config = IgniteConfiguration(version=IgniteVersion(upgrade_version))
8976

9077
ignite_upgraded = 0
9178

92-
start = monotonic()
93-
94-
for ignite in ignites.nodes if upgrade_coordinator else reversed(ignites.nodes):
79+
for ignite in ignites.nodes if upgrade_coordinator_first else reversed(ignites.nodes):
9580
self.logger.debug(f"Updating {ignites.who_am_i(ignite)}")
9681

9782
ignites.stop_node(ignite)
@@ -105,6 +90,4 @@ def upgrade_ignite_cluster(self, ignites: IgniteService, upgrade_version: str, u
10590
ignites.await_event(f"Topology snapshot \\[ver={exp_topology}", ignites.startup_timeout_sec,
10691
from_the_beginning=True)
10792

108-
results['Ignite cluster upgrade time (s)'] = round(monotonic() - start, 1)
109-
11093
self.logger.info(f"Upgrade is complete.")

modules/ducktests/tests/ignitetest/services/utils/control_utility.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,18 @@ def snapshot_check(self, snapshot_name: str):
231231

232232
return res
233233

234-
def enable_rolling_upgrade(self, target_version: str):
234+
def enable_rolling_upgrade(self, target_version: str, force: bool = False):
235235
"""
236236
Enable Rolling Upgrade with the target Ignite version.
237237
:param target_version: Target Ignite version.
238+
:param force: If {@code true}, skips target version compatibility checks and forcibly enables rolling upgrade.
239+
This flag does not override an already active upgrade configuration.
238240
"""
239-
result = self.__run(f"--rolling-upgrade enable {target_version} --enable-experimental --yes")
241+
242+
if force:
243+
result = self.__run(f"--rolling-upgrade enable {target_version} --force --enable-experimental --yes")
244+
else:
245+
result = self.__run(f"--rolling-upgrade enable {target_version} --enable-experimental --yes")
240246

241247
assert "Rolling upgrade enabled" in result, f"Unexpected response: {result}"
242248

modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919
from ducktape.mark import defaults
2020

2121
from ignitetest.services.ignite import IgniteService
22-
from ignitetest.services.utils.control_utility import ControlUtility
2322
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
2423
from ignitetest.tests.rebalance.util import start_ignite, get_result, TriggerEvent, NUM_NODES, \
2524
await_rebalance_start, BaseRebalanceTest
2625
from ignitetest.tests.util import preload_data
2726
from ignitetest.utils import cluster, ignite_versions
28-
from ignitetest.utils.version import DEV_BRANCH, LATEST, IgniteVersion
27+
from ignitetest.utils.version import DEV_BRANCH, LATEST
2928

3029

3130
class RebalanceInMemoryTest(BaseRebalanceTest):
@@ -36,44 +35,33 @@ class RebalanceInMemoryTest(BaseRebalanceTest):
3635
@ignite_versions(str(DEV_BRANCH), str(LATEST))
3736
@defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
3837
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
39-
def test_node_join(self, ignite_version, backups, cache_count, entry_count, entry_size, preloaders,
38+
def test_node_join(self, ignite_version,
39+
backups, cache_count, entry_count, entry_size, preloaders,
4040
thread_pool_size, batch_size, batches_prefetch_count, throttle):
4141
"""
4242
Tests rebalance on node join.
4343
"""
44-
return self.__run(ignite_version, TriggerEvent.NODE_JOIN, backups, cache_count, entry_count, entry_size,
45-
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle)
44+
return self.__run(ignite_version, TriggerEvent.NODE_JOIN,
45+
backups, cache_count, entry_count, entry_size, preloaders,
46+
thread_pool_size, batch_size, batches_prefetch_count, throttle)
4647

4748
@cluster(num_nodes=NUM_NODES)
4849
@ignite_versions(str(DEV_BRANCH), str(LATEST))
4950
@defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
5051
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
51-
def test_node_left(self, ignite_version, backups, cache_count, entry_count, entry_size, preloaders,
52+
def test_node_left(self, ignite_version,
53+
backups, cache_count, entry_count, entry_size, preloaders,
5254
thread_pool_size, batch_size, batches_prefetch_count, throttle):
5355
"""
5456
Tests rebalance on node left.
5557
"""
56-
return self.__run(ignite_version, TriggerEvent.NODE_LEFT, backups, cache_count, entry_count, entry_size,
57-
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle)
58+
return self.__run(ignite_version, TriggerEvent.NODE_LEFT,
59+
backups, cache_count, entry_count, entry_size, preloaders,
60+
thread_pool_size, batch_size, batches_prefetch_count, throttle)
5861

59-
@cluster(num_nodes=NUM_NODES)
60-
@ignite_versions(str(DEV_BRANCH))
61-
@defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
62-
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None],
63-
init_version=str(LATEST), upgrade_version=str(DEV_BRANCH))
64-
def test_node_join_with_upgrade(self, ignite_version, backups, cache_count, entry_count, entry_size, preloaders,
65-
thread_pool_size, batch_size, batches_prefetch_count, throttle, init_version,
66-
upgrade_version, force_upgrade=False):
67-
"""
68-
Tests rebalance on node left.
69-
"""
70-
return self.__run(init_version, TriggerEvent.NODE_JOIN, backups, cache_count, entry_count, entry_size,
71-
preloaders, thread_pool_size, batch_size, batches_prefetch_count, throttle, upgrade_version,
72-
force_upgrade)
73-
74-
def __run(self, ignite_version, trigger_event, backups, cache_count, entry_count, entry_size, preloaders,
75-
thread_pool_size, batch_size, batches_prefetch_count, throttle, upgrade_version=None,
76-
force_upgrade=False):
62+
def __run(self, ignite_version, trigger_event,
63+
backups, cache_count, entry_count, entry_size, preloaders,
64+
thread_pool_size, batch_size, batches_prefetch_count, throttle):
7765
"""
7866
Test performs rebalance test which consists of following steps:
7967
* Start cluster.
@@ -113,15 +101,6 @@ def __run(self, ignite_version, trigger_event, backups, cache_count, entry_count
113101
ignite = IgniteService(self.test_context,
114102
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1,
115103
modules=reb_params.modules)
116-
117-
if upgrade_version is not None:
118-
if not force_upgrade:
119-
control_sh = ControlUtility(ignites)
120-
121-
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring)
122-
123-
ignite.config._replace(version=upgrade_version)
124-
125104
ignite.start()
126105
rebalance_nodes = ignite.nodes
127106

modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
get_result, check_type_of_rebalancing, await_rebalance_start, BaseRebalanceTest
2828
from ignitetest.tests.util import preload_data
2929
from ignitetest.utils import cluster, ignite_versions
30-
from ignitetest.utils.version import DEV_BRANCH, LATEST, IgniteVersion
30+
from ignitetest.utils.version import DEV_BRANCH, LATEST
3131

3232

3333
class RebalancePersistentTest(BaseRebalanceTest):
@@ -43,28 +43,6 @@ def test_node_join(self, ignite_version, backups, cache_count, entry_count, entr
4343
"""
4444
Tests rebalance on node join.
4545
"""
46-
return self.test_join(ignite_version, backups, cache_count, entry_count, entry_size, preloaders,
47-
thread_pool_size, batch_size, batches_prefetch_count, throttle)
48-
49-
@cluster(num_nodes=NUM_NODES)
50-
@ignite_versions(str(DEV_BRANCH))
51-
@defaults(backups=[1], cache_count=[1], entry_count=[5_000], entry_size=[50_000], preloaders=[1],
52-
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None],
53-
init_version=str(LATEST), upgrade_version=str(DEV_BRANCH))
54-
def test_node_join_with_upgrade(self, ignite_version, backups, cache_count, entry_count, entry_size, preloaders,
55-
thread_pool_size, batch_size, batches_prefetch_count, throttle, init_version,
56-
upgrade_version, force_upgrade=False):
57-
"""
58-
Tests rebalance on node join with version upgrade.
59-
"""
60-
return self.test_join(init_version, backups, cache_count, entry_count, entry_size, preloaders, thread_pool_size,
61-
batch_size, batches_prefetch_count, throttle, upgrade_version, force_upgrade)
62-
63-
def test_join(self, ignite_version, backups, cache_count, entry_count, entry_size, preloaders, thread_pool_size,
64-
batch_size, batches_prefetch_count, throttle, upgrade_version=None, force_upgrade=False):
65-
"""
66-
Tests rebalance on node join.
67-
"""
6846

6947
reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size,
7048
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
@@ -86,13 +64,6 @@ def test_join(self, ignite_version, backups, cache_count, entry_count, entry_siz
8664

8765
new_node = IgniteService(self.test_context, ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
8866
num_nodes=1, modules=reb_params.modules)
89-
90-
if upgrade_version is not None:
91-
if not force_upgrade:
92-
control_utility.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring)
93-
94-
new_node.config._replace(version=upgrade_version)
95-
9667
new_node.start()
9768

9869
control_utility.add_to_baseline(new_node.nodes)

0 commit comments

Comments
 (0)