SQLx Integration
Ultimo provides first-class support for SQLx, a fully async SQL toolkit with compile-time query verification.
Why SQLx?
- ⚡ Fully Async - Built on top of Tokio for maximum performance
- 🔍 Compile-time Verification - Catch SQL errors at compile time
- 🗃️ Multiple Databases - PostgreSQL, MySQL, SQLite support
- 🚀 Connection Pooling - Automatic connection management
- 🎯 Raw SQL - Write SQL directly with parameter binding
- 🔄 Transactions - Full transaction support
- 📦 Built-in Migrations - Database migrations out of the box
Installation
Add dependencies to Cargo.toml:
[dependencies]
ultimo = { version = "0.1.2", features = ["sqlx"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }For other databases:
- MySQL:
features = ["runtime-tokio", "mysql"] - SQLite:
features = ["runtime-tokio", "sqlite"]
Basic Setup
use ultimo::prelude::*;
use ultimo::database::sqlx::SqlxPool;
use sqlx::FromRow;
#[derive(Serialize, FromRow)]
struct User {
id: i32,
name: String,
email: String,
}
#[tokio::main]
async fn main() -> ultimo::Result<()> {
let mut app = Ultimo::new();
// Connect to database
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/mydb".to_string());
let pool = SqlxPool::connect(&database_url).await?;
app.with_sqlx(pool);
// Routes
app.get("/users", get_users);
app.post("/users", create_user);
app.get("/users/:id", get_user);
app.put("/users/:id", update_user);
app.delete("/users/:id", delete_user);
app.listen("127.0.0.1:3000").await
}CRUD Operations
Read (Query)
// List all users
app.get("/users", |ctx: Context| async move {
let db = ctx.sqlx::<sqlx::Postgres>()?;
let users = sqlx::query_as::<_, User>(
"SELECT id, name, email FROM users ORDER BY id"
)
.fetch_all(db)
.await?;
ctx.json(json!({ "users": users, "total": users.len() })).await
});
// Get single user
app.get("/users/:id", |ctx: Context| async move {
let id: i32 = ctx.req.param("id")?.parse()?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email FROM users WHERE id = $1"
)
.bind(id)
.fetch_optional(db)
.await?;
match user {
Some(user) => ctx.json(user).await,
None => Err(UltimoError::NotFound("User not found".to_string())),
}
});Create (Insert)
#[derive(Deserialize)]
struct CreateUserInput {
name: String,
email: String,
}
app.post("/users", |ctx: Context| async move {
let input: CreateUserInput = ctx.req.json().await?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
let user = sqlx::query_as::<_, User>(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"
)
.bind(&input.name)
.bind(&input.email)
.fetch_one(db)
.await?;
ctx.status(201).await;
ctx.json(user).await
});Update
#[derive(Deserialize)]
struct UpdateUserInput {
name: Option<String>,
email: Option<String>,
}
app.put("/users/:id", |ctx: Context| async move {
let id: i32 = ctx.req.param("id")?.parse()?;
let input: UpdateUserInput = ctx.req.json().await?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
let user = sqlx::query_as::<_, User>(
"UPDATE users SET name = COALESCE($1, name), email = COALESCE($2, email)
WHERE id = $3 RETURNING id, name, email"
)
.bind(input.name)
.bind(input.email)
.bind(id)
.fetch_optional(db)
.await?;
match user {
Some(user) => ctx.json(user).await,
None => Err(UltimoError::NotFound("User not found".to_string())),
}
});Delete
app.delete("/users/:id", |ctx: Context| async move {
let id: i32 = ctx.req.param("id")?.parse()?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(id)
.execute(db)
.await?;
if result.rows_affected() == 0 {
return Err(UltimoError::NotFound("User not found".to_string()));
}
ctx.status(204).await;
Ok(())
});Transactions
Execute multiple queries atomically:
app.post("/transfer", |ctx: Context| async move {
#[derive(Deserialize)]
struct Transfer {
from_account: i32,
to_account: i32,
amount: f64,
}
let transfer: Transfer = ctx.req.json().await?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
// Start transaction
let mut tx = db.begin().await?;
// Deduct from sender
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(transfer.amount)
.bind(transfer.from_account)
.execute(&mut *tx)
.await?;
// Add to receiver
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(transfer.amount)
.bind(transfer.to_account)
.execute(&mut *tx)
.await?;
// Commit transaction
tx.commit().await?;
ctx.json(json!({"success": true, "amount": transfer.amount})).await
});Connection Pooling
Configure connection pool settings:
use sqlx::postgres::PgPoolOptions;
use std::time::Duration;
let pool = PgPoolOptions::new()
.max_connections(10) // Maximum number of connections
.min_connections(2) // Minimum number of connections
.connect_timeout(Duration::from_secs(5))
.idle_timeout(Duration::from_secs(300))
.acquire_timeout(Duration::from_secs(3))
.connect(&database_url)
.await?;
let sqlx_pool = SqlxPool::from_pool(pool);
app.with_sqlx(sqlx_pool);Query Macros
SQLx provides macros for compile-time verification:
// Verified at compile time!
let user = sqlx::query_as!(
User,
"SELECT id, name, email FROM users WHERE id = $1",
id
)
.fetch_one(db)
.await?;Note: Requires DATABASE_URL environment variable set at compile time.
Error Handling
Handle database-specific errors gracefully:
app.post("/users", |ctx: Context| async move {
let input: CreateUserInput = ctx.req.json().await?;
let db = ctx.sqlx::<sqlx::Postgres>()?;
let user = sqlx::query_as::<_, User>(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"
)
.bind(&input.name)
.bind(&input.email)
.fetch_one(db)
.await
.map_err(|e| {
// Handle unique constraint violations
if let sqlx::Error::Database(db_err) = &e {
if db_err.constraint() == Some("users_email_key") {
return UltimoError::BadRequest("Email already exists".to_string());
}
}
UltimoError::Internal(format!("Database error: {}", e))
})?;
ctx.status(201).await;
ctx.json(user).await
});Migrations
SQLx includes a built-in migration tool:
# Create migrations directory
mkdir -p migrations
# Create a migration
sqlx migrate add create_users_table
# Edit the generated SQL file
echo "CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);" > migrations/$(ls migrations | tail -1)
# Run migrations
sqlx migrate runApply migrations in your app:
use sqlx::migrate::Migrator;
#[tokio::main]
async fn main() -> ultimo::Result<()> {
let mut app = Ultimo::new();
let pool = SqlxPool::connect(&database_url).await?;
// Run migrations
let migrator = Migrator::new(std::path::Path::new("./migrations")).await?;
migrator.run(pool.pool()).await?;
app.with_sqlx(pool);
// ... rest of setup
}Health Checks
Add a health check endpoint:
app.get("/health", |ctx: Context| async move {
let db = ctx.sqlx::<sqlx::Postgres>()?;
// Check database connection
match sqlx::query("SELECT 1").execute(db).await {
Ok(_) => ctx.json(json!({
"status": "healthy",
"database": "connected"
})).await,
Err(e) => {
ctx.status(503).await;
ctx.json(json!({
"status": "unhealthy",
"database": format!("error: {}", e)
})).await
}
}
});Testing
Write tests with a test database:
#[cfg(test)]
mod tests {
use super::*;
async fn setup_test_db() -> SqlxPool {
let pool = SqlxPool::connect("postgres://localhost/test_db")
.await
.unwrap();
// Create test table
sqlx::query(
"CREATE TABLE IF NOT EXISTS test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)"
)
.execute(pool.pool())
.await
.unwrap();
pool
}
#[tokio::test]
#[ignore] // Run with: cargo test -- --ignored
async fn test_create_user() {
let pool = setup_test_db().await;
let user = sqlx::query_as::<_, User>(
"INSERT INTO test_users (name, email)
VALUES ($1, $2) RETURNING id, name, email"
)
.bind("Alice")
.bind("alice@example.com")
.fetch_one(pool.pool())
.await
.unwrap();
assert_eq!(user.name, "Alice");
assert_eq!(user.email, "alice@example.com");
}
}Best Practices
- Use connection pooling - Don't create new connections per request
- Parameterize queries - Prevent SQL injection with
$1,$2placeholders - Use transactions - For operations that must succeed or fail together
- Handle errors gracefully - Convert database errors to user-friendly messages
- Add indexes - For frequently queried columns
- Use migrations - Track database schema changes
- Monitor connection pool - Watch for connection exhaustion
Examples
Check out the working example:
cd examples/database-sqlx
DATABASE_URL=postgres://postgres:postgres@localhost/ultimo_test cargo run