diff --git a/packages/api-server/src/routes/kijiji.ts b/packages/api-server/src/routes/kijiji.ts index 0fb4c06..227d992 100644 --- a/packages/api-server/src/routes/kijiji.ts +++ b/packages/api-server/src/routes/kijiji.ts @@ -46,7 +46,7 @@ export async function kijijiRoute(req: Request): Promise { try { const items = await fetchKijijiItems( SEARCH_QUERY, - 1, + 4, // 4 requests per second for faster scraping "https://www.kijiji.ca", searchOptions, {}, diff --git a/packages/core/src/scrapers/kijiji.ts b/packages/core/src/scrapers/kijiji.ts index a001720..37113ed 100644 --- a/packages/core/src/scrapers/kijiji.ts +++ b/packages/core/src/scrapers/kijiji.ts @@ -756,51 +756,75 @@ export default async function fetchKijijiItems( `\nFound ${newListingLinks.length} new listing links on page ${page}. Total unique: ${seenUrls.size}`, ); - // Fetch details for this page's listings - const progressBar = new cliProgress.SingleBar( - {}, - cliProgress.Presets.shades_classic, - ); + // Fetch details for this page's listings with controlled concurrency + const isTTY = process.stdout?.isTTY ?? false; + const progressBar = isTTY + ? new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic) + : null; const totalProgress = newListingLinks.length; let currentProgress = 0; - progressBar.start(totalProgress, currentProgress); + progressBar?.start(totalProgress, currentProgress); - for (const link of newListingLinks) { - try { - const html = await fetchHtml(link, DELAY_MS, { - onRateInfo: (remaining, reset) => { - if (remaining && reset) { - console.log( - `\nItem - Rate limit remaining: ${remaining}, reset in: ${reset}s`, - ); - } - }, - }); - const parsed = await parseDetailedListing( - html, - BASE_URL, - finalListingOptions, + // Process in batches for controlled concurrency + const CONCURRENT_REQUESTS = REQUESTS_PER_SECOND * 2; // 2x rate for faster processing + const results: (DetailedListing | null)[] = []; + + for (let i = 0; i < newListingLinks.length; i += CONCURRENT_REQUESTS) { + const batch = newListingLinks.slice(i, i + CONCURRENT_REQUESTS); + const batchPromises = batch.map(async (link) => { + try { + const html = await fetchHtml(link, 0, { + // No per-request delay, batch handles rate limit + onRateInfo: (remaining, reset) => { + if (remaining && reset) { + console.log( + `\nItem - Rate limit remaining: ${remaining}, reset in: ${reset}s`, + ); + } + }, + }); + const parsed = await parseDetailedListing( + html, + BASE_URL, + finalListingOptions, + ); + return parsed; + } catch (err) { + if (err instanceof HttpError) { + console.error( + `\nFailed to fetch ${link}\n - ${err.statusCode} ${err.message}`, + ); + } else { + console.error( + `\nFailed to fetch ${link}\n - ${String((err as Error)?.message || err)}`, + ); + } + return null; + } finally { + currentProgress++; + progressBar?.update(currentProgress); + if (!progressBar) { + console.log(`Progress: ${currentProgress}/${totalProgress}`); + } + } + }); + + const batchResults = await Promise.all(batchPromises); + results.push(...batchResults); + + // Wait between batches to respect rate limit + if (i + CONCURRENT_REQUESTS < newListingLinks.length) { + await new Promise((resolve) => + setTimeout(resolve, DELAY_MS * batch.length), ); - if (parsed) { - allListings.push(parsed); - } - } catch (err) { - if (err instanceof HttpError) { - console.error( - `\nFailed to fetch ${link}\n - ${err.statusCode} ${err.message}`, - ); - } else { - console.error( - `\nFailed to fetch ${link}\n - ${String((err as Error)?.message || err)}`, - ); - } - } finally { - currentProgress++; - progressBar.update(currentProgress); } } - progressBar.stop(); + allListings.push( + ...results.filter((r): r is DetailedListing => r !== null), + ); + + progressBar?.stop(); // If we got fewer results than expected (40 per page), we've reached the end if (searchResults.length < 40) { diff --git a/packages/mcp-server/src/protocol/handler.ts b/packages/mcp-server/src/protocol/handler.ts index f338ecd..7de29ff 100644 --- a/packages/mcp-server/src/protocol/handler.ts +++ b/packages/mcp-server/src/protocol/handler.ts @@ -1,10 +1,8 @@ -import { - fetchEbayItems, - fetchFacebookItems, - fetchKijijiItems, -} from "@marketplace-scrapers/core"; import { tools } from "./tools"; +const API_BASE_URL = process.env.API_BASE_URL || "http://localhost:4005/api"; +const API_TIMEOUT = Number(process.env.API_TIMEOUT) || 180000; // 3 minutes default + /** * Handle MCP JSON-RPC 2.0 protocol requests */ @@ -105,32 +103,44 @@ export async function handleMcpRequest(req: Request): Promise { error: { code: -32602, message: "query parameter is required" }, }); } - const searchOptions = { - location: args.location, - category: args.category, - keywords: args.keywords, - sortBy: args.sortBy, - sortOrder: args.sortOrder, - maxPages: args.maxPages || 5, - priceMin: args.priceMin, - priceMax: args.priceMax, - }; - const items = await Promise.race([ - fetchKijijiItems( - query, - 4, - "https://www.kijiji.ca", - searchOptions, - {}, - ), - new Promise((_, reject) => + const params = new URLSearchParams({ q: query }); + if (args.location) params.append("location", args.location); + if (args.category) params.append("category", args.category); + if (args.keywords) params.append("keywords", args.keywords); + if (args.sortBy) params.append("sortBy", args.sortBy); + if (args.sortOrder) params.append("sortOrder", args.sortOrder); + if (args.maxPages) + params.append("maxPages", args.maxPages.toString()); + if (args.priceMin) + params.append("priceMin", args.priceMin.toString()); + if (args.priceMax) + params.append("priceMax", args.priceMax.toString()); + + console.log( + `[MCP] Calling Kijiji API: ${API_BASE_URL}/kijiji?${params.toString()}`, + ); + const response = await Promise.race([ + fetch(`${API_BASE_URL}/kijiji?${params.toString()}`), + new Promise((_, reject) => setTimeout( - () => reject(new Error("Request timed out after 60 seconds")), - 60000, + () => + reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)), + API_TIMEOUT, ), ), ]); - result = items || []; + + if (!response.ok) { + const errorText = await response.text(); + console.error( + `[MCP] Kijiji API error ${response.status}: ${errorText}`, + ); + throw new Error(`API returned ${response.status}: ${errorText}`); + } + result = await response.json(); + console.log( + `[MCP] Kijiji returned ${Array.isArray(result) ? result.length : 0} items`, + ); } else if (name === "search_facebook") { const query = args.query; if (!query) { @@ -140,23 +150,37 @@ export async function handleMcpRequest(req: Request): Promise { error: { code: -32602, message: "query parameter is required" }, }); } - const items = await Promise.race([ - fetchFacebookItems( - query, - 1, - args.location || "toronto", - args.maxItems || 25, - args.cookiesSource, - undefined, - ), - new Promise((_, reject) => + const params = new URLSearchParams({ q: query }); + if (args.location) params.append("location", args.location); + if (args.maxItems) + params.append("maxItems", args.maxItems.toString()); + if (args.cookiesSource) params.append("cookies", args.cookiesSource); + + console.log( + `[MCP] Calling Facebook API: ${API_BASE_URL}/facebook?${params.toString()}`, + ); + const response = await Promise.race([ + fetch(`${API_BASE_URL}/facebook?${params.toString()}`), + new Promise((_, reject) => setTimeout( - () => reject(new Error("Request timed out after 60 seconds")), - 60000, + () => + reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)), + API_TIMEOUT, ), ), ]); - result = items || []; + + if (!response.ok) { + const errorText = await response.text(); + console.error( + `[MCP] Facebook API error ${response.status}: ${errorText}`, + ); + throw new Error(`API returned ${response.status}: ${errorText}`); + } + result = await response.json(); + console.log( + `[MCP] Facebook returned ${Array.isArray(result) ? result.length : 0} items`, + ); } else if (name === "search_ebay") { const query = args.query; if (!query) { @@ -166,26 +190,49 @@ export async function handleMcpRequest(req: Request): Promise { error: { code: -32602, message: "query parameter is required" }, }); } - const items = await Promise.race([ - fetchEbayItems(query, 1, { - minPrice: args.minPrice, - maxPrice: args.maxPrice, - strictMode: args.strictMode || false, - exclusions: args.exclusions || [], - keywords: args.keywords || [query], - buyItNowOnly: args.buyItNowOnly !== false, - canadaOnly: args.canadaOnly !== false, - }), - new Promise((_, reject) => + const params = new URLSearchParams({ q: query }); + if (args.minPrice) + params.append("minPrice", args.minPrice.toString()); + if (args.maxPrice) + params.append("maxPrice", args.maxPrice.toString()); + if (args.strictMode !== undefined) + params.append("strictMode", args.strictMode.toString()); + if (args.exclusions?.length) + params.append("exclusions", args.exclusions.join(",")); + if (args.keywords?.length) + params.append("keywords", args.keywords.join(",")); + if (args.buyItNowOnly !== undefined) + params.append("buyItNowOnly", args.buyItNowOnly.toString()); + if (args.canadaOnly !== undefined) + params.append("canadaOnly", args.canadaOnly.toString()); + if (args.maxItems) + params.append("maxItems", args.maxItems.toString()); + + console.log( + `[MCP] Calling eBay API: ${API_BASE_URL}/ebay?${params.toString()}`, + ); + const response = await Promise.race([ + fetch(`${API_BASE_URL}/ebay?${params.toString()}`), + new Promise((_, reject) => setTimeout( - () => reject(new Error("Request timed out after 60 seconds")), - 60000, + () => + reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)), + API_TIMEOUT, ), ), ]); - const results = args.maxItems ? items.slice(0, args.maxItems) : items; - result = results || []; + if (!response.ok) { + const errorText = await response.text(); + console.error( + `[MCP] eBay API error ${response.status}: ${errorText}`, + ); + throw new Error(`API returned ${response.status}: ${errorText}`); + } + result = await response.json(); + console.log( + `[MCP] eBay returned ${Array.isArray(result) ? result.length : 0} items`, + ); } else { return Response.json({ jsonrpc: "2.0",