';
@@ -68,6 +93,14 @@ function redis_queue_migrate_options_v2() {
} );
}
+/**
+ * Get the main plugin instance.
+ *
+ * Provides global access to the Redis Queue plugin singleton instance.
+ * This is the recommended way to access the plugin's public API.
+ *
+ * @return \Soderlind\RedisQueue\Core\Redis_Queue|null Plugin instance or null if not loaded.
+ */
function redis_queue() {
if ( class_exists( 'Soderlind\RedisQueue\Core\Redis_Queue' ) ) {
return Soderlind\RedisQueue\Core\Redis_Queue::get_instance();
@@ -75,6 +108,17 @@ function redis_queue() {
return null;
}
+/**
+ * Enqueue a background job.
+ *
+ * Convenience function to add a job to the Redis queue. This is a wrapper
+ * around the main plugin's enqueue_job method.
+ *
+ * @param string $job_type The type of job (email, image_processing, api_sync, or custom).
+ * @param array $payload Job-specific data to be processed.
+ * @param array $options Optional settings: priority, queue, delay.
+ * @return string|false Job ID on success, false on failure.
+ */
function redis_queue_enqueue_job( $job_type, $payload = array(), $options = array() ) {
$instance = redis_queue();
if ( ! $instance ) {
@@ -83,11 +127,23 @@ function redis_queue_enqueue_job( $job_type, $payload = array(), $options = arra
return $instance->enqueue_job( $job_type, $payload, $options );
}
+/**
+ * Process queued jobs synchronously.
+ *
+ * Creates a synchronous worker and processes jobs from the specified queue(s).
+ * This can be called from WP-CLI, cron jobs, or custom worker scripts.
+ *
+ * @param string|array $queue Queue name(s) to process. Default 'default'.
+ * @param int|null $max_jobs Maximum number of jobs to process. Default from settings.
+ * @return array|false Processing results array or false on failure.
+ */
function redis_queue_process_jobs( $queue = 'default', $max_jobs = null ) {
$instance = redis_queue();
if ( ! $instance || ! $instance->get_queue_manager() || ! $instance->get_job_processor() ) {
return false;
}
+
+ // Create a synchronous worker and process jobs.
$worker = new \Soderlind\RedisQueue\Workers\Sync_Worker( $instance->get_queue_manager(), $instance->get_job_processor() );
return $worker->process_jobs( (array) $queue, $max_jobs );
}
diff --git a/src/Contracts/Job_Result.php b/src/Contracts/Job_Result.php
index d335fc0..dbf283d 100644
--- a/src/Contracts/Job_Result.php
+++ b/src/Contracts/Job_Result.php
@@ -1,14 +1,79 @@
queue_manager = $queue_manager;
}
+ /**
+ * Process a single job.
+ * Executes the job, handles success/failure, and tracks performance metrics.
+ *
+ * @param array $job_data Job data from Redis.
+ * @return Job_Result Result of job execution.
+ */
public function process_job( $job_data ): Job_Result {
$this->current_job = $job_data;
$this->start_time = microtime( true );
$this->start_memory = memory_get_usage( true );
$job_id = $job_data[ 'job_id' ] ?? 'unknown';
- // Early sanitize to avoid downstream empty class warnings.
+
+ // Sanitize job data to ensure required fields exist.
$job_data = $this->sanitize_job_data( $job_data );
+
try {
+ // Create job instance from data.
$job = $this->create_job_instance( $job_data );
if ( ! $job ) {
throw new Exception( 'Failed to create job instance' );
}
+
+ // Set execution timeout if specified.
$timeout = $job->get_timeout();
if ( $timeout > 0 ) {
@set_time_limit( $timeout );
}
+
+ // Execute the job.
$result = $job->execute();
$execution_time = microtime( true ) - $this->start_time;
$memory_usage = memory_get_peak_usage( true ) - $this->start_memory;
+
+ // Record performance metrics.
$result->set_execution_time( $execution_time );
$result->set_memory_usage( $memory_usage );
+
+ // Handle result based on success/failure.
if ( $result->is_successful() ) {
$this->handle_successful_job( $job_id, $result );
} else {
$this->handle_failed_job( $job_id, $job, $result, 1, null );
}
+
+ // Fire job processed action.
if ( function_exists( 'do_action' ) ) {
\do_action( 'redis_queue_job_processed', $job_id, $job, $result );
}
+
return $result;
} catch (Exception $e) {
+ // Handle exception during job execution.
$execution_time = microtime( true ) - $this->start_time;
$memory_usage = memory_get_peak_usage( true ) - $this->start_memory;
$result = Basic_Job_Result::failure( $e->getMessage(), $e->getCode(), [ 'exception_type' => get_class( $e ) ] );
$result->set_execution_time( $execution_time );
$result->set_memory_usage( $memory_usage );
+
+ // Attempt to handle failure with retry logic.
$job = $this->create_job_instance( $job_data );
if ( $job ) {
$this->handle_failed_job( $job_id, $job, $result, 1, $e );
} else {
$this->mark_job_failed( $job_id, $result );
}
+
+ // Fire job failed action.
if ( function_exists( 'do_action' ) ) {
\do_action( 'redis_queue_job_failed', $job_id, $e, $job_data );
}
+
return $result;
} finally {
+ // Clean up current job reference.
$this->current_job = null;
}
}
+ /**
+ * Process multiple jobs from specified queues.
+ * Continues processing until max_jobs reached or no more jobs available.
+ *
+ * @param array $queues Queue names to process from.
+ * @param int $max_jobs Maximum number of jobs to process.
+ * @return array Processing results with statistics.
+ */
public function process_jobs( $queues = [ 'default' ], $max_jobs = 10 ): array {
$results = [];
$processed = 0;
$start_time = microtime( true );
$start_memory = memory_get_usage( true );
+
+ // Fire batch start action.
if ( function_exists( 'do_action' ) ) {
\do_action( 'redis_queue_batch_start', $queues, $max_jobs );
}
diff --git a/src/Core/Redis_Queue.php b/src/Core/Redis_Queue.php
index 57081de..7ee9184 100644
--- a/src/Core/Redis_Queue.php
+++ b/src/Core/Redis_Queue.php
@@ -18,6 +18,11 @@ final class Redis_Queue {
/** @var \Soderlind\RedisQueue\Admin\Admin_Interface|null */
public $admin_interface = null;
+ /**
+ * Get plugin singleton instance.
+ *
+ * @return self Plugin instance.
+ */
public static function get_instance(): self {
if ( null === self::$instance ) {
self::$instance = new self();
@@ -25,8 +30,12 @@ public static function get_instance(): self {
return self::$instance;
}
+ /**
+ * Constructor.
+ * Initializes GitHub updater and WordPress hooks.
+ */
public function __construct() {
- // Updater.
+ // Initialize GitHub updater for automatic plugin updates.
GitHub_Plugin_Updater::create_with_assets(
'https://github.com/soderlind/redis-queue',
defined( 'REDIS_QUEUE_PLUGIN_FILE' ) ? REDIS_QUEUE_PLUGIN_FILE : __FILE__,
@@ -37,6 +46,10 @@ public function __construct() {
$this->init_hooks();
}
+ /**
+ * Initialize WordPress hooks.
+ * Registers activation, deactivation, and initialization hooks.
+ */
private function init_hooks(): void {
\register_activation_hook( REDIS_QUEUE_PLUGIN_FILE, [ $this, 'activate' ] );
\register_deactivation_hook( REDIS_QUEUE_PLUGIN_FILE, [ $this, 'deactivate' ] );
@@ -45,20 +58,35 @@ private function init_hooks(): void {
\add_action( 'rest_api_init', [ $this, 'init_rest_api' ] );
}
+ /**
+ * Initialize plugin components.
+ * Loads dependencies and creates component instances.
+ * Fires 'redis_queue_init' action hook.
+ */
public function init(): void {
$this->load_dependencies();
$this->init_components();
\do_action( 'redis_queue_init', $this );
}
+ /**
+ * Load plugin dependencies.
+ * All dependencies are autoloaded via Composer.
+ */
private function load_dependencies(): void {
// Autoloaded via Composer.
}
+ /**
+ * Initialize plugin components.
+ * Creates queue manager, job processor, REST controller, and admin interface.
+ */
private function init_components(): void {
$this->queue_manager = new Redis_Queue_Manager();
$this->job_processor = new Job_Processor( $this->queue_manager );
$this->rest_controller = new \Soderlind\RedisQueue\API\REST_Controller( $this->queue_manager, $this->job_processor );
+
+ // Initialize admin interface only in admin context.
if ( \is_admin() ) {
$this->admin_interface = new \Soderlind\RedisQueue\Admin\Admin_Interface( $this->queue_manager, $this->job_processor );
if ( method_exists( $this->admin_interface, 'init' ) ) {
@@ -67,41 +95,74 @@ private function init_components(): void {
}
}
+ /**
+ * Initialize REST API routes.
+ * Registers REST API endpoints for queue management.
+ */
+ /**
+ * Initialize REST API routes.
+ * Registers REST API endpoints for queue management.
+ */
public function init_rest_api(): void {
if ( $this->rest_controller ) {
$this->rest_controller->register_routes();
}
}
+ /**
+ * Load plugin text domain for translations.
+ */
public function load_textdomain(): void {
\load_plugin_textdomain( 'redis-queue', false, dirname( REDIS_QUEUE_PLUGIN_BASENAME ) . '/languages' );
}
+ /**
+ * Plugin activation hook.
+ * Creates database tables, sets default options, and validates requirements.
+ */
public function activate(): void {
+ // Check PHP version requirement.
if ( \version_compare( PHP_VERSION, REDIS_QUEUE_MIN_PHP, '<' ) ) {
\deactivate_plugins( REDIS_QUEUE_PLUGIN_BASENAME );
\wp_die( \esc_html__( 'Redis Queue requires a newer PHP version.', 'redis-queue' ), \esc_html__( 'Plugin Activation Error', 'redis-queue' ), [ 'back_link' => true ] );
}
+
+ // Check Redis extension or Predis library availability.
if ( ! \extension_loaded( 'redis' ) && ! \class_exists( 'Predis\\Client' ) ) {
\deactivate_plugins( REDIS_QUEUE_PLUGIN_BASENAME );
\wp_die( \esc_html__( 'Redis Queue requires either the Redis extension or Predis library.', 'redis-queue' ), \esc_html__( 'Plugin Activation Error', 'redis-queue' ), [ 'back_link' => true ] );
}
+
$this->create_tables();
$this->set_default_options();
\flush_rewrite_rules();
\do_action( 'redis_queue_activate' );
}
+ /**
+ * Plugin deactivation hook.
+ * Clears scheduled cron jobs and flushes rewrite rules.
+ */
public function deactivate(): void {
\wp_clear_scheduled_hook( 'redis_queue_process_jobs' );
\flush_rewrite_rules();
\do_action( 'redis_queue_deactivate' );
}
+ /**
+ * Create plugin database tables.
+ * Creates the jobs table with proper indexes for performance.
+ */
+ /**
+ * Create plugin database tables.
+ * Creates the jobs table with proper indexes for performance.
+ */
private function create_tables(): void {
global $wpdb;
$charset_collate = $wpdb->get_charset_collate();
$table_name = $wpdb->prefix . 'redis_queue_jobs';
+
+ // Create jobs table with comprehensive schema for tracking job lifecycle.
$sql = "CREATE TABLE $table_name (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
job_id varchar(255) NOT NULL,
@@ -126,10 +187,15 @@ private function create_tables(): void {
KEY priority (priority),
KEY created_at (created_at)
) $charset_collate;";
+
require_once ABSPATH . 'wp-admin/includes/upgrade.php';
\dbDelta( $sql );
}
+ /**
+ * Set default plugin options.
+ * Initializes default configuration for Redis connection and queue settings.
+ */
private function set_default_options(): void {
$default_options = [
'redis_host' => '127.0.0.1',
@@ -145,6 +211,8 @@ private function set_default_options(): void {
'cleanup_completed_jobs' => true,
'cleanup_after_days' => 7,
];
+
+ // Add options only if they don't already exist.
foreach ( $default_options as $option => $value ) {
$option_name = 'redis_queue_' . $option;
if ( false === \get_option( $option_name ) ) {
@@ -153,27 +221,66 @@ private function set_default_options(): void {
}
}
+ /**
+ * Get plugin option value.
+ *
+ * @param string $option Option name (without redis_queue_ prefix).
+ * @param mixed $default Default value if option doesn't exist.
+ * @return mixed Option value or default.
+ */
public function get_option( $option, $default = null ) {
return \get_option( 'redis_queue_' . $option, $default );
}
+
+ /**
+ * Update plugin option value.
+ *
+ * @param string $option Option name (without redis_queue_ prefix).
+ * @param mixed $value New option value.
+ * @return bool True if updated, false otherwise.
+ */
public function update_option( $option, $value ) {
return \update_option( 'redis_queue_' . $option, $value );
}
+
+ /**
+ * Get queue manager instance.
+ *
+ * @return Redis_Queue_Manager|null Queue manager instance.
+ */
public function get_queue_manager() {
return $this->queue_manager;
}
+
+ /**
+ * Get job processor instance.
+ *
+ * @return Job_Processor|null Job processor instance.
+ */
public function get_job_processor() {
return $this->job_processor;
}
+ /**
+ * Enqueue a job to the Redis queue.
+ *
+ * @param string $job_type Job type identifier (email, image_processing, api_sync, or custom).
+ * @param array $payload Job-specific data.
+ * @param array $options Optional settings: priority, queue, delay.
+ * @return string|false Job ID on success, false on failure.
+ */
public function enqueue_job( $job_type, $payload = [], $options = [] ) {
if ( ! $this->queue_manager ) {
return false;
}
+
+ // Create job instance based on type.
$job = $this->create_job_instance( $job_type, $payload );
if ( ! $job ) {
return false;
}
+
+ // Apply optional settings.
if ( isset( $options[ 'priority' ] ) ) {
$job->set_priority( (int) $options[ 'priority' ] );
}
@@ -183,10 +290,19 @@ public function enqueue_job( $job_type, $payload = [], $options = [] ) {
if ( isset( $options[ 'delay' ] ) ) {
$job->set_delay_until( time() + (int) $options[ 'delay' ] );
}
+
return $this->queue_manager->enqueue( $job );
}
+ /**
+ * Create job instance based on job type.
+ *
+ * @param string $job_type Job type identifier.
+ * @param array $payload Job payload data.
+ * @return \Soderlind\RedisQueue\Contracts\Queue_Job|null Job instance or null if type unknown.
+ */
private function create_job_instance( $job_type, $payload ) {
+ // Handle built-in job types.
switch ( $job_type ) {
case 'email':
return new \Soderlind\RedisQueue\Jobs\Email_Job( $payload );
@@ -195,6 +311,7 @@ private function create_job_instance( $job_type, $payload ) {
case 'api_sync':
return new \Soderlind\RedisQueue\Jobs\API_Sync_Job( $payload );
default:
+ // Allow custom job types via filter.
return \apply_filters( 'redis_queue_create_job', null, $job_type, $payload );
}
}
diff --git a/src/Core/Redis_Queue_Manager.php b/src/Core/Redis_Queue_Manager.php
index 6d64d2e..55ab12d 100644
--- a/src/Core/Redis_Queue_Manager.php
+++ b/src/Core/Redis_Queue_Manager.php
@@ -33,38 +33,59 @@ public function __construct() {
}
}
+ /**
+ * Connect to Redis.
+ * Attempts connection using native Redis extension first, falls back to Predis.
+ *
+ * @return bool True if connected successfully.
+ */
private function connect(): bool {
try {
+ // Get Redis connection settings.
$host = \redis_queue()->get_option( 'redis_host', '127.0.0.1' );
$port = \redis_queue()->get_option( 'redis_port', 6379 );
$password = \redis_queue()->get_option( 'redis_password', '' );
$database = \redis_queue()->get_option( 'redis_database', 0 );
+ // Try native Redis extension first.
if ( \extension_loaded( 'redis' ) ) {
$this->redis = new \Redis();
$connected = $this->redis->connect( $host, $port, 2.5 );
+
if ( $connected ) {
+ // Authenticate if password provided.
if ( ! empty( $password ) ) {
$this->redis->auth( $password );
}
+ // Select database.
$this->redis->select( $database );
$this->connected = true;
}
} elseif ( class_exists( PredisClient::class) ) {
- $config = [ 'scheme' => 'tcp', 'host' => $host, 'port' => $port, 'database' => $database ];
+ // Fall back to Predis library.
+ $config = [
+ 'scheme' => 'tcp',
+ 'host' => $host,
+ 'port' => $port,
+ 'database' => $database
+ ];
+
if ( ! empty( $password ) ) {
$config[ 'password' ] = $password;
}
+
$this->redis = new PredisClient( $config );
$this->redis->connect();
$this->connected = true;
}
+ // Fire connection success action.
if ( $this->connected ) {
if ( function_exists( 'do_action' ) ) {
\do_action( 'redis_queue_connected', $this );
}
}
+
return $this->connected;
} catch (Exception $e) {
\error_log( 'Redis Queue Demo: Connection failed - ' . $e->getMessage() );
@@ -73,11 +94,19 @@ private function connect(): bool {
}
}
+ /**
+ * Check if Redis connection is active.
+ * Performs a PING command to verify connection.
+ *
+ * @return bool True if connected and responsive.
+ */
public function is_connected(): bool {
if ( ! $this->connected || ! $this->redis ) {
return false;
}
+
try {
+ // Ping Redis to verify connection is alive.
$response = $this->redis->ping();
return ( $response === true || $response === 'PONG' );
} catch (Exception $e) {
@@ -86,15 +115,26 @@ public function is_connected(): bool {
}
}
+ /**
+ * Enqueue a job to Redis.
+ * Stores job metadata in database and adds job ID to Redis queue.
+ *
+ * @param Queue_Job $job Job instance to enqueue.
+ * @param int|null $delay Optional delay in seconds before job is available.
+ * @return string|false Job ID on success, false on failure.
+ */
public function enqueue( Queue_Job $job, $delay = null ) {
if ( ! $this->is_connected() ) {
return false;
}
+
try {
+ // Generate unique job ID.
$job_id = $this->generate_job_id();
$job_data = $this->prepare_job_data( $job, $job_id );
$queue_key = $this->get_queue_key( $job->get_queue_name() );
+ // Store job metadata in database.
if ( ! $this->store_job_metadata( $job_id, $job, $job_data ) ) {
return false;
}
diff --git a/src/Jobs/Abstract_Base_Job.php b/src/Jobs/Abstract_Base_Job.php
index ab3beda..838a816 100644
--- a/src/Jobs/Abstract_Base_Job.php
+++ b/src/Jobs/Abstract_Base_Job.php
@@ -6,57 +6,156 @@
use Soderlind\RedisQueue\Contracts\Job_Result;
use Soderlind\RedisQueue\Contracts\Basic_Job_Result;
+/**
+ * Abstract Base Job.
+ * Base class for all queue jobs providing common functionality.
+ *
+ * Child classes must implement:
+ * - get_job_type(): Return job type identifier
+ * - execute(): Perform job-specific work
+ */
abstract class Abstract_Base_Job implements Queue_Job {
+ /** @var array Job payload data. */
protected array $payload = [];
+
+ /** @var int Job priority (lower = higher priority). */
protected int $priority = 50;
+
+ /** @var int Maximum retry attempts. */
protected int $retry_attempts = 3;
+
+ /** @var int Execution timeout in seconds. */
protected int $timeout = 300;
+
+ /** @var string Queue name. */
protected string $queue_name = 'default';
+
+ /** @var array Retry backoff delays in seconds [attempt1, attempt2, attempt3]. */
protected array $retry_backoff = [ 60, 300, 900 ];
+ /**
+ * Constructor.
+ *
+ * @param array $payload Job-specific data.
+ */
public function __construct( $payload = [] ) {
$this->payload = $payload;
}
+ /**
+ * Get job type identifier.
+ *
+ * @return string Job type.
+ */
abstract public function get_job_type();
+
+ /**
+ * Execute the job.
+ *
+ * @return Job_Result Job execution result.
+ */
abstract public function execute();
+
+ /**
+ * Get job payload.
+ *
+ * @return array Job payload data.
+ */
public function get_payload() {
return $this->payload;
}
+
+ /**
+ * Get job priority.
+ *
+ * @return int Priority value (lower = higher priority).
+ */
public function get_priority() {
return $this->priority;
}
+
+ /**
+ * Get maximum retry attempts.
+ *
+ * @return int Max retry attempts.
+ */
public function get_retry_attempts() {
return $this->retry_attempts;
}
+
+ /**
+ * Get execution timeout.
+ *
+ * @return int Timeout in seconds.
+ */
public function get_timeout() {
return $this->timeout;
}
+
+ /**
+ * Get queue name.
+ *
+ * @return string Queue name.
+ */
public function get_queue_name() {
return $this->queue_name;
}
+
+ /**
+ * Handle job failure.
+ * Logs failure and fires action hooks.
+ *
+ * @param mixed $exception Exception or failure reason.
+ * @param int $attempt Current attempt number.
+ */
public function handle_failure( $exception, $attempt ) {
\do_action( 'redis_queue_job_failure', $this, $exception, $attempt );
+
+ // Log failure if logging is enabled.
if ( \redis_queue()->get_option( 'enable_logging', true ) ) {
$message = $exception instanceof Exception ? $exception->getMessage() : 'Failure result without exception';
\error_log( sprintf( 'Redis Queue Demo: Job %s failed on attempt %d - %s', $this->get_job_type(), $attempt, $message ) );
}
}
+
+ /**
+ * Determine if job should be retried after failure.
+ *
+ * @param mixed $exception Exception or failure reason.
+ * @param int $attempt Current attempt number.
+ * @return bool True if job should be retried.
+ */
public function should_retry( $exception, $attempt ) {
+ // Don't retry if max attempts reached.
if ( $attempt >= $this->retry_attempts ) {
return false;
}
+
+ // Handle non-exception failures.
if ( ! ( $exception instanceof Exception ) ) {
return \apply_filters( 'redis_queue_should_retry_job', true, $this, null, $attempt );
}
+
+ // Don't retry certain exception types (programming errors).
$non_retry = [ 'InvalidArgumentException', 'TypeError', 'ParseError' ];
if ( in_array( get_class( $exception ), $non_retry, true ) ) {
return false;
}
+
+ // Allow filtering of retry decision.
return \apply_filters( 'redis_queue_should_retry_job', true, $this, $exception, $attempt );
}
+
+ /**
+ * Get retry delay for given attempt.
+ * Uses configured backoff or exponential backoff.
+ *
+ * @param int $attempt Attempt number.
+ * @return int Delay in seconds before retry.
+ */
public function get_retry_delay( $attempt ) {
$idx = $attempt - 1;
+ // Use configured backoff or calculate exponential backoff (max 1 hour).
$delay = $this->retry_backoff[ $idx ] ?? min( pow( 2, $attempt ) * 60, 3600 );
return \apply_filters( 'redis_queue_job_retry_delay', $delay, $this, $attempt );
}
diff --git a/src/Jobs/Email_Job.php b/src/Jobs/Email_Job.php
index 4846b6e..fe89050 100644
--- a/src/Jobs/Email_Job.php
+++ b/src/Jobs/Email_Job.php
@@ -4,110 +4,224 @@
use Exception;
use Soderlind\RedisQueue\Contracts\Job_Result;
+/**
+ * Email Job.
+ * Handles sending emails through WordPress wp_mail function.
+ * Supports single emails, bulk emails, and newsletters.
+ */
class Email_Job extends Abstract_Base_Job {
+ /**
+ * Constructor.
+ * Sets email-specific defaults for queue, priority, and timeout.
+ *
+ * @param array $payload Job payload data.
+ */
public function __construct( $payload = [] ) {
parent::__construct( $payload );
$this->queue_name = 'email';
- $this->priority = 20;
- $this->timeout = 120;
+ $this->priority = 20; // Higher priority than default jobs
+ $this->timeout = 120; // 2 minutes for email sending
}
+
+ /**
+ * Get job type identifier.
+ *
+ * @return string Job type 'email'.
+ */
public function get_job_type() {
return 'email';
}
+
+ /**
+ * Execute the email job.
+ * Routes to appropriate handler based on email type.
+ *
+ * @return Job_Result Job execution result.
+ */
public function execute() {
try {
$type = $this->get_payload_value( 'type', 'single' );
- return match ( $type ) { 'single' => $this->send_single_email(), 'bulk' => $this->send_bulk_emails(), 'newsletter' => $this->send_newsletter(), default => $this->failure( 'Unknown email type: ' . $type ), };
+
+ // Route to appropriate email handler.
+ return match ( $type ) {
+ 'single' => $this->send_single_email(),
+ 'bulk' => $this->send_bulk_emails(),
+ 'newsletter' => $this->send_newsletter(),
+ default => $this->failure( 'Unknown email type: ' . $type ),
+ };
} catch (Exception $e) {
return $this->failure( $e->getMessage(), $e->getCode() );
}
}
+
+ /**
+ * Send a single email.
+ *
+ * @return Job_Result Result with send status.
+ */
private function send_single_email() {
+ // Get email parameters from payload.
$to = $this->get_payload_value( 'to' );
$subject = $this->get_payload_value( 'subject' );
$message = $this->get_payload_value( 'message' );
$headers = $this->get_payload_value( 'headers', [] );
+
+ // Validate required fields.
if ( empty( $to ) || empty( $subject ) || empty( $message ) ) {
return $this->failure( 'Missing required email fields: to, subject, message' );
}
+
+ // Send email via wp_mail.
$attachments = $this->get_payload_value( 'attachments', [] );
$sent = \wp_mail( $to, $subject, $message, $headers, $attachments );
+
if ( $sent ) {
return $this->success( [ 'sent' => true, 'to' => $to ], [ 'email_type' => 'single' ] );
}
+
+ // Get PHPMailer error if available.
global $phpmailer;
$phpmailer_error = ( isset( $phpmailer ) && is_object( $phpmailer ) && ! empty( $phpmailer->ErrorInfo ) ) ? $phpmailer->ErrorInfo : null;
+
return $this->failure( 'Failed to send email to: ' . $to, null, [ 'email_type' => 'single', 'phpmailer_error' => $phpmailer_error ] );
}
+
+ /**
+ * Send multiple emails.
+ * Each email can have different recipients, subjects, and messages.
+ *
+ * @return Job_Result Result with batch send statistics.
+ */
private function send_bulk_emails() {
$emails = $this->get_payload_value( 'emails', [] );
+
if ( empty( $emails ) || ! is_array( $emails ) ) {
return $this->failure( 'No emails provided or invalid format' );
}
+
$sent = 0;
$failed = 0;
$failures = [];
+
+ // Process each email.
foreach ( $emails as $email ) {
$to = $email[ 'to' ] ?? '';
$subject = $email[ 'subject' ] ?? '';
$message = $email[ 'message' ] ?? '';
$headers = $email[ 'headers' ] ?? [];
+
+ // Validate required fields.
if ( empty( $to ) || empty( $subject ) || empty( $message ) ) {
$failed++;
$failures[] = [ 'to' => $to, 'reason' => 'Missing required fields' ];
continue;
}
+
+ // Send email.
$result = \wp_mail( $to, $subject, $message, $headers );
+
if ( $result ) {
$sent++;
} else {
$failed++;
$failures[] = [ 'to' => $to, 'reason' => 'wp_mail returned false' ];
}
- usleep( 100000 );
+
+ // Small delay to avoid overwhelming mail server.
+ usleep( 100000 ); // 0.1 seconds
}
- return $this->success( [ 'total' => count( $emails ), 'sent' => $sent, 'failed' => $failed, 'failures' => $failures ], [ 'email_type' => 'bulk' ] );
+
+ return $this->success(
+ [
+ 'total' => count( $emails ),
+ 'sent' => $sent,
+ 'failed' => $failed,
+ 'failures' => $failures
+ ],
+ [ 'email_type' => 'bulk' ]
+ );
}
+
+ /**
+ * Send newsletter to subscribers.
+ * Sends same content to multiple recipients with personalization.
+ *
+ * @return Job_Result Result with newsletter send statistics.
+ */
private function send_newsletter() {
$subject = $this->get_payload_value( 'subject' );
$message = $this->get_payload_value( 'message' );
$subscriber_ids = $this->get_payload_value( 'subscriber_ids', [] );
$headers = $this->get_payload_value( 'headers', [] );
+
+ // Validate required fields.
if ( empty( $subject ) || empty( $message ) ) {
return $this->failure( 'Missing required newsletter fields: subject, message' );
}
+
+ // Get subscriber list.
$subscribers = $this->get_newsletter_subscribers( $subscriber_ids );
if ( empty( $subscribers ) ) {
return $this->failure( 'No subscribers found' );
}
+
$sent = 0;
$failed = 0;
$failures = [];
+
+ // Send to each subscriber.
foreach ( $subscribers as $subscriber ) {
$to = is_array( $subscriber ) ? $subscriber[ 'email' ] : $subscriber;
+
+ // Validate email address.
if ( ! \is_email( $to ) ) {
$failed++;
$failures[] = [ 'to' => $to, 'reason' => 'Invalid email address' ];
continue;
}
+
+ // Personalize message for subscriber.
$personalized = $this->personalize_message( $message, $subscriber );
$result = \wp_mail( $to, $subject, $personalized, $headers );
+
if ( $result ) {
$sent++;
} else {
$failed++;
$failures[] = [ 'to' => $to, 'reason' => 'wp_mail returned false' ];
}
- usleep( 200000 );
+
+ // Delay to avoid overwhelming mail server.
+ usleep( 200000 ); // 0.2 seconds
}
- return $this->success( [ 'total' => count( $subscribers ), 'sent' => $sent, 'failed' => $failed, 'failures' => $failures, 'subject' => $subject ], [ 'email_type' => 'newsletter' ] );
+
+ return $this->success(
+ [
+ 'total' => count( $subscribers ),
+ 'sent' => $sent,
+ 'failed' => $failed,
+ 'failures' => $failures,
+ 'subject' => $subject
+ ],
+ [ 'email_type' => 'newsletter' ]
+ );
}
+
+ /**
+ * Get newsletter subscribers.
+ *
+ * @param array $subscriber_ids Optional specific subscriber IDs.
+ * @return array Array of subscriber data.
+ */
private function get_newsletter_subscribers( $subscriber_ids = [] ) {
+ // Get specific subscribers if IDs provided.
if ( ! empty( $subscriber_ids ) ) {
global $wpdb;
$placeholders = implode( ',', array_fill( 0, count( $subscriber_ids ), '%d' ) );
return $wpdb->get_results( $wpdb->prepare( "SELECT email, display_name FROM {$wpdb->users} WHERE ID IN ($placeholders)", ...$subscriber_ids ), ARRAY_A );
}
+
+ // Get all users as subscribers.
$users = \get_users( [ 'fields' => [ 'user_email', 'display_name' ] ] );
$subs = [];
foreach ( $users as $u ) {
@@ -115,35 +229,100 @@ private function get_newsletter_subscribers( $subscriber_ids = [] ) {
}
return $subs;
}
+
+ /**
+ * Personalize message with subscriber data.
+ * Replaces {name} and {email} placeholders.
+ *
+ * @param string $message Message template.
+ * @param array|string $subscriber Subscriber data.
+ * @return string Personalized message.
+ */
private function personalize_message( $message, $subscriber ) {
$name = is_array( $subscriber ) ? ( $subscriber[ 'display_name' ] ?? $subscriber[ 'email' ] ) : $subscriber;
- $repl = [ '{name}' => $name, '{email}' => is_array( $subscriber ) ? $subscriber[ 'email' ] : $subscriber ];
+ $repl = [
+ '{name}' => $name,
+ '{email}' => is_array( $subscriber ) ? $subscriber[ 'email' ] : $subscriber
+ ];
return str_replace( array_keys( $repl ), array_values( $repl ), $message );
}
+
+ /**
+ * Handle email job failure.
+ * Extends parent handler with email-specific action.
+ *
+ * @param mixed $exception Exception or failure reason.
+ * @param int $attempt Current attempt number.
+ */
public function handle_failure( $exception, $attempt ) {
parent::handle_failure( $exception, $attempt );
$email_type = $this->get_payload_value( 'type', 'single' );
\do_action( 'redis_queue_email_job_failed', $this, $exception, $attempt, $email_type );
}
+
+ /**
+ * Determine if email job should be retried.
+ * Don't retry invalid emails or bulk emails after 2 attempts.
+ *
+ * @param mixed $exception Exception or failure reason.
+ * @param int $attempt Current attempt number.
+ * @return bool True if job should be retried.
+ */
public function should_retry( $exception, $attempt ) {
+ // Don't retry invalid email addresses.
if ( $exception instanceof Exception && str_contains( $exception->getMessage(), 'Invalid email' ) ) {
return false;
}
+
+ // Don't retry non-exception failures.
if ( ! ( $exception instanceof Exception ) ) {
return false;
}
+
+ // Limit bulk email retries.
$email_type = $this->get_payload_value( 'type', 'single' );
if ( $email_type === 'bulk' && $attempt >= 2 ) {
return false;
}
+
return parent::should_retry( $exception, $attempt );
}
+
+ /**
+ * Create single email job.
+ * Static factory method for creating single email jobs.
+ *
+ * @param string $to Recipient email address.
+ * @param string $subject Email subject.
+ * @param string $message Email message.
+ * @param array $headers Optional email headers.
+ * @return self Email job instance.
+ */
public static function create_single_email( $to, $subject, $message, $headers = [] ) {
return new self( [ 'type' => 'single', 'to' => $to, 'subject' => $subject, 'message' => $message, 'headers' => $headers ] );
}
+
+ /**
+ * Create bulk email job.
+ * Static factory method for creating bulk email jobs.
+ *
+ * @param array $emails Array of email data (to, subject, message, headers).
+ * @return self Email job instance.
+ */
public static function create_bulk_emails( $emails ) {
return new self( [ 'type' => 'bulk', 'emails' => $emails ] );
}
+
+ /**
+ * Create newsletter job.
+ * Static factory method for creating newsletter jobs.
+ *
+ * @param string $subject Newsletter subject.
+ * @param string $message Newsletter message (supports {name} and {email} placeholders).
+ * @param array $subscriber_ids Optional specific subscriber IDs.
+ * @param array $headers Optional email headers.
+ * @return self Email job instance.
+ */
public static function create_newsletter( $subject, $message, $subscriber_ids = [], $headers = [] ) {
return new self( [ 'type' => 'newsletter', 'subject' => $subject, 'message' => $message, 'subscriber_ids' => $subscriber_ids, 'headers' => $headers ] );
}
diff --git a/src/Workers/Sync_Worker.php b/src/Workers/Sync_Worker.php
index fa0c8ad..574f900 100644
--- a/src/Workers/Sync_Worker.php
+++ b/src/Workers/Sync_Worker.php
@@ -7,29 +7,30 @@
use Throwable; // PHP 7+/8+ throwable base.
/**
- * Namespaced synchronous worker.
+ * Synchronous Worker.
*
- * This is a straight namespace wrapper of the original global Sync_Worker class.
- * Logic/behavior intentionally unchanged; UI & external integrations should continue to work.
- * Legacy global alias removed (backward compatibility dropped).
+ * Processes queued jobs synchronously in the current request.
+ * Used for CLI workers, cron jobs, and manual triggering.
+ * Tracks statistics and provides hooks for monitoring.
*
* @since 1.0.0 (legacy)
* @since 1.1.0 Namespaced
+ * @since 2.0.0 Renamed namespace
*/
class Sync_Worker {
- /** @var Redis_Queue_Manager */
+ /** @var Redis_Queue_Manager Queue manager instance. */
private $queue_manager;
- /** @var Job_Processor */
+ /** @var Job_Processor Job processor instance. */
private $job_processor;
- /** @var array */
+ /** @var array Worker configuration. */
private $config;
- /** @var string */
+ /** @var string Current worker state. */
private $state = 'idle';
- /** @var array */
+ /** @var array Worker statistics. */
private $stats = [
'jobs_processed' => 0,
'jobs_failed' => 0,
@@ -38,6 +39,13 @@ class Sync_Worker {
'last_activity' => null,
];
+ /**
+ * Constructor.
+ *
+ * @param Redis_Queue_Manager $queue_manager Queue manager instance.
+ * @param Job_Processor $job_processor Job processor instance.
+ * @param array $config Optional worker configuration.
+ */
public function __construct( Redis_Queue_Manager $queue_manager, Job_Processor $job_processor, $config = [] ) {
$this->queue_manager = $queue_manager;
$this->job_processor = $job_processor;
@@ -45,18 +53,34 @@ public function __construct( Redis_Queue_Manager $queue_manager, Job_Processor $
$this->stats[ 'start_time' ] = microtime( true );
}
+ /**
+ * Process jobs from specified queues.
+ * Continues processing until max_jobs reached or no more jobs available.
+ *
+ * @param array $queues Queue names to process from.
+ * @param int|null $max_jobs Maximum number of jobs to process. If null, uses config value.
+ * @return array Processing results with statistics.
+ */
public function process_jobs( $queues = [ 'default' ], $max_jobs = null ) {
+ // Check Redis connection availability.
if ( ! $this->queue_manager->is_connected() ) {
return [ 'success' => false, 'error' => 'Redis connection not available' ];
}
+
+ // Update worker state.
$this->state = 'working';
$this->stats[ 'last_activity' ] = microtime( true );
+
+ // Use configured max_jobs if not specified.
if ( null === $max_jobs ) {
$max_jobs = $this->config[ 'max_jobs_per_run' ];
}
+
+ // Fire worker start actions.
function_exists( '\do_action' ) && \do_action( 'redis_queue_worker_start', $this, $queues, $max_jobs );
- function_exists( '\do_action' ) && \do_action( 'redis_queue_worker_start', $this, $queues, $max_jobs );
+
try {
+ // Process jobs via job processor.
$results = $this->job_processor->process_jobs( $queues, $max_jobs );
$this->stats[ 'jobs_processed' ] += $results[ 'processed' ];
$this->stats[ 'total_time' ] += $results[ 'total_time' ];