Skip to main contentSkip to FAQSkip to contact
Planned4 min read

Python SDK#

Planned Specification. The tetrafi Python package is under development and will be published to PyPI upon release. The API surface below is the target specification - use the REST API and WebSocket directly until release. Feedback on the SDK shape is welcome.

The Python SDK mirrors the TypeScript SDK API. See the TypeScript SDK docs for architectural context - method names use snake_case in Python and camelCase in TypeScript, but behavior is identical.

The tetrafi Python package will provide both synchronous and asynchronous interfaces for the TetraFi API. Designed for algorithmic trading workflows and data pipelines.

Installation#

Install tetrafi
Press play to start demo...

Requirements: Python 3.10+, httpx, websockets

Initialization#

Synchronous#

Python
1from tetrafi import TetraFi
2
3client = TetraFi(
4 api_key="tfk_live_...",
5 environment="sandbox", # "sandbox" | "production"
6 timeout=30, # seconds, default 30
7 retries=3, # auto-retry on , default 3
8)
8 linespython
Python
1from tetrafi import AsyncTetraFi
2
3async with AsyncTetraFi(
4 api_key="tfk_live_...",
5 environment="sandbox",
6) as client:
7 rfq = await client.rfq.create(
8 pair="USDC/USDT",
9 side="buy",
10 amount="1000000.00",
11 )
11 linespython
api_keystrrequired
Your API key (tfk_test_* for sandbox, tfk_live_* for production)
environmentstr
'sandbox' | 'production'. Defaults to 'production'.Default: production
timeoutint | float
Request timeout in seconds.Default: 30
retriesint
Auto-retry count on 5xx errors.Default: 3
base_urlstr
Override the API base URL.

Core Methods#

Method Index

MethodReturnsSinceStatus
rfq.create(**params)RFQv2.0
Stable
rfq.get_quotes(rfq_id)list[Quote]v2.0
Stable
rfq.accept_best(rfq_id)Settlementv2.0
quotes.accept(quote_id)Settlementv2.0
settlements.get(id)Settlementv2.0
compliance.check(address)ComplianceStatusv2.0
audit.get_trail(id)list[AuditEntry]v2.1

rfq.create(**params) Stable#

Submit a new Request for Quote to competing solvers.

Python
1rfq = client.rfq.create(
2 pair="USDC/USDT",
3 side="buy",
4 amount="1000000.00",
5 corridor="ethereum-optimism", # optional
6 max_slippage=0.001, # optional, 0.1%
7)
8print(f"RFQ: {rfq.id}, status: {rfq.status}")
8 linespython
ReturnsRFQ

rfq.get_quotes(rfq_id) Stable#

Get competing quotes for an active RFQ.

Python
1quotes = client.rfq.get_quotes("rfq_abc123")
2for q in quotes:
3 print(f" {q.solver_id}: {q.price} ({q.confidence:.0%} confidence)")
3 linespython
Returnslist[Quote]- sorted best-price first

rfq.accept_best(rfq_id)#

Accept the best available quote automatically.

Python
1settlement = client.rfq.accept_best("rfq_abc123")
2print(f"Settlement: {settlement.id}")
2 linespython
ReturnsSettlement

quotes.accept(quote_id)#

Accept a specific quote.

Python
1settlement = client.quotes.accept("qt_xyz789")
1 linespython
ReturnsSettlement

settlements.get(id)#

Poll settlement status.

Python
1status = client.settlements.get("stl_abc123")
2print(f"Status: {status.status}")
2 linespython
ReturnsSettlement- pending | escrow_locked | filling | complete | failed

compliance.check(address)#

Check compliance attestation status.

Python
1status = client.compliance.check("...")
2print(f"Compliant: {status.is_compliant}")
2 linespython

audit.get_trail(settlement_id)#

Fetch the WORM audit trail.

Python
1entries = client.audit.get_trail("stl_abc123")
2for entry in entries:
3 print(f"[{entry.type}] {entry.timestamp}: {entry.hash}")
3 linespython

Type Definitions#

Python
1from dataclasses import dataclass
2from typing import Optional, Literal
3
4@dataclass
5class RFQ:
6 id: str
7 pair: str
8 side: Literal["buy", "sell"]
9 amount: str
10 corridor: str
11 status: Literal["pending", "quoting", "accepted", "expired"]
12 created_at: str
13 expires_at: str
14
15@dataclass
16class Quote:
17 id: str
18 rfq_id: str
19 solver_id: str
20 price: str # Decimal string, e.g. "0.9995"
21 ttl: int # Seconds until expiry
22 confidence: float # 0-1 fill confidence score
23 created_at: str
23 linespython
Python
1@dataclass
2class Settlement:
3 id: str
4 rfq_id: str
5 quote_id: str
6 status: Literal["pending", "escrow_locked", "filling", "complete", "failed", "refunded"]
7 amount: str
8 origin_tx_hash: Optional[str] = None
9 destination_tx_hash: Optional[str] = None
10 created_at: str = ""
11 settled_at: Optional[str] = None
12
13@dataclass
14class ComplianceStatus:
15 address: str
16 is_compliant: bool
17 attestation_hash: str
18 expires_at: str
18 linespython
Python
1@dataclass
2class AuditEntry:
3 trade_id: str
4 type: str
5 timestamp: str
6 data: dict
7 hash: str
8 prev_hash: str
8 linespython

Full Working Example#

Python
1import asyncio
2import time
3from tetrafi import AsyncTetraFi
4import os
5
6async def execute_swap():
7 async with AsyncTetraFi(
8 api_key=os.environ["TETRAFI_API_KEY"],
9 environment="sandbox",
10 ) as client:
11 # 1. Create RFQ
12 rfq = await client.rfq.create(
13 pair="USDC/USDT",
14 side="buy",
15 amount="1000000.00",
16 )
17 print(f"RFQ created: {rfq.id}")
18
19 # 2. Wait for auction window
20 await asyncio.sleep(3)
21
22 # 3. Get quotes
23 quotes = await client.rfq.get_quotes(rfq.id)
24 print(f"Received {len(quotes)} quotes")
25
26 if not quotes:
27 raise ValueError("No quotes received")
28
29 best = max(quotes, key=lambda q: float(q.price))
30 print(f"Best: {best.solver_id} at {best.price}")
31
32 # 4. Accept best
33 settlement = await client.quotes.accept(best.id)
34 print(f"Settlement: {settlement.id}")
35
36 # 5. Monitor to completion
37 while settlement.status not in ("complete", "failed", "refunded"):
38 await asyncio.sleep(5)
39 settlement = await client.settlements.get(settlement.id)
40 print(f"Status: {settlement.status}")
41
42 if settlement.status == "complete":
43 print("✓ Settlement complete!")
44 print(f" Origin TX: {settlement.origin_tx_hash}")
45 print(f" Dest TX: {settlement.destination_tx_hash}")
46 else:
47 raise Exception(f"Settlement failed: {settlement.status}")
48
49asyncio.run(execute_swap())
49 linespython

Error Handling#

Python
1from tetrafi.errors import (
2 TetraFiError,
3 RateLimitError,
4 AuthError,
5 ComplianceError,
6 ValidationError,
7)
8import asyncio
9
10async def safe_create_rfq(client, **kwargs):
11 retries = 0
12 while True:
13 try:
14 return await client.rfq.create(**kwargs)
15 except RateLimitError as e:
16 wait = e.retry_after or 2 ** retries
17 print(f"Rate limited - waiting {wait}s")
18 await asyncio.sleep(wait)
19 retries += 1
20 except AuthError:
21 raise # Don't retry auth errors
22 except ComplianceError as e:
23 print(f"Compliance rejected: {e.message}")
24 raise
25 except TetraFiError as e:
26 print(f"API error {e.code}: {e.message}")
27 if retries >= 3:
28 raise
29 retries += 1
29 linespython

WebSocket Event Stream#

Python
1import asyncio
2import json
3import websockets
4
5async def listen_to_events():
6 uri = "wss://api.tetrafi.io/api/v1/ws"
7
8 async with websockets.connect(uri) as ws:
9 # Authenticate
10 await ws.send(json.dumps({
11 "type": "auth",
12 "token": os.environ["TETRAFI_API_KEY"],
13 }))
14
15 # Subscribe to settlement events
16 await ws.send(json.dumps({
17 "type": "subscribe",
18 "channels": ["settlement.*", "quote.received"],
19 }))
20
21 print("Listening for events...")
22 async for message in ws:
23 event = json.loads(message)
24 event_type = event.get("type")
25
26 if event_type == "quote.received":
27 q = event["quote"]
28 print(f"[Quote] {q['solver_id']}: {q['price']}")
29 elif event_type == "settlement.complete":
30 s = event["settlement"]
31 print(f"[Complete] {s['id']}: {s['origin_tx_hash']}")
32 elif event_type == "ping":
33 await ws.send(json.dumps({"type": "pong"}))
34
35asyncio.run(listen_to_events())
35 linespython

Versioning#

The tetrafi package follows semver. Pin your dependency conservatively until you've confirmed compatibility in staging.

BumpMeaningAction required
1.0.x → 1.0.yPatch - bug fixes, no API changesAuto-update is safe
1.0.x → 1.1.xMinor - additive features, new methodsRead changelog, no migration
1.x.x → 2.0.0Major - breaking changesRead migration guide

Deprecation policy. Deprecated methods raise a DeprecationWarning for at least one minor version before removal:

Python
1>>> client.rfq.create(slippage=0.001)
2DeprecationWarning: tetrafi.rfq.create(slippage=...) is deprecated.
3Use max_slippage= instead. This will be removed in 2.0.0.
3 linespython

To make these warnings visible during development, add at the top of your test runner or main module:

Python
1import warnings
2warnings.simplefilter("always", DeprecationWarning)
2 linespython

The full changelog lives at github.com/tetrafi/sdk-python/releases. Subscribe to the GitHub releases feed to be notified of new versions.

v1.4.2Current2026-04-01(1 months ago)
AddedHMAC pseudonymization for Travel Rule data fields
FixedConnection pool leak in async client under high concurrency
ChangedDefault timeout increased from 15s to 30s to accommodate cross-chain settlement latency

Pinning tip. For production deployments, pin to an exact version in requirements.txt (tetrafi==1.4.2) or pyproject.toml (tetrafi = "1.4.2" for Poetry, tetrafi==1.4.2 for uv) rather than a range like >=1.4. This prevents an automatic minor update from changing behavior between deploys. Upgrade explicitly after reading the changelog.

Live Playground#

Try the Python SDK in your browser:

Submit an RFQ

Create an RFQ, receive quotes, and accept the best price.