Skip to main contentSkip to FAQSkip to contact

The tetrafi Python package provides both synchronous and asynchronous interfaces for the TetraFi API. Designed for algorithmic trading workflows and data pipelines.

Bash
1pip install tetrafi
2# or
3poetry add tetrafi
4# or
5uv add tetrafi
5 linesbash

Requirements: Python 3.10+, httpx, websockets

Python
1from tetrafi import TetraFi
2
3client = TetraFi(
4 api_key="sk_live_...",
5 environment="sandbox", # "sandbox" | "production"
6 timeout=30, # seconds, default 30
7 retries=3, # auto-retry on , default 3
8)
8 linespython
Python
1from tetrafi import AsyncTetraFi
2
3async with AsyncTetraFi(
4 api_key="sk_live_...",
5 environment="sandbox",
6) as client:
7 rfq = await client.rfq.create(
8 pair="USDC/USDT",
9 side="buy",
10 amount="1000000.00",
11 )
11 linespython
ParameterTypeDescription
api_keyrequiredstrYour API key (sk_test_* for sandbox, sk_live_* for production)
environmentstr"sandbox" | "production". Defaults to "production".
timeoutint | floatRequest timeout in seconds. Defaults to 30.
retriesintAuto-retry count on 5xx errors. Defaults to 3.
base_urlstrOverride the API base URL.

Submit a new Request for Quote to competing solvers.

Python
1rfq = client.rfq.create(
2 pair="USDC/USDT",
3 side="buy",
4 amount="1000000.00",
5 corridor="ethereum-optimism", # optional
6 max_slippage=0.001, # optional, 0.1%
7)
8print(f"RFQ: {rfq.id}, status: {rfq.status}")
8 linespython
ReturnsRFQ

Get competing quotes for an active RFQ.

Python
1quotes = client.rfq.get_quotes("rfq_abc123")
2for q in quotes:
3 print(f" {q.solver_id}: {q.price} ({q.confidence:.0%} confidence)")
3 linespython
Returnslist[Quote]- sorted best-price first

Accept the best available quote automatically.

Python
1settlement = client.rfq.accept_best("rfq_abc123")
2print(f"Settlement: {settlement.id}")
2 linespython
ReturnsSettlement

Accept a specific quote.

Python
1settlement = client.quotes.accept("qt_xyz789")
1 linespython
ReturnsSettlement

Poll settlement status.

Python
1status = client.settlements.get("stl_abc123")
2print(f"Status: {status.status}")
2 linespython
ReturnsSettlement- pending | escrow_locked | filling | complete | failed

Check compliance attestation status.

Python
1status = client.compliance.check("...")
2print(f"Compliant: {status.is_compliant}")
2 linespython

Fetch the WORM audit trail.

