Source code for polymarket_mcp.services.data

"""Typed Data API service adapter.

Purpose:
    Fetch Polymarket Data API payloads and normalize them into stable wallet and
    trade models.

Design:
    This module is the normalization boundary between raw Data API responses and
    MCP-facing code.
"""

from __future__ import annotations

from typing import Any

from polymarket_mcp.http import JsonHttpClient
from polymarket_mcp.models.data import (
    ActivityItem,
    Position,
    Trade,
    TradesArgs,
    WalletArgs,
)
from polymarket_mcp.settings import AppSettings


def _to_optional_str(value: object) -> str | None:
    """Coerce a value to an optional string."""
    if value is None:
        return None
    return str(value)


def _to_optional_float(value: object) -> float | None:
    """Coerce a value to an optional float."""
    if value is None:
        return None
    return float(value)


def _to_optional_int(value: object) -> int | None:
    """Coerce a value to an optional integer."""
    if value is None:
        return None
    return int(value)


[docs] class DataApiService: """Service wrapper for Polymarket Data API. Args: settings: Application settings. Returns: DataApiService: Configured service adapter. """ def __init__(self, settings: AppSettings) -> None: self._client = JsonHttpClient( base_url=settings.data_base_url, timeout=settings.request_timeout_seconds, domain="data", user_agent=settings.user_agent, ) def _normalize_position(self, payload: dict[str, Any]) -> Position: """Normalize a raw position payload. Args: payload: Upstream position dictionary. Returns: Position: Normalized position model. """ return Position( market_slug=_to_optional_str( payload.get("market_slug") or payload.get("marketSlug") ), title=_to_optional_str(payload.get("title") or payload.get("question")), outcome=_to_optional_str(payload.get("outcome")), size=_to_optional_float(payload.get("size")), avg_price=_to_optional_float( payload.get("avg_price") or payload.get("avgPrice") ), current_value=_to_optional_float( payload.get("current_value") or payload.get("currentValue") ), realized_pnl=_to_optional_float( payload.get("realized_pnl") or payload.get("realizedPnl") ), unrealized_pnl=_to_optional_float( payload.get("unrealized_pnl") or payload.get("unrealizedPnl") ), ) def _normalize_activity(self, payload: dict[str, Any]) -> ActivityItem: """Normalize a raw activity payload. Args: payload: Upstream activity dictionary. Returns: ActivityItem: Normalized activity model. """ return ActivityItem( activity_type=_to_optional_str( payload.get("activity_type") or payload.get("type") ), market_slug=_to_optional_str( payload.get("market_slug") or payload.get("marketSlug") ), title=_to_optional_str(payload.get("title") or payload.get("question")), outcome=_to_optional_str(payload.get("outcome")), price=_to_optional_float(payload.get("price")), size=_to_optional_float(payload.get("size")), timestamp=_to_optional_int( payload.get("timestamp") or payload.get("time") ), ) def _normalize_trade(self, payload: dict[str, Any]) -> Trade: """Normalize a raw trade payload. Args: payload: Upstream trade dictionary. Returns: Trade: Normalized trade model. """ return Trade( market_slug=_to_optional_str( payload.get("market_slug") or payload.get("marketSlug") ), outcome=_to_optional_str(payload.get("outcome")), price=_to_optional_float(payload.get("price")), size=_to_optional_float(payload.get("size")), side=_to_optional_str(payload.get("side")), user=_to_optional_str(payload.get("user")), timestamp=_to_optional_int( payload.get("timestamp") or payload.get("time") ), )
[docs] async def get_positions(self, args: WalletArgs) -> list[Position]: """Get normalized current positions for a wallet. Args: args: Wallet query arguments. Returns: list[Position]: Current wallet positions. """ data = await self._client.get("/positions", params=args.model_dump()) return [ self._normalize_position(item) for item in data if isinstance(item, dict) ]
[docs] async def get_closed_positions(self, args: WalletArgs) -> list[Position]: """Get normalized closed positions for a wallet. Args: args: Wallet query arguments. Returns: list[Position]: Closed wallet positions. """ data = await self._client.get("/closed-positions", params=args.model_dump()) return [ self._normalize_position(item) for item in data if isinstance(item, dict) ]
[docs] async def get_activity(self, args: WalletArgs) -> list[ActivityItem]: """Get normalized wallet activity. Args: args: Wallet query arguments. Returns: list[ActivityItem]: Wallet activity rows. """ data = await self._client.get("/activity", params=args.model_dump()) return [ self._normalize_activity(item) for item in data if isinstance(item, dict) ]
[docs] async def get_trades(self, args: TradesArgs) -> list[Trade]: """Get normalized trades. Args: args: Trade query arguments. Returns: list[Trade]: Matching trade rows. """ data = await self._client.get( "/trades", params=args.model_dump(exclude_none=True), ) return [ self._normalize_trade(item) for item in data if isinstance(item, dict) ]