prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ? AND status = 'processing'"); $failStmt->execute([substr($error['message'], 0, 255), $currentlyProcessingScheduleId]); error_log("Schedule ID {$currentlyProcessingScheduleId} marked as 'failed' due to fatal error."); } catch (Exception $e) { error_log("Could not mark schedule ID {$currentlyProcessingScheduleId} as failed in shutdown function: " . $e->getMessage()); } } }); $now = new DateTime('now', new DateTimeZone('America/Mexico_City')); $nowFormatted = $now->format('Y-m-d H:i:00'); error_log("Buscando mensajes pendientes a las: " . $nowFormatted . " (Timezone: " . date_default_timezone_get() . ")"); $pdo->beginTransaction(); try { $lockQuery = " SELECT s.id FROM schedules s WHERE s.status = 'pending' AND s.send_time <= ? ORDER BY s.send_time ASC LIMIT 10 FOR UPDATE SKIP LOCKED "; $lockStmt = $pdo->prepare($lockQuery); $lockStmt->execute([$nowFormatted]); $lockedScheduleIds = $lockStmt->fetchAll(PDO::FETCH_COLUMN); if (empty($lockedScheduleIds)) { $pdo->commit(); error_log("No hay mensajes pendientes para procesar"); exit(0); } $placeholders_processing = rtrim(str_repeat('?,', count($lockedScheduleIds)), ','); $updateStmt = $pdo->prepare("UPDATE schedules SET status = 'processing' WHERE id IN ($placeholders_processing)"); $updateStmt->execute($lockedScheduleIds); $placeholders = rtrim(str_repeat('?,', count($lockedScheduleIds)), ','); $query = " SELECT s.id as schedule_id, s.message_id, s.recipient_id, m.content, m.user_id, u.username as creator_username, r.platform, r.platform_id, r.type as recipient_type, s.send_time, s.is_recurring, s.recurring_days, s.recurring_time FROM schedules s JOIN messages m ON s.message_id = m.id JOIN users u ON m.user_id = u.id JOIN recipients r ON s.recipient_id = r.id WHERE s.id IN ($placeholders) ORDER BY s.send_time ASC "; $stmt = $pdo->prepare($query); $stmt->execute($lockedScheduleIds); $pendingMessages = $stmt->fetchAll(PDO::FETCH_ASSOC); $pdo->commit(); error_log(sprintf("Se encontraron %d mensajes pendientes para enviar", count($pendingMessages))); } catch (Exception $e) { if ($pdo->inTransaction()) { $pdo->rollBack(); } error_log("Error al procesar mensajes pendientes: " . $e->getMessage()); exit(1); } if (empty($pendingMessages)) { echo "No hay mensajes pendientes para enviar.\n"; exit(0); } function sendWithRetry($sender, $platformId, $content, $recipientType, $platform, $addTranslateButton = false, $originalFullContent = null) { try { // El método sendMessage ahora puede devolver un array de IDs de mensajes o un solo ID $result = $sender->sendMessage($platformId, $content, [], $addTranslateButton, 'es', $originalFullContent); // Si el resultado es false, hubo un error if ($result === false) { return false; } // Si es un array, ya contiene múltiples IDs de mensajes if (is_array($result)) { return $result; } // Si es un solo ID, lo devolvemos como array de un elemento if ($result !== true) { return [$result]; } // Si es true pero no hay ID (caso de Discord con mensajes vacíos) return []; } catch (Exception $e) { error_log("sendWithRetry falló para la plataforma {$platform}: " . $e->getMessage()); return false; } } foreach ($pendingMessages as $msg) { $currentlyProcessingScheduleId = $msg['schedule_id']; $pdo->beginTransaction(); try { $platform = $msg['platform']; $content = $msg['content']; $recipientId = $msg['platform_id']; // Agregar el nombre del creador $content .= "\n\n
Notificación creada por: {$msg['creator_username']}
"; $messageSent = false; $sentMessageIds = []; if ($platform === 'discord') { custom_log("[Queue] Processing Discord message for schedule ID: {$msg['schedule_id']}"); $converter = new HtmlToDiscordMarkdownConverter(); $parts = $converter->convertToArray($content); $addTranslateButton = (strpos($content, 'data-translate="true"') !== false); if (empty($parts)) { throw new Exception("El contenido del mensaje para Discord estaba vacío después de la conversión."); } $discord = new Discord(['token' => DISCORD_BOT_TOKEN]); $discord->on('ready', function (Discord $discord) use ($recipientId, $parts, &$messageSent, &$sentMessageIds, $pdo, $addTranslateButton) { $channel = $discord->getChannel($recipientId); if (!$channel) { $discord->getLogger()->error("[Queue] Discord channel not found: {$recipientId}"); $discord->close(); return; } $sendPart = function (int $index) use ($channel, $parts, &$sentMessageIds, &$sendPart, &$messageSent, $discord) { // Si ya hemos enviado todas las partes, terminamos if ($index >= count($parts)) { $messageSent = true; $discord->getLogger()->info('[Queue] All parts sent successfully.'); // Add translation buttons if requested if ($addTranslateButton && !empty($sentMessageIds)) { try { $stmt = $pdo->query("SELECT language_code, language_name, flag_emoji FROM supported_languages WHERE is_active = 1 ORDER BY language_name ASC"); $activeLangs = $stmt->fetchAll(PDO::FETCH_ASSOC); if (count($activeLangs) > 1) { $components = []; $actionRow = ActionRow::new(); $buttonCount = 0; foreach ($activeLangs as $lang) { $code = $lang['language_code']; $button = Button::new(Button::STYLE_SECONDARY, 'translate_manual:' . $code) ->setLabel($lang['language_name']); if (!empty($lang['flag_emoji'])) { $button->setEmoji($lang['flag_emoji']); } $actionRow->addComponent($button); $buttonCount++; if ($buttonCount % 5 === 0) { $components[] = $actionRow; $actionRow = ActionRow::new(); } } if ($buttonCount % 5 !== 0) { $components[] = $actionRow; } $firstMessageId = $sentMessageIds[0]; $channel->messages->fetch($firstMessageId)->done(function (Message $sentMessage) use ($components, $discord) { $builder = MessageBuilder::new()->setComponents($components); $sentMessage->edit($builder); $discord->getLogger()->info("[Queue] Translation buttons added to message {$sentMessage->id}"); }); } } catch (Throwable $e) { $discord->getLogger()->error('[Queue] Failed to add translation buttons', ['error' => $e->getMessage()]); } } $discord->close(); return; } $part = $parts[$index]; $contentToSend = $part['type'] === 'text' ? $part['content'] : $part['url']; if (empty(trim($contentToSend))) { // Si la parte está vacía, pasar a la siguiente $sendPart($index + 1); return; } $channel->sendMessage(MessageBuilder::new()->setContent($contentToSend)) ->done(function (Message $message) use (&$sentMessageIds, $sendPart, $index) { $sentMessageIds[] = $message->id; // Enviar la siguiente parte $sendPart($index + 1); }, function (\Exception $e) use ($discord) { $discord->getLogger()->error('[Queue] Failed to send a message part.', ['error' => $e->getMessage()]); $discord->close(); // Fallar y cerrar }); }; // Empezar a enviar desde la primera parte (índice 0) $sendPart(0); }); $discord->run(); } else if ($platform === 'telegram') { // Lógica de Telegram usando factories custom_log("[Queue] Processing Telegram message for schedule ID: {$msg['schedule_id']}"); // Usar ConverterFactory para conversión HTML require_once __DIR__ . '/telegram/converters/HtmlToTelegramHtmlConverter.php'; $converter = new HtmlToTelegramHtmlConverter(); $contentHtml = $converter->convert($content); // Usar directamente TelegramSender desde nueva ubicación require_once __DIR__ . '/telegram/TelegramSender.php'; $sender = new TelegramSender(TELEGRAM_BOT_TOKEN, $pdo); $addTranslateButton = (strpos($content, '
') !== false); $result = $sender->sendMessage($recipientId, $contentHtml, [], $addTranslateButton, 'es', $content); if ($result !== false && is_array($result)) { $messageSent = true; foreach ($result as $sent_msg) { if (isset($sent_msg['message_id'])) { $sentMessageIds[] = $sent_msg['message_id']; } } } } if ($messageSent) { $platformMessageIdsJson = !empty($sentMessageIds) ? json_encode($sentMessageIds) : null; $messageCount = count($sentMessageIds); $insertStmt = $pdo->prepare("INSERT INTO sent_messages (schedule_id, recipient_id, platform_message_id, message_count, sent_at) VALUES (?, ?, ?, ?, NOW())"); $insertStmt->execute([$msg['schedule_id'], $msg['recipient_id'], $platformMessageIdsJson, $messageCount]); if ($msg['is_recurring'] == 1) { $recurringDays = explode(',', $msg['recurring_days']); $nextSendTime = calculateNextSendTime($recurringDays, $msg['recurring_time']); $updateStmt = $pdo->prepare("UPDATE schedules SET send_time = ?, status = 'pending', sent_at = NOW() WHERE id = ?"); $updateStmt->execute([$nextSendTime, $msg['schedule_id']]); custom_log("[Queue] Recurrent message {$msg['schedule_id']} rescheduled for: {$nextSendTime}"); } else { $updateStmt = $pdo->prepare("UPDATE schedules SET status = 'sent', sent_at = NOW() WHERE id = ?"); $updateStmt->execute([$msg['schedule_id']]); custom_log("[Queue] Single message {$msg['schedule_id']} marked as sent."); } $pdo->commit(); } else { throw new Exception("Message sending failed for schedule ID: {$msg['schedule_id']}"); } } catch (Exception $e) { if ($pdo->inTransaction()) { $pdo->rollBack(); } error_log("Error al procesar el schedule ID {$currentlyProcessingScheduleId}: " . $e->getMessage()); $failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ?"); $failStmt->execute([substr($e->getMessage(), 0, 255), $currentlyProcessingScheduleId]); echo "Failed to process message for schedule ID: {$currentlyProcessingScheduleId}. Error: " . $e->getMessage() . "\n"; } $currentlyProcessingScheduleId = null; } echo "Queue processing finished.\n"; ?>