diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml deleted file mode 100644 index cb2fb408..00000000 --- a/.idea/csv-editor.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/CODE/Logicytics.py b/CODE/Logicytics.py index e97b2b8c..850f09dc 100644 --- a/CODE/Logicytics.py +++ b/CODE/Logicytics.py @@ -1,37 +1,258 @@ from __future__ import annotations +import gc import os import shutil import subprocess +import sys import zipfile from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime -from typing import Any +import psutil from prettytable import PrettyTable -from logicytics import Log, Execute, Check, Get, FileManagement, Flag, DEBUG, DELETE_LOGS +from logicytics import Log, Execute, Check, Get, FileManagement, Flag, DEBUG, DELETE_LOGS, CONFIG # Initialization -FileManagement.mkdir() log = Log({"log_level": DEBUG, "delete_log": DELETE_LOGS}) -ACTION = None -SUB_ACTION = None +ACTION, SUB_ACTION = None, None +MAX_WORKERS = CONFIG.getint("Settings", "max_workers", fallback=min(32, (os.cpu_count() or 1) + 4)) +log.debug(f"MAX_WORKERS: {MAX_WORKERS}") -class Health: +class ExecuteScript: + def __init__(self): + self.execution_list = self.__generate_execution_list() + @staticmethod - def backup(directory: str, name: str): + def __safe_remove(file_name: str, file_list: list[str] | set[str]) -> list[str]: + file_set = set(file_list) + if file_name in file_set: + file_set.remove(file_name) + else: + log.critical(f"The file {file_name} should exist in this directory - But was not found!") + return list(file_set) + + @staticmethod + def __safe_append(file_name: str, file_list: list[str] | set[str]) -> list[str]: + file_set = set(file_list) + if os.path.exists(file_name): + file_set.add(file_name) + else: + log.critical(f"Missing required file: {file_name}") + return list(file_set) + + def __generate_execution_list(self) -> list[str]: """ - Creates a backup of a specified directory by zipping its contents and moving it to a designated backup location. + Generate an execution list of scripts based on the specified action. - Args: - directory (str): The path to the directory to be backed up. - name (str): The name of the backup file. + This function dynamically creates a list of scripts to be executed by filtering and selecting + scripts based on the global ACTION variable. It supports different execution modes: + - 'minimal': A predefined set of lightweight scripts + - 'nopy': PowerShell and script-based scripts without Python + - 'modded': Includes scripts from the MODS directory + - 'depth': Comprehensive script execution with data mining and logging scripts + - 'vulnscan_ai': Vulnerability scanning script only Returns: - None + list[str]: A list of script file paths to be executed, filtered and modified based on the current action. + + Raises: + ValueError: Implicitly if a script file cannot be removed from the initial list. + + Notes: + - Removes sensitive or unnecessary scripts from the initial file list + - Logs the final execution list for debugging purposes + - Warns users about potential long execution times for certain actions + """ + execution_list = Get.list_of_files(".", only_extensions=(".py", ".exe", ".ps1", ".bat"), + exclude_files=["Logicytics.py"]) + files_to_remove = { + "sensitive_data_miner.py", + "dir_list.py", + "tree.ps1", + "vulnscan.py", + "event_log.py", + } + execution_list = [file for file in execution_list if file not in files_to_remove] + + if ACTION == "minimal": + execution_list = [ + "cmd_commands.py", + "registry.py", + "tasklist.py", + "wmic.py", + "netadapter.ps1", + "property_scraper.ps1", + "window_feature_miner.ps1", + "event_log.py", + ] + + elif ACTION == "nopy": + execution_list = [ + "browser_miner.ps1", + "netadapter.ps1", + "property_scraper.ps1", + "window_feature_miner.ps1", + "tree.ps1" + ] + + elif ACTION == "modded": + # Add all files in MODS to execution list + execution_list = Get.list_of_files("../MODS", only_extensions=(".py", ".exe", ".ps1", ".bat"), + append_file_list=execution_list, exclude_files=["Logicytics.py"]) + + elif ACTION == "depth": + log.warning( + "This flag will use clunky and huge scripts, and so may take a long time, but reap great rewards.") + files_to_append = { + "sensitive_data_miner.py", + "dir_list.py", + "tree.ps1", + "event_log.py", + } + for file in files_to_append: + execution_list = self.__safe_append(file, execution_list) + log.warning("This flag will use threading!") + + elif ACTION == "vulnscan_ai": + # Only vulnscan detector + if os.path.exists("vulnscan.py"): + execution_list = ["vulnscan.py"] + else: + log.critical("Vulnscan is missing...") + exit(1) + + if len(execution_list) == 0: + log.critical("Nothing is in the execution list.. This is due to faulty code or corrupted Logicytics files!") + exit(1) + + log.debug(f"The following will be executed: {execution_list}") + return execution_list + + @staticmethod + def __script_handler(script: str) -> tuple[str, Exception | None]: + """ + Executes a single script and logs the result, capturing any exceptions that occur during execution. + + Parameters: + script (str): The path to the script to be executed + """ + log.debug(f"Executing {script}") + try: + log.execution(Execute.script(script)) + log.info(f"{script} executed successfully") + return script, None + except Exception as err: + log.error(f"Error executing {script}: {err}") + return script, err + + def handler(self): + """Executes the scripts in the execution list based on the action.""" + log.info("Starting Logicytics...") + + if ACTION == "threaded" or ACTION == "depth": + self.__threaded() + elif ACTION == "performance_check": + self.__performance() + else: + self.__default() + + def __threaded(self): + """Executes scripts in parallel using threading.""" + log.debug("Using threading") + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = {executor.submit(self.__script_handler, script): script + for script in self.execution_list} + + for future in as_completed(futures): + script = futures[future] + try: + result, error = future.result() + if error: + log.error(f"Failed to execute {script}: {error}") + else: + log.debug(f"Completed {script}") + except Exception as e: + log.error(f"Thread crashed while executing {script}: {e}") + + def __default(self): + """Executes scripts sequentially.""" + try: + for script in self.execution_list: + result, error = self.__script_handler(script) + if error: + log.error(f"Failed to execute {script}") + else: + log.debug(f"Completed {script}") + except UnicodeDecodeError as e: + log.error(f"Error in script execution (Unicode): {e}") + except Exception as e: + log.error(f"Error in script execution: {e}") + + def __performance(self): + """Checks performance of each script.""" + if DEBUG.lower() != "debug": + log.warning("Advised to turn on DEBUG logging!!") + + execution_times = [] + memory_usage = [] + process = psutil.Process(os.getpid()) + + for file in range(len(self.execution_list)): + gc.collect() + start_time = datetime.now() + start_memory = process.memory_full_info().uss / 1024 / 1024 # MB + log.execution(Execute.script(self.execution_list[file])) + end_time = datetime.now() + end_memory = process.memory_full_info().uss / 1024 / 1024 # MB + elapsed_time = end_time - start_time + memory_delta = end_memory - start_memory + memory_usage.append((self.execution_list[file], str(memory_delta))) + execution_times.append((self.execution_list[file], elapsed_time)) + log.info(f"{self.execution_list[file]} executed in {elapsed_time}") + log.info(f"{self.execution_list[file]} used {memory_delta:.2f}MB of memory") + log.debug(f"Started with {start_memory}MB of memory and ended with {end_memory}MB of memory") + + table = PrettyTable() + table.field_names = ["Script", "Execution Time", "Memory Usage (MB)"] + for script, elapsed_time in execution_times: + memory = next(m[1] for m in memory_usage if m[0] == script) + table.add_row([script, elapsed_time, f"{memory:.2f}"]) + + try: + with open( + f"../ACCESS/LOGS/PERFORMANCE/Performance_Summary_" + f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt", "w" + ) as f: + f.write(table.get_string()) + f.write( + "\nSome values may be negative, Reason may be due to external resources playing with memory usage, " + "close background tasks to get more accurate readings") + f.write("Note: This is not a low-level memory logger, data here isn't 100% accurate!") + log.info("Performance check complete! Performance log found in ACCESS/LOGS/PERFORMANCE") + except Exception as e: + log.error(f"Error writing performance log: {e}") + + +class SpecialAction: + @staticmethod + def backup(directory: str, name: str): """ + Creates a backup of a specified directory by zipping its contents and moving it to a designated backup location. + + Args: + directory (str): The path to the directory to be backed up. + name (str): The name of the backup file. + + Returns: + None + """ + if not os.path.exists(directory): + log.critical(f"Directory {directory} does not exist!") + return + # Check if backup exists, delete it if so if os.path.exists(f"../ACCESS/BACKUP/{name}.zip"): os.remove(f"../ACCESS/BACKUP/{name}.zip") @@ -49,14 +270,14 @@ def backup(directory: str, name: str): @staticmethod def update() -> tuple[str, str]: """ - Updates the repository by pulling the latest changes from the remote repository. + Updates the repository by pulling the latest changes from the remote repository. - This function navigates to the parent directory, pulls the latest changes using Git, - and then returns to the current working directory. + This function navigates to the parent directory, pulls the latest changes using Git, + and then returns to the current working directory. - Returns: - str: The output from the git pull command. - """ + Returns: + str: The output from the git pull command. + """ # Check if git command is available if subprocess.run(["git", "--version"], capture_output=True).returncode != 0: return "Git is not installed or not available in the PATH.", "error" @@ -72,6 +293,32 @@ def update() -> tuple[str, str]: os.chdir(current_dir) return output, "info" + @staticmethod + def execute_new_window(file_path: str): + """ + Execute a Python script in a new command prompt window. + + This function launches the specified Python script in a separate command prompt window, waits for its completion, and then exits the current process. + + Parameters: + file_path (str): The relative path to the Python script to be executed, + which will be resolved relative to the current script's directory. + + Side Effects: + - Opens a new command prompt window + - Runs the specified Python script + - Terminates the current process after script execution + + Raises: + FileNotFoundError: If the specified script path does not exist + subprocess.SubprocessError: If there are issues launching the subprocess + """ + sr_current_dir = os.path.dirname(os.path.abspath(__file__)) + sr_script_path = os.path.join(sr_current_dir, file_path) + sr_process = subprocess.Popen(["cmd.exe", "/c", "start", sys.executable, sr_script_path]) + sr_process.wait() + exit(0) + def get_flags(): """ @@ -93,32 +340,6 @@ def get_flags(): log.debug(f"Sub-Action: {SUB_ACTION}") -def special_execute(file_path: str): - """ - Execute a Python script in a new command prompt window. - - This function launches the specified Python script in a separate command prompt window, waits for its completion, and then exits the current process. - - Parameters: - file_path (str): The relative path to the Python script to be executed, - which will be resolved relative to the current script's directory. - - Side Effects: - - Opens a new command prompt window - - Runs the specified Python script - - Terminates the current process after script execution - - Raises: - FileNotFoundError: If the specified script path does not exist - subprocess.SubprocessError: If there are issues launching the subprocess - """ - sr_current_dir = os.path.dirname(os.path.abspath(__file__)) - sr_script_path = os.path.join(sr_current_dir, file_path) - sr_process = subprocess.Popen(["cmd.exe", "/c", "start", "python", sr_script_path]) - sr_process.wait() - exit(0) - - def handle_special_actions(): """ Handles special actions based on the current action flag. @@ -142,7 +363,7 @@ def handle_special_actions(): # Special actions -> Quit if ACTION == "debug": log.info("Opening debug menu...") - special_execute("_debug.py") + SpecialAction.execute_new_window("_debug.py") messages = Check.sys_internal_zip() if messages: @@ -151,11 +372,11 @@ def handle_special_actions(): if ACTION == "dev": log.info("Opening developer menu...") - special_execute("_dev.py") + SpecialAction.execute_new_window("_dev.py") if ACTION == "update": log.info("Updating...") - message, log_type = Health.update() + message, log_type = SpecialAction.update() log.string(message, log_type) if log_type == "info": log.info("Update complete!") @@ -175,9 +396,9 @@ def handle_special_actions(): if ACTION == "backup": log.info("Backing up...") - Health.backup(".", "Default_Backup") + SpecialAction.backup(".", "Default_Backup") log.debug("Backup complete -> CODE dir") - Health.backup("../MODS", "Mods_Backup") + SpecialAction.backup("../MODS", "Mods_Backup") log.debug("Backup complete -> MODS dir") log.info("Backup complete!") input("Press Enter to exit...") @@ -213,154 +434,6 @@ def check_privileges(): log.warning("UAC is enabled, this may cause issues - Please disable UAC if possible") -def generate_execution_list() -> list | list[str] | list[str | Any]: - """ - Generate an execution list of scripts based on the specified action. - - This function dynamically creates a list of scripts to be executed by filtering and selecting - scripts based on the global ACTION variable. It supports different execution modes: - - 'minimal': A predefined set of lightweight scripts - - 'nopy': PowerShell and script-based scripts without Python - - 'modded': Includes scripts from the MODS directory - - 'depth': Comprehensive script execution with data mining and logging scripts - - 'vulnscan_ai': Vulnerability scanning script only - - Returns: - list[str]: A list of script file paths to be executed, filtered and modified based on the current action. - - Raises: - ValueError: Implicitly if a script file cannot be removed from the initial list. - - Notes: - - Removes sensitive or unnecessary scripts from the initial file list - - Logs the final execution list for debugging purposes - - Warns users about potential long execution times for certain actions - """ - execution_list = Get.list_of_files(".", extensions=(".py", ".exe", ".ps1", ".bat")) - execution_list.remove("sensitive_data_miner.py") - execution_list.remove("dir_list.py") - execution_list.remove("tree.ps1") - execution_list.remove("vulnscan.py") - execution_list.remove("event_log.py") - - if ACTION == "minimal": - execution_list = [ - "cmd_commands.py", - "registry.py", - "tasklist.py", - "wmic.py", - "netadapter.ps1", - "property_scraper.ps1", - "window_feature_miner.ps1", - "event_log.py", - ] - - if ACTION == "nopy": - execution_list = [ - "browser_miner.ps1", - "netadapter.ps1", - "property_scraper.ps1", - "window_feature_miner.ps1", - "tree.ps1" - ] - - if ACTION == "modded": - # Add all files in MODS to execution list - execution_list = Get.list_of_files("../MODS", - extensions=(".py", ".exe", ".ps1", ".bat"), - append_file_list=execution_list) - - if ACTION == "depth": - log.warning("This flag will use clunky and huge scripts, and so may take a long time, but reap great rewards.") - execution_list.append("sensitive_data_miner.py") - execution_list.append("dir_list.py") - execution_list.append("tree.ps1") - execution_list.append("event_log.py") - log.warning("This flag will use threading!") - - if ACTION == "vulnscan_ai": - # Only vulnscan detector - execution_list = ["vulnscan.py"] - - log.debug(f"The following will be executed: {execution_list}") - return execution_list - - -def execute_scripts(): - """Executes the scripts in the execution list based on the action.""" - # Check weather to use threading or not, as well as execute code - log.info("Starting Logicytics...") - - if ACTION == "threaded" or ACTION == "depth": - - def execute_single_script(script: str) -> tuple[str, Exception | None]: - """ - Executes a single script and logs the result. - - This function executes a single script and logs the result, - capturing any exceptions that occur during execution - - Parameters: - script (str): The path to the script to be executed - """ - log.debug(f"Executing {script}") - try: - log.parse_execution(Execute.script(script)) - log.info(f"{script} executed") - return script, None - except Exception as err: - log.error(f"Error executing {script}: {err}") - return script, err - - log.debug("Using threading") - execution_list = generate_execution_list() - with ThreadPoolExecutor() as executor: - futures = {executor.submit(execute_single_script, script): script - for script in execution_list} - - for future in as_completed(futures): - script = futures[future] - result, error = future.result() - if error: - log.error(f"Failed to execute {script}") - else: - log.debug(f"Completed {script}") - - elif ACTION == "performance_check": - execution_times = [] - execution_list = generate_execution_list() - for file in range(len(execution_list)): - start_time = datetime.now() - log.parse_execution(Execute.script(execution_list[file])) - end_time = datetime.now() - elapsed_time = end_time - start_time - execution_times.append((file, elapsed_time)) - log.info(f"{execution_list[file]} executed in {elapsed_time}") - - table = PrettyTable() - table.field_names = ["Script", "Execution Time"] - for file, elapsed_time in execution_times: - table.add_row([file, elapsed_time]) - - with open( - f"../ACCESS/LOGS/PERFORMANCE/Performance_Summary_" - f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt", "w" - ) as f: - f.write(table.get_string()) - - log.info("Performance check complete! Performance log found in ACCESS/LOGS/PERFORMANCE") - else: - try: - execution_list = generate_execution_list() - for script in execution_list: # Loop through List - log.parse_execution(Execute.script(script)) - log.info(f"{script} executed") - except UnicodeDecodeError as e: - log.error(f"Error in code: {e}") - except Exception as e: - log.error(f"Error in code: {e}") - - def zip_generated_files(): """Zips generated files based on the action.""" @@ -426,7 +499,7 @@ def Logicytics(): # Check for privileges and errors check_privileges() # Execute scripts - execute_scripts() + ExecuteScript().handler() # Zip generated files zip_generated_files() # Finish with sub actions @@ -436,7 +509,13 @@ def Logicytics(): if __name__ == "__main__": - Logicytics() + try: + Logicytics() + except KeyboardInterrupt: + log.warning("⚠️ Force shutdown detected! Some temporary files might be left behind.") + log.warning("💡 Pro tip: Next time, let the program finish naturally.") + # TODO v3.4.2 -> Cleanup function + exit(0) else: log.error("This script cannot be imported!") exit(1) diff --git a/CODE/VulnScan/tools/_test_gpu_acceleration.py b/CODE/VulnScan/tools/_test_gpu_acceleration.py index 86397e70..3b6b8b1c 100644 --- a/CODE/VulnScan/tools/_test_gpu_acceleration.py +++ b/CODE/VulnScan/tools/_test_gpu_acceleration.py @@ -6,7 +6,7 @@ exit(1) -def check_gpu(): +def check_gpu() -> str: """Check if CUDA is available and print the device information. This function attempts to detect CUDA capability and prints whether @@ -14,15 +14,12 @@ def check_gpu(): """ try: if torch.cuda.is_available(): - print(f"CUDA is available. Using GPU: {torch.cuda.get_device_name(0)}") + return f"CUDA is available. Using GPU: {torch.cuda.get_device_name(0)}" else: - print("CUDA is not available. Using CPU.") + return "CUDA is not available. Using CPU." except RuntimeError as err: - print(f"Error initializing CUDA: {err}") + return f"Error initializing CUDA: {err}" if __name__ == '__main__': - check_gpu() -else: - raise ImportError("This training script is meant to be run directly " - "and cannot be imported. Please execute it as a standalone script.") + print(check_gpu()) diff --git a/CODE/VulnScan/v2-deprecated/_generate_data.py b/CODE/VulnScan/v2-deprecated/_generate_data.py deleted file mode 100644 index 778c1c26..00000000 --- a/CODE/VulnScan/v2-deprecated/_generate_data.py +++ /dev/null @@ -1,110 +0,0 @@ -import os -import random -from logicytics import deprecated -from faker import Faker - -MAX_FILE_SIZE: int = 10 * 1024 # Example: Max file size is 10 KB -SAVE_DIRECTORY: str = "PATH" -# Initialize the Faker instance -fake = Faker() - - -@deprecated(reason="This function is only used for generating sensitive data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") -def create_sensitive_file(file_path: str, max_size: int): - """ - Generate a sensitive file with real sensitive information. - - Args: - file_path (str): The path where the file will be saved. - max_size (int): The maximum size of the file in bytes. - """ - content = "" - # Generate sensitive data using Faker - content += f"Name: {fake.name()}\n" - content += f"Address: {fake.address()}\n" - content += f"Phone: {fake.phone_number()}\n" - content += f"Email: {fake.email()}\n" - content += f"Credit Card: {fake.credit_card_number()}\n" - content += f"SSN: {fake.ssn()}\n" - content += f"Company: {fake.company()}\n" - - # Keep adding more sensitive data until the file size is less than the max limit - while len(content.encode('utf-8')) < max_size: - content += f"Sensitive Info: {fake.text(max_nb_chars=200)}\n" - - with open(file_path, "w", encoding="utf-8") as f: - f.write(content) - - -@deprecated(reason="This function is only used for generating normal data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") -def create_normal_file(file_path: str, max_size: int): - """ - Generate a normal file with non-sensitive data. - - Args: - file_path (str): The path where the file will be saved. - max_size (int): The maximum size of the file in bytes. - """ - content = "" - # Add random text - while len(content.encode('utf-8')) < max_size: - content += fake.text(max_nb_chars=200) + "\n" - - with open(file_path, "w", encoding="utf-8") as f: - f.write(content) - - -@deprecated(reason="This function is only used for generating mixed data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") -def create_mix_file(file_path: str, max_size: int): - """ - Generate a mix file with both normal and sensitive data. - - Args: - file_path (str): The path where the file will be saved. - max_size (int): The maximum size of the file in bytes. - """ - content = "" - # Add a mix of normal and sensitive data - while len(content.encode('utf-8')) < max_size: - if random.choice([True, False]): - content += fake.text(max_nb_chars=200) + "\n" # Normal data - else: - content += f"Name: {fake.name()}\n" - content += f"Credit Card: {fake.credit_card_number()}\n" - content += f"SSN: {fake.ssn()}\n" # Sensitive data - - with open(file_path, "w", encoding="utf-8") as f: - f.write(content) - - -@deprecated(reason="This function is only used for generating random files for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") -def create_random_files(directories: str, num_file: int = 100): - """ - Create random files (Normal, Mix, Sensitive). - - Args: - directories (str): The directory where the files will be saved. - num_file (int): The number of files to generate. - """ - os.makedirs(directories, exist_ok=True) - - for i in range(num_file): - file_type = random.choice(['Normal', 'Mix', 'Sensitive']) - file_name = f"file_{i + 1}_{file_type}.txt" - file_path = os.path.join(directories, file_name) - - if file_type == "Sensitive": - create_sensitive_file(file_path, MAX_FILE_SIZE) - elif file_type == "Mix": - create_mix_file(file_path, MAX_FILE_SIZE) - else: - create_normal_file(file_path, MAX_FILE_SIZE) - - print(f"Created {file_type} file: {file_name}") - - -if __name__ == "__main__": - create_random_files(SAVE_DIRECTORY, num_file=1000000) -else: - raise ImportError("This training script is meant to be run directly " - "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/v3/_train.py b/CODE/VulnScan/v3/_train.py index f9bfb2a4..ffd645bc 100644 --- a/CODE/VulnScan/v3/_train.py +++ b/CODE/VulnScan/v3/_train.py @@ -19,11 +19,11 @@ from sklearn.tree import DecisionTreeClassifier from torch.utils.data import Dataset, DataLoader -# NN seems to be the best choice for this task - # Set up logging from logicytics import Log, DEBUG +# NN seems to be the best choice for this task + logger = Log( {"log_level": DEBUG, "filename": "../../../ACCESS/LOGS/VulnScan_Train.log", @@ -31,6 +31,7 @@ "%(log_color)s%(levelname)-8s%(reset)s %(yellow)s%(asctime)s %(blue)s%(message)s", } ) +vectorizer = None # Dataset Class for PyTorch models @@ -361,7 +362,8 @@ def validate_data(): logger.error("CUDA must be a boolean") exit(1) - allowed_models = ["NeuralNetwork", "LogReg", "RandomForest", "ExtraTrees", "GBM", "XGBoost", "DecisionTree", "NaiveBayes"] + allowed_models = ["NeuralNetwork", "LogReg", "RandomForest", "ExtraTrees", "GBM", "XGBoost", "DecisionTree", + "NaiveBayes"] if MODEL_NAME not in allowed_models: logger.error(f"MODEL_NAME must be one of: {', '.join(allowed_models)}") exit(1) diff --git a/CODE/_debug.py b/CODE/_debug.py index 7ca58dcc..044a7c12 100644 --- a/CODE/_debug.py +++ b/CODE/_debug.py @@ -4,32 +4,22 @@ import os import platform import sys +import time import psutil import requests from logicytics import Log, DEBUG, VERSION, Check -if __name__ == "__main__": - log_debug = Log( - {"log_level": DEBUG, - "filename": "../ACCESS/LOGS/DEBUG/DEBUG.log", - "truncate_message": False} - ) +log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ACCESS\\LOGS\\DEBUG\\DEBUG.log") +log = Log({"log_level": DEBUG, "filename": log_path, "truncate_message": False, "delete_log": True}) -class HealthCheck: +class VersionManager: @staticmethod - def __version_tuple(version: str) -> tuple[int, int, int | str, str]: + def parse_version(version: str) -> tuple[int, int, int | str, str]: """ - Parses a version string into a tuple. - - Args: - version (str): The version string to parse. - - Returns: - tuple[int, int, int, str]: A tuple containing the major, minor, and patch versions, - and a string indicating whether it is a snapshot or release version. + Parses a version string into a tuple (major, minor, patch, type). """ try: if version.startswith("snapshot-"): @@ -39,261 +29,208 @@ def __version_tuple(version: str) -> tuple[int, int, int | str, str]: return major, minor, patch, "snapshot" else: return tuple(map(int, version.split('.'))) + ("release",) - except Exception as err: - log_debug.error(f"Failed to parse version: {err}") + except Exception as e: + log.error(f"Failed to parse version: {e}") return 0, 0, 0, "error" + +class FileManager: @staticmethod - def files(directory: str, required_files: list[str]): + def check_required_files(directory: str, required_files: list[str]): """ Checks if all required files are present in the directory and its subdirectories. - - Args: - directory (str): Path to the directory to check. - required_files (list[str]): List of required file names with relative paths. """ try: - log_debug.debug(f"Checking directory: {directory}") + log.debug(f"Checking directory: {directory}") if not os.path.exists(directory): - log_debug.error(f"Directory {directory} does not exist.") + log.error(f"Directory {directory} does not exist.") + return - # Gather all files with relative paths actual_files = [] for root, _, files in os.walk(directory): for file in files: relative_path = os.path.relpath(os.path.join(root, file), start=directory) - actual_files.append( - relative_path.replace("\\", "/").replace('"', '')) # Normalize paths for comparison - - log_debug.debug(f"Actual files found: {actual_files}") + actual_files.append(relative_path.replace("\\", "/").replace('"', '')) # Normalize paths - # Track missing and extra files - missing_files = [] - extra_files = [] + log.debug(f"Actual files found: {actual_files}") - # Normalize required files - normalized_required_files = [required_file.strip().replace("\\", "/").replace('"', '') for required_file in - required_files] + # Strip quotes and normalize paths for comparison + normalized_required_files = [ + required_file.strip().replace("\\", "/").replace('"', '') # Remove quotes and normalize paths + for required_file in required_files + ] - # Check for missing files - for required_file in normalized_required_files: - if required_file not in actual_files: - missing_files.append(required_file) - - log_debug.debug(f"Missing files: {missing_files}") - - # Check for extra files - for actual_file in actual_files: - if actual_file not in normalized_required_files: - extra_files.append(actual_file) - - log_debug.debug(f"Extra files: {extra_files}") + # Compare files + missing_files, extra_files = FileManager.compare_files(actual_files, normalized_required_files) if missing_files: - log_debug.error(f"Missing files: {', '.join(missing_files)}") + log.error(f"Missing files: {', '.join(missing_files)}") if extra_files: - log_debug.warning(f"Extra files found: {', '.join(extra_files)}") - log_debug.info("All required files are present.") - + log.warning(f"Extra files found: {', '.join(extra_files)}") + if not missing_files and not extra_files: + log.info("All required files are present.") except Exception as e: - log_debug.error(f"Unexpected error during file check: {e}") + log.error(f"Unexpected error during file check: {e}") - @classmethod - def versions(cls, local_version: str, remote_version: str): + @staticmethod + def compare_files(actual_files: list[str], required_files: list[str]) -> tuple[list[str], list[str]]: """ - Compares local and remote versions. - - Args: - local_version (str): Local version. - remote_version (str): Remote version. + Compares actual and required files, returning missing and extra files. """ + missing_files = [file for file in required_files if file not in actual_files] + extra_files = [file for file in actual_files if file not in required_files] + return missing_files, extra_files - local_version_tuple = cls.__version_tuple(local_version) - remote_version_tuple = cls.__version_tuple(remote_version) - - if "error" in local_version_tuple or "error" in remote_version_tuple: - log_debug.error("Version parsing error.") - return - - try: - if "snapshot" in local_version_tuple or "snapshot" in remote_version_tuple: - log_debug.warning("Snapshot versions are unstable.") - - if local_version_tuple == remote_version_tuple: - log_debug.info(f"Version is up to date. Your Version: {local_version}") - elif local_version_tuple > remote_version_tuple: - log_debug.warning("Version is ahead of the repository. " - f"Your Version: {local_version}, " - f"Repository Version: {remote_version}") - else: - log_debug.error("Version is behind the repository. " - f"Your Version: {local_version}, Repository Version: {remote_version}") - except Exception as e: - log_debug.error(f"Version comparison error: {e}") - -class DebugCheck: +class SysInternalManager: @staticmethod - def sys_internal_binaries(path: str): + def check_binaries(path: str): """ Checks the SysInternal Binaries in the given directory. - - Args: - path (str): Directory path. """ try: if not os.path.exists(path): raise FileNotFoundError("Directory does not exist") contents = os.listdir(path) - log_debug.debug(str(contents)) + log.debug(str(contents)) has_zip = any(file.endswith(".zip") for file in contents) has_exe = any(file.endswith(".exe") for file in contents) if any(file.endswith(".ignore") for file in contents): - log_debug.warning("A `.sys.ignore` file was found - Ignoring") + log.warning("A `.sys.ignore` file was found - Ignoring") elif has_zip and not has_exe: - log_debug.error("Only zip files - Missing EXEs due to no `ignore` file") + log.error("Only zip files - Missing EXEs due to no `ignore` file") elif has_zip and has_exe: - log_debug.info("Both zip and exe files - All good") + log.info("Both zip and exe files - All good") else: - log_debug.error("SysInternal Binaries Not Found: Missing Files - Corruption detected") + log.error("SysInternal Binaries Not Found: Missing Files - Corruption detected") except Exception as e: - log_debug.error(f"Unexpected error: {e}") + log.error(f"Unexpected error: {e}") + +class SystemInfoManager: @staticmethod def cpu_info() -> tuple[str, str, str]: """ Retrieves CPU details. - - Returns: - tuple[str, str, str]: Architecture, vendor ID, and model. """ return ( f"CPU Architecture: {platform.machine()}", f"CPU Vendor ID: {platform.system()}", - f"CPU Model: {platform.release()} {platform.version()}", + f"CPU Model: {platform.release()} {platform.version()}" ) + @staticmethod + def python_version(): + """ + Checks the current Python version against recommended version ranges and logs the result. + """ + version = sys.version.split()[0] + MIN_VERSION = (3, 11) + MAX_VERSION = (3, 13) + try: + major, minor = map(int, version.split(".")[:2]) + if MIN_VERSION <= (major, minor) < MAX_VERSION: + log.info(f"Python Version: {version} - Perfect") + elif (major, minor) < MIN_VERSION: + log.warning(f"Python Version: {version} - Recommended: 3.11.x") + else: + log.error(f"Python Version: {version} - Incompatible") + except Exception as e: + log.error(f"Failed to parse Python Version: {e}") + -def python_version(): - """ - Checks the current Python version against recommended version ranges and logs the result. - - This function determines the compatibility of the current Python runtime by comparing its version - against predefined minimum and maximum version thresholds. It provides informative logging about - the Python version status. - - Logs: - - Info: When Python version is within the recommended range (3.11.x to 3.12.x) - - Warning: When Python version is below the minimum recommended version (< 3.11) - - Error: When Python version is above the maximum supported version (>= 3.13) or parsing fails - - Raises: - No explicit exceptions are raised; errors are logged internally - - Example: - Typical log outputs might include: - - "Python Version: 3.11.5 - Perfect" - - "Python Version: 3.10.2 - Recommended: 3.11.x" - - "Python Version: 3.13.0 - Incompatible" - """ - version = sys.version.split()[0] - MIN_VERSION = (3, 11) - MAX_VERSION = (3, 13) - try: - major, minor = map(int, version.split(".")[:2]) - if MIN_VERSION <= (major, minor) < MAX_VERSION: - log_debug.info(f"Python Version: {version} - Perfect") - elif (major, minor) < MIN_VERSION: - log_debug.warning(f"Python Version: {version} - Recommended: 3.11.x") - else: - log_debug.error(f"Python Version: {version} - Incompatible") - except Exception as e: - log_debug.error(f"Failed to parse Python Version: {e}") - - -def get_online_config() -> dict | None: - """ +class ConfigManager: + @staticmethod + def get_online_config() -> dict | None: + """ Retrieves configuration data from a remote repository. + """ + try: + url = "https://raw.githubusercontent.com/DefinetlyNotAI/Logicytics/main/CODE/config.ini" + config = configparser.ConfigParser() + config.read_string(requests.get(url, timeout=15).text) + return config + except requests.exceptions.RequestException as e: + log.error(f"Connection error: {e}") + return None + - Returns: - dict: Parsed configuration data if successful. - None: If there was an error fetching the configuration. +class HealthCheck: + @staticmethod + def check_versions(local_version: str, remote_version: str): """ - try: - url = "https://raw.githubusercontent.com/DefinetlyNotAI/Logicytics/main/CODE/config.ini" - config = configparser.ConfigParser() - config.read_string(requests.get(url, timeout=15).text) - return config - except requests.exceptions.RequestException as e: - log_debug.error(f"Connection error: {e}") - return None + Compares local and remote versions. + """ + local_version_tuple = VersionManager.parse_version(local_version) + remote_version_tuple = VersionManager.parse_version(remote_version) + + if "error" in local_version_tuple or "error" in remote_version_tuple: + log.error("Version parsing error.") + return + + try: + if "snapshot" in local_version_tuple or "snapshot" in remote_version_tuple: + log.warning("Snapshot versions are unstable.") + + if local_version_tuple == remote_version_tuple: + log.info(f"Version is up to date. Your Version: {local_version}") + elif local_version_tuple > remote_version_tuple: + log.warning("Version is ahead of the repository. " + f"Your Version: {local_version}, " + f"Repository Version: {remote_version}") + else: + log.error("Version is behind the repository. " + f"Your Version: {local_version}, Repository Version: {remote_version}") + except Exception as e: + log.error(f"Version comparison error: {e}") -@log_debug.function +@log.function def debug(): """ Executes a comprehensive system debug routine, performing various checks and logging system information. - - This function performs the following tasks: - - Clears the existing debug log file - - Retrieves and validates online configuration - - Checks system version compatibility - - Verifies required file integrity - - Checks SysInternal binaries - - Logs system privileges and environment details - - Checks Python version compatibility - - Retrieves and logs CPU information - - Logs are written to the debug log file, capturing system state, configuration, and potential issues. - - Notes: - - Requires admin privileges for full system checks - - Logs information about execution environment - - Checks system and Python version compatibility - - Provides insights into system configuration and potential security settings """ - # Clear Debug Log - log_path = "../ACCESS/LOGS/DEBUG/DEBUG.log" - if os.path.exists(log_path): - os.remove(log_path) - # Online Configuration Check - config = get_online_config() + config = ConfigManager.get_online_config() if config: - HealthCheck.versions(VERSION, config["System Settings"]["version"]) + HealthCheck.check_versions(VERSION, config["System Settings"]["version"]) # File Integrity Check required_files = config["System Settings"].get("files", "").split(",") - HealthCheck.files(".", required_files) + FileManager.check_required_files(".", required_files) # SysInternal Binaries Check - DebugCheck.sys_internal_binaries("SysInternal_Suite") + SysInternalManager.check_binaries("SysInternal_Suite") # System Checks - log_debug.info("Admin privileges found" if Check.admin() else "Admin privileges not found") - log_debug.info("UAC enabled" if Check.uac() else "UAC disabled") - log_debug.info(f"Execution path: {psutil.__file__}") - log_debug.info(f"Global execution path: {sys.executable}") - log_debug.info(f"Local execution path: {sys.prefix}") - log_debug.info( + log.info("Admin privileges found" if Check.admin() else "Admin privileges not found") + log.info("UAC enabled" if Check.uac() else "UAC disabled") + log.info(f"Execution path: {psutil.__file__}") + log.info(f"Global execution path: {sys.executable}") + log.info(f"Local execution path: {sys.prefix}") + log.info( "Running in a virtual environment" if sys.prefix != sys.base_prefix else "Not running in a virtual environment") - log_debug.info( - "Execution policy is unrestricted" if Check.execution_policy() else "Execution policy is not unrestricted") + log.info( + "Execution policy is unrestricted" if Check.execution_policy() else "Execution policy is restricted") # Python Version Check - python_version() + SystemInfoManager.python_version() # CPU Info - for info in DebugCheck.cpu_info(): - log_debug.info(info) + for info in SystemInfoManager.cpu_info(): + log.info(info) # Final Debug Status - log_debug.info(f"Debug: {DEBUG}") + log.info(f"Log Level: {DEBUG}") if __name__ == "__main__": - debug() + try: + debug() + except Exception as err: + log.error(f"Failed to execute debug routine: {err}") + time.sleep(0.5) input("Press Enter to exit...") diff --git a/CODE/_dev.py b/CODE/_dev.py index 149686ce..7a37e3e3 100644 --- a/CODE/_dev.py +++ b/CODE/_dev.py @@ -1,13 +1,28 @@ from __future__ import annotations +import os +import re import subprocess import configobj -from logicytics import Log, DEBUG, Get, FileManagement, CURRENT_FILES, VERSION +from logicytics import log, Get, FileManagement, CURRENT_FILES, VERSION -if __name__ == "__main__": - log_dev = Log({"log_level": DEBUG}) + +def color_print(text, color="reset", is_input=False) -> None | str: + colors = { + "reset": "\033[0m", + "red": "\033[31m", + "green": "\033[32m", + "yellow": "\033[33m", + "cyan": "\033[36m", + } + + color_code = colors.get(color.lower(), colors["reset"]) + if is_input: + return input(f"{color_code}{text}{colors['reset']}") + else: + print(f"{color_code}{text}{colors['reset']}") def _update_ini_file(filename: str, new_data: list | str, key: str) -> None: @@ -21,21 +36,23 @@ def _update_ini_file(filename: str, new_data: list | str, key: str) -> None: None """ try: - config = configobj.ConfigObj(filename, encoding='utf-8', write_empty_values=True) + config = configobj.ConfigObj(filename, encoding="utf-8", write_empty_values=True) + if key == "files": config["System Settings"][key] = ", ".join(new_data) elif key == "version": config["System Settings"][key] = new_data else: - log_dev.error(f"Invalid key: {key}") + color_print(f"[!] Invalid key: {key}", "yellow") return + config.write() except FileNotFoundError: - log_dev.error(f"File not found: {filename}") + color_print("[x] INI file not found", "red") except configobj.ConfigObjError as e: - log_dev.error(f"Error parsing INI file: {filename}, {e}") + color_print(f"[x] Parsing INI file failed: {e}", "red") except Exception as e: - log_dev.error(f"An error occurred: {e}") + color_print(f"[x] {e}", "red") def _prompt_user(question: str, file_to_open: str = None, special: bool = False) -> bool: @@ -59,18 +76,18 @@ def _prompt_user(question: str, file_to_open: str = None, special: bool = False) - Provides optional file opening and reminder messaging """ try: - answer = input(question + " (Y)es or (N)o:- ") + answer = color_print(f"[?] {question} (y)es or (n)o:- ", "cyan", is_input=True) if not (answer.lower() == "yes" or answer.lower() == "y"): if file_to_open: subprocess.run(["start", file_to_open], shell=True) if not special: - print( - "Please ensure you fix the issues/problem and try again with the checklist." + color_print( + "[x] Please ensure you fix the issues/problem and try again with the checklist.", "red" ) return False return True except Exception as e: - log_dev.error(e) + color_print(f"[x] {e}", "red") def _perform_checks() -> bool: @@ -81,16 +98,15 @@ def _perform_checks() -> bool: bool: True if all checks are confirmed by the user, False otherwise. """ checks = [ - ("Have you read the required contributing guidelines?", "../CONTRIBUTING.md"), - ("Have you made files you don't want to be run start with '_'?", "."), - ("Have you added the file to CODE dir?", "."), - ("Have you added docstrings and comments?", "../CONTRIBUTING.md"), - ("Is each file containing around 1 main feature?", "../CONTRIBUTING.md"), + ("[-] Have you read the required contributing guidelines?", "..\\CONTRIBUTING.md"), + ("[-] Have you made files you don't want to be run start with '_'?", "."), + ("[-] Have you added the file to CODE dir?", "."), + ("[-] Have you added docstrings and comments?", "..\\CONTRIBUTING.md"), + ("[-] Is each file containing around 1 main feature?", "..\\CONTRIBUTING.md"), ] for question, file_to_open in checks: if not _prompt_user(question, file_to_open): - log_dev.warning("Fix the issues and try again with the checklist.") return False return True @@ -100,36 +116,53 @@ def _handle_file_operations() -> None: Handles file operations and logging for added, removed, and normal files. """ EXCLUDE_FILES = ["logicytics\\User_History.json.gz", "logicytics\\User_History.json"] - files = Get.list_of_files(".", True, exclude_files=EXCLUDE_FILES) + files = Get.list_of_files(".", exclude_files=EXCLUDE_FILES) added_files, removed_files, normal_files = [], [], [] clean_files_list = [file.replace('"', '') for file in CURRENT_FILES] - for f in files: - clean_f = f.replace('"', '') - if clean_f in clean_files_list and clean_f not in EXCLUDE_FILES: - normal_files.append(clean_f) - elif clean_f not in EXCLUDE_FILES: - added_files.append(clean_f) + files_set = set(os.path.abspath(f) for f in files) + clean_files_set = set(os.path.abspath(f) for f in clean_files_list) - for f in clean_files_list: - clean_f = f.replace('"', '') - if clean_f not in files and clean_f not in EXCLUDE_FILES: - removed_files.append(clean_f) + for file in files_set: + if file in clean_files_set and file not in EXCLUDE_FILES: + normal_files.append(file) + elif file not in clean_files_set and file not in EXCLUDE_FILES: + added_files.append(file) + + for file in clean_files_set: + if file not in files_set and file not in EXCLUDE_FILES: + removed_files.append(file) print("\n".join([f"\033[92m+ {file}\033[0m" for file in added_files])) # Green + print("\n".join([f"\033[91m- {file}\033[0m" for file in removed_files])) # Red - print("\n".join([f"* {file}" for file in normal_files])) - if not _prompt_user("Does the list above include your added files?"): - log_dev.critical("Something went wrong! Please contact support.") + if not _prompt_user("[-] Does the list above include your added files?"): + color_print("[x] Something went wrong! Please contact support.", "red") return + max_attempts = 10 + attempts = 0 _update_ini_file("config.ini", files, "files") - _update_ini_file("config.ini", input(f"Enter the new version of the project (Old version is {VERSION}): "), "version") - print("\nGreat Job! Please tick the box in the GitHub PR request for completing steps in --dev") + while True: + version = color_print(f"[?] Enter the new version of the project (Old version is {VERSION}): ", "cyan", + is_input=True) + if attempts >= max_attempts: + color_print("[x] Maximum attempts reached. Please run the script again.", "red") + exit() + if re.match(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$", version): + _update_ini_file("config.ini", version, "version") + break + else: + color_print("[!] Please enter a valid version number (e.g., 1.2.3)", "yellow") + attempts += 1 + color_print(f"[!] {max_attempts - attempts} attempts remaining", "yellow") + + color_print("\n[-] Great Job! Please tick the box in the GitHub PR request for completing steps in --dev", "green") -@log_dev.function + +@log.function def dev_checks() -> None: """ Performs comprehensive developer checks to ensure code quality and project guidelines compliance. @@ -153,11 +186,6 @@ def dev_checks() -> None: - Prints file change lists with color coding - Updates configuration file with current files and version - Logs warnings or errors during the process - - Example: - Typical usage is during project development to ensure consistent practices: - >>> dev_checks() - # Interactively guides developer through project checks """ FileManagement.mkdir() if not _perform_checks(): @@ -168,4 +196,4 @@ def dev_checks() -> None: if __name__ == "__main__": dev_checks() # Wait for the user to press Enter to exit the program - input("\nPress Enter to exit the program... ") + input("\n[*] Press Enter to exit the program... ") diff --git a/CODE/bluetooth_details.py b/CODE/bluetooth_details.py index f15b3749..f692ed06 100644 --- a/CODE/bluetooth_details.py +++ b/CODE/bluetooth_details.py @@ -3,10 +3,7 @@ import json import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function diff --git a/CODE/bluetooth_logger.py b/CODE/bluetooth_logger.py index f94fb21f..73609c78 100644 --- a/CODE/bluetooth_logger.py +++ b/CODE/bluetooth_logger.py @@ -2,10 +2,7 @@ import re import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log # Utility function to log data to a file @@ -117,11 +114,6 @@ def get_paired_bluetooth_devices(): Raises: Exception: If there are issues running the PowerShell command or parsing the output. - - Example: - >>> devices = get_paired_bluetooth_devices() - >>> print(devices) - ['Name: Wireless Headphones, MAC: 001122334455', 'Name: Bluetooth Speaker, MAC: 667788990011'] """ command = ( 'Get-PnpDevice -Class Bluetooth | Where-Object { $_.Status -eq "OK" } | Select-Object Name, DeviceID' diff --git a/CODE/browser_miner.ps1 b/CODE/browser_miner.ps1 index 9acd6efa..b689cbcc 100644 --- a/CODE/browser_miner.ps1 +++ b/CODE/browser_miner.ps1 @@ -5,7 +5,6 @@ $sourcePaths = @( "C:\Users\{}\AppData\Roaming\Mozilla\Firefox\Profiles", "C:\Users\{}\AppData\Roaming\Opera Software\Opera Stable\Network", "C:\Users\{}\AppData\Roaming\Opera Software\Opera GX Stable\Network", - 'C:\\WINDOWS\\system32\\config\\SAM', 'C:\\Windows\\System32\\config', 'C:\\Windows\\System32\\GroupPolicy', 'C:\\Windows\\System32\\GroupPolicyUsers', diff --git a/CODE/cmd_commands.py b/CODE/cmd_commands.py index 0d172ba1..3570002a 100644 --- a/CODE/cmd_commands.py +++ b/CODE/cmd_commands.py @@ -1,7 +1,4 @@ -from logicytics import Log, DEBUG, Execute - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log, Execute @log.function @@ -21,7 +18,8 @@ def command(file: str, commands: str, message: str, encoding: str = "UTF-8") -> log.info(f"Executing {message}") try: output = Execute.command(commands) - open(file, "w", encoding=encoding).write(output) + with open(file, "w", encoding=encoding) as f: + f.write(output) log.info(f"{message} Successful - {file}") except Exception as e: log.error(f"Error while getting {message}: {e}") diff --git a/CODE/config.ini b/CODE/config.ini index 32e01ddd..ad95cbaa 100644 --- a/CODE/config.ini +++ b/CODE/config.ini @@ -1,11 +1,22 @@ +######################################################## +# The following settings are for Logicytics as a whole # +######################################################## + [Settings] # Would you like to enable debug mode? # This will print out more information to the console, with prefix DEBUG # This will not be logged however log_using_debug = false + # Would you like for new logs to be created every execution? # Or would you like to append to the same log file? delete_old_logs = false + +# When using threading mode, you have the option to decide how many threads to use (workers) +# Uncomment and change the value to use a maximum amount of threads, +# otherwise keep it commented if you don't need a maximum limit +; max_workers = 10 + # Logicytics will save preferences and history in a file, # This is used by Flag.py, to suggest better flags # Would you like this to happen? @@ -14,12 +25,13 @@ save_preferences = true [System Settings] # Do not play with these settings unless you know what you are doing -version = 3.3.0 -files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, logicytics\__init__.py, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl, VulnScan\tools\_study_network.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v2-deprecated\_generate_data.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py" +# Dev Mode allows a safe way to modify these settings!! +version = 3.4.0 +files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, network_psutil.py, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, logicytics\Checks.py, logicytics\Config.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl" -################################################### -# The following settings are for specific modules # -################################################### +######################################################## +# The following settings are for specific modules # +######################################################## [Flag Settings] # The minimum accuracy to suggest a flag, @@ -59,6 +71,7 @@ timeout = 10 # The following settings are for the Generate module for fake training data extensions = .txt, .log, .md, .csv, .json, .xml, .html, .yaml, .ini, .pdf, .docx, .xlsx, .pptx save_path = PATH + # Options include: # 'Sense' - Generates 50k files, each 25KB in size. # 'SenseNano' - Generates 5 files, each 5KB in size. @@ -66,14 +79,17 @@ save_path = PATH # 'SenseMini' - Generates 10k files, each 10KB in size. # 'SenseCustom' - Uses custom size settings from the configuration file. code_name = SenseMini + # This allows more randomness in the file sizes, use 0 to disable # this is applied randomly every time a file is generated # Variation is applied in the following way: # size +- (size */ variation) where its random weather to add or subtract and divide or multiply size_variation = 0.1 + # Set to SenseCustom to use below size settings min_file_size = 5KB max_file_size = 50KB + # Chances for the following data types in files: # 0.0 - 1.0, the rest will be for pure data full_sensitive_chance = 0.07 @@ -104,6 +120,7 @@ vectorizer_type = tfidf # "RandomForest", "ExtraTrees", "GBM", # "XGBoost", "DecisionTree", "NaiveBayes" model_name = NeuralNetwork + # General Training Parameters epochs = 10 batch_size = 32 diff --git a/CODE/dir_list.py b/CODE/dir_list.py index 754a9851..defe31b4 100644 --- a/CODE/dir_list.py +++ b/CODE/dir_list.py @@ -1,10 +1,7 @@ import os from concurrent.futures import ThreadPoolExecutor -from logicytics import Log, DEBUG, Execute - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log, Execute def run_command_threaded(directory: str, file: str, message: str, encoding: str = "UTF-8") -> None: diff --git a/CODE/dump_memory.py b/CODE/dump_memory.py index 238d7d4d..cd8cd2d3 100644 --- a/CODE/dump_memory.py +++ b/CODE/dump_memory.py @@ -5,18 +5,15 @@ import psutil -from logicytics import Log, DEBUG +from logicytics import log -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) - -# TODO v3.3.1 +# TODO v3.4.1 # psutil.virtual_memory(): used, free, percent, total # psutil.swap_memory(): used, free, percent, total # If the file size exceeds this limit, the file will be truncated with a message # Put 0 to disable the limit -# TODO v3.3.1: Make this take from config.ini +# TODO v3.4.1: Make this take from config.ini LIMIT_FILE_SIZE = 20 # Always in MiB diff --git a/CODE/event_log.py b/CODE/event_log.py index 07d62dfb..a27d561c 100644 --- a/CODE/event_log.py +++ b/CODE/event_log.py @@ -4,12 +4,7 @@ import wmi # Import the wmi library -from logicytics import Log, DEBUG - -# Note: This script CANNOT be run without admin privileges - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function diff --git a/CODE/log_miner.py b/CODE/log_miner.py index efe5531d..eff74f47 100644 --- a/CODE/log_miner.py +++ b/CODE/log_miner.py @@ -1,9 +1,6 @@ import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function diff --git a/CODE/logicytics/Config.py b/CODE/logicytics/Config.py new file mode 100644 index 00000000..9811a765 --- /dev/null +++ b/CODE/logicytics/Config.py @@ -0,0 +1,50 @@ +import configparser +import os + + +def __config_data() -> tuple[str, str, list[str], bool, str]: + """ + Retrieves configuration data from the 'config.ini' file. + + If the configuration file is not found in any of these locations, + the program exits with an error message. + + Returns: + tuple[str, str, list[str], bool]: A tuple containing: + - Log level (str): Either "DEBUG" or "INFO" + - Version (str): System version from configuration + - Files (list[str]): List of files specified in configuration + - Delete old logs (bool): Flag indicating whether to delete old log files + - CONFIG itself + + Raises: + SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations + """ + + def _config_path() -> str: + configs_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.ini") + + if os.path.exists(configs_path): + return configs_path + else: + print("The config.ini file is not found in the expected location.") + exit(1) + + config = configparser.ConfigParser() + path = _config_path() + config.read(path) + + log_using_debug = config.getboolean("Settings", "log_using_debug") + delete_old_logs = config.getboolean("Settings", "delete_old_logs") + version = config.get("System Settings", "version") + files = config.get("System Settings", "files").split(", ") + + log_using_debug = "DEBUG" if log_using_debug else "INFO" + + return log_using_debug, version, files, delete_old_logs, config + + +# Check if the script is being run directly, if not, set up the library +if __name__ == '__main__': + exit("This is a library, Please import rather than directly run.") +DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, CONFIG = __config_data() diff --git a/CODE/logicytics/FileManagement.py b/CODE/logicytics/FileManagement.py index bc72b8b1..39fe3215 100644 --- a/CODE/logicytics/FileManagement.py +++ b/CODE/logicytics/FileManagement.py @@ -93,8 +93,7 @@ def __get_files_to_zip(path: str) -> list: """ excluded_extensions = (".py", ".exe", ".bat", ".ps1", ".pkl", ".pth") excluded_prefixes = ("config.ini", "SysInternal_Suite", - "__pycache__", "logicytics", "VulnScan", - "Vectorizer features") + "__pycache__", "logicytics", "VulnScan") return [ f for f in os.listdir(path) @@ -128,7 +127,7 @@ def ignore_files(files_func): zip_file.write(os.path.join(path, file)) @staticmethod - def __remove_files(path: str, files: list): + def __remove_files(path: str, files: list) -> str | None: """ Removes the specified files from the given path. @@ -204,8 +203,8 @@ def and_hash(cls, path: str, name: str, flag: str) -> tuple | str: Returns: tuple or str: A tuple containing success messages or an error message. """ - today = datetime.now() - filename = f"Logicytics_{name}_{flag}_{today.strftime('%Y-%m-%d_%H-%M-%S')}" + time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + filename = f"Logicytics_{name}_{flag}_{time}" files_to_zip = cls.__get_files_to_zip(path) cls.__create_zip_file(path, files_to_zip, filename) check = cls.__remove_files(path, files_to_zip) diff --git a/CODE/logicytics/Flag.py b/CODE/logicytics/Flag.py index 95f4cd75..2736b03c 100644 --- a/CODE/logicytics/Flag.py +++ b/CODE/logicytics/Flag.py @@ -1,7 +1,6 @@ from __future__ import annotations import argparse -import configparser import difflib import gzip import json @@ -9,33 +8,26 @@ from collections import Counter from datetime import datetime +from .Config import CONFIG + # Check if the script is being run directly, if not, set up the library if __name__ == '__main__': exit("This is a library, Please import rather than directly run.") else: - # Set up constants and configurations - config = configparser.ConfigParser() - try: - config.read('config.ini') - except FileNotFoundError: - try: - config.read('../config.ini') - except FileNotFoundError: - exit("No configuration file found.") # Save user preferences? - SAVE_PREFERENCES = config.getboolean("Settings", "save_preferences") + SAVE_PREFERENCES = CONFIG.getboolean("Settings", "save_preferences") # Debug mode for Sentence Transformer - DEBUG_MODE = config.getboolean("Flag Settings", "model_debug") # Debug mode for Sentence Transformer + DEBUG_MODE = CONFIG.getboolean("Flag Settings", "model_debug") # Debug mode for Sentence Transformer # File for storing user history data - HISTORY_FILE = 'logicytics/User_History.json.gz' # User history file + HISTORY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'User_History.json.gz') # User history file # Minimum accuracy threshold for flag suggestions MIN_ACCURACY_THRESHOLD = float( - config.get("Flag Settings", "accuracy_min")) # Minimum accuracy threshold for flag suggestions + CONFIG.get("Flag Settings", "accuracy_min")) # Minimum accuracy threshold for flag suggestions if not 0 <= MIN_ACCURACY_THRESHOLD <= 100: raise ValueError("accuracy_min must be between 0 and 100") -class Match: +class _Match: @staticmethod def __get_sim(user_input: str, all_descriptions: list[str]) -> list[float]: """ @@ -69,11 +61,11 @@ def __get_sim(user_input: str, all_descriptions: list[str]) -> list[float]: logging.getLogger("sentence_transformers").setLevel(logging.ERROR) try: - MODEL = SentenceTransformer(config.get("Flag Settings", "model_to_use")) + MODEL = SentenceTransformer(CONFIG.get("Flag Settings", "model_to_use")) except Exception as e: print(f"Error: {e}") print("Please check the model name in the config file.") - print(f"Model name {config.get('Flag Settings', 'model_to_use')} may not be valid.") + print(f"Model name {CONFIG.get('Flag Settings', 'model_to_use')} may not be valid.") exit(1) user_embedding = MODEL.encode(user_input, convert_to_tensor=True, show_progress_bar=DEBUG_MODE) @@ -140,7 +132,7 @@ def _generate_summary_and_graph(cls): - Saves the graph visualization to a PNG file Parameters: - cls (Match): The class instance containing historical data methods + cls (_Match): The class instance containing historical data methods Raises: SystemExit: If no history data file is found @@ -156,7 +148,7 @@ def _generate_summary_and_graph(cls): - Requires matplotlib for graph generation - Attempts to save graph in multiple predefined directory paths """ - # TODO Yet in beta + # TODO Yet in beta - v3.6.0 # Load the decompressed history data using the load_history function import matplotlib.pyplot as plt @@ -286,6 +278,7 @@ def update_history(cls, user_input: str, matched_flag: str, accuracy: float): if not SAVE_PREFERENCES: return history_data = cls.load_history() + matched_flag = matched_flag.lstrip('-') # Ensure that interactions is a dictionary (not a list) if not isinstance(history_data['interactions'], dict): @@ -454,6 +447,7 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar "and not the best, use only if the device doesnt have python installed.", ) + # TODO v3.6.0 -> Out of beta parser.add_argument( "--vulnscan-ai", action="store_true", @@ -469,6 +463,7 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar help="Run Logicytics in minimal mode. Just bare essential scraping using only quick scripts", ) + # TODO v3.6.0 -> Out of beta parser.add_argument( "--performance-check", action="store_true", @@ -642,7 +637,7 @@ def __suggest_flag(cls, user_input: str, valid_flags: list[str]): Behavior: - If a close flag match exists, suggests the closest match - If no close match, prompts user for a description - - Uses the Match.flag method to find the most accurate flag based on description + - Uses the _Match.flag method to find the most accurate flag based on description - Prints matching results, with optional detailed output in debug mode Side Effects: @@ -652,7 +647,8 @@ def __suggest_flag(cls, user_input: str, valid_flags: list[str]): # Get the closest valid flag match based on the user's input closest_matches = difflib.get_close_matches(user_input, valid_flags, n=1, cutoff=0.6) if closest_matches: - print(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0]}'?") + print(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0].replace('_', '-')}'?") + exit(1) # Prompt the user for a description if no close match is found user_input_desc = input("We can't find a match, Please provide a description: ").lower() @@ -660,14 +656,15 @@ def __suggest_flag(cls, user_input: str, valid_flags: list[str]): # Map the user-provided description to the closest valid flag flags_list = [f"--{flag}" for flag in valid_flags] descriptions_list = [f"Run Logicytics with {flag}" for flag in valid_flags] - flag_received, accuracy_received = Match.flag(user_input_desc, flags_list, descriptions_list) + flag_received, accuracy_received = _Match.flag(user_input_desc, flags_list, descriptions_list) if DEBUG_MODE: - print(f"User input: {user_input_desc}\nMatched flag: {flag_received}\nAccuracy: {accuracy_received:.2f}%\n") + print( + f"User input: {user_input_desc}\nMatched flag: {flag_received.replace('_', '-')}\nAccuracy: {accuracy_received:.2f}%\n") else: - print(f"Matched flag: {flag_received} (Accuracy: {accuracy_received:.2f}%)\n") + print(f"Matched flag: {flag_received.replace('_', '-')} (Accuracy: {accuracy_received:.2f}%)\n") @staticmethod - def show_help_menu(return_output: bool = False): + def show_help_menu(return_output: bool = False) -> str | None: """ Display the help menu for the Logicytics application. @@ -757,12 +754,12 @@ def update_data_history(matched_flag: str): Example: update_data_history('--verbose') # Increments usage count for '--verbose' flag """ - history_data = Match.load_history() + history_data = _Match.load_history() # Ensure the flag exists in the flags_usage counter and increment it - if matched_flag not in history_data['flags_usage']: - history_data['flags_usage'][matched_flag] = 0 - history_data['flags_usage'][matched_flag] += 1 - Match.save_history(history_data) + if matched_flag.replace("--", "") not in history_data['flags_usage']: + history_data['flags_usage'][matched_flag.replace("--", "")] = 0 + history_data['flags_usage'][matched_flag.replace("--", "")] += 1 + _Match.save_history(history_data) if len(used_flags) == 2: for flag in used_flags: diff --git a/CODE/logicytics/Get.py b/CODE/logicytics/Get.py index 92a72896..f41e93a3 100644 --- a/CODE/logicytics/Get.py +++ b/CODE/logicytics/Get.py @@ -1,107 +1,48 @@ from __future__ import annotations -import configparser -import os.path -from pathlib import Path +import os class Get: @staticmethod - def list_of_files(directory: str, extensions: tuple | bool = True, append_file_list: list[str] = None, - exclude_files: list[str] = None) -> list: + def list_of_files( + directory: str, + only_extensions: list[str] = None, + append_file_list: list[str] = None, + exclude_files: list[str] = None, + exclude_extensions: list[str] = None, + ) -> list[str]: """ Retrieves a list of files in the specified directory based on given extensions and exclusion criteria. - - Supports two modes of file retrieval: - 1. When `extensions` is `True`, retrieves all files recursively from the directory. - 2. When `extensions` is a tuple, retrieves files matching specific extensions while applying exclusion rules. - + Parameters: directory (str): Path of the directory to search for files. - extensions (tuple | bool, optional): File extensions to filter or True to retrieve all files. Defaults to True. - append_file_list (list, optional): Existing list to append found filenames to. Creates a new list if not provided. Defaults to None. - exclude_files (list, optional): List of filenames to exclude from results. Defaults to None. - + only_extensions (list[str], optional): List of file extensions to filter. If None, retrieves all files. Defaults to None. + append_file_list (list[str], optional): Existing list to append found filenames to. Defaults to None. + exclude_files (list[str], optional): List of filenames to exclude from results. Defaults to None. + exclude_extensions (list[str], optional): List of extensions to exclude from results. Defaults to None. + Returns: - list: A list of filenames matching the specified criteria. - + list[str]: A list of filenames matching the specified criteria. + Exclusion rules: - Ignores files starting with an underscore (_) - - Excludes "Logicytics.py" - Skips files specified in `exclude_files` """ - append_file_list = [] if not append_file_list else append_file_list + append_file_list = append_file_list or [] + exclude_files = set(exclude_files or []) + exclude_extensions = set(exclude_extensions or []) - if isinstance(extensions, bool) and extensions: - for root, _, filenames in os.walk(directory): - for filename in filenames: - file_path = os.path.relpath(os.path.join(root, filename), directory) - append_file_list.append(file_path) - return append_file_list + for root, _, filenames in os.walk(directory): + for filename in filenames: + if filename.startswith("_") or filename in exclude_files: + continue # Skip excluded files + if any(filename.endswith(ext) for ext in exclude_extensions): + continue # Skip excluded files - for filename in os.listdir(Path(directory)): - if ( - filename.endswith(extensions) - and not filename.startswith("_") - and filename != "Logicytics.py" - and (exclude_files is None or filename not in exclude_files) - ): - append_file_list.append(filename) - return append_file_list + file_path = os.path.relpath(os.path.join(root, filename), directory) - @staticmethod - def config_data() -> tuple[str, str, list[str], bool]: - """ - Retrieves configuration data from the 'config.ini' file. - - This method attempts to read the 'config.ini' file from multiple potential locations: - 1. Current directory - 2. Parent directory - 3. Grandparent directory - - If the configuration file is not found in any of these locations, the program exits with an error message. - - Returns: - tuple[str, str, list[str], bool]: A tuple containing: - - Log level (str): Either "DEBUG" or "INFO" - - Version (str): System version from configuration - - Files (list[str]): List of files specified in configuration - - Delete old logs (bool): Flag indicating whether to delete old log files - - Raises: - SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations - """ - - def get_config_data(config_file_name: str) -> tuple[str, str, list[str], bool]: - """ - Reads configuration data from the specified 'config.ini' file. - - Args: - config_file_name (str): The name of the configuration file to read. - - Returns: - tuple[str, str, list[str], bool]: A tuple containing the log level, version, and a list of files. - """ - config = configparser.ConfigParser() - config.read(config_file_name) - - log_using_debug = config.getboolean("Settings", "log_using_debug") - delete_old_logs = config.getboolean("Settings", "delete_old_logs") - version = config.get("System Settings", "version") - files = config.get("System Settings", "files").split(", ") - - log_using_debug = "DEBUG" if log_using_debug else "INFO" - - return log_using_debug, version, files, delete_old_logs + if only_extensions is None or any(filename.endswith(ext) for ext in only_extensions): + append_file_list.append(file_path) - try: - return get_config_data("config.ini") - except Exception: - try: - return get_config_data("../config.ini") - except Exception: - try: - return get_config_data("../../config.ini") - except Exception: - print("The config.ini file is not found.") - exit(1) + return append_file_list diff --git a/CODE/logicytics/Logger.py b/CODE/logicytics/Logger.py index 36f23bda..c009b208 100644 --- a/CODE/logicytics/Logger.py +++ b/CODE/logicytics/Logger.py @@ -14,6 +14,21 @@ class Log: """ A logging class that supports colored output using the colorlog library. """ + _instance = None + + def __new__(cls, *args, **kwargs): + """ + Ensures that only one instance of the Log class is created (Singleton pattern). + + :param cls: The class being instantiated. + :param args: Positional arguments. + :param kwargs: Keyword arguments. + :return: The single instance of the Log class. + """ + if cls._instance is None: + cls._instance = super(Log, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance def __init__(self, config: dict = None): """ @@ -21,15 +36,23 @@ def __init__(self, config: dict = None): :param config: A dictionary containing configuration options. """ + if self._initialized and config is None: + return + self._initialized = True + if config: + self.reset() + # log_path_relative variable takes Logger.py full path, goes up twice then joins with ACCESS\\LOGS\\Logicytics.log + log_path_relative = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "ACCESS\\LOGS\\Logicytics.log") config = config or { - "filename": "../ACCESS/LOGS/Logicytics.log", + "filename": log_path_relative, "use_colorlog": True, "log_level": "INFO", "debug_color": "cyan", "info_color": "green", "warning_color": "yellow", "error_color": "red", - "critical_color": "red", + "critical_color": "bold_red", "exception_color": "red", "colorlog_fmt_parameters": "%(log_color)s%(levelname)-8s%(reset)s %(blue)s%(message)s", "truncate_message": True, @@ -41,7 +64,8 @@ def __init__(self, config: dict = None): logging.addLevelName(self.INTERNAL_LOG_LEVEL, "INTERNAL") self.color = config.get("use_colorlog", True) self.truncate = config.get("truncate_message", True) - self.filename = config.get("filename", "../ACCESS/LOGS/Logicytics.log") + + self.filename = config.get("filename", log_path_relative) if self.color: logger = colorlog.getLogger() logger.setLevel(getattr(logging, config["log_level"].upper(), logging.INFO)) @@ -52,7 +76,7 @@ def __init__(self, config: dict = None): "INFO": config.get("info_color", "green"), "WARNING": config.get("warning_color", "yellow"), "ERROR": config.get("error_color", "red"), - "CRITICAL": config.get("critical_color", "red"), + "CRITICAL": config.get("critical_color", "bold_red"), "EXCEPTION": config.get("exception_color", "red"), } @@ -94,6 +118,15 @@ def __init__(self, config: dict = None): ) self.newline() + @staticmethod + def reset(): + """ + Resets the logger by removing all existing handlers. + """ + logger = logging.getLogger() + for handler in logger.handlers[:]: + logger.removeHandler(handler) + @staticmethod def __timestamp() -> str: """ @@ -301,7 +334,7 @@ def exception(self, message, exception_type: Type = Exception): ) raise exception_type(message) - def parse_execution(self, message_log: list[list[str, str]]): + def execution(self, message_log: list[list[str, str]]): """ Parse and log multiple messages with their corresponding log types. diff --git a/CODE/logicytics/__init__.py b/CODE/logicytics/__init__.py index 71c52177..c7247725 100644 --- a/CODE/logicytics/__init__.py +++ b/CODE/logicytics/__init__.py @@ -2,22 +2,44 @@ import traceback from logicytics.Checks import Check +from logicytics.Config import DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, CONFIG from logicytics.Execute import Execute from logicytics.FileManagement import FileManagement from logicytics.Flag import Flag from logicytics.Get import Get from logicytics.Logger import Log +# Check if the script is being run directly, if not, set up the library +if __name__ == '__main__': + exit("This is a library, Please import rather than directly run.") Execute = Execute() Get = Get() Check = Check() -FileManagement = FileManagement() Flag = Flag() +FileManagement = FileManagement() +__show_trace = DEBUG == "DEBUG" +FileManagement.mkdir() +log = Log({"log_level": DEBUG}) + + +class ObjectLoadError(Exception): + """Raised when an Object fails to load.""" -DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS = Get.config_data() + def __init__(self, message="Failed to load object", object_name=None): + """ + Initialize the exception with a custom message and object details. + + Args: + message (str): The error message + object_name (str, optional): Name of the object that failed to load + """ + self.object_name = object_name + if object_name: + message = f"{message} (Object: {object_name})" + super().__init__(message) -def deprecated(removal_version: str, reason: str, show_trace: bool = True if DEBUG == "DEBUG" else False) -> callable: +def deprecated(removal_version: str, reason: str, show_trace: bool = __show_trace) -> callable: """ Decorator function that marks a function as deprecated and provides a warning when the function is called. @@ -38,13 +60,13 @@ def deprecated(removal_version: str, reason: str, show_trace: bool = True if DEB def decorator(func: callable) -> callable: """ Decorator function that marks a function as deprecated and provides a warning when the function is called. - + Args: func (callable): The function to be decorated with a deprecation warning. - + Returns: callable: A wrapper function that preserves the original function's metadata and prints a deprecation warning. - + Notes: - Uses functools.wraps to preserve the original function's metadata - Prints a colorized deprecation warning to stderr @@ -55,14 +77,14 @@ def decorator(func: callable) -> callable: def wrapper(*args, **kwargs) -> callable: """ Wraps a deprecated function to print a warning message before execution. - + Args: *args: Positional arguments passed to the original function. **kwargs: Keyword arguments passed to the original function. - + Returns: The return value of the original function after printing a deprecation warning. - + Warns: Prints a colored deprecation warning to stderr with details about: - Function name being deprecated diff --git a/CODE/media_backup.py b/CODE/media_backup.py index 9e5c380c..e2480e0f 100644 --- a/CODE/media_backup.py +++ b/CODE/media_backup.py @@ -3,10 +3,7 @@ import shutil from datetime import datetime -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log class Media: diff --git a/CODE/network_psutil.py b/CODE/network_psutil.py new file mode 100644 index 00000000..1db71894 --- /dev/null +++ b/CODE/network_psutil.py @@ -0,0 +1,184 @@ +import asyncio +import os +import socket + +import psutil + +from logicytics import log, Execute + + +class NetworkInfo: + """ + A class to gather and save various network-related information. + """ + + @log.function + def get(self): + """ + Gathers and saves various network-related information by calling multiple internal methods. + """ + try: + self.__fetch_network_io_stats() + self.__fetch_network_connections() + self.__fetch_network_interface_addresses() + self.__fetch_network_interface_stats() + self.__execute_external_network_command() + self.__fetch_network_connections_with_process_info() + self.__measure_network_bandwidth_usage() + self.__fetch_hostname_and_ip() + except Exception as e: + log.error(f"Error getting network info: {e}, Type: {type(e).__name__}") + + @staticmethod + def __save_data(filename: str, data: str, father_dir_name: str = "network_data"): + """ + Saves the given data to a file. + + :param filename: The name of the file to save the data in. + :param data: The data to be saved. + :param father_dir_name: The directory to save the file in. Defaults to "network_data". + """ + os.makedirs(father_dir_name, exist_ok=True) + try: + with open(os.path.join(father_dir_name, filename), "w") as f: + f.write(data) + except IOError as e: + log.error(f"Failed to save {filename}: {e}") + + def __fetch_network_io_stats(self): + """ + Fetches and saves network I/O statistics for each network interface. + """ + log.debug("Fetching network interface stats...") + net_io = psutil.net_io_counters(pernic=True) + net_io_data = "" + for iface, stats in net_io.items(): + net_io_data += f"Interface: {iface}\n" + net_io_data += f"Bytes Sent: {stats.bytes_sent}, Bytes Received: {stats.bytes_recv}\n" + net_io_data += f"Packets Sent: {stats.packets_sent}, Packets Received: {stats.packets_recv}\n" + net_io_data += f"Errors In: {stats.errin}, Errors Out: {stats.errout}\n" + net_io_data += f"Dropped In: {stats.dropin}, Dropped Out: {stats.dropout}\n\n" + self.__save_data("network_io.txt", net_io_data) + log.info("Network IO stats saved.") + + def __fetch_network_connections(self): + """ + Fetches and saves information about network connections. + """ + log.debug("Fetching network connections...") + connections = psutil.net_connections(kind='all') + connections_data = "" + for conn in connections: + connections_data += f"Type: {conn.type}, Local: {conn.laddr}, Remote: {conn.raddr}, Status: {conn.status}\n" + self.__save_data("network_connections.txt", connections_data) + log.info("Network connections saved.") + + def __fetch_network_interface_addresses(self): + """ + Fetches and saves network interface addresses. + """ + log.debug("Fetching network interface addresses...") + interfaces = psutil.net_if_addrs() + interfaces_data = "" + for iface, addrs in interfaces.items(): + for addr in addrs: + interfaces_data += f"Interface: {iface}, Address: {addr.address}, Netmask: {addr.netmask}, Broadcast: {addr.broadcast}\n" + self.__save_data("network_interfaces.txt", interfaces_data) + log.info("Network interface addresses saved.") + + def __fetch_network_interface_stats(self): + """ + Fetches and saves network interface statistics. + """ + log.debug("Fetching network interface stats...") + stats = psutil.net_if_stats() + stats_data = "" + for iface, stat in stats.items(): + stats_data += f"Interface: {iface}, Speed: {stat.speed}Mbps, Duplex: {stat.duplex}, Up: {stat.isup}\n" + self.__save_data("network_stats.txt", stats_data) + log.info("Network interface stats saved.") + + def __execute_external_network_command(self): + """ + Executes an external network command and saves the output. + """ + log.debug("Executing external network command...") + result = Execute.command("ipconfig") + self.__save_data("network_command_output.txt", result) + log.info("Network command output saved.") + + def __fetch_network_connections_with_process_info(self): + """ + Fetches and saves network connections along with associated process information. + """ + log.debug("Fetching network connections with process info...") + connections_data = "" + for conn in psutil.net_connections(kind='all'): + pid = conn.pid if conn.pid else "N/A" + proc_name = "Unknown" + if pid != "N/A": + try: + proc_name = psutil.Process(pid).name() + except psutil.NoSuchProcess: + proc_name = "Process Exited" + connections_data += f"Type: {conn.type}, Local: {conn.laddr}, Remote: {conn.raddr}, Status: {conn.status}, Process: {proc_name} (PID: {pid})\n" + self.__save_data("network_connections_with_processes.txt", connections_data) + log.info("Network connections with process info saved.") + + async def __measure_network_bandwidth_usage(self, sample_count: int = 5, interval: float = 1.0): + """ + Measures and saves the average network bandwidth usage. + + Args: + sample_count: Number of samples to take (default: 5) + interval: Time between samples in seconds (default: 1.0) + """ + # TODO v3.4.1: Allow config.ini to set values + log.debug("Measuring network bandwidth usage...") + samples = [] + for _ in range(sample_count): + net1 = psutil.net_io_counters() + await asyncio.sleep(interval) + net2 = psutil.net_io_counters() + samples.append({ + 'up': (net2.bytes_sent - net1.bytes_sent) / 1024, + 'down': (net2.bytes_recv - net1.bytes_recv) / 1024 + }) + if samples: + avg_up = sum(s['up'] for s in samples) / len(samples) + avg_down = sum(s['down'] for s in samples) / len(samples) + max_up = max(s['up'] for s in samples) + max_down = max(s['down'] for s in samples) + else: + avg_up = avg_down = max_up = max_down = 0 + bandwidth_data = f"Average Upload Speed: {avg_up:.2f} KB/s\n" + bandwidth_data += f"Average Download Speed: {avg_down:.2f} KB/s\n" + bandwidth_data += f"Peak Upload Speed: {max_up:.2f} KB/s\n" + bandwidth_data += f"Peak Download Speed: {max_down:.2f} KB/s\n" + self.__save_data("network_bandwidth_usage.txt", bandwidth_data) + log.info("Network bandwidth usage saved.") + + def __fetch_hostname_and_ip(self): + """ + Fetches and saves the hostname and IP addresses of the machine. + """ + try: + hostname = socket.gethostname() + ip_addresses = [] + for res in socket.getaddrinfo(hostname, None): + ip = res[4][0] + if ip not in ip_addresses: + ip_addresses.append(ip) + ip_config_data = f"Hostname: {hostname}\n" + ip_config_data += "IP Addresses:\n" + for ip in ip_addresses: + ip_config_data += f" - {ip}\n" + except socket.gaierror as e: + log.error(f"Failed to resolve hostname: {e}") + ip_config_data = f"Hostname: {hostname}\nFailed to resolve IP addresses\n" + self.__save_data("hostname_ip.txt", ip_config_data) + log.info("Hostname and IP address saved.") + + +if __name__ == "__main__": + NetworkInfo().get() diff --git a/CODE/packet_sniffer.py b/CODE/packet_sniffer.py index 1473b4d7..5c969a04 100644 --- a/CODE/packet_sniffer.py +++ b/CODE/packet_sniffer.py @@ -1,6 +1,8 @@ from __future__ import annotations -from configparser import ConfigParser +import os +import warnings +from time import time import matplotlib.pyplot as plt import networkx as nx @@ -8,391 +10,405 @@ from scapy.all import sniff, conf from scapy.layers.inet import IP, TCP, UDP, ICMP -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log, CONFIG # Read configuration from config.ini -config = ConfigParser() -config.read('config.ini') -config = config['PacketSniffer Settings'] - -# Global configuration -conf.verb = 0 # Turn off verbosity for clean output -packet_data = [] # List to store packet information -G = nx.Graph() # Initialize a graph - - -# Function to process and log packet details -def log_packet(packet: IP): - """ - Processes a captured IP packet, extracting and logging network connection details. - - Extracts key network information from the packet including source and destination IP addresses, - protocol, source and destination ports. Logs packet details, updates global packet data collection, - prints a summary, and adds connection information to the network graph. - - Parameters: - packet (IP): A Scapy IP layer packet to be processed and analyzed. - - Raises: - Exception: Logs and suppresses any errors encountered during packet processing. - - Side Effects: - - Appends packet information to global `packet_data` list - - Prints packet summary to console - - Updates network connection graph - - Logs debug information about captured packet - - Notes: - - Silently handles packet processing errors to prevent sniffing interruption - - Requires global variables `packet_data` and supporting functions like - `get_protocol_name()`, `get_port_info()`, `print_packet_summary()`, and `add_to_graph()` - """ - try: - if packet.haslayer(IP): - log.debug(f"Packet captured: {packet.summary()}") - packet_info = { - 'src_ip': packet[IP].src, - 'dst_ip': packet[IP].dst, - 'protocol': get_protocol_name(packet), - 'src_port': get_port_info(packet, 'sport'), - 'dst_port': get_port_info(packet, 'dport'), - } - packet_data.append(packet_info) - print_packet_summary(packet_info) - add_to_graph(packet_info) - except Exception as err: - log.error(f"Error processing packet: {err}") - - -# Function to determine the protocol name -def get_protocol_name(packet: IP) -> str: - """ - Determines the protocol name of a captured network packet. - - This function examines the layers of a given IP packet to identify its protocol type. It supports identification of TCP, UDP, ICMP, and classifies any other packet types as 'Other'. - - Parameters: - packet (IP): The captured network packet to analyze for protocol identification. - - Returns: - str: The protocol name, which can be one of: 'TCP', 'UDP', 'ICMP', or 'Other'. - - Notes: - - Uses Scapy's layer checking methods to determine protocol - - Logs debug information about the packet and detected protocol - - Provides a fallback 'Other' classification for unrecognized protocols - """ - log.debug(f"Checking protocol for packet: {packet.summary()}") - if packet.haslayer(TCP): - log.debug("Protocol: TCP") - return 'TCP' - elif packet.haslayer(UDP): - log.debug("Protocol: UDP") - return 'UDP' - elif packet.haslayer(ICMP): - log.debug("Protocol: ICMP") - return 'ICMP' - else: - log.debug("Protocol: Other") - return 'Other' - - -# Function to extract port information from a packet -def get_port_info(packet: IP, port_type: str) -> int | None: - """ - Extracts the source or destination port from a captured packet. - - Parameters: - packet (IP): The captured packet to analyze. - port_type (str): The type of port to extract ('sport' for source port, 'dport' for destination port). - - Returns: - int | None: The port number if available, otherwise None. - - Raises: - ValueError: If an invalid port_type is provided. - - Notes: - - Supports extracting ports from TCP and UDP layers - - Returns None if the packet does not have TCP or UDP layers - """ - log.debug(f"Port type: {port_type}") - if packet.haslayer(TCP): - return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport - elif packet.haslayer(UDP): - return packet[UDP].sport if port_type == 'sport' else packet[UDP].dport - return None - - -# Function to print packet summary -def print_packet_summary(packet_info: dict): - """ - Prints a summary of the captured network packet to the debug log. - - Parameters: - packet_info (dict): A dictionary containing detailed information about a captured network packet with the following expected keys: - - 'protocol' (str): The network protocol of the packet (e.g., TCP, UDP, ICMP) - - 'src_ip' (str): Source IP address of the packet - - 'dst_ip' (str): Destination IP address of the packet - - 'src_port' (int/str): Source port number of the packet - - 'dst_port' (int/str): Destination port number of the packet - - Returns: - None: Logs packet summary information without returning a value - """ - log.debug(f"Packet captured: {packet_info['protocol']} packet from {packet_info['src_ip']} " - f"to {packet_info['dst_ip']} | Src Port: {packet_info['src_port']} | Dst Port: {packet_info['dst_port']}") - - -# Function to add packet information to the graph -def add_to_graph(packet_info: dict): - """ - Adds an edge to the network graph representing a connection between source and destination IPs. - - Parameters: - packet_info (dict): A dictionary containing packet network details with the following keys: - - 'src_ip' (str): Source IP address of the packet - - 'dst_ip' (str): Destination IP address of the packet - - 'protocol' (str): Network protocol used for the connection (e.g., TCP, UDP) - - Side Effects: - Modifies the global NetworkX graph (G) by adding an edge between source and destination IPs - with the protocol information as an edge attribute. - - Notes: - - Assumes a global NetworkX graph object 'G' is already initialized - - Does not perform validation of input packet_info dictionary - """ - src_ip = packet_info['src_ip'] - dst_ip = packet_info['dst_ip'] - protocol = packet_info['protocol'] - G.add_edge(src_ip, dst_ip, protocol=protocol) - - -# Function to start sniffing packets -def start_sniffing(interface: str, packet_count: int = 10, timeout: int = 10): - """ - Starts packet sniffing on a given network interface. - - Captures network packets on the specified interface with configurable packet count and timeout. Processes each captured packet using a custom callback function, logs packet details, and stops when the specified packet count is reached. - - Parameters: - interface (str): Network interface name to capture packets from. - packet_count (int, optional): Maximum number of packets to capture. Defaults to 10. - timeout (int, optional): Maximum time to spend capturing packets in seconds. Defaults to 10. - - Side Effects: - - Logs packet details during capture - - Saves captured packet data to a CSV file - - Generates a network graph visualization - - Raises: - Exception: If packet capture encounters unexpected errors - - Example: - start_sniffing('eth0', packet_count=50, timeout=30) - """ - log.info(f"Starting packet capture on interface '{interface}'...") - - # Initialize a packet capture counter - packet_counter = 0 - - # Define a custom packet callback to count packets - def packet_callback(packet: IP) -> bool: +CONFIG.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.ini")) +config = CONFIG['PacketSniffer Settings'] +# Ignore all warnings (Wireshark issue) +warnings.filterwarnings("ignore") + + +class Sniff: + # Global configuration + def __init__(self): + conf.verb = 0 # Turn off verbosity for clean output + self.packet_data = [] # List to store packet information + self.G = nx.Graph() # Initialize a graph + + # Function to process and log packet details + def __log_packet(self, packet: IP): + """ + Processes a captured IP packet, extracting and logging network connection details. + + Extracts key network information from the packet including source and destination IP addresses, + protocol, source and destination ports. Logs packet details, updates global packet data collection, + prints a summary, and adds connection information to the network graph. + + Parameters: + packet (IP): A Scapy IP layer packet to be processed and analyzed. + + Raises: + Exception: Logs and suppresses any errors encountered during packet processing. + + Side Effects: + - Appends packet information to global `packet_data` list + - Prints packet summary to console + - Updates network connection graph + - Logs debug information about captured packet + + Notes: + - Silently handles packet processing errors to prevent sniffing interruption + - Requires global variables `packet_data` and supporting functions like + `get_protocol_name()`, `get_port_info()`, `print_packet_summary()`, and `add_to_graph()` + """ + try: + if packet.haslayer(IP): + log.debug(f"Packet captured: {packet.summary()}") + packet_info = { + 'src_ip': packet[IP].src, + 'dst_ip': packet[IP].dst, + 'protocol': self.__get_protocol_name(packet), + 'src_port': self.__get_port_info(packet, 'sport'), + 'dst_port': self.__get_port_info(packet, 'dport'), + } + self.packet_data.append(packet_info) + self.__print_packet_summary(packet_info) + self.__add_to_graph(packet_info) + except Exception as err: + log.error(f"Error processing packet {packet.summary() if hasattr(packet, 'summary') else 'Unknown'}: {err}") + + # Function to determine the protocol name + @staticmethod + def __get_protocol_name(packet: IP) -> str: + """ + Determines the protocol name of a captured network packet. + + This function examines the layers of a given IP packet to identify its protocol type. It supports identification of TCP, UDP, ICMP, and classifies any other packet types as 'Other'. + + Parameters: + packet (IP): The captured network packet to analyze for protocol identification. + + Returns: + str: The protocol name, which can be one of: 'TCP', 'UDP', 'ICMP', or 'Other'. + + Notes: + - Uses Scapy's layer checking methods to determine protocol + - Logs debug information about the packet and detected protocol + - Provides a fallback 'Other' classification for unrecognized protocols + """ + log.debug(f"Checking protocol for packet: {packet.summary()}") + if packet.haslayer(TCP): + log.debug("Protocol: TCP") + return 'TCP' + elif packet.haslayer(UDP): + log.debug("Protocol: UDP") + return 'UDP' + elif packet.haslayer(ICMP): + log.debug("Protocol: ICMP") + return 'ICMP' + else: + log.debug("Protocol: Other") + return 'Other' + + # Function to extract port information from a packet + @staticmethod + def __get_port_info(packet: IP, port_type: str = "sport") -> int | None: + """ + Extracts the source or destination port from a captured packet. + + Parameters: + packet (IP): The captured packet to analyze. + port_type (str, optional): The type of port to extract ('sport' for source port, 'dport' for destination port). Defaults to 'sport'. + + Returns: + int | None: The port number if available, otherwise None. + + Raises: + ValueError: If an invalid port_type is provided. + + Notes: + - Supports extracting ports from TCP and UDP layers + - Returns None if the packet does not have TCP or UDP layers + """ + if port_type not in ('sport', 'dport'): + log.critical( + f"Invalid port_type '{port_type}'. Must be 'sport' or 'dport'. Using 'sport' as the port type (default).") + port_type = 'sport' + log.debug(f"Port type: {port_type}") + if packet.haslayer(TCP): + return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport + elif packet.haslayer(UDP): + return packet[UDP].sport if port_type == 'sport' else packet[UDP].dport + return None + + # Function to print packet summary + @staticmethod + def __print_packet_summary(packet_info: dict): + """ + Prints a summary of the captured network packet to the debug log. + + Parameters: + packet_info (dict): A dictionary containing detailed information about a captured network packet with the following expected keys: + - 'protocol' (str): The network protocol of the packet (e.g., TCP, UDP, ICMP) + - 'src_ip' (str): Source IP address of the packet + - 'dst_ip' (str): Destination IP address of the packet + - 'src_port' (int/str): Source port number of the packet + - 'dst_port' (int/str): Destination port number of the packet + + Returns: + None: Logs packet summary information without returning a value + """ + log.debug(f"Packet captured: {packet_info['protocol']} packet from {packet_info['src_ip']} " + f"to {packet_info['dst_ip']} | Src Port: {packet_info['src_port']} | Dst Port: {packet_info['dst_port']}") + + # Function to add packet information to the graph + def __add_to_graph(self, packet_info: dict): + """ + Adds an edge to the network graph representing a connection between source and destination IPs. + + Parameters: + packet_info (dict): A dictionary containing packet network details with the following keys: + - 'src_ip' (str): Source IP address of the packet + - 'dst_ip' (str): Destination IP address of the packet + - 'protocol' (str): Network protocol used for the connection (e.g., TCP, UDP) + + Side Effects: + Modifies the global NetworkX graph (G) by adding an edge between source and destination IPs + with the protocol information as an edge attribute. + + Notes: + - Assumes a global NetworkX graph object 'G' is already initialized + - Does not perform validation of input packet_info dictionary """ - Callback function to process each captured network packet during sniffing. - - Processes individual packets, logs their details, and manages packet capture termination. Tracks the number of packets captured and stops sniffing when the predefined packet count is reached. - + src_ip = packet_info['src_ip'] + dst_ip = packet_info['dst_ip'] + protocol = packet_info['protocol'] + self.G.add_edge(src_ip, dst_ip, protocol=protocol) + + # Function to start sniffing packets + def __start_sniffing(self, interface: str, packet_count: int = 10, timeout: int = 10): + """ + Starts packet sniffing on a given network interface. + + Captures network packets on the specified interface with configurable packet count and timeout. Processes each captured packet using a custom callback function, logs packet details, and stops when the specified packet count is reached. + Parameters: - packet (IP): The captured network packet to be processed. - + interface (str): Network interface name to capture packets from. + packet_count (int, optional): Maximum number of packets to capture. Defaults to 10. + timeout (int, optional): Maximum time to spend capturing packets in seconds. Defaults to 10. + + Side Effects: + - Logs packet details during capture + - Saves captured packet data to a CSV file + - Generates a network graph visualization + + Raises: + Exception: If packet capture encounters unexpected errors + + Example: + start_sniffing('eth0', packet_count=50, timeout=30) + """ + log.info(f"Starting packet capture on interface '{interface}'...") + + # Initialize a packet capture counter + packet_counter = 0 + + # Define a custom packet callback to count packets + def packet_callback(packet: IP) -> bool: + """ + Callback function to process each captured network packet during sniffing. + + Processes individual packets, logs their details, and manages packet capture termination. Tracks the number of packets captured and stops sniffing when the predefined packet count is reached. + + Parameters: + packet (IP): The captured network packet to be processed. + + Returns: + bool: True if the specified packet count has been reached, signaling the sniffer to stop; False otherwise. + + Side Effects: + - Increments the global packet counter + - Logs packet details using log_packet function + - Logs debug information about received packets + - Stops packet capture when packet count limit is met + + Raises: + No explicit exceptions raised, but may propagate exceptions from log_packet function. + """ + log.debug(f"Received packet: {packet.summary()}") + nonlocal packet_counter # Reference the outer packet_counter + if packet_counter >= packet_count: + # Stop sniffing once the packet count is reached + log.info(f"Captured {packet_count} packets, stopping sniffing.") + return True # Return True to stop sniffing + self.__log_packet(packet) # Call the existing log_packet function + packet_counter += 1 # Increment the packet counter + + # Start sniffing with the custom callback + sniff(iface=interface, prn=packet_callback, count=packet_count, timeout=timeout) + + # After sniffing completes, save the captured packet data to CSV and visualize the graph + log.info("Packet capture completed.") + self.__save_packet_data_to_csv('captured_packets.csv') + self.__visualize_graph() + + # Function to save captured packet data to CSV + def __save_packet_data_to_csv(self, file_path: str): + """ + Saves captured packet data to a CSV file. + + Writes the collected network packet information to a specified CSV file. If packet data exists, it creates a pandas DataFrame and exports it to the given file path. If no packet data has been captured, it logs a warning message. + + Parameters: + file_path (str): The file path where the packet data will be saved as a CSV file. + Returns: - bool: True if the specified packet count has been reached, signaling the sniffer to stop; False otherwise. - + None + Side Effects: - - Increments the global packet counter - - Logs packet details using log_packet function - - Logs debug information about received packets - - Stops packet capture when packet count limit is met - + - Writes packet data to a CSV file + - Logs an informational message on successful save + - Logs a warning if no packet data is available + Raises: - No explicit exceptions raised, but may propagate exceptions from log_packet function. + IOError: Potential file writing permission or path-related errors (implicitly handled by pandas) """ - log.debug(f"Received packet: {packet.summary()}") - nonlocal packet_counter # Reference the outer packet_counter - if packet_counter >= packet_count: - # Stop sniffing once the packet count is reached - log.info(f"Captured {packet_count} packets, stopping sniffing.") - return True # Return True to stop sniffing - log_packet(packet) # Call the existing log_packet function - packet_counter += 1 # Increment the packet counter - - # Start sniffing with the custom callback - sniff(iface=interface, prn=packet_callback, count=packet_count, timeout=timeout) - - # After sniffing completes, save the captured packet data to CSV and visualize the graph - log.info("Packet capture completed.") - save_packet_data_to_csv('captured_packets.csv') - visualize_graph() - - -# Function to save captured packet data to CSV -def save_packet_data_to_csv(file_path: str): - """ - Saves captured packet data to a CSV file. - - Writes the collected network packet information to a specified CSV file. If packet data exists, it creates a pandas DataFrame and exports it to the given file path. If no packet data has been captured, it logs a warning message. - - Parameters: - file_path (str): The file path where the packet data will be saved as a CSV file. - - Returns: - None - - Side Effects: - - Writes packet data to a CSV file - - Logs an informational message on successful save - - Logs a warning if no packet data is available - - Raises: - IOError: Potential file writing permission or path-related errors (implicitly handled by pandas) - """ - global packet_data - if packet_data: - df = pd.DataFrame(packet_data) - df.to_csv(file_path, index=False) - log.info(f"Packet data saved to '{file_path}'.") - else: - log.warning("No packet data to save.") - - -# Function to visualize the graph of packet connections -def visualize_graph(node_colors: dict[str, str] | None = None, - node_sizes: dict[str, int] | None = None, - *, # Force keyword arguments for the following parameters - figsize: tuple[int, int] = (12, 8), - font_size: int = 10, - font_weight: str = "bold", - title: str = "Network Connections Graph", - output_file: str = "network_connections_graph.png", - layout_func: callable = nx.spring_layout): - """ - Visualizes the graph of packet connections with customizable node colors and sizes. - - Generates a network graph representation of packet connections using NetworkX and Matplotlib, with optional customization of node colors and sizes. - - Parameters: - node_colors (dict, optional): A dictionary mapping nodes to their display colors. - If not provided, defaults to skyblue for all nodes. - node_sizes (dict, optional): A dictionary mapping nodes to their display sizes. - If not provided, defaults to 3000 for all nodes. - figsize (tuple, optional): The size of the figure in inches (width, height). Defaults to (12, 8). - font_size (int, optional): The font size for node labels. Defaults to 10. - font_weight (str, optional): The font weight for node labels. Defaults to 'bold'. - title (str, optional): The title of the graph. Defaults to 'Network Connections Graph'. - output_file (str, optional): The name of the output PNG file to save the graph visualization. Defaults to 'network_connections_graph.png'. - layout_func (callable, optional): The layout function to use for the graph. Defaults to nx.spring_layout. - - Side Effects: - - Creates a matplotlib figure - - Saves a PNG image file named 'network_connections_graph.png' - - Closes the matplotlib figure after saving - - Returns: - None - - Example: - # Default visualization - visualize_graph() - - # Custom node colors and sizes - custom_colors = {'192.168.1.1': 'red', '10.0.0.1': 'green'} - custom_sizes = {'192.168.1.1': 5000, '10.0.0.1': 2000} - visualize_graph(node_colors=custom_colors, node_sizes=custom_sizes) - """ - pos = layout_func(G) - plt.figure(figsize=figsize) - - if node_colors is None: - node_colors = {node: "skyblue" for node in G.nodes()} - - if node_sizes is None: - node_sizes = {node: 3000 for node in G.nodes()} - - node_color_list = [node_colors.get(node, "skyblue") for node in G.nodes()] - node_size_list = [node_sizes.get(node, 3000) for node in G.nodes()] - - nx.draw(G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list, font_size=font_size, - font_weight=font_weight) - edge_labels = nx.get_edge_attributes(G, 'protocol') - nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels) - plt.title(title) - plt.savefig(output_file) - plt.close() - - -@log.function -def packet_sniffer(): - """ - Initiates packet sniffing based on configuration settings. - - Reads network configuration parameters from a global config dictionary, including network interface, packet count, and timeout. Validates input parameters to ensure they are positive values. Attempts to start packet sniffing on the specified interface, with built-in error handling and interface name correction for common variations. - - Raises: - SystemExit: If packet count or timeout values are invalid - Exception: If there are issues with the network interface or packet sniffing process - - Side Effects: - - Logs configuration and sniffing errors - - Attempts to autocorrect interface names - - Calls start_sniffing() to capture network packets - - Exits the program if critical configuration errors are encountered - """ - - def correct_interface_name(interface_name: str) -> str: - corrections = { - "WiFi": "Wi-Fi", - "Wi-Fi": "WiFi" - } - return corrections.get(interface_name, interface_name) - - interface = config['interface'] - packet_count = int(config['packet_count']) - timeout = int(config['timeout']) - - if packet_count <= 0 or timeout <= 0: - try: - log.error( - "Oops! Can't work with these values:\n" - f"- Packet count: {packet_count} {'❌ (must be > 0)' if packet_count <= 0 else '✅'}\n" - f"- Timeout: {timeout} {'❌ (must be > 0)' if timeout <= 0 else '✅'}" - ) - except Exception: - log.error("Error reading configuration: Improper values for packet count or timeout") - exit(1) - - for attempt in range(2): # Try original and corrected name + if self.packet_data: + df = pd.DataFrame(self.packet_data) + df.to_csv(file_path, index=False) + log.info(f"Packet data saved to '{file_path}'.") + else: + log.warning("No packet data to save.") + + # Function to visualize the graph of packet connections + def __visualize_graph(self, node_colors: dict[str, str] | None = None, + node_sizes: dict[str, int] | None = None, + *, # Force keyword arguments for the following parameters + figsize: tuple[int, int] = (12, 8), + font_size: int = 10, + font_weight: str = "bold", + title: str = "Network Connections Graph", + output_file: str = "network_connections_graph.png", + layout_func: callable = nx.spring_layout): + """ + Visualizes the graph of packet connections with customizable node colors and sizes. + + Generates a network graph representation of packet connections using NetworkX and Matplotlib, with optional customization of node colors and sizes. + + Parameters: + node_colors (dict, optional): A dictionary mapping nodes to their display colors. + If not provided, defaults to skyblue for all nodes. + node_sizes (dict, optional): A dictionary mapping nodes to their display sizes. + If not provided, defaults to 3000 for all nodes. + figsize (tuple, optional): The size of the figure in inches (width, height). Defaults to (12, 8). + font_size (int, optional): The font size for node labels. Defaults to 10. + font_weight (str, optional): The font weight for node labels. Defaults to 'bold'. + title (str, optional): The title of the graph. Defaults to 'Network Connections Graph'. + output_file (str, optional): The name of the output PNG file to save the graph visualization. Defaults to 'network_connections_graph.png'. + layout_func (callable, optional): The layout function to use for the graph. Defaults to nx.spring_layout. + + Side Effects: + - Creates a matplotlib figure + - Saves a PNG image file named 'network_connections_graph.png' + - Closes the matplotlib figure after saving + + Returns: + None + + Example: + # Default visualization + visualize_graph() + + # Custom node colors and sizes + custom_colors = {'192.168.1.1': 'red', '10.0.0.1': 'green'} + custom_sizes = {'192.168.1.1': 5000, '10.0.0.1': 2000} + visualize_graph(node_colors=custom_colors, node_sizes=custom_sizes) + """ + pos = layout_func(self.G) + plt.figure(figsize=figsize) + + if node_colors is None: + node_colors = {node: "skyblue" for node in self.G.nodes()} + + if node_sizes is None: + node_sizes = {node: 3000 for node in self.G.nodes()} + + node_color_list = [node_colors.get(node, "skyblue") for node in self.G.nodes()] + node_size_list = [node_sizes.get(node, 3000) for node in self.G.nodes()] + + nx.draw(self.G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list, + font_size=font_size, + font_weight=font_weight) + edge_labels = nx.get_edge_attributes(self.G, 'protocol') + nx.draw_networkx_edge_labels(self.G, pos, edge_labels=edge_labels) + plt.title(title) + plt.savefig(output_file) + plt.close() + + @log.function + def packets(self): + """ + Initiates packet sniffing based on configuration settings. + + Reads network configuration parameters from a global config dictionary, including network interface, packet count, and timeout. Validates input parameters to ensure they are positive values. Attempts to start packet sniffing on the specified interface, with built-in error handling and interface name correction for common variations. + + Raises: + SystemExit: If packet count or timeout values are invalid + Exception: If there are issues with the network interface or packet sniffing process + + Side Effects: + - Logs configuration and sniffing errors + - Attempts to autocorrect interface names + - Calls start_sniffing() to capture network packets + - Exits the program if critical configuration errors are encountered + """ + + def correct_interface_name(interface_name: str) -> str: + corrections = { + "WiFi": "Wi-Fi", + "Wi-Fi": "WiFi" + } + return corrections.get(interface_name, interface_name) + + interface = config['interface'] + packet_count = int(config['packet_count']) + timeout = int(config['timeout']) + + if packet_count <= 0 or timeout <= 0: + try: + log.error( + "Oops! Can't work with these values (Not your fault):\n" + f" - Packet count: {packet_count} {'❌ (must be > 0)' if packet_count <= 0 else '✅'}\n" + f" - Timeout: {timeout} {'❌ (must be > 0)' if timeout <= 0 else '✅'}" + ) + except Exception: + log.error("Error reading configuration: Improper values for packet count or timeout") + exit(1) + + start_time = time() + # TODO v3.4.1 -> Config.ini controlled value + max_retry_time = 30 # seconds + for attempt in range(2): # Try original and corrected name + try: + if time() - start_time > max_retry_time: + log.error("Retry timeout exceeded") + break + self.__start_sniffing(interface, packet_count, timeout) + break + except Exception as err: + if attempt == 0 and interface in ("WiFi", "Wi-Fi"): + log.warning("Retrying with corrected interface name...") + interface = correct_interface_name(interface) + else: + log.error(f"Failed to sniff packets: {err}") + + def cleanup(self): + # Clean up resources try: - start_sniffing(interface, packet_count, timeout) - break + plt.close('all') # Close all figures except Exception as err: - if attempt == 0 and interface in ("WiFi", "Wi-Fi"): - log.warning("Retrying with corrected interface name...") - interface = correct_interface_name(interface) - else: - log.error(f"Failed to sniff packets: {err}") + log.error(f"Error during cleanup: {err}") + finally: + self.G.clear() # Clear the graph to free memory # Entry point of the script if __name__ == "__main__": + sniffer = Sniff() try: - packet_sniffer() + sniffer.packets() except Exception as e: log.error(e) finally: - if G: - plt.close() + sniffer.cleanup() diff --git a/CODE/registry.py b/CODE/registry.py index 41c4282b..e1fa3522 100644 --- a/CODE/registry.py +++ b/CODE/registry.py @@ -1,10 +1,7 @@ import os import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function @@ -23,7 +20,7 @@ def backup_registry(): result = subprocess.run(cmd, check=True, capture_output=True, text=True) log.info(f"Registry backed up successfully to {export_path}. Output: {result.stdout}") except subprocess.CalledProcessError as e: - log.error(f"Failed to back up the registry: {e}. More details: {result.stderr}") + log.error(f"Failed to back up the registry: {e}.") except Exception as e: log.error(f"Failed to back up the registry: {e}") diff --git a/CODE/sensitive_data_miner.py b/CODE/sensitive_data_miner.py index eda42944..e884a6f4 100644 --- a/CODE/sensitive_data_miner.py +++ b/CODE/sensitive_data_miner.py @@ -3,10 +3,7 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log # List of allowed extensions allowed_extensions = [ @@ -49,10 +46,10 @@ def __search_files_by_keyword(root: Path, keyword: str) -> list: path_list = [] try: path_list = os.listdir(root) - except WindowsError as e: - if DEBUG: - # Log the error if in debug mode, as it is a common occurrence. - log.warning(f"Permission Denied: {e}") + except (WindowsError, PermissionError) as e: + log.warning(f"Permission Denied: {e}") + except Exception as e: + log.error(f"Failed to access directory: {e}") for filename in path_list: file_path = root / filename diff --git a/CODE/ssh_miner.py b/CODE/ssh_miner.py index 1588784b..567cc539 100644 --- a/CODE/ssh_miner.py +++ b/CODE/ssh_miner.py @@ -1,10 +1,7 @@ import os import shutil -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function diff --git a/CODE/sys_internal.py b/CODE/sys_internal.py index 2f3cfe01..81ac6685 100644 --- a/CODE/sys_internal.py +++ b/CODE/sys_internal.py @@ -1,11 +1,7 @@ import os import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) - +from logicytics import log sys_internal_executables = [ "psfile.exe", @@ -16,6 +12,12 @@ "psloglist.exe", ] +# Check if the executables exist +sys_internal_executables = [ + exe for exe in sys_internal_executables + if os.path.exists(os.path.join("SysInternal_Suite", exe)) +] + @log.function def sys_internal(): diff --git a/CODE/tasklist.py b/CODE/tasklist.py index c8cc289a..5f135453 100644 --- a/CODE/tasklist.py +++ b/CODE/tasklist.py @@ -1,9 +1,6 @@ import subprocess -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log @log.function diff --git a/CODE/vulnscan.py b/CODE/vulnscan.py index 6c75f8df..1d6ff6e5 100644 --- a/CODE/vulnscan.py +++ b/CODE/vulnscan.py @@ -1,237 +1,249 @@ from __future__ import annotations -import mimetypes import os import threading import warnings -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path import joblib import numpy as np import torch from safetensors import safe_open -from sklearn.feature_extraction.text import TfidfVectorizer +from tqdm import tqdm -# Set up logging -from logicytics import Log, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) - -# TODO v3.1.0: Load models and then use caching to avoid reloading models +from logicytics import log # Ignore all warnings warnings.filterwarnings("ignore") -def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module: - """ - Load a machine learning model from the specified file path. - - Supports loading models from three different file formats: Pickle (.pkl), SafeTensors (.safetensors), and PyTorch (.pth) files. - - Parameters: - model_path_to_load (str): Full file path to the model file to be loaded. - - Returns: - safe_open | torch.nn.Module: Loaded model object, which can be a joblib, safetensors, or torch model. - - Raises: - ValueError: If the model file does not have a supported extension (.pkl, .safetensors, or .pth). +# TODO: v3.4.2 +# apply Batch file reading, +# Use Asynchronous File Scanning, +# Optimize Model Loading and Caching, +# Improve Feature Extraction + +# TODO: v3.4.1 +# also add a global variable called MAX_FILE_SIZE, if its none ignore it, else only scan files under that file size (default at 50MB) +# add this to config.ini -> max_workers = min(32, os.cpu_count() * 2) +# add UNREADABLE_EXTENSIONS as well to config.ini + +UNREADABLE_EXTENSIONS = [ + ".exe", ".dll", ".so", # Executables & libraries + ".zip", ".tar", ".gz", ".7z", ".rar", # Archives + ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp", # Images + ".mp3", ".wav", ".flac", ".aac", ".ogg", # Audio + ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", # Video + ".pdf", # PDFs aren't plain text + ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", # Microsoft Office files + ".odt", ".ods", ".odp", # OpenDocument files + ".bin", ".dat", ".iso", # Binary, raw data, disk images + ".class", ".pyc", ".o", ".obj", # Compiled code + ".sqlite", ".db", # Databases + ".ttf", ".otf", ".woff", ".woff2", # Fonts + ".lnk", ".url" # Links +] + + +class _SensitiveDataScanner: """ - if model_path_to_load.endswith('.pkl'): - return joblib.load(model_path_to_load) - elif model_path_to_load.endswith('.safetensors'): - return safe_open(model_path_to_load, framework='torch') - elif model_path_to_load.endswith('.pth'): - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=FutureWarning) - return torch.load(model_path_to_load, weights_only=False) - else: - raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth") - - -@log.function -def scan_path(model_path: str, scan_paths: str, vectorizer_path: str): + Class for scanning files for sensitive content using a trained model. """ - Scan a specified path for sensitive content using a pre-trained machine learning model and vectorizer. - - This function handles loading the model and vectorizer if they are not already initialized, and then performs a vulnerability scan on the given path. It ensures thread-safe model and vectorizer loading using global locks. - - Args: - model_path (str): Filesystem path to the machine learning model file to be used for scanning. - scan_paths (str): Filesystem path to the file or directory that will be scanned for sensitive content. - vectorizer_path (str): Filesystem path to the vectorizer file used for text feature extraction. - - Raises: - Exception: Captures and logs any errors that occur during the scanning process, preventing the entire scanning operation from halting. - - Side Effects: - - Loads global model and vectorizer if not already initialized - - Logs information about model and vectorizer loading - - Calls vulnscan() to perform actual file scanning - - Logs any errors encountered during scanning - """ - global model_to_use, vectorizer_to_use - try: - with model_lock: - if model_to_use is None: - log.info(f"Loading model from {model_path}") - model_to_use = load_model(model_path) - with vectorizer_lock: - if vectorizer_to_use is None: - log.info(f"Loading vectorizer from {vectorizer_path}") - vectorizer_to_use = joblib.load(vectorizer_path) - vulnscan(model_to_use, scan_paths, vectorizer_to_use) - except FileNotFoundError as err: - log.error(f"File not found while scanning {scan_paths}: {err}") - except PermissionError as err: - log.error(f"Permission denied while scanning {scan_paths}: {err}") - except (torch.serialization.pickle.UnpicklingError, RuntimeError) as err: - log.error(f"Model loading failed for {scan_paths}: {err}") - except Exception as err: - log.error(f"Unexpected error scanning {scan_paths}: {err}") - - -def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_content: str) -> tuple[bool, float, str]: - """ - Determine if the file content is sensitive using the provided model and vectorizer. - - Args: - model: Machine learning model. - vectorizer: Vectorizer to transform file content. - file_content (str): Content of the file to be analyzed. - Returns: - tuple: (True if the content is sensitive, False otherwise, prediction probability, reason). - """ - if isinstance(model, torch.nn.Module): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - model.to(device) - features = vectorizer.transform([file_content]) - model.eval() - with torch.no_grad(): - features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(device) - prediction = model(features_tensor) - probability = torch.softmax(prediction, dim=1).max().item() + def __init__(self, model_path: str, vectorizer_path: str): + self.model_path = model_path + self.vectorizer_path = vectorizer_path + + self.model_cache = {} + self.vectorizer_cache = {} + + self.model_lock = threading.Lock() + self.vectorizer_lock = threading.Lock() + + self.model = None + self.vectorizer = None + self._load_model() + self._load_vectorizer() + + def _load_model(self) -> None: + """Loads and caches the ML model.""" + if self.model_path in self.model_cache: + log.info(f"Using cached model from {self.model_path}") + self.model = self.model_cache[self.model_path] + return + + if self.model_path.endswith('.pkl'): + self.model = joblib.load(self.model_path) + elif self.model_path.endswith('.safetensors'): + self.model = safe_open(self.model_path, framework='torch') + elif self.model_path.endswith('.pth'): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=FutureWarning) + self.model = torch.load(self.model_path, weights_only=False) + else: + raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth") + + self.model_cache[self.model_path] = self.model + log.info(f"Loaded and cached model from {self.model_path}") + + def _load_vectorizer(self) -> None: + """Loads and caches the vectorizer.""" + if self.vectorizer_path in self.vectorizer_cache: + log.info(f"Using cached vectorizer from {self.vectorizer_path}") + self.vectorizer = self.vectorizer_cache[self.vectorizer_path] + return + + try: + self.vectorizer = joblib.load(self.vectorizer_path) + except Exception as e: + log.critical(f"Failed to load vectorizer from {self.vectorizer_path}: {e}") + exit(1) + self.vectorizer_cache[self.vectorizer_path] = self.vectorizer + log.info(f"Loaded and cached vectorizer from {self.vectorizer_path}") + + def _is_sensitive(self, file_content: str) -> tuple[bool, float, str]: + """Determines if a file's content is sensitive using the model.""" + if isinstance(self.model, torch.nn.Module): + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.model.to(device) + # Use sparse matrices to save memory + features = self.vectorizer.transform([file_content]).tocsr() + self.model.eval() + with torch.no_grad(): + # Convert sparse matrix to tensor more efficiently + features_tensor = torch.sparse_coo_tensor( + torch.LongTensor([features.nonzero()[0], features.nonzero()[1]]), + torch.FloatTensor(features.data), + size=features.shape + ).to(device) + prediction = self.model(features_tensor) + probability = torch.softmax(prediction, dim=1).max().item() + # Get top features from sparse matrix directly + feature_scores = features.data + top_indices = np.argsort(feature_scores)[-5:] + reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_indices]) + return prediction.argmax(dim=1).item() == 1, probability, reason + else: + features = self.vectorizer.transform([file_content]) + prediction = self.model.predict_proba(features) + probability = prediction.max() top_features = np.argsort(features.toarray()[0])[-5:] - reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features]) - return prediction.argmax(dim=1).item() == 1, probability, reason - else: - features = vectorizer.transform([file_content]) - prediction = model.predict_proba(features) - probability = prediction.max() - top_features = np.argsort(features.toarray()[0])[-5:] - reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features]) - return model.predict(features)[0] == 1, probability, reason - - -def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: str) -> tuple[bool, float, str]: - """ - Scan a single file to determine if it contains sensitive content. - - Args: - model: Machine learning model. - vectorizer: Vectorizer to transform file content. - file_path (str): Path to the file to be scanned. - - Returns: - tuple: (True if the file is sensitive, False otherwise, prediction probability). - """ - mime_type, _ = mimetypes.guess_type(file_path) - if mime_type and mime_type.startswith('text'): - with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: - content = file.read() - return is_sensitive(model, vectorizer, content) - else: - with open(file_path, 'r', errors='ignore') as file: - content = file.read() - return is_sensitive(model, vectorizer, content) - - -@log.function -def vulnscan(model, SCAN_PATH, vectorizer): - """ - Scan a file to determine if it contains sensitive content and log the results. - - Args: - model (object): Machine learning model used for content sensitivity classification. - SCAN_PATH (str): Absolute or relative file path to be scanned for sensitive content. - vectorizer (object): Text vectorization model to transform file content into feature representation. - - Returns: - None: Logs sensitive file details and appends file path to 'Sensitive_File_Paths.txt' if sensitive content is detected. - - Side Effects: - - Logs scanning information using the configured logger - - Creates or appends to 'Sensitive_File_Paths.txt' when sensitive content is found - - Writes sensitive file paths to the log file - - Raises: - IOError: If there are issues writing to the 'Sensitive_File_Paths.txt' file - """ - log.debug(f"Scanning {SCAN_PATH}") - result, probability, reason = scan_file(model, vectorizer, SCAN_PATH) - if result: - log.debug(f"File {SCAN_PATH} is sensitive with probability {probability:.2f}. Reason: {reason}") - if not os.path.exists("Sensitive_File_Paths.txt"): - with open("Sensitive_File_Paths.txt", "w") as sensitive_file: - sensitive_file.write(f"{SCAN_PATH}\n\n") - with open("Sensitive_File_Paths.txt", "a") as sensitive_file: - sensitive_file.write(f"{SCAN_PATH}\n") + reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_features]) + return self.model.predict(features)[0] == 1, probability, reason + + def scan_file(self, file_path: str) -> tuple[bool, float, str]: + """Scans a file for sensitive content.""" + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + content = file.read() + return self._is_sensitive(content) + except Exception as e: + log.error(f"Failed to scan {file_path}: {e}") + return False, 0.0, "Error reading file" + + def cleanup(self): + """Clears caches and resets model & vectorizer.""" + self.model_cache.clear() + self.vectorizer_cache.clear() + self.model = None + self.vectorizer = None + log.info("Cleanup complete!") + + +class VulnScan: + def __init__(self, model_path: str, vectorizer_path: str): + self.scanner = _SensitiveDataScanner(model_path, vectorizer_path) + + @log.function + def scan_directory(self, scan_paths: list[str]) -> None: + """Scans multiple directories for sensitive files.""" + max_workers = min(32, os.cpu_count() * 2) + log.debug(f"max_workers={max_workers}") + + log.info("Getting directories files...") + try: + # Fast file collection using ThreadPoolExecutor and efficient flattening + with ThreadPoolExecutor(max_workers=max_workers): + all_files = [] + for path in scan_paths: + try: + all_files.extend([str(f) for f in Path(path).rglob('*') if f.is_file()]) + except Exception as e: + log.warning(f"Error collecting files from {path}: {e}") + continue # Skip this path and continue with others + + log.info(f"Files collected successfully: {len(all_files)}") + + except Exception as e: + log.error(f"Failed to collect files: {e}") + return + + log.info(f"Scanning {len(all_files)} files...") + + try: + # Use ThreadPoolExecutor for scanning files concurrently + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {} + total_len_modifiable = len(all_files) + + # Submit scan tasks + with tqdm(total=total_len_modifiable, + desc="\033[32mSCAN\033[0m \033[94mSubmitting Scan Tasks\033[0m", + unit="file", bar_format="{l_bar} {bar} {n_fmt}/{total_fmt}") as submit_pbar: + + for file in all_files: + if any(file.lower().endswith(ext) for ext in UNREADABLE_EXTENSIONS): + log.debug(f"Skipping file '{file}'") + total_len_modifiable -= 1 + submit_pbar.update(1) + continue + + futures[executor.submit(self.scanner.scan_file, file)] = file + submit_pbar.update(1) + + # Scan progress tracking + log.info(f"Valid file count: {total_len_modifiable}") + with tqdm(total=total_len_modifiable, desc="\033[32mSCAN\033[0m \033[94mScanning Files\033[0m", + unit="file", bar_format="{l_bar} {bar} {n_fmt}/{total_fmt}") as scan_pbar: + + sensitive_files = [] + for future in as_completed(futures): + try: + result, probability, reason = future.result() + if result: + file_path = futures[future] + log.debug( + f"Sensitive file detected: {file_path} (Confidence: {probability:.2f}). Reason: {reason}") + sensitive_files.append(file_path) + except Exception as e: + log.error(f"Scan failed: {e}") + + scan_pbar.update(1) + + # Write all sensitive files at once + with open("Sensitive_File_Paths.txt", "a") as sensitive_file: + if sensitive_files: + sensitive_file.write("\n".join(sensitive_files) + "\n") + else: + sensitive_file.write("Sadly no sensitive file's were detected.") + + except Exception as e: + log.error(f"Scanning error: {e}") + + self.scanner.cleanup() if __name__ == "__main__": - # Locks for model and vectorizer - log.info("Locking threads - Model and Vectorizer") - model_lock = threading.Lock() - vectorizer_lock = threading.Lock() - - model_to_use = None - vectorizer_to_use = None - - # Start scanning - log.info("Getting paths to scan - This may take some time!!") - - threads = [] - paths = [] - base_paths = [ - "C:\\Users\\", - "C:\\Windows\\Logs", - "C:\\Program Files", - "C:\\Program Files (x86)" - ] - - # Use max_workers based on CPU count but cap it at a reasonable number - max_workers = min(32, os.cpu_count() * 2) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(os.path.join, root, file_main) for base_path in base_paths for root, _, files_main in - os.walk(base_path) for file_main in files_main] - for future in futures: - paths.append(future.result()) - - # Start scanning - log.warning("Starting scan - This may take hours and consume memory!!") - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - total_paths = len(paths) - completed = 0 - futures = [ - executor.submit( - scan_path, - "VulnScan/Model SenseMini .3n3.pth", - path, - "VulnScan/Vectorizer .3n3.pkl" - ) - for path in paths + try: + base_paths = [ + "C:\\Users\\", + "C:\\Windows\\Logs", + "C:\\Program Files", + "C:\\Program Files (x86)" ] - for future in futures: - try: - future.result() - completed += 1 - if completed % 100 == 0: - progress = (completed / total_paths) * 100 - log.info(f"Scan progress: {progress:.1f}% ({completed}/{total_paths})") - except Exception as e: - log.error(f"Scan failed: {e}") + vulnscan = VulnScan("VulnScan/Model SenseMini .3n3.pth", "VulnScan/Vectorizer .3n3.pkl") + vulnscan.scan_directory(base_paths) + except KeyboardInterrupt: + log.warning("User interrupted. Please don't do this as it won't follow the code's cleanup process") + exit(0) diff --git a/CODE/wifi_stealer.py b/CODE/wifi_stealer.py index ddd563ab..85afdbc9 100644 --- a/CODE/wifi_stealer.py +++ b/CODE/wifi_stealer.py @@ -1,9 +1,6 @@ from __future__ import annotations -from logicytics import Log, DEBUG, Execute - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log, Execute def get_password(ssid: str) -> str | None: diff --git a/CODE/wmic.py b/CODE/wmic.py index 28b59bf6..64625031 100644 --- a/CODE/wmic.py +++ b/CODE/wmic.py @@ -1,7 +1,4 @@ -from logicytics import Log, Execute, DEBUG - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log, Execute @log.function @@ -20,7 +17,8 @@ def wmic(): None """ data = Execute.command("wmic BIOS get Manufacturer,Name,Version /format:htable") - open("WMIC.html", "w").write(data) + with open("WMIC.html", "w") as file: + file.write(data) wmic_commands = [ "wmic os get Caption,CSDVersion,ServicePackMajorVersion", "wmic computersystem get Model,Manufacturer,NumberOfProcessors", @@ -28,11 +26,11 @@ def wmic(): "wmic diskdrive get model,size", ] with open("wmic_output.txt", "w") as file: - for i in range(len(wmic_commands)): - log.info(f"Executing Command Number {i + 1}: {wmic_commands[i]}") - output = Execute.command(wmic_commands[i]) + for index, command in enumerate(wmic_commands): + log.info(f"Executing Command Number {index + 1}: {command}") + output = Execute.command(command) file.write("-" * 190) - file.write(f"Command {i + 1}: {wmic_commands[i]}\n") + file.write(f"Command {index + 1}: {command}\n") file.write(output) file.write("-" * 190) diff --git a/MODS/_MOD_SKELETON.py b/MODS/_MOD_SKELETON.py index 3865f234..28f1e44b 100644 --- a/MODS/_MOD_SKELETON.py +++ b/MODS/_MOD_SKELETON.py @@ -3,12 +3,8 @@ # Other Imports if needed or necessary go here -# This imports everything needed including the unique logger called by log - It is not optional # To know more check the WiKi -from logicytics import * - -if __name__ == "__main__": - log = Log({"log_level": DEBUG}) +from logicytics import log # And more if needed # Your actual code, must be able to run without any interference by outside actions # USE log.debug, log.info, log.error, log.warning and log.critical and log.string as well @@ -41,14 +37,14 @@ def MOD_EXAMPLE() -> None: log.info("This is a info message") log.debug("This is a debug message") log.critical("This is a critical message") - # This is special, allows you to use strings to specify the log level, it is not recommended to use this + # This is special, allows you to use strings to specify the log level, it is not recommended to use these # Options are error, warning, info, debug, critical - It is case-insensitive and can be used with any of the log levels # Defaults with the log level of debug log.string("This is a random message", "ERROR") pass # Your code here with proper logging like the above log options -# Not a must, but it is recommended to call your function at the end of the file using the following code +# It is recommended to call your function at the end of the file using the following code # This is to ensure that the function is called only when directly executed and not when imported if __name__ == "__main__": MOD_EXAMPLE() diff --git a/PLANS.md b/PLANS.md index 5cfa12d8..fe59e630 100644 --- a/PLANS.md +++ b/PLANS.md @@ -7,10 +7,14 @@ | Task | Version | Might or Will be done? | |----------------------------------------------------------------------------------------------|---------|------------------------| -| Remove deprecated feature: `_generate_data.py` | v3.4.0 | ✅ | -| New feature: Psutil Network functions, most likely `net_info.py` | v3.4.0 | ✅ | +| Implement TODOs for v3.4.1 | v3.4.1 | ✅ | +| Add docstrings to all functions as well as var types | v3.4.1 | ✅ | +| Implement Cleanup functionality for Logicytics if KeyboardInterrupt occurs | v3.4.2 | ✅ | +| Implement TODOs for v3.4.2 | v3.4.2 | ✅ | +| Implement logs for the logicytics lib, rather than prints | v3.4.2 | ✅ | | Implement the 2 missing flags | v3.5.0 | ✅ | | Move VulnScan tools and v3 module to separate repository, keep only the model and vectorizer | v3.5.0 | ✅ | | Get any BETA features out of BETA | v3.6.0 | ✅ | | Remake VulnScan .pkl and .pth to be more accurate | v3.6.0 | ✅ | | Encrypted Volume Detection and Analysis, Advanced USB Device History Tracker | v3.7.0 | ✅ | +| Smush `sensitive data miner` with `vulnscan` | v3.7.0 | ✅ | diff --git a/README.md b/README.md index 8a36ef53..b4a18205 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,7 @@ Here are some of the data points that Logicytics extracts: | dump_memory.py | Dumps some memory as well as log some RAM details | Not completely good yet... | | bluetooth_details.py | Gets the PNP Device ID, Status, Manufacturer, Device ID, Name, Description of all paired bluetooth devices | | | bluetooth_logger.py | Collect, log, and analyze Bluetooth-related data, by accessing the Windows registry and Event Viewer. | | +| network_psutil.py | The `network_psutil.py` file collects and logs various network-related information. | | This is not an exhaustive list, but it should give you a good idea of what data Logicytics is capable of extracting. diff --git a/SECURITY.md b/SECURITY.md index 2201f6ce..994e4097 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -6,16 +6,17 @@ This section outlines the versions of our project that are currently supported w | Version | Supported | Release Date | |---------|-----------|-----------------| +| 3.4.x | ✅ | January 3, 2025 | | 3.3.x | ✅ | January 3, 2025 | -| 3.2.x | ✅ | Dec 19, 2024 | -| 3.1.x | ⚠️ | Dec 11, 2024 | -| 3.0.x | ⚠️ | Dec 6, 2024 | +| 3.2.x | ✖️ | Dec 19, 2024 | +| 3.1.x | ✖️ | Dec 11, 2024 | +| 3.0.x | ✖️ | Dec 6, 2024 | | 2.5.x | ✖️ | Nov 25, 2024 | | 2.4.x | ✖️ | Nov 12, 2024 | | 2.3.x | ✖️ | Sep 21, 2024 | -| 2.2.x | ✖️ | Sep 9, 2024 | -| 2.1.x | ✖️ | Aug 29, 2024 | -| 2.0.x | ✖️ | Aug 25, 2024 | +| 2.2.x | ❌ | Sep 9, 2024 | +| 2.1.x | ❌ | Aug 29, 2024 | +| 2.0.x | ❌ | Aug 25, 2024 | | 1.6.x | ❌ | Jun 18, 2024 | | 1.5.x | ❌ | Jun 10, 2024 | | 1.4.x | ❌ | May 30, 2024 | @@ -29,7 +30,7 @@ This section outlines the versions of our project that are currently supported w | Key | Desc | |-----|-----------------------------------------------------| -| ✅ | Supported for security updates | +| ✅ | Supported for all security updates | | ⚠️ | Supported, but will leave support next major update | | ✖️ | Only for major security issues (CVSS 8.0+) | | ❌ | No longer supported for any security updates | diff --git a/requirements.txt b/requirements.txt index 67f59c7a..187c7d2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,5 @@ safetensors~=0.5.2 WMI~=1.5.1 prettytable~=3.14.0 pandas~=2.2.2 -scapy~=2.5.0 \ No newline at end of file +scapy~=2.5.0 +psutil~=6.1.1 \ No newline at end of file