refactor: increase kijiji scraping request rate to 4 rps

Signed-off-by: Dmytro Stanchiev <git@dmytros.dev>
This commit is contained in:
2026-01-23 14:41:56 -05:00
parent f3839aba54
commit 65eb8d1724
3 changed files with 165 additions and 94 deletions

View File

@@ -46,7 +46,7 @@ export async function kijijiRoute(req: Request): Promise<Response> {
try { try {
const items = await fetchKijijiItems( const items = await fetchKijijiItems(
SEARCH_QUERY, SEARCH_QUERY,
1, 4, // 4 requests per second for faster scraping
"https://www.kijiji.ca", "https://www.kijiji.ca",
searchOptions, searchOptions,
{}, {},

View File

@@ -756,51 +756,75 @@ export default async function fetchKijijiItems(
`\nFound ${newListingLinks.length} new listing links on page ${page}. Total unique: ${seenUrls.size}`, `\nFound ${newListingLinks.length} new listing links on page ${page}. Total unique: ${seenUrls.size}`,
); );
// Fetch details for this page's listings // Fetch details for this page's listings with controlled concurrency
const progressBar = new cliProgress.SingleBar( const isTTY = process.stdout?.isTTY ?? false;
{}, const progressBar = isTTY
cliProgress.Presets.shades_classic, ? new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic)
); : null;
const totalProgress = newListingLinks.length; const totalProgress = newListingLinks.length;
let currentProgress = 0; let currentProgress = 0;
progressBar.start(totalProgress, currentProgress); progressBar?.start(totalProgress, currentProgress);
for (const link of newListingLinks) { // Process in batches for controlled concurrency
try { const CONCURRENT_REQUESTS = REQUESTS_PER_SECOND * 2; // 2x rate for faster processing
const html = await fetchHtml(link, DELAY_MS, { const results: (DetailedListing | null)[] = [];
onRateInfo: (remaining, reset) => {
if (remaining && reset) { for (let i = 0; i < newListingLinks.length; i += CONCURRENT_REQUESTS) {
console.log( const batch = newListingLinks.slice(i, i + CONCURRENT_REQUESTS);
`\nItem - Rate limit remaining: ${remaining}, reset in: ${reset}s`, const batchPromises = batch.map(async (link) => {
); try {
} const html = await fetchHtml(link, 0, {
}, // No per-request delay, batch handles rate limit
}); onRateInfo: (remaining, reset) => {
const parsed = await parseDetailedListing( if (remaining && reset) {
html, console.log(
BASE_URL, `\nItem - Rate limit remaining: ${remaining}, reset in: ${reset}s`,
finalListingOptions, );
}
},
});
const parsed = await parseDetailedListing(
html,
BASE_URL,
finalListingOptions,
);
return parsed;
} catch (err) {
if (err instanceof HttpError) {
console.error(
`\nFailed to fetch ${link}\n - ${err.statusCode} ${err.message}`,
);
} else {
console.error(
`\nFailed to fetch ${link}\n - ${String((err as Error)?.message || err)}`,
);
}
return null;
} finally {
currentProgress++;
progressBar?.update(currentProgress);
if (!progressBar) {
console.log(`Progress: ${currentProgress}/${totalProgress}`);
}
}
});
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);
// Wait between batches to respect rate limit
if (i + CONCURRENT_REQUESTS < newListingLinks.length) {
await new Promise((resolve) =>
setTimeout(resolve, DELAY_MS * batch.length),
); );
if (parsed) {
allListings.push(parsed);
}
} catch (err) {
if (err instanceof HttpError) {
console.error(
`\nFailed to fetch ${link}\n - ${err.statusCode} ${err.message}`,
);
} else {
console.error(
`\nFailed to fetch ${link}\n - ${String((err as Error)?.message || err)}`,
);
}
} finally {
currentProgress++;
progressBar.update(currentProgress);
} }
} }
progressBar.stop(); allListings.push(
...results.filter((r): r is DetailedListing => r !== null),
);
progressBar?.stop();
// If we got fewer results than expected (40 per page), we've reached the end // If we got fewer results than expected (40 per page), we've reached the end
if (searchResults.length < 40) { if (searchResults.length < 40) {

View File

@@ -1,10 +1,8 @@
import {
fetchEbayItems,
fetchFacebookItems,
fetchKijijiItems,
} from "@marketplace-scrapers/core";
import { tools } from "./tools"; import { tools } from "./tools";
const API_BASE_URL = process.env.API_BASE_URL || "http://localhost:4005/api";
const API_TIMEOUT = Number(process.env.API_TIMEOUT) || 180000; // 3 minutes default
/** /**
* Handle MCP JSON-RPC 2.0 protocol requests * Handle MCP JSON-RPC 2.0 protocol requests
*/ */
@@ -105,32 +103,44 @@ export async function handleMcpRequest(req: Request): Promise<Response> {
error: { code: -32602, message: "query parameter is required" }, error: { code: -32602, message: "query parameter is required" },
}); });
} }
const searchOptions = { const params = new URLSearchParams({ q: query });
location: args.location, if (args.location) params.append("location", args.location);
category: args.category, if (args.category) params.append("category", args.category);
keywords: args.keywords, if (args.keywords) params.append("keywords", args.keywords);
sortBy: args.sortBy, if (args.sortBy) params.append("sortBy", args.sortBy);
sortOrder: args.sortOrder, if (args.sortOrder) params.append("sortOrder", args.sortOrder);
maxPages: args.maxPages || 5, if (args.maxPages)
priceMin: args.priceMin, params.append("maxPages", args.maxPages.toString());
priceMax: args.priceMax, if (args.priceMin)
}; params.append("priceMin", args.priceMin.toString());
const items = await Promise.race([ if (args.priceMax)
fetchKijijiItems( params.append("priceMax", args.priceMax.toString());
query,
4, console.log(
"https://www.kijiji.ca", `[MCP] Calling Kijiji API: ${API_BASE_URL}/kijiji?${params.toString()}`,
searchOptions, );
{}, const response = await Promise.race([
), fetch(`${API_BASE_URL}/kijiji?${params.toString()}`),
new Promise((_, reject) => new Promise<Response>((_, reject) =>
setTimeout( setTimeout(
() => reject(new Error("Request timed out after 60 seconds")), () =>
60000, reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)),
API_TIMEOUT,
), ),
), ),
]); ]);
result = items || [];
if (!response.ok) {
const errorText = await response.text();
console.error(
`[MCP] Kijiji API error ${response.status}: ${errorText}`,
);
throw new Error(`API returned ${response.status}: ${errorText}`);
}
result = await response.json();
console.log(
`[MCP] Kijiji returned ${Array.isArray(result) ? result.length : 0} items`,
);
} else if (name === "search_facebook") { } else if (name === "search_facebook") {
const query = args.query; const query = args.query;
if (!query) { if (!query) {
@@ -140,23 +150,37 @@ export async function handleMcpRequest(req: Request): Promise<Response> {
error: { code: -32602, message: "query parameter is required" }, error: { code: -32602, message: "query parameter is required" },
}); });
} }
const items = await Promise.race([ const params = new URLSearchParams({ q: query });
fetchFacebookItems( if (args.location) params.append("location", args.location);
query, if (args.maxItems)
1, params.append("maxItems", args.maxItems.toString());
args.location || "toronto", if (args.cookiesSource) params.append("cookies", args.cookiesSource);
args.maxItems || 25,
args.cookiesSource, console.log(
undefined, `[MCP] Calling Facebook API: ${API_BASE_URL}/facebook?${params.toString()}`,
), );
new Promise((_, reject) => const response = await Promise.race([
fetch(`${API_BASE_URL}/facebook?${params.toString()}`),
new Promise<Response>((_, reject) =>
setTimeout( setTimeout(
() => reject(new Error("Request timed out after 60 seconds")), () =>
60000, reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)),
API_TIMEOUT,
), ),
), ),
]); ]);
result = items || [];
if (!response.ok) {
const errorText = await response.text();
console.error(
`[MCP] Facebook API error ${response.status}: ${errorText}`,
);
throw new Error(`API returned ${response.status}: ${errorText}`);
}
result = await response.json();
console.log(
`[MCP] Facebook returned ${Array.isArray(result) ? result.length : 0} items`,
);
} else if (name === "search_ebay") { } else if (name === "search_ebay") {
const query = args.query; const query = args.query;
if (!query) { if (!query) {
@@ -166,26 +190,49 @@ export async function handleMcpRequest(req: Request): Promise<Response> {
error: { code: -32602, message: "query parameter is required" }, error: { code: -32602, message: "query parameter is required" },
}); });
} }
const items = await Promise.race([ const params = new URLSearchParams({ q: query });
fetchEbayItems(query, 1, { if (args.minPrice)
minPrice: args.minPrice, params.append("minPrice", args.minPrice.toString());
maxPrice: args.maxPrice, if (args.maxPrice)
strictMode: args.strictMode || false, params.append("maxPrice", args.maxPrice.toString());
exclusions: args.exclusions || [], if (args.strictMode !== undefined)
keywords: args.keywords || [query], params.append("strictMode", args.strictMode.toString());
buyItNowOnly: args.buyItNowOnly !== false, if (args.exclusions?.length)
canadaOnly: args.canadaOnly !== false, params.append("exclusions", args.exclusions.join(","));
}), if (args.keywords?.length)
new Promise((_, reject) => params.append("keywords", args.keywords.join(","));
if (args.buyItNowOnly !== undefined)
params.append("buyItNowOnly", args.buyItNowOnly.toString());
if (args.canadaOnly !== undefined)
params.append("canadaOnly", args.canadaOnly.toString());
if (args.maxItems)
params.append("maxItems", args.maxItems.toString());
console.log(
`[MCP] Calling eBay API: ${API_BASE_URL}/ebay?${params.toString()}`,
);
const response = await Promise.race([
fetch(`${API_BASE_URL}/ebay?${params.toString()}`),
new Promise<Response>((_, reject) =>
setTimeout( setTimeout(
() => reject(new Error("Request timed out after 60 seconds")), () =>
60000, reject(new Error(`Request timed out after ${API_TIMEOUT}ms`)),
API_TIMEOUT,
), ),
), ),
]); ]);
const results = args.maxItems ? items.slice(0, args.maxItems) : items; if (!response.ok) {
result = results || []; const errorText = await response.text();
console.error(
`[MCP] eBay API error ${response.status}: ${errorText}`,
);
throw new Error(`API returned ${response.status}: ${errorText}`);
}
result = await response.json();
console.log(
`[MCP] eBay returned ${Array.isArray(result) ? result.length : 0} items`,
);
} else { } else {
return Response.json({ return Response.json({
jsonrpc: "2.0", jsonrpc: "2.0",