Python
1entries = client.audit.get_trail("stl_abc123")
2for entry in entries:
3 print(f"[{entry.type}] {entry.timestamp}: {entry.hash}")
3 linespython
Python
1from dataclasses import dataclass
2from typing import Optional, Literal
3
4@dataclass
5class RFQ:
6 id: str
7 pair: str
8 side: Literal["buy", "sell"]
9 amount: str
10 corridor: str
11 status: Literal["pending", "quoting", "accepted", "expired"]
12 created_at: str
13 expires_at: str
14
15@dataclass
16class Quote:
17 id: str
18 rfq_id: str
19 solver_id: str
20 price: str # Decimal string, e.g. "0.9995"
21 ttl: int # Seconds until expiry
22 confidence: float # 0-1 fill confidence score
23 created_at: str
24
25@dataclass
26class Settlement:
27 id: str
28 rfq_id: str
29 quote_id: str
30 status: Literal["pending", "escrow_locked", "filling", "complete", "failed", "refunded"]
31 amount: str
32 origin_tx_hash: Optional[str] = None
33 destination_tx_hash: Optional[str] = None
34 created_at: str = ""
35 settled_at: Optional[str] = None
36
37@dataclass
38class ComplianceStatus:
39 address: str
40 is_compliant: bool
41 attestation_hash: str
42 expires_at: str
43
44@dataclass
45class AuditEntry:
46 trade_id: str
47 type: str
48 timestamp: str
49 data: dict
50 hash: str
51 prev_hash: str
51 linespython
Python
1import asyncio
2import time
3from tetrafi import AsyncTetraFi
4import os
5
6async def execute_swap():
7 async with AsyncTetraFi(
8 api_key=os.environ["TETRAFI_API_KEY"],
9 environment="sandbox",
10 ) as client:
11 # 1. Create RFQ
12 rfq = await client.rfq.create(
13 pair="USDC/USDT",
14 side="buy",
15 amount="1000000.00",
16 )
17 print(f"RFQ created: {rfq.id}")
18
19 # 2. Wait for auction window
20 await asyncio.sleep(3)
21
22 # 3. Get quotes
23 quotes = await client.rfq.get_quotes(rfq.id)
24 print(f"Received {len(quotes)} quotes")
25
26 if not quotes:
27 raise ValueError("No quotes received")
28
29 best = max(quotes, key=lambda q: float(q.price))
30 print(f"Best: {best.solver_id} at {best.price}")
31
32 # 4. Accept best
33 settlement = await client.quotes.accept(best.id)
34 print(f"Settlement: {settlement.id}")
35
36 # 5. Monitor to completion
37 while settlement.status not in ("complete", "failed", "refunded"):
38 await asyncio.sleep(5)
39 settlement = await client.settlements.get(settlement.id)
40 print(f"Status: {settlement.status}")
41
42 if settlement.status == "complete":
43 print("✓ Settlement complete!")
44 print(f" Origin TX: {settlement.origin_tx_hash}")
45 print(f" Dest TX: {settlement.destination_tx_hash}")
46 else:
47 raise Exception(f"Settlement failed: {settlement.status}")
48
49asyncio.run(execute_swap())
49 linespython
Python
1from tetrafi.errors import (
2 TetraFiError,
3 RateLimitError,
4 AuthError,
5 ComplianceError,
6 ValidationError,
7)
8import asyncio
9
10async def safe_create_rfq(client, **kwargs):
11 retries = 0
12 while True:
13 try:
14 return await client.rfq.create(**kwargs)
15 except RateLimitError as e:
16 wait = e.retry_after or 2 ** retries
17 print(f"Rate limited - waiting {wait}s")
18 await asyncio.sleep(wait)
19 retries += 1
20 except AuthError:
21 raise # Don't retry auth errors
22 except ComplianceError as e:
23 print(f"Compliance rejected: {e.message}")
24 raise
25 except TetraFiError as e:
26 print(f"API error {e.code}: {e.message}")
27 if retries >= 3:
28 raise
29 retries += 1
29 linespython
Python
1import asyncio
2import json
3import websockets
4
5async def listen_to_events():
6 uri = "wss://api.tetrafi.io/v1/ws"
7
8 async with websockets.connect(uri) as ws:
9 # Authenticate
10 await ws.send(json.dumps({
11 "type": "auth",
12 "token": os.environ["TETRAFI_API_KEY"],
13 }))
14
15 # Subscribe to settlement events
16 await ws.send(json.dumps({
17 "type": "subscribe",
18 "channels": ["settlement.*", "quote.received"],
19 }))
20
21 print("Listening for events...")
22 async for message in ws:
23 event = json.loads(message)
24 event_type = event.get("type")
25
26 if event_type == "quote.received":
27 q = event["quote"]
28 print(f"[Quote] {q['solver_id']}: {q['price']}")
29 elif event_type == "settlement.complete":
30 s = event["settlement"]
31 print(f"[Complete] {s['id']}: {s['origin_tx_hash']}")
32 elif event_type == "ping":
33 await ws.send(json.dumps({"type": "pong"}))
34
35asyncio.run(listen_to_events())
35 linespython

The tetrafi package follows semver. Pin your dependency conservatively until you've confirmed compatibility in staging.

BumpMeaningAction required
1.0.x → 1.0.yPatch - bug fixes, no API changesAuto-update is safe
1.0.x → 1.1.xMinor - additive features, new methodsRead changelog, no migration
1.x.x → 2.0.0Major - breaking changesRead migration guide

Deprecation policy. Deprecated methods raise a DeprecationWarning for at least one minor version before removal:

Python
1>>> client.rfq.create(slippage=0.001)
2DeprecationWarning: tetrafi.rfq.create(slippage=...) is deprecated.
3Use max_slippage= instead. This will be removed in 2.0.0.
3 linespython

To make these warnings visible during development, add at the top of your test runner or main module:

Python
1import warnings
2warnings.simplefilter("always", DeprecationWarning)
2 linespython

The full changelog lives at github.com/tetrafi/sdk-python/releases. Subscribe to the GitHub releases feed to be notified of new versions.

Pinning tip. For production deployments, pin to an exact version in requirements.txt (tetrafi==1.4.2) or pyproject.toml (tetrafi = "1.4.2" for Poetry, tetrafi==1.4.2 for uv) rather than a range like >=1.4. This prevents an automatic minor update from changing behavior between deploys. Upgrade explicitly after reading the changelog.