The tetrafi Python package provides both synchronous and asynchronous interfaces for the TetraFi API. Designed for algorithmic trading workflows and data pipelines.
1pip install tetrafi2# or3poetry add tetrafi4# or5uv add tetrafiRequirements: Python 3.10+, httpx, websockets
1from tetrafi import TetraFi23client = TetraFi(4 api_key="sk_live_...",5 environment="sandbox", # "sandbox" | "production"6 timeout=30, # seconds, default 307 retries=3, # auto-retry on , default 38)1from tetrafi import AsyncTetraFi23async with AsyncTetraFi(4 api_key="sk_live_...",5 environment="sandbox",6) as client:7 rfq = await client.rfq.create(8 pair="USDC/USDT",9 side="buy",10 amount="1000000.00",11 )| Parameter | Type | Description |
|---|---|---|
| api_keyrequired | str | Your API key (sk_test_* for sandbox, sk_live_* for production) |
| environment | str | "sandbox" | "production". Defaults to "production". |
| timeout | int | float | Request timeout in seconds. Defaults to 30. |
| retries | int | Auto-retry count on 5xx errors. Defaults to 3. |
| base_url | str | Override the API base URL. |
Submit a new Request for Quote to competing solvers.
1rfq = client.rfq.create(2 pair="USDC/USDT",3 side="buy",4 amount="1000000.00",5 corridor="ethereum-optimism", # optional6 max_slippage=0.001, # optional, 0.1%7)8print(f"RFQ: {rfq.id}, status: {rfq.status}")Get competing quotes for an active RFQ.
1quotes = client.rfq.get_quotes("rfq_abc123")2for q in quotes:3 print(f" {q.solver_id}: {q.price} ({q.confidence:.0%} confidence)")Accept the best available quote automatically.
1settlement = client.rfq.accept_best("rfq_abc123")2print(f"Settlement: {settlement.id}")Accept a specific quote.
1settlement = client.quotes.accept("qt_xyz789")Poll settlement status.
1status = client.settlements.get("stl_abc123")2print(f"Status: {status.status}")Check compliance attestation status.
1status = client.compliance.check("...")2print(f"Compliant: {status.is_compliant}")Fetch the WORM audit trail.
1entries = client.audit.get_trail("stl_abc123")2for entry in entries:3 print(f"[{entry.type}] {entry.timestamp}: {entry.hash}")1from dataclasses import dataclass2from typing import Optional, Literal34@dataclass5class RFQ:6 id: str7 pair: str8 side: Literal["buy", "sell"]9 amount: str10 corridor: str11 status: Literal["pending", "quoting", "accepted", "expired"]12 created_at: str13 expires_at: str1415@dataclass16class Quote:17 id: str18 rfq_id: str19 solver_id: str20 price: str # Decimal string, e.g. "0.9995"21 ttl: int # Seconds until expiry22 confidence: float # 0-1 fill confidence score23 created_at: str2425@dataclass26class Settlement:27 id: str28 rfq_id: str29 quote_id: str30 status: Literal["pending", "escrow_locked", "filling", "complete", "failed", "refunded"]31 amount: str32 origin_tx_hash: Optional[str] = None33 destination_tx_hash: Optional[str] = None34 created_at: str = ""35 settled_at: Optional[str] = None3637@dataclass38class ComplianceStatus:39 address: str40 is_compliant: bool41 attestation_hash: str42 expires_at: str4344@dataclass45class AuditEntry:46 trade_id: str47 type: str48 timestamp: str49 data: dict50 hash: str51 prev_hash: str1import asyncio2import time3from tetrafi import AsyncTetraFi4import os56async def execute_swap():7 async with AsyncTetraFi(8 api_key=os.environ["TETRAFI_API_KEY"],9 environment="sandbox",10 ) as client:11 # 1. Create RFQ12 rfq = await client.rfq.create(13 pair="USDC/USDT",14 side="buy",15 amount="1000000.00",16 )17 print(f"RFQ created: {rfq.id}")1819 # 2. Wait for auction window20 await asyncio.sleep(3)2122 # 3. Get quotes23 quotes = await client.rfq.get_quotes(rfq.id)24 print(f"Received {len(quotes)} quotes")2526 if not quotes:27 raise ValueError("No quotes received")2829 best = max(quotes, key=lambda q: float(q.price))30 print(f"Best: {best.solver_id} at {best.price}")3132 # 4. Accept best33 settlement = await client.quotes.accept(best.id)34 print(f"Settlement: {settlement.id}")3536 # 5. Monitor to completion37 while settlement.status not in ("complete", "failed", "refunded"):38 await asyncio.sleep(5)39 settlement = await client.settlements.get(settlement.id)40 print(f"Status: {settlement.status}")4142 if settlement.status == "complete":43 print("✓ Settlement complete!")44 print(f" Origin TX: {settlement.origin_tx_hash}")45 print(f" Dest TX: {settlement.destination_tx_hash}")46 else:47 raise Exception(f"Settlement failed: {settlement.status}")4849asyncio.run(execute_swap())1from tetrafi.errors import (2 TetraFiError,3 RateLimitError,4 AuthError,5 ComplianceError,6 ValidationError,7)8import asyncio910async def safe_create_rfq(client, **kwargs):11 retries = 012 while True:13 try:14 return await client.rfq.create(**kwargs)15 except RateLimitError as e:16 wait = e.retry_after or 2 ** retries17 print(f"Rate limited - waiting {wait}s")18 await asyncio.sleep(wait)19 retries += 120 except AuthError:21 raise # Don't retry auth errors22 except ComplianceError as e:23 print(f"Compliance rejected: {e.message}")24 raise25 except TetraFiError as e:26 print(f"API error {e.code}: {e.message}")27 if retries >= 3:28 raise29 retries += 11import asyncio2import json3import websockets45async def listen_to_events():6 uri = "wss://api.tetrafi.io/v1/ws"78 async with websockets.connect(uri) as ws:9 # Authenticate10 await ws.send(json.dumps({11 "type": "auth",12 "token": os.environ["TETRAFI_API_KEY"],13 }))1415 # Subscribe to settlement events16 await ws.send(json.dumps({17 "type": "subscribe",18 "channels": ["settlement.*", "quote.received"],19 }))2021 print("Listening for events...")22 async for message in ws:23 event = json.loads(message)24 event_type = event.get("type")2526 if event_type == "quote.received":27 q = event["quote"]28 print(f"[Quote] {q['solver_id']}: {q['price']}")29 elif event_type == "settlement.complete":30 s = event["settlement"]31 print(f"[Complete] {s['id']}: {s['origin_tx_hash']}")32 elif event_type == "ping":33 await ws.send(json.dumps({"type": "pong"}))3435asyncio.run(listen_to_events())The tetrafi package follows semver. Pin your dependency conservatively until you've confirmed compatibility in staging.
| Bump | Meaning | Action required |
|---|---|---|
1.0.x → 1.0.y | Patch - bug fixes, no API changes | Auto-update is safe |
1.0.x → 1.1.x | Minor - additive features, new methods | Read changelog, no migration |
1.x.x → 2.0.0 | Major - breaking changes | Read migration guide |
Deprecation policy. Deprecated methods raise a DeprecationWarning for at least one minor version before removal:
1>>> client.rfq.create(slippage=0.001)2DeprecationWarning: tetrafi.rfq.create(slippage=...) is deprecated.3Use max_slippage= instead. This will be removed in 2.0.0.To make these warnings visible during development, add at the top of your test runner or main module:
1import warnings2warnings.simplefilter("always", DeprecationWarning)The full changelog lives at github.com/tetrafi/sdk-python/releases. Subscribe to the GitHub releases feed to be notified of new versions.
Pinning tip. For production deployments, pin to an exact version in requirements.txt (tetrafi==1.4.2) or pyproject.toml (tetrafi = "1.4.2" for Poetry, tetrafi==1.4.2 for uv) rather than a range like >=1.4. This prevents an automatic minor update from changing behavior between deploys. Upgrade explicitly after reading the changelog.