Python PGWire Guide
QuestDB is tested with the following Python clients:
Other Python clients that are compatible with the PostgreSQL wire protocol should also work with QuestDB, but we do not test them. If you find a client that does not work, please open an issue
Performance Considerations
QuestDB is designed to be a high-performance database. The PGWire protocol has many flavors, and some of them are not optimized for performance. We found psycopg2 to be the slowest of the three clients. Our recommendation is to use asyncpg or psycopg3 for the best performance when querying data.
Note: For data ingestion, we recommend using QuestDB's first-party clients with the InfluxDB Line Protocol (ILP) instead of PGWire. PGWire should primarily be used for querying data in QuestDB. QuestDB provides an official Python client for data ingestion using ILP.
Connection Parameters
All Python PostgreSQL clients need similar connection parameters to connect to QuestDB:
CONNECTION_PARAMS = {
'host': '127.0.0.1',
'port': 8812, # Default PGWire port for QuestDB
'user': 'admin',
'password': 'quest',
'database': 'qdb'
}
asyncpg
asyncpg is an asynchronous PostgreSQL client library designed for high performance. It uses Python's async/await syntax and is built specifically for asyncio.
Features
- Fast binary protocol implementation
- Native asyncio support
- Efficient prepared statements
- Connection pooling
- Excellent performance for large result sets
Installation
pip install asyncpg
Basic Connection
import asyncio
import asyncpg
async def connect_to_questdb():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
version = await conn.fetchval("SELECT version()")
print(f"Connected to QuestDB version: {version}")
await conn.close()
asyncio.run(connect_to_questdb())
Querying Data
asyncpg provides several methods for fetching data:
fetch()
: Returns all rows as a list of Record objectsfetchval()
: Returns a single value (first column of first row)fetchrow()
: Returns a single row as a Record objectcursor()
: Returns an async cursor for streaming results
import asyncio
import asyncpg
from datetime import datetime, timedelta
async def query_with_asyncpg():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
# Fetch multiple rows
rows = await conn.fetch("""
SELECT * FROM trades
WHERE ts >= $1
ORDER BY ts DESC
LIMIT 10
""", datetime.now() - timedelta(days=1))
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row['ts']}, Symbol: {row['symbol']}, Price: {row['price']}")
# Fetch a single row
single_row = await conn.fetchrow("""
SELECT * FROM trades
LIMIT -1
""")
if single_row:
print(f"Latest trade: {single_row['symbol']} at {single_row['price']}")
# Fetch a single value
count = await conn.fetchval("SELECT count(*) FROM trades")
print(f"Total trades: {count}")
await conn.close()
asyncio.run(query_with_asyncpg())
Using Cursors for Large Result Sets
For large result sets, you can use a cursor to fetch results in batches:
import asyncio
import asyncpg
async def stream_with_cursor():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
async with conn.transaction():
# Execute a query that might return a large number of rows
cursor = await conn.cursor("""
SELECT * FROM trades
ORDER BY ts
""")
batch_size = 100
total_processed = 0
while True:
batch = await cursor.fetch(batch_size)
# If no more rows, break the loop
if not batch:
break
total_processed += len(batch)
print(f"Processed {total_processed} rows so far...")
await conn.close()
print(f"Finished processing {total_processed} total rows")
asyncio.run(stream_with_cursor())
Connection Pooling
For applications that need to execute many queries, you can use connection pooling:
import asyncio
import asyncpg
async def connection_pool_example():
pool = await asyncpg.create_pool(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM trades LIMIT 10")
print(f"Fetched {len(result)} rows")
await pool.close()
asyncio.run(connection_pool_example())
Parameterized Queries
asyncpg uses numbered parameters ($1
, $2
, etc.) for prepared statements:
import asyncio
import asyncpg
from datetime import datetime, timedelta
async def parameterized_query():
conn = await asyncpg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
database='qdb'
)
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
rows = await conn.fetch("""
SELECT
symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE ts >= $1 AND ts <= $2
GROUP BY symbol
""", start_time, end_time)
print(f"Found {len(rows)} symbols")
for row in rows:
print(f"Symbol: {row['symbol']}, Avg Price: {row['avg_price']:.2f}")
await conn.close()
asyncio.run(parameterized_query())
Binary Protocol
asyncpg uses the binary protocol by default, which improves performance by avoiding text encoding/decoding for data transfer.
Performance Tips
- Use connection pooling for multiple queries
- Take advantage of the binary protocol (used by default)
- Use parameterized queries for repeated query patterns
- For large result sets, use cursors to avoid loading all data into memory at once
psycopg3
psycopg3 is the latest major version of the popular psycopg PostgreSQL adapter. It's a complete rewrite of psycopg2 with support for both synchronous and asynchronous operations.
Features
- Support for both sync and async programming
- Server-side cursors
- Connection pooling
- Type adapters and converters
- Binary protocol support
Installation
pip install psycopg
Basic Connection
import psycopg
conn = psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True # Important for QuestDB
)
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()
print(f"Connected to QuestDB version: {version[0]}")
conn.close()
Querying Data
psycopg3 provides several methods for fetching data:
fetchall()
: Returns all rows as a list of tuplesfetchone()
: Returns a single row as a tuplefetchmany(size)
: Returns a specified number of rows
import psycopg
from datetime import datetime, timedelta
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor() as cur:
end_time = datetime.now()
start_time = end_time - timedelta(days=1)
cur.execute("""
SELECT * FROM trades
WHERE ts >= %s AND ts <= %s
ORDER BY ts DESC
LIMIT 10
""", (start_time, end_time))
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row[0]}, Symbol: {row[1]}, Price: {row[2]}")
Row Factories
psycopg3 allows you to specify how rows are returned using row factories:
import psycopg
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row['symbol']}, Price: {row['price']}")
with conn.cursor(row_factory=psycopg.rows.namedtuple_row) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row.symbol}, Price: {row.price}")
Server-Side Cursors
For large result sets, you can use server-side cursors:
import psycopg
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor() as cur:
# Execute a query that might return many rows
cur.execute("SELECT * FROM trades")
batch_size = 1000
total_processed = 0
while True:
batch = cur.fetchmany(batch_size)
if not batch:
break
total_processed += len(batch)
if total_processed % 10000 == 0:
print(f"Processed {total_processed} rows so far...")
print(f"Finished processing {total_processed} total rows")
Parameterized Queries
psycopg3 uses placeholder parameters (%s
) for prepared statements:
import psycopg
from datetime import datetime, timedelta
with psycopg.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as conn:
with conn.cursor() as cur:
# Define query parameters
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
# Execute a parameterized query
cur.execute("""
SELECT symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE ts >= %s
AND ts <= %s
GROUP BY symbol
""", (start_time, end_time))
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row[0]}, Avg Price: {row[1]:.2f}")
Async Support
psycopg3 supports async operations:
import asyncio
import psycopg
async def async_psycopg3():
async with await psycopg.AsyncConnection.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb',
autocommit=True
) as aconn:
async with aconn.cursor() as acur:
await acur.execute("SELECT * FROM trades LIMIT 10")
rows = await acur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(row)
asyncio.run(async_psycopg3())
Connection Pooling
psycopg3 provides connection pooling capabilities. This reduces the overhead of establishing new connections
and allows for efficient reuse of existing connections. The feature requires the psycopg_pool
package.
pip install psycopg_pool
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
min_size=5,
max_size=20,
kwargs={
'host': '127.0.0.1',
'port': 8812,
'user': 'admin',
'password': 'quest',
'dbname': 'qdb',
'autocommit': True
}
)
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
pool.close()
Performance Tips
- Use row factories to avoid manual tuple-to-dict conversion
- For large result sets, use server-side cursors
- Take advantage of connection pooling for multiple queries
- Use the async API for non-blocking operations
psycopg2
psycopg2 is a mature PostgreSQL adapter for Python. While not as performant as asyncpg or psycopg3, it's widely used and has excellent compatibility. This driver is recommended as a last resort for QuestDB if asyncpg or psycopg3 are not working for you.
Features
- Stable and mature API
- Thread safety
- Connection pooling (with external libraries)
- Server-side cursors
- Rich type conversion
Installation
pip install psycopg2-binary
Basic Connection
import psycopg2
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()
print(f"Connected to QuestDB version: {version[0]}")
conn.close()
Querying Data
psycopg2 provides several methods for fetching data:
fetchall()
: Returns all rows as a list of tuplesfetchone()
: Returns a single row as a tuplefetchmany(size)
: Returns a specified number of rows
import psycopg2
from datetime import datetime, timedelta
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
for row in rows:
print(f"Timestamp: {row[0]}, Symbol: {row[1]}, Price: {row[2]}")
# Fetch one row at a time
cur.execute("SELECT * FROM trades LIMIT 5")
print("\nFetching one row at a time:")
row = cur.fetchone()
while row:
print(row)
row = cur.fetchone()
# Fetch many rows at a time
cur.execute("SELECT * FROM trades LIMIT 100")
print("\nFetching 10 rows at a time:")
while True:
rows = cur.fetchmany(10)
if not rows:
break
print(f"Batch of {len(rows)} rows")
finally:
conn.close()
Dictionary Cursors
psycopg2 provides dictionary cursors to access rows by column name:
import psycopg2.extras
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row['symbol']}, Price: {row['price']}")
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM trades LIMIT 5")
rows = cur.fetchall()
for row in rows:
print(row) # Prints as a dict
finally:
conn.close()
Server-Side Cursors
For large result sets, you can use server-side cursors:
import psycopg2
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
# Execute a query that might return many rows
cur.execute("SELECT * FROM trades")
batch_size = 1000
total_processed = 0
while True:
batch = cur.fetchmany(batch_size)
# If no more rows, break the loop
if not batch:
break
total_processed += len(batch)
if total_processed % 10000 == 0:
print(f"Processed {total_processed} rows so far...")
print(f"Finished processing {total_processed} total rows")
finally:
conn.close()
Parameterized Queries
psycopg2 uses placeholder parameters (%s
) for prepared statements:
import psycopg2
from datetime import datetime, timedelta
conn = psycopg2.connect(
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn.autocommit = True
try:
with conn.cursor() as cur:
end_time = datetime.now()
start_time = end_time - timedelta(days=7000)
cur.execute("""
SELECT symbol,
avg(price) as avg_price,
min(price) as min_price,
max(price) as max_price
FROM trades
WHERE ts >= %s
AND ts <= %s
GROUP BY symbol
""", (start_time, end_time))
rows = cur.fetchall()
for row in rows:
print(f"Symbol: {row[0]}, Avg Price: {row[1]:.2f}")
finally:
conn.close()
Connection Pooling with psycopg2-pool
For connection pooling with psycopg2, you can use external libraries like psycopg2-pool:
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(
minconn=5,
maxconn=20,
host='127.0.0.1',
port=8812,
user='admin',
password='quest',
dbname='qdb'
)
conn = pool.getconn()
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT * FROM trades LIMIT 10")
rows = cur.fetchall()
print(f"Fetched {len(rows)} rows")
finally:
pool.putconn(conn)
pool.closeall()
Integration with pandas
psycopg2 integrates with pandas over SQLAlchemy, allowing you to read data directly into a DataFrame. This feature
requires the pandas
, sqlalchemy
, and questdb-connect
packages.
pip install pandas sqlalchemy questdb-connect
This example shows how to use the SQLAlchemy engine with psycopg2 for querying QuestDB into a pandas DataFrame. For ingestion from Pandas to QuestDB see our pandas ingestion guide.
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
engine = create_engine("questdb://admin:quest@localhost:8812/qdb")
with engine.connect() as conn:
end_time = datetime.now()
start_time = end_time - timedelta(days=10000)
query = """
SELECT * \
FROM trades
WHERE ts >= %s \
AND ts <= %s
ORDER BY ts \
"""
# Execute the query directly into a pandas DataFrame
df = pd.read_sql(query, conn, params=(start_time, end_time))
print(f"DataFrame shape: {df.shape}")
print(f"DataFrame columns: {df.columns.tolist()}")
print(f"Sample data:\n{df.head()}")
Known Limitations with QuestDB
- psycopg2 is generally slower than asyncpg and psycopg3
Performance Tips
- Use dictionary cursors to avoid manual tuple-to-dict conversion
- For large result sets, use server-side cursors
- Consider using a connection pool for multiple queries
- Set autocommit=True to avoid transaction overhead
Best Practices for All PGWire Clients
Query Optimization
For optimal query performance with QuestDB:
- Filter on the designated timestamp column: Always include time filters on the designated timestamp column to leverage QuestDB's time-series optimizations
- Use appropriate time ranges: Avoid querying unnecessarily large time ranges
- Use SAMPLE BY for large datasets: Downsample data when appropriate
Common Time Series Queries
QuestDB provides specialized time-series functions that work with all PGWire clients:
# Example time-series query patterns
# 1. Sample by query (works with all clients)
"""
SELECT
ts,
avg(price) as avg_value
FROM trades
WHERE timestamp >= '2020-01-01'
SAMPLE BY 1h;
"""
# 2. Latest on query (efficient way to get most recent values)
"""
SELECT * FROM trades
LATEST ON timestamp PARTITION BY symbol;
"""
Conclusion
QuestDB's support for the PostgreSQL Wire Protocol allows you to use a variety of Python clients to query your time-series data:
- asyncpg: Best performance, especially for large result sets, uses binary protocol by default
- psycopg3: Excellent balance of features and performance, supports both sync and async operations
- psycopg2: Mature and stable client with wide compatibility, but slower than asyncpg and psycopg3
For most use cases, we recommend using asyncpg or psycopg3 for better performance. For data ingestion, consider using QuestDB's first-party clients with the InfluxDB Line Protocol (ILP) for maximum throughput.