prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ? AND status = 'processing'"); $failStmt->execute([substr($error['message'], 0, 255), $currentlyProcessingScheduleId]); error_log("Schedule ID {$currentlyProcessingScheduleId} marked as 'failed' due to fatal error."); } catch (Exception $e) { error_log("Could not mark schedule ID {$currentlyProcessingScheduleId} as failed in shutdown function: " . $e->getMessage()); } } }); $now = new DateTime('now', new DateTimeZone('America/Mexico_City')); $nowFormatted = $now->format('Y-m-d H:i:00'); error_log("Buscando mensajes pendientes a las: " . $nowFormatted . " (Timezone: " . date_default_timezone_get() . ")"); $pdo->beginTransaction(); try { $lockQuery = " SELECT s.id FROM schedules s WHERE s.status = 'pending' AND s.send_time <= ? ORDER BY s.send_time ASC LIMIT 10 FOR UPDATE SKIP LOCKED "; $lockStmt = $pdo->prepare($lockQuery); $lockStmt->execute([$nowFormatted]); $lockedScheduleIds = $lockStmt->fetchAll(PDO::FETCH_COLUMN); if (empty($lockedScheduleIds)) { $pdo->commit(); error_log("No hay mensajes pendientes para procesar"); exit(0); } $placeholders_processing = rtrim(str_repeat('?,', count($lockedScheduleIds)), ','); $updateStmt = $pdo->prepare("UPDATE schedules SET status = 'processing' WHERE id IN ($placeholders_processing)"); $updateStmt->execute($lockedScheduleIds); $placeholders = rtrim(str_repeat('?,', count($lockedScheduleIds)), ','); $query = " SELECT s.id as schedule_id, s.message_id, s.recipient_id, m.content, m.user_id, u.username as creator_username, r.platform, r.platform_id, r.type as recipient_type, s.send_time, s.is_recurring, s.recurring_days, s.recurring_time FROM schedules s JOIN messages m ON s.message_id = m.id JOIN users u ON m.user_id = u.id JOIN recipients r ON s.recipient_id = r.id WHERE s.id IN ($placeholders) ORDER BY s.send_time ASC "; $stmt = $pdo->prepare($query); $stmt->execute($lockedScheduleIds); $pendingMessages = $stmt->fetchAll(PDO::FETCH_ASSOC); $pdo->commit(); error_log(sprintf("Se encontraron %d mensajes pendientes para enviar", count($pendingMessages))); } catch (Exception $e) { if ($pdo->inTransaction()) { $pdo->rollBack(); } error_log("Error al procesar mensajes pendientes: " . $e->getMessage()); exit(1); } if (empty($pendingMessages)) { echo "No hay mensajes pendientes para enviar.\n"; exit(0); } function sendWithRetry($sender, $platformId, $content, $recipientType, $platform, $addTranslateButton = false, $originalFullContent = null) { try { // El método sendMessage ahora puede devolver un array de IDs de mensajes o un solo ID $result = $sender->sendMessage($platformId, $content, [], $addTranslateButton, 'es', $originalFullContent); // Si el resultado es false, hubo un error if ($result === false) { return false; } // Si es un array, ya contiene múltiples IDs de mensajes if (is_array($result)) { return $result; } // Si es un solo ID, lo devolvemos como array de un elemento if ($result !== true) { return [$result]; } // Si es true pero no hay ID (caso de Discord con mensajes vacíos) return []; } catch (Exception $e) { error_log("sendWithRetry falló para la plataforma {$platform}: " . $e->getMessage()); return false; } } foreach ($pendingMessages as $msg) { $currentlyProcessingScheduleId = $msg['schedule_id']; $pdo->beginTransaction(); try { $platform = $msg['platform']; $content = $msg['content']; $recipientId = $msg['platform_id']; // Agregar el nombre del creador $content .= "\n\n
Notificación creada por: {$msg['creator_username']}
"; $messageSent = false; $sentMessageIds = []; if ($platform === 'discord') { // Usar DiscordSender directamente desde nueva ubicación custom_log("[Queue] Processing Discord message for schedule ID: {$msg['schedule_id']}"); require_once __DIR__ . '/discord/DiscordSender.php'; require_once __DIR__ . '/discord/converters/HtmlToDiscordMarkdownConverter.php'; $sender = new DiscordSender(DISCORD_BOT_TOKEN); $contentHtml = $content; try { // DiscordSender.sendMessage devuelve array de IDs de mensajes enviados $result = $sender->sendMessage($recipientId, $contentHtml); if ($result !== false && is_array($result)) { $messageSent = true; $sentMessageIds = $result; custom_log("[Queue] Discord message sent successfully for schedule ID: {$msg['schedule_id']}, message IDs: " . implode(', ', $sentMessageIds)); } else { throw new Exception("DiscordSender failed to send message"); } } catch (Exception $e) { custom_log("[Queue] Error sending Discord message: " . $e->getMessage()); throw $e; } } else if ($platform === 'telegram') { // Lógica de Telegram usando factories custom_log("[Queue] Processing Telegram message for schedule ID: {$msg['schedule_id']}"); // Usar ConverterFactory para conversión HTML require_once __DIR__ . '/telegram/converters/HtmlToTelegramHtmlConverter.php'; $converter = new HtmlToTelegramHtmlConverter(); $contentHtml = $converter->convert($content); // Usar directamente TelegramSender desde nueva ubicación require_once __DIR__ . '/telegram/TelegramSender.php'; $sender = new TelegramSender(TELEGRAM_BOT_TOKEN, $pdo); $addTranslateButton = (strpos($content, '
') !== false); $result = $sender->sendMessage($recipientId, $contentHtml, [], $addTranslateButton, 'es', $content); if ($result !== false && is_array($result)) { $messageSent = true; foreach ($result as $sent_msg) { if (isset($sent_msg['message_id'])) { $sentMessageIds[] = $sent_msg['message_id']; } } } } if ($messageSent) { $platformMessageIdsJson = !empty($sentMessageIds) ? json_encode($sentMessageIds) : null; $messageCount = count($sentMessageIds); $insertStmt = $pdo->prepare("INSERT INTO sent_messages (schedule_id, recipient_id, platform_message_id, message_count, sent_at) VALUES (?, ?, ?, ?, NOW())"); $insertStmt->execute([$msg['schedule_id'], $msg['recipient_id'], $platformMessageIdsJson, $messageCount]); if ($msg['is_recurring'] == 1) { $recurringDays = explode(',', $msg['recurring_days']); $nextSendTime = calculateNextSendTime($recurringDays, $msg['recurring_time']); $updateStmt = $pdo->prepare("UPDATE schedules SET send_time = ?, status = 'pending', sent_at = NOW() WHERE id = ?"); $updateStmt->execute([$nextSendTime, $msg['schedule_id']]); custom_log("[Queue] Recurrent message {$msg['schedule_id']} rescheduled for: {$nextSendTime}"); } else { $updateStmt = $pdo->prepare("UPDATE schedules SET status = 'sent', sent_at = NOW() WHERE id = ?"); $updateStmt->execute([$msg['schedule_id']]); custom_log("[Queue] Single message {$msg['schedule_id']} marked as sent."); } $pdo->commit(); } else { throw new Exception("Message sending failed for schedule ID: {$msg['schedule_id']}"); } } catch (Exception $e) { if ($pdo->inTransaction()) { $pdo->rollBack(); } error_log("Error al procesar el schedule ID {$currentlyProcessingScheduleId}: " . $e->getMessage()); $failStmt = $pdo->prepare("UPDATE schedules SET status = 'failed', error_message = ? WHERE id = ?"); $failStmt->execute([substr($e->getMessage(), 0, 255), $currentlyProcessingScheduleId]); echo "Failed to process message for schedule ID: {$currentlyProcessingScheduleId}. Error: " . $e->getMessage() . "\n"; } $currentlyProcessingScheduleId = null; } echo "Queue processing finished.\n"; ?>