269 lines
11 KiB
PHP
Executable File
269 lines
11 KiB
PHP
Executable File
<?php
|
|
/**
|
|
* Process Queue - Procesa mensajes programados
|
|
*
|
|
* REFACTORIZADO: Usa factories y nueva estructura de directorios
|
|
* Este script debe ejecutarse cada minuto vía cron job
|
|
*
|
|
* Cambios:
|
|
* - Usa SenderFactory y ConverterFactory
|
|
* - Estructura modular por plataforma
|
|
* - Código más limpio y mantenible
|
|
*/
|
|
|
|
error_log("Iniciando process_queue.php (REFACTORIZADO) - " . date('Y-m-d H:i:s'));
|
|
|
|
// Core dependencies
|
|
require_once __DIR__ . '/config/config.php';
|
|
require_once __DIR__ . '/includes/logger.php';
|
|
require_once __DIR__ . '/includes/db.php';
|
|
require_once __DIR__ . '/common/helpers/schedule_helpers.php';
|
|
|
|
// Factories para manejo dinámico de plataformas
|
|
require_once __DIR__ . '/common/helpers/sender_factory.php';
|
|
require_once __DIR__ . '/common/helpers/converter_factory.php';
|
|
|
|
// Dependencies de Discord para cuando sea necesario
|
|
use Discord\Discord;
|
|
use Discord\WebSockets\Intents;
|
|
use Discord\Parts\Channel\Message;
|
|
use Discord\Builders\MessageBuilder;
|
|
use Discord\Builders\Components\ActionRow;
|
|
use Discord\Builders\Components\Button;
|
|
|
|
|
|
// Configurar zona horaria
|
|
date_default_timezone_set('America/Mexico_City');
|
|
|
|
// Variable global para rastrear el ID del schedule que se está procesando
|
|
$currentlyProcessingScheduleId = null;
|
|
|
|
// Registrar una función de apagado para manejar errores fatales
|
|
register_shutdown_function(function () use ($pdo, &$currentlyProcessingScheduleId) {
|
|
$error = error_get_last();
|
|
|
|
if ($currentlyProcessingScheduleId && $error && in_array($error['type'], [E_ERROR, E_CORE_ERROR, E_COMPILE_ERROR, E_USER_ERROR])) {
|
|
$errorMessage = sprintf("Fatal Error on schedule_id %d: %s in %s on line %d", $currentlyProcessingScheduleId, $error['message'], $error['file'], $error['line']);
|
|
error_log($errorMessage);
|
|
try {
|
|
// Solo intentar marcar como 'failed' si todavía está en 'processing'
|
|
$failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ? AND status = 'processing'");
|
|
$failStmt->execute([substr($error['message'], 0, 255), $currentlyProcessingScheduleId]);
|
|
error_log("Schedule ID {$currentlyProcessingScheduleId} marked as 'failed' due to fatal error.");
|
|
} catch (Exception $e) {
|
|
error_log("Could not mark schedule ID {$currentlyProcessingScheduleId} as failed in shutdown function: " . $e->getMessage());
|
|
}
|
|
}
|
|
});
|
|
|
|
$now = new DateTime('now', new DateTimeZone('America/Mexico_City'));
|
|
$nowFormatted = $now->format('Y-m-d H:i:00');
|
|
|
|
error_log("Buscando mensajes pendientes a las: " . $nowFormatted . " (Timezone: " . date_default_timezone_get() . ")");
|
|
|
|
$pdo->beginTransaction();
|
|
|
|
try {
|
|
$lockQuery = "
|
|
SELECT s.id
|
|
FROM schedules s
|
|
WHERE s.status = 'pending'
|
|
AND s.send_time <= ?
|
|
ORDER BY s.send_time ASC
|
|
LIMIT 10
|
|
FOR UPDATE SKIP LOCKED
|
|
";
|
|
|
|
$lockStmt = $pdo->prepare($lockQuery);
|
|
$lockStmt->execute([$nowFormatted]);
|
|
$lockedScheduleIds = $lockStmt->fetchAll(PDO::FETCH_COLUMN);
|
|
|
|
if (empty($lockedScheduleIds)) {
|
|
$pdo->commit();
|
|
error_log("No hay mensajes pendientes para procesar");
|
|
exit(0);
|
|
}
|
|
|
|
$placeholders_processing = rtrim(str_repeat('?,', count($lockedScheduleIds)), ',');
|
|
$updateStmt = $pdo->prepare("UPDATE schedules SET status = 'processing' WHERE id IN ($placeholders_processing)");
|
|
$updateStmt->execute($lockedScheduleIds);
|
|
|
|
$placeholders = rtrim(str_repeat('?,', count($lockedScheduleIds)), ',');
|
|
$query = "
|
|
SELECT
|
|
s.id as schedule_id,
|
|
s.message_id,
|
|
s.recipient_id,
|
|
m.content,
|
|
m.user_id,
|
|
u.username as creator_username,
|
|
r.platform,
|
|
r.platform_id,
|
|
r.type as recipient_type,
|
|
s.send_time,
|
|
s.is_recurring,
|
|
s.recurring_days,
|
|
s.recurring_time
|
|
FROM schedules s
|
|
JOIN messages m ON s.message_id = m.id
|
|
JOIN users u ON m.user_id = u.id
|
|
JOIN recipients r ON s.recipient_id = r.id
|
|
WHERE s.id IN ($placeholders)
|
|
ORDER BY s.send_time ASC
|
|
";
|
|
|
|
$stmt = $pdo->prepare($query);
|
|
$stmt->execute($lockedScheduleIds);
|
|
$pendingMessages = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
|
|
|
$pdo->commit();
|
|
|
|
error_log(sprintf("Se encontraron %d mensajes pendientes para enviar", count($pendingMessages)));
|
|
|
|
} catch (Exception $e) {
|
|
if ($pdo->inTransaction()) {
|
|
$pdo->rollBack();
|
|
}
|
|
error_log("Error al procesar mensajes pendientes: " . $e->getMessage());
|
|
exit(1);
|
|
}
|
|
|
|
if (empty($pendingMessages)) {
|
|
echo "No hay mensajes pendientes para enviar.\n";
|
|
exit(0);
|
|
}
|
|
|
|
function sendWithRetry($sender, $platformId, $content, $recipientType, $platform, $addTranslateButton = false, $originalFullContent = null)
|
|
{
|
|
try {
|
|
// El método sendMessage ahora puede devolver un array de IDs de mensajes o un solo ID
|
|
$result = $sender->sendMessage($platformId, $content, [], $addTranslateButton, 'es', $originalFullContent);
|
|
|
|
// Si el resultado es false, hubo un error
|
|
if ($result === false) {
|
|
return false;
|
|
}
|
|
|
|
// Si es un array, ya contiene múltiples IDs de mensajes
|
|
if (is_array($result)) {
|
|
return $result;
|
|
}
|
|
|
|
// Si es un solo ID, lo devolvemos como array de un elemento
|
|
if ($result !== true) {
|
|
return [$result];
|
|
}
|
|
|
|
// Si es true pero no hay ID (caso de Discord con mensajes vacíos)
|
|
return [];
|
|
} catch (Exception $e) {
|
|
error_log("sendWithRetry falló para la plataforma {$platform}: " . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
foreach ($pendingMessages as $msg) {
|
|
$currentlyProcessingScheduleId = $msg['schedule_id'];
|
|
$pdo->beginTransaction();
|
|
|
|
try {
|
|
$platform = $msg['platform'];
|
|
$content = $msg['content'];
|
|
$recipientId = $msg['platform_id'];
|
|
|
|
// Agregar el nombre del creador
|
|
$content .= "\n\n<div class='creator-info'>Notificación creada por: {$msg['creator_username']}</div>";
|
|
|
|
$messageSent = false;
|
|
$sentMessageIds = [];
|
|
|
|
if ($platform === 'discord') {
|
|
// Usar DiscordSender directamente desde nueva ubicación
|
|
custom_log("[Queue] Processing Discord message for schedule ID: {$msg['schedule_id']}");
|
|
|
|
require_once __DIR__ . '/discord/DiscordSender.php';
|
|
require_once __DIR__ . '/discord/converters/HtmlToDiscordMarkdownConverter.php';
|
|
|
|
$sender = new DiscordSender(DISCORD_BOT_TOKEN);
|
|
$contentHtml = $content;
|
|
|
|
try {
|
|
// DiscordSender.sendMessage devuelve array de IDs de mensajes enviados
|
|
$result = $sender->sendMessage($recipientId, $contentHtml);
|
|
|
|
if ($result !== false && is_array($result)) {
|
|
$messageSent = true;
|
|
$sentMessageIds = $result;
|
|
custom_log("[Queue] Discord message sent successfully for schedule ID: {$msg['schedule_id']}, message IDs: " . implode(', ', $sentMessageIds));
|
|
} else {
|
|
throw new Exception("DiscordSender failed to send message");
|
|
}
|
|
} catch (Exception $e) {
|
|
custom_log("[Queue] Error sending Discord message: " . $e->getMessage());
|
|
throw $e;
|
|
}
|
|
|
|
} else if ($platform === 'telegram') {
|
|
// Lógica de Telegram usando factories
|
|
custom_log("[Queue] Processing Telegram message for schedule ID: {$msg['schedule_id']}");
|
|
|
|
// Usar ConverterFactory para conversión HTML
|
|
require_once __DIR__ . '/telegram/converters/HtmlToTelegramHtmlConverter.php';
|
|
$converter = new HtmlToTelegramHtmlConverter();
|
|
$contentHtml = $converter->convert($content);
|
|
|
|
// Usar directamente TelegramSender desde nueva ubicación
|
|
require_once __DIR__ . '/telegram/TelegramSender.php';
|
|
$sender = new TelegramSender(TELEGRAM_BOT_TOKEN, $pdo);
|
|
|
|
$addTranslateButton = (strpos($content, '<div data-translate="true">') !== false);
|
|
|
|
$result = $sender->sendMessage($recipientId, $contentHtml, [], $addTranslateButton, 'es', $content);
|
|
if ($result !== false && is_array($result)) {
|
|
$messageSent = true;
|
|
foreach ($result as $sent_msg) {
|
|
if (isset($sent_msg['message_id'])) {
|
|
$sentMessageIds[] = $sent_msg['message_id'];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if ($messageSent) {
|
|
$platformMessageIdsJson = !empty($sentMessageIds) ? json_encode($sentMessageIds) : null;
|
|
$messageCount = count($sentMessageIds);
|
|
|
|
$insertStmt = $pdo->prepare("INSERT INTO sent_messages (schedule_id, recipient_id, platform_message_id, message_count, sent_at) VALUES (?, ?, ?, ?, NOW())");
|
|
$insertStmt->execute([$msg['schedule_id'], $msg['recipient_id'], $platformMessageIdsJson, $messageCount]);
|
|
|
|
if ($msg['is_recurring'] == 1) {
|
|
$recurringDays = explode(',', $msg['recurring_days']);
|
|
$nextSendTime = calculateNextSendTime($recurringDays, $msg['recurring_time']);
|
|
$updateStmt = $pdo->prepare("UPDATE schedules SET send_time = ?, status = 'pending', sent_at = NOW() WHERE id = ?");
|
|
$updateStmt->execute([$nextSendTime, $msg['schedule_id']]);
|
|
custom_log("[Queue] Recurrent message {$msg['schedule_id']} rescheduled for: {$nextSendTime}");
|
|
} else {
|
|
$updateStmt = $pdo->prepare("UPDATE schedules SET status = 'sent', sent_at = NOW() WHERE id = ?");
|
|
$updateStmt->execute([$msg['schedule_id']]);
|
|
custom_log("[Queue] Single message {$msg['schedule_id']} marked as sent.");
|
|
}
|
|
$pdo->commit();
|
|
} else {
|
|
throw new Exception("Message sending failed for schedule ID: {$msg['schedule_id']}");
|
|
}
|
|
|
|
} catch (Exception $e) {
|
|
if ($pdo->inTransaction()) {
|
|
$pdo->rollBack();
|
|
}
|
|
error_log("Error al procesar el schedule ID {$currentlyProcessingScheduleId}: " . $e->getMessage());
|
|
$failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ?");
|
|
$failStmt->execute([substr($e->getMessage(), 0, 255), $currentlyProcessingScheduleId]);
|
|
echo "Failed to process message for schedule ID: {$currentlyProcessingScheduleId}. Error: " . $e->getMessage() . "\n";
|
|
}
|
|
$currentlyProcessingScheduleId = null;
|
|
}
|
|
|
|
echo "Queue processing finished.\n";
|
|
?>
|