Skip to content

Commit 83dad7c

Browse files
authored
Merge pull request #518 from Northeastern-Electric-Racing/498-endpoint-for-batch-rule-unsubscription
Endpoint for batch rule unsubscription
2 parents bef960d + f611c5b commit 83dad7c

4 files changed

Lines changed: 225 additions & 11 deletions

File tree

scylla-server/src/controllers/rule_controller.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
};
1818

1919
#[derive(Deserialize)]
20-
pub struct SubscribeRulesRequest {
20+
pub struct RuleSubscriptionRequest {
2121
rule_ids: Vec<String>,
2222
client_id: String,
2323
}
@@ -106,10 +106,34 @@ pub async fn edit_rule(
106106
.map_err(|e| ScyllaError::RuleError(e))
107107
}
108108

109+
#[debug_handler]
110+
pub async fn unsubscribe_rules(
111+
Extension(rules_manager): Extension<Arc<RuleManager>>,
112+
Json(request): Json<RuleSubscriptionRequest>,
113+
) -> Result<Json<String>, ScyllaError> {
114+
debug!(
115+
"Unsubscribing client {} from {} rules",
116+
request.client_id,
117+
request.rule_ids.len()
118+
);
119+
120+
let rule_ids: Vec<RuleId> = request.rule_ids.into_iter().map(RuleId).collect();
121+
122+
match rules_manager
123+
.unsubscribe_rules(ClientId(request.client_id), rule_ids)
124+
.await
125+
{
126+
Ok(_) => Ok(Json::from(
127+
"Successfully unsubscribed from rules".to_owned(),
128+
)),
129+
Err(err) => Err(ScyllaError::RuleError(err)),
130+
}
131+
}
132+
109133
#[debug_handler]
110134
pub async fn subscribe_rules(
111135
Extension(rules_manager): Extension<Arc<RuleManager>>,
112-
Json(request): Json<SubscribeRulesRequest>,
136+
Json(request): Json<RuleSubscriptionRequest>,
113137
) -> Result<Json<String>, ScyllaError> {
114138
debug!(
115139
"Subscribing client {} to {} rules",

scylla-server/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use scylla_server::{
2727
data_type_controller, file_insertion_controller,
2828
rule_controller::{
2929
add_rule, delete_rule, edit_rule, get_all_rules, get_all_rules_with_client_info,
30-
subscribe_rules,
30+
subscribe_rules, unsubscribe_rules,
3131
},
3232
run_controller, scylla_config_controller,
3333
video_streamer_controller::{self},
@@ -411,6 +411,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
411411
.route("/rules/delete/{rule_id}", post(delete_rule))
412412
.route("/rules", get(get_all_rules))
413413
.route("/rules/{client_id}", get(get_all_rules_with_client_info))
414+
.route("/rules/unsubscribe", post(unsubscribe_rules))
414415
.route("/rules/edit/{rule_id}", put(edit_rule))
415416
.route("/rules/subscribe", post(subscribe_rules))
416417
//.route("/rules/delete/{rule_id}", post()).route("/rules/poll")

scylla-server/src/rule_structs.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -548,21 +548,41 @@ impl RuleManager {
548548
self.subscriptions.read().await.lefts()
549549
}
550550

551+
/// Helper function to verify all rules exist
552+
async fn verify_rules_exist(&self, rule_ids: &[RuleId]) -> Result<(), RuleManagerError> {
553+
let rules_guard = self.rules.read().await;
554+
for rule_id in rule_ids {
555+
if !rules_guard.contains_key(rule_id) {
556+
return Err(RuleManagerError::NoMatchingRule);
557+
}
558+
}
559+
Ok(())
560+
}
561+
562+
/// Unsubscribe a client from multiple rules
563+
pub async fn unsubscribe_rules(
564+
&self,
565+
client_id: ClientId,
566+
rule_ids: Vec<RuleId>,
567+
) -> Result<(), RuleManagerError> {
568+
let mut subscriptions = self.subscriptions.write().await;
569+
570+
// Remove subscriptions (rules remain even if no subscribers)
571+
for rule_id in rule_ids {
572+
subscriptions.remove_right_from_left(&client_id, &rule_id);
573+
}
574+
575+
Ok(())
576+
}
577+
551578
/// Subscribe a client to multiple existing rules
552579
pub async fn subscribe_rules(
553580
&self,
554581
client_id: ClientId,
555582
rule_ids: Vec<RuleId>,
556583
) -> Result<(), RuleManagerError> {
557-
let rules_guard = self.rules.read().await;
558-
559584
// First, verify all rules exist
560-
for rule_id in &rule_ids {
561-
if !rules_guard.contains_key(rule_id) {
562-
return Err(RuleManagerError::NoMatchingRule);
563-
}
564-
}
565-
drop(rules_guard);
585+
self.verify_rules_exist(&rule_ids).await?;
566586

567587
// Now subscribe to all rules
568588
let mut subscriptions = self.subscriptions.write().await;

scylla-server/tests/rule_structs_test.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ async fn test_edit_rule_preserves_subscriptions_concurrent() -> Result<(), RuleM
626626
Ok(())
627627
}
628628

629+
#[tokio::test]
629630
async fn test_subscribe_rules_success() -> Result<(), RuleManagerError> {
630631
let rule_manager = RuleManager::new();
631632
let client1 = ClientId("client1".to_string());
@@ -684,6 +685,174 @@ async fn test_subscribe_rules_success() -> Result<(), RuleManagerError> {
684685
Ok(())
685686
}
686687

688+
#[tokio::test]
689+
async fn test_unsubscribe_rules_success() -> Result<(), RuleManagerError> {
690+
let rule_manager = RuleManager::new();
691+
let client1 = ClientId("client1".to_string());
692+
let client2 = ClientId("client2".to_string());
693+
694+
// Create rules via client1
695+
let rule1 = Rule::new(
696+
RuleId("rule_1".to_string()),
697+
Topic("topic/1".to_string()),
698+
core::time::Duration::from_secs(60),
699+
"a > 10".to_owned(),
700+
);
701+
702+
let rule2 = Rule::new(
703+
RuleId("rule_2".to_string()),
704+
Topic("topic/2".to_string()),
705+
core::time::Duration::from_secs(30),
706+
"b < 5".to_owned(),
707+
);
708+
709+
rule_manager.add_rule(client1.clone(), rule1).await?;
710+
rule_manager.add_rule(client1.clone(), rule2).await?;
711+
rule_manager
712+
.add_rule(
713+
client2.clone(),
714+
Rule::new(
715+
RuleId("rule_1".to_string()),
716+
Topic("topic/1".to_string()),
717+
core::time::Duration::from_secs(60),
718+
"a > 10".to_owned(),
719+
),
720+
)
721+
.await?;
722+
723+
// Verify initial state: 2 rules, 2 clients
724+
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
725+
assert_eq!(rule_manager.get_all_clients().await.len(), 2);
726+
727+
// Client1 unsubscribes from rule_1
728+
rule_manager
729+
.unsubscribe_rules(client1.clone(), vec![RuleId("rule_1".to_string())])
730+
.await?;
731+
732+
// Rule 1 should still exist (client2 subscribed), 2 clients remain
733+
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
734+
assert_eq!(rule_manager.get_all_clients().await.len(), 2);
735+
736+
Ok(())
737+
}
738+
739+
#[tokio::test]
740+
async fn test_unsubscribe_rules_keeps_orphaned() -> Result<(), RuleManagerError> {
741+
let rule_manager = RuleManager::new();
742+
let client = ClientId("test_client".to_string());
743+
744+
// Create rules
745+
let rule1 = Rule::new(
746+
RuleId("rule_1".to_string()),
747+
Topic("topic/1".to_string()),
748+
core::time::Duration::from_secs(60),
749+
"a > 10".to_owned(),
750+
);
751+
752+
let rule2 = Rule::new(
753+
RuleId("rule_2".to_string()),
754+
Topic("topic/2".to_string()),
755+
core::time::Duration::from_secs(30),
756+
"b < 5".to_owned(),
757+
);
758+
759+
rule_manager.add_rule(client.clone(), rule1).await?;
760+
rule_manager.add_rule(client.clone(), rule2).await?;
761+
762+
// Verify initial state
763+
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
764+
assert_eq!(rule_manager.get_all_clients().await.len(), 1);
765+
766+
// Unsubscribe from both rules
767+
rule_manager
768+
.unsubscribe_rules(
769+
client.clone(),
770+
vec![RuleId("rule_1".to_string()), RuleId("rule_2".to_string())],
771+
)
772+
.await?;
773+
774+
// Rules should still exist (not deleted), client removed
775+
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
776+
assert_eq!(rule_manager.get_all_clients().await.len(), 0);
777+
778+
Ok(())
779+
}
780+
781+
#[tokio::test]
782+
async fn test_unsubscribe_rules_nonexistent() -> Result<(), RuleManagerError> {
783+
let rule_manager = RuleManager::new();
784+
let client = ClientId("test_client".to_string());
785+
786+
// Try to unsubscribe from rules that don't exist - should succeed (no-op)
787+
rule_manager
788+
.unsubscribe_rules(
789+
client,
790+
vec![
791+
RuleId("nonexistent_1".to_string()),
792+
RuleId("nonexistent_2".to_string()),
793+
],
794+
)
795+
.await?;
796+
797+
Ok(())
798+
}
799+
800+
#[tokio::test]
801+
async fn test_unsubscribe_rules_empty_list() -> Result<(), RuleManagerError> {
802+
let rule_manager = RuleManager::new();
803+
let client = ClientId("test_client".to_string());
804+
805+
// Unsubscribe from empty list - should succeed
806+
rule_manager.unsubscribe_rules(client, vec![]).await?;
807+
808+
Ok(())
809+
}
810+
811+
#[tokio::test]
812+
async fn test_orphaned_rule_resubscription() -> Result<(), RuleManagerError> {
813+
let rule_manager = RuleManager::new();
814+
let client1 = ClientId("client1".to_string());
815+
let client2 = ClientId("client2".to_string());
816+
817+
// Client1 creates a rule
818+
let rule = Rule::new(
819+
RuleId("rule_1".to_string()),
820+
Topic("topic/1".to_string()),
821+
core::time::Duration::from_secs(60),
822+
"a > 10".to_owned(),
823+
);
824+
825+
rule_manager.add_rule(client1.clone(), rule).await?;
826+
827+
// Verify initial state
828+
assert_eq!(rule_manager.get_all_rules().await.len(), 1);
829+
assert_eq!(rule_manager.get_all_clients().await.len(), 1);
830+
831+
// Client1 unsubscribes - rule becomes orphaned but still exists
832+
rule_manager
833+
.unsubscribe_rules(client1.clone(), vec![RuleId("rule_1".to_string())])
834+
.await?;
835+
836+
assert_eq!(rule_manager.get_all_rules().await.len(), 1); // Rule still exists
837+
assert_eq!(rule_manager.get_all_clients().await.len(), 0); // No clients
838+
839+
// Client2 subscribes to the orphaned rule (re-adding it)
840+
let rule_reuse = Rule::new(
841+
RuleId("rule_1".to_string()),
842+
Topic("topic/1".to_string()),
843+
core::time::Duration::from_secs(60),
844+
"a > 10".to_owned(),
845+
);
846+
847+
rule_manager.add_rule(client2.clone(), rule_reuse).await?;
848+
849+
// Verify rule is now subscribed to by client2
850+
assert_eq!(rule_manager.get_all_rules().await.len(), 1);
851+
assert_eq!(rule_manager.get_all_clients().await.len(), 1);
852+
853+
Ok(())
854+
}
855+
687856
#[tokio::test]
688857
async fn test_subscribe_rules_nonexistent_rule() -> Result<(), RuleManagerError> {
689858
let rule_manager = RuleManager::new();

0 commit comments

Comments
 (0)