|
14 | 14 | from memos.mem_scheduler.general_modules.base import BaseSchedulerModule |
15 | 15 | from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue |
16 | 16 | from memos.mem_scheduler.schemas.general_schemas import DIRECT_EXCHANGE_TYPE, FANOUT_EXCHANGE_TYPE |
17 | | -from memos.mem_scheduler.utils.misc_utils import is_cloud_env |
18 | 17 |
|
19 | 18 |
|
20 | 19 | logger = get_logger(__name__) |
@@ -132,6 +131,15 @@ def initialize_rabbitmq( |
132 | 131 | self.rabbitmq_exchange_type = self.rabbitmq_config.exchange_type |
133 | 132 | logger.info(f"Using configured exchange type: {self.rabbitmq_exchange_type}") |
134 | 133 |
|
| 134 | + env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") |
| 135 | + env_exchange_type = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_TYPE") |
| 136 | + if env_exchange_name: |
| 137 | + self.rabbitmq_exchange_name = env_exchange_name |
| 138 | + logger.info(f"Using env exchange name override: {self.rabbitmq_exchange_name}") |
| 139 | + if env_exchange_type: |
| 140 | + self.rabbitmq_exchange_type = env_exchange_type |
| 141 | + logger.info(f"Using env exchange type override: {self.rabbitmq_exchange_type}") |
| 142 | + |
135 | 143 | # Start connection process |
136 | 144 | parameters = self.get_rabbitmq_connection_param() |
137 | 145 | self.rabbitmq_connection = SelectConnection( |
@@ -313,15 +321,16 @@ def rabbitmq_publish_message(self, message: dict): |
313 | 321 | if label == "knowledgeBaseUpdate": |
314 | 322 | routing_key = "" |
315 | 323 |
|
316 | | - # Cloud environment override: applies to specific message types if MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set |
| 324 | + # Env override: apply to all message types when MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set |
317 | 325 | env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") |
318 | | - if is_cloud_env() and env_exchange_name and label in ["taskStatus", "knowledgeBaseUpdate"]: |
| 326 | + env_routing_key = os.getenv("MEMSCHEDULER_RABBITMQ_ROUTING_KEY") |
| 327 | + if env_exchange_name: |
319 | 328 | exchange_name = env_exchange_name |
320 | | - routing_key = "" # Routing key is always empty in cloud environment for these types |
321 | | - |
322 | | - # Specific diagnostic logging for messages affected by cloud environment settings |
| 329 | + routing_key = ( |
| 330 | + env_routing_key if env_routing_key is not None and env_routing_key != "" else "" |
| 331 | + ) |
323 | 332 | logger.info( |
324 | | - f"[DIAGNOSTIC] Publishing {label} message in Cloud Env. " |
| 333 | + f"[DIAGNOSTIC] Publishing {label} message with env exchange override. " |
325 | 334 | f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." |
326 | 335 | ) |
327 | 336 | logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") |
|
0 commit comments