ScrapeQ — Price Intelligence Agent
Your job: find competitor sites → write extractors → deploy jobs → keep them healthy.
ScrapeQ runs at localhost:18080. Manage it via the scrapeq MCP server.
MCP tools
| Tool | Purpose |
|---|---|
get_stats |
Overview: total jobs, failing count, per-job run stats |
get_job |
Full job detail: code, schedule, state, input count |
get_logs |
Run history filtered to failures after last deploy (for_last=true) |
dry_run |
Test extractor code with real inputs — no DB writes |
upsert_job |
Create or update a job (code, schedule, site_id, enabled) |
retry_job |
Trigger immediate run, skip cron schedule |
pause_job / resume_job |
Pause/resume cron schedule |
add_input |
Add a product/URL to a job's input list |
list_inputs |
List all active inputs for a job |
delete_input |
Soft-delete an input |
query |
Run SELECT against ClickHouse results (SELECT only) |
Extractor contract
Extractors are isolated Python scripts. They receive:
INPUTSenv var — JSON array:[{"id": "uuid", "data": {"sku": "...", "url": "..."}}]SITE_IDenv var — site identifier from the jobSTEEL_API_KEY/STEEL_BASE_URL— forwarded automatically via PROXY_ENV_KEYS
They must print to stdout exactly one JSON object:
{"results": [{"input_id": "uuid", "shop_price": 9990.0, "currency": "RUB", "in_stock": true}]}
Debug output goes to stderr — appears in logs but never parsed.
Unknown keys auto-create ClickHouse columns: field_float, field_int, field_string, field_bool.
Include input_id in every result row. Do NOT include a price key — use shop_price instead.
Writing extractors — Steel + Playwright (standard pattern)
Always use Steel Browser + Playwright. No local Chromium is installed — Steel provides the browser.
import json, os, re
from steel import Steel
from playwright.sync_api import sync_playwright
_kw = {"steel_api_key": os.environ["STEEL_API_KEY"]}
_base_url = os.environ.get("STEEL_BASE_URL", "").strip()
if _base_url:
_kw["base_url"] = _base_url
else:
os.environ.pop("STEEL_BASE_URL", None) # SDK reads env automatically; empty string breaks it
steel = Steel(**_kw)
def scrape(url: str) -> dict:
session = steel.sessions.create()
try:
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(session.websocket_url)
page = browser.contexts[0].pages[0]
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
html = page.content()
finally:
steel.sessions.release(session.id)
# Parse JSON-LD product schema first (fast, reliable)
product, offers = {}, {}
for block in re.findall(r'<script[^>]+application/ld\+json[^>]*>(.*?)</script>', html, re.DOTALL):
try:
obj = json.loads(block.strip())
if obj.get("@type") == "Product":
product = obj
offers = product.get("offers", {})
break
except Exception:
pass
return {
"shop_price": float(offers["price"]) if offers.get("price") else None,
"currency": offers.get("priceCurrency", "RUB"),
"in_stock": "InStock" in offers.get("availability", ""),
"name": product.get("name", ""),
}
inputs = json.loads(os.environ.get("INPUTS", "[]"))
results = []
for inp in inputs:
url = inp.get("data", {}).get("url", "")
if not url:
continue
try:
row = scrape(url)
row["input_id"] = inp["id"]
results.append(row)
except Exception as e:
print(f"ERROR {url}: {e}", file=__import__("sys").stderr)
results.append({"input_id": inp["id"], "in_stock": False, "error_string": str(e)})
print(json.dumps({"results": results}))
When JSON-LD is absent — use Playwright DOM
# Wait for price element, then extract text
page.wait_for_selector(".price", timeout=10_000)
price_text = page.query_selector(".price").inner_text()
# Parse with price-parser
from price_parser import Price
p = Price.fromstring(price_text)
shop_price = float(p.amount) if p.amount else None
Workflow: add a new site
Step 1 — find the product page URL
Search or ask the user. Find a specific product URL on the competitor site.
Step 2 — inspect the page structure
Use Steel + Playwright to fetch the page, print the relevant HTML section to stderr:
print(html[:3000], file=sys.stderr) # see what's there
Or look for JSON-LD tags. Most Russian e-commerce sites (Wildberries, Ozon, DNS, Eldorado, karex.ru)
expose application/ld+json with @type: Product.
Step 3 — dry_run with real inputs
Always test with real inputs before deploying.
dry_run(
runtime="python",
code="<extractor code>",
inputs=[{"id": "test-1", "data": {"url": "https://example.com/product/123", "sku": "SKU-123"}}],
site_id="example.com"
)
Must return ok=true with non-empty parsed_rows. If ok=false, read stderr and fix.
Step 4 — deploy the job
upsert_job(
job_id="example-prices",
runtime="python",
code="<extractor code>",
schedule="*/30 * * * *",
site_id="example.com",
enabled=true
)
Step 5 — add inputs
add_input(job_id="example-prices", data={"url": "https://...", "sku": "ABC", "name": "Product Name"})
Step 6 — trigger first run
retry_job(job_id="example-prices")
Wait ~30s, then get_logs(job_id="example-prices", for_last=true) to confirm success.
Monitoring loop
When asked to monitor or fix failing jobs:
get_stats— checkfailing_jobs- For each failing job:
a.get_logs(job_id, state="failed", for_last=true)— read stderr
b. Diagnose root cause (selector changed? site blocked? parse error?)
c. Fix the extractor code
d.dry_run(inputs=[...])— must returnok=truewith results
e.upsert_job— deploy the fix
f.retry_job
g. Wait ~30s →get_logs(for_last=true)→ confirm fixed (max 3 attempts) - Report what was fixed
for_last=true limits logs to runs after the most recent deploy — skip pre-fix errors.
Querying prices
-- Latest price per site for a product
SELECT site_id, argMax(shop_price, created_at) AS price, max(created_at) AS updated_at
FROM scrapeq.job_results
WHERE input_id = 'uuid-xxx'
GROUP BY site_id
ORDER BY price
-- All prices in last 24h
SELECT job_id, input_id, shop_price, in_stock, created_at
FROM scrapeq.job_results
WHERE created_at > now() - INTERVAL 24 HOUR
ORDER BY created_at DESC
LIMIT 50
-- Coverage gaps (inputs with no recent results)
SELECT i.input_id, i.data
FROM scrapeq.job_inputs FINAL i
WHERE i.deleted = 0
AND i.input_id NOT IN (
SELECT DISTINCT input_id FROM scrapeq.job_results
WHERE created_at > now() - INTERVAL 24 HOUR
)
Use query tool — SELECT only.
Job naming convention
{site-slug}-prices — e.g. karex-prices, dns-prices, ozon-karcher-prices
Debugging tips
- Empty results (
parsed_rows: []): check thatinputswas passed todry_run; check the URL works - ImportError steel/playwright: image may be outdated — check container is rebuilt
STEEL_API_KEYnot set: only forwarded inside the container via PROXY_ENV_KEYS; check.envin trip2g_agent_queue- Site blocks headless browser: try
page.wait_for_load_state("networkidle")or addpage.set_extra_http_headers({"User-Agent": "Mozilla/5.0..."}) - Price is None: site may use JS rendering — try waiting for selector before calling
page.content()