Files
sistema_funcionando_lastwar/process_queue.php

269 lines
11 KiB
PHP
Executable File

<?php
/**
* Process Queue - Procesa mensajes programados
*
* REFACTORIZADO: Usa factories y nueva estructura de directorios
* Este script debe ejecutarse cada minuto vía cron job
*
* Cambios:
* - Usa SenderFactory y ConverterFactory
* - Estructura modular por plataforma
* - Código más limpio y mantenible
*/
error_log("Iniciando process_queue.php (REFACTORIZADO) - " . date('Y-m-d H:i:s'));
// Core dependencies
require_once __DIR__ . '/config/config.php';
require_once __DIR__ . '/includes/logger.php';
require_once __DIR__ . '/includes/db.php';
require_once __DIR__ . '/common/helpers/schedule_helpers.php';
// Factories para manejo dinámico de plataformas
require_once __DIR__ . '/common/helpers/sender_factory.php';
require_once __DIR__ . '/common/helpers/converter_factory.php';
// Dependencies de Discord para cuando sea necesario
use Discord\Discord;
use Discord\WebSockets\Intents;
use Discord\Parts\Channel\Message;
use Discord\Builders\MessageBuilder;
use Discord\Builders\Components\ActionRow;
use Discord\Builders\Components\Button;
// Configurar zona horaria
date_default_timezone_set('America/Mexico_City');
// Variable global para rastrear el ID del schedule que se está procesando
$currentlyProcessingScheduleId = null;
// Registrar una función de apagado para manejar errores fatales
register_shutdown_function(function () use ($pdo, &$currentlyProcessingScheduleId) {
$error = error_get_last();
if ($currentlyProcessingScheduleId && $error && in_array($error['type'], [E_ERROR, E_CORE_ERROR, E_COMPILE_ERROR, E_USER_ERROR])) {
$errorMessage = sprintf("Fatal Error on schedule_id %d: %s in %s on line %d", $currentlyProcessingScheduleId, $error['message'], $error['file'], $error['line']);
error_log($errorMessage);
try {
// Solo intentar marcar como 'failed' si todavía está en 'processing'
$failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ? AND status = 'processing'");
$failStmt->execute([substr($error['message'], 0, 255), $currentlyProcessingScheduleId]);
error_log("Schedule ID {$currentlyProcessingScheduleId} marked as 'failed' due to fatal error.");
} catch (Exception $e) {
error_log("Could not mark schedule ID {$currentlyProcessingScheduleId} as failed in shutdown function: " . $e->getMessage());
}
}
});
$now = new DateTime('now', new DateTimeZone('America/Mexico_City'));
$nowFormatted = $now->format('Y-m-d H:i:00');
error_log("Buscando mensajes pendientes a las: " . $nowFormatted . " (Timezone: " . date_default_timezone_get() . ")");
$pdo->beginTransaction();
try {
$lockQuery = "
SELECT s.id
FROM schedules s
WHERE s.status = 'pending'
AND s.send_time <= ?
ORDER BY s.send_time ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
";
$lockStmt = $pdo->prepare($lockQuery);
$lockStmt->execute([$nowFormatted]);
$lockedScheduleIds = $lockStmt->fetchAll(PDO::FETCH_COLUMN);
if (empty($lockedScheduleIds)) {
$pdo->commit();
error_log("No hay mensajes pendientes para procesar");
exit(0);
}
$placeholders_processing = rtrim(str_repeat('?,', count($lockedScheduleIds)), ',');
$updateStmt = $pdo->prepare("UPDATE schedules SET status = 'processing' WHERE id IN ($placeholders_processing)");
$updateStmt->execute($lockedScheduleIds);
$placeholders = rtrim(str_repeat('?,', count($lockedScheduleIds)), ',');
$query = "
SELECT
s.id as schedule_id,
s.message_id,
s.recipient_id,
m.content,
m.user_id,
u.username as creator_username,
r.platform,
r.platform_id,
r.type as recipient_type,
s.send_time,
s.is_recurring,
s.recurring_days,
s.recurring_time
FROM schedules s
JOIN messages m ON s.message_id = m.id
JOIN users u ON m.user_id = u.id
JOIN recipients r ON s.recipient_id = r.id
WHERE s.id IN ($placeholders)
ORDER BY s.send_time ASC
";
$stmt = $pdo->prepare($query);
$stmt->execute($lockedScheduleIds);
$pendingMessages = $stmt->fetchAll(PDO::FETCH_ASSOC);
$pdo->commit();
error_log(sprintf("Se encontraron %d mensajes pendientes para enviar", count($pendingMessages)));
} catch (Exception $e) {
if ($pdo->inTransaction()) {
$pdo->rollBack();
}
error_log("Error al procesar mensajes pendientes: " . $e->getMessage());
exit(1);
}
if (empty($pendingMessages)) {
echo "No hay mensajes pendientes para enviar.\n";
exit(0);
}
function sendWithRetry($sender, $platformId, $content, $recipientType, $platform, $addTranslateButton = false, $originalFullContent = null)
{
try {
// El método sendMessage ahora puede devolver un array de IDs de mensajes o un solo ID
$result = $sender->sendMessage($platformId, $content, [], $addTranslateButton, 'es', $originalFullContent);
// Si el resultado es false, hubo un error
if ($result === false) {
return false;
}
// Si es un array, ya contiene múltiples IDs de mensajes
if (is_array($result)) {
return $result;
}
// Si es un solo ID, lo devolvemos como array de un elemento
if ($result !== true) {
return [$result];
}
// Si es true pero no hay ID (caso de Discord con mensajes vacíos)
return [];
} catch (Exception $e) {
error_log("sendWithRetry falló para la plataforma {$platform}: " . $e->getMessage());
return false;
}
}
foreach ($pendingMessages as $msg) {
$currentlyProcessingScheduleId = $msg['schedule_id'];
$pdo->beginTransaction();
try {
$platform = $msg['platform'];
$content = $msg['content'];
$recipientId = $msg['platform_id'];
// Agregar el nombre del creador
$content .= "\n\n<div class='creator-info'>Notificación creada por: {$msg['creator_username']}</div>";
$messageSent = false;
$sentMessageIds = [];
if ($platform === 'discord') {
// Usar DiscordSender directamente desde nueva ubicación
custom_log("[Queue] Processing Discord message for schedule ID: {$msg['schedule_id']}");
require_once __DIR__ . '/discord/DiscordSender.php';
require_once __DIR__ . '/discord/converters/HtmlToDiscordMarkdownConverter.php';
$sender = new DiscordSender(DISCORD_BOT_TOKEN);
$contentHtml = $content;
try {
// DiscordSender.sendMessage devuelve array de IDs de mensajes enviados
$result = $sender->sendMessage($recipientId, $contentHtml);
if ($result !== false && is_array($result)) {
$messageSent = true;
$sentMessageIds = $result;
custom_log("[Queue] Discord message sent successfully for schedule ID: {$msg['schedule_id']}, message IDs: " . implode(', ', $sentMessageIds));
} else {
throw new Exception("DiscordSender failed to send message");
}
} catch (Exception $e) {
custom_log("[Queue] Error sending Discord message: " . $e->getMessage());
throw $e;
}
} else if ($platform === 'telegram') {
// Lógica de Telegram usando factories
custom_log("[Queue] Processing Telegram message for schedule ID: {$msg['schedule_id']}");
// Usar ConverterFactory para conversión HTML
require_once __DIR__ . '/telegram/converters/HtmlToTelegramHtmlConverter.php';
$converter = new HtmlToTelegramHtmlConverter();
$contentHtml = $converter->convert($content);
// Usar directamente TelegramSender desde nueva ubicación
require_once __DIR__ . '/telegram/TelegramSender.php';
$sender = new TelegramSender(TELEGRAM_BOT_TOKEN, $pdo);
$addTranslateButton = (strpos($content, '<div data-translate="true">') !== false);
$result = $sender->sendMessage($recipientId, $contentHtml, [], $addTranslateButton, 'es', $content);
if ($result !== false && is_array($result)) {
$messageSent = true;
foreach ($result as $sent_msg) {
if (isset($sent_msg['message_id'])) {
$sentMessageIds[] = $sent_msg['message_id'];
}
}
}
}
if ($messageSent) {
$platformMessageIdsJson = !empty($sentMessageIds) ? json_encode($sentMessageIds) : null;
$messageCount = count($sentMessageIds);
$insertStmt = $pdo->prepare("INSERT INTO sent_messages (schedule_id, recipient_id, platform_message_id, message_count, sent_at) VALUES (?, ?, ?, ?, NOW())");
$insertStmt->execute([$msg['schedule_id'], $msg['recipient_id'], $platformMessageIdsJson, $messageCount]);
if ($msg['is_recurring'] == 1) {
$recurringDays = explode(',', $msg['recurring_days']);
$nextSendTime = calculateNextSendTime($recurringDays, $msg['recurring_time']);
$updateStmt = $pdo->prepare("UPDATE schedules SET send_time = ?, status = 'pending', sent_at = NOW() WHERE id = ?");
$updateStmt->execute([$nextSendTime, $msg['schedule_id']]);
custom_log("[Queue] Recurrent message {$msg['schedule_id']} rescheduled for: {$nextSendTime}");
} else {
$updateStmt = $pdo->prepare("UPDATE schedules SET status = 'sent', sent_at = NOW() WHERE id = ?");
$updateStmt->execute([$msg['schedule_id']]);
custom_log("[Queue] Single message {$msg['schedule_id']} marked as sent.");
}
$pdo->commit();
} else {
throw new Exception("Message sending failed for schedule ID: {$msg['schedule_id']}");
}
} catch (Exception $e) {
if ($pdo->inTransaction()) {
$pdo->rollBack();
}
error_log("Error al procesar el schedule ID {$currentlyProcessingScheduleId}: " . $e->getMessage());
$failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ?");
$failStmt->execute([substr($e->getMessage(), 0, 255), $currentlyProcessingScheduleId]);
echo "Failed to process message for schedule ID: {$currentlyProcessingScheduleId}. Error: " . $e->getMessage() . "\n";
}
$currentlyProcessingScheduleId = null;
}
echo "Queue processing finished.\n";
?>