Skip to content

SQLx Integration

Ultimo provides first-class support for SQLx, a fully async SQL toolkit with compile-time query verification.

Why SQLx?

  • Fully Async - Built on top of Tokio for maximum performance
  • 🔍 Compile-time Verification - Catch SQL errors at compile time
  • 🗃️ Multiple Databases - PostgreSQL, MySQL, SQLite support
  • 🚀 Connection Pooling - Automatic connection management
  • 🎯 Raw SQL - Write SQL directly with parameter binding
  • 🔄 Transactions - Full transaction support
  • 📦 Built-in Migrations - Database migrations out of the box

Installation

Add dependencies to Cargo.toml:

[dependencies]
ultimo = { version = "0.1.2", features = ["sqlx"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }

For other databases:

  • MySQL: features = ["runtime-tokio", "mysql"]
  • SQLite: features = ["runtime-tokio", "sqlite"]

Basic Setup

use ultimo::prelude::*;
use ultimo::database::sqlx::SqlxPool;
use sqlx::FromRow;
 
#[derive(Serialize, FromRow)]
struct User {
    id: i32,
    name: String,
    email: String,
}
 
#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
 
    // Connect to database
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/mydb".to_string());
 
    let pool = SqlxPool::connect(&database_url).await?;
    app.with_sqlx(pool);
 
    // Routes
    app.get("/users", get_users);
    app.post("/users", create_user);
    app.get("/users/:id", get_user);
    app.put("/users/:id", update_user);
    app.delete("/users/:id", delete_user);
 
    app.listen("127.0.0.1:3000").await
}

CRUD Operations

Read (Query)

// List all users
app.get("/users", |ctx: Context| async move {
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let users = sqlx::query_as::<_, User>(
        "SELECT id, name, email FROM users ORDER BY id"
    )
    .fetch_all(db)
    .await?;
 
    ctx.json(json!({ "users": users, "total": users.len() })).await
});
 
// Get single user
app.get("/users/:id", |ctx: Context| async move {
    let id: i32 = ctx.req.param("id")?.parse()?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let user = sqlx::query_as::<_, User>(
        "SELECT id, name, email FROM users WHERE id = $1"
    )
    .bind(id)
    .fetch_optional(db)
    .await?;
 
    match user {
        Some(user) => ctx.json(user).await,
        None => Err(UltimoError::NotFound("User not found".to_string())),
    }
});

Create (Insert)

#[derive(Deserialize)]
struct CreateUserInput {
    name: String,
    email: String,
}
 
app.post("/users", |ctx: Context| async move {
    let input: CreateUserInput = ctx.req.json().await?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let user = sqlx::query_as::<_, User>(
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"
    )
    .bind(&input.name)
    .bind(&input.email)
    .fetch_one(db)
    .await?;
 
    ctx.status(201).await;
    ctx.json(user).await
});

Update

#[derive(Deserialize)]
struct UpdateUserInput {
    name: Option<String>,
    email: Option<String>,
}
 
app.put("/users/:id", |ctx: Context| async move {
    let id: i32 = ctx.req.param("id")?.parse()?;
    let input: UpdateUserInput = ctx.req.json().await?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let user = sqlx::query_as::<_, User>(
        "UPDATE users SET name = COALESCE($1, name), email = COALESCE($2, email)
         WHERE id = $3 RETURNING id, name, email"
    )
    .bind(input.name)
    .bind(input.email)
    .bind(id)
    .fetch_optional(db)
    .await?;
 
    match user {
        Some(user) => ctx.json(user).await,
        None => Err(UltimoError::NotFound("User not found".to_string())),
    }
});

Delete

app.delete("/users/:id", |ctx: Context| async move {
    let id: i32 = ctx.req.param("id")?.parse()?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let result = sqlx::query("DELETE FROM users WHERE id = $1")
        .bind(id)
        .execute(db)
        .await?;
 
    if result.rows_affected() == 0 {
        return Err(UltimoError::NotFound("User not found".to_string()));
    }
 
    ctx.status(204).await;
    Ok(())
});

Transactions

Execute multiple queries atomically:

app.post("/transfer", |ctx: Context| async move {
    #[derive(Deserialize)]
    struct Transfer {
        from_account: i32,
        to_account: i32,
        amount: f64,
    }
 
    let transfer: Transfer = ctx.req.json().await?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    // Start transaction
    let mut tx = db.begin().await?;
 
    // Deduct from sender
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
        .bind(transfer.amount)
        .bind(transfer.from_account)
        .execute(&mut *tx)
        .await?;
 
    // Add to receiver
    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(transfer.amount)
        .bind(transfer.to_account)
        .execute(&mut *tx)
        .await?;
 
    // Commit transaction
    tx.commit().await?;
 
    ctx.json(json!({"success": true, "amount": transfer.amount})).await
});

Connection Pooling

Configure connection pool settings:

use sqlx::postgres::PgPoolOptions;
use std::time::Duration;
 
let pool = PgPoolOptions::new()
    .max_connections(10)           // Maximum number of connections
    .min_connections(2)            // Minimum number of connections
    .connect_timeout(Duration::from_secs(5))
    .idle_timeout(Duration::from_secs(300))
    .acquire_timeout(Duration::from_secs(3))
    .connect(&database_url)
    .await?;
 
let sqlx_pool = SqlxPool::from_pool(pool);
app.with_sqlx(sqlx_pool);

Query Macros

SQLx provides macros for compile-time verification:

// Verified at compile time!
let user = sqlx::query_as!(
    User,
    "SELECT id, name, email FROM users WHERE id = $1",
    id
)
.fetch_one(db)
.await?;

Note: Requires DATABASE_URL environment variable set at compile time.

Error Handling

Handle database-specific errors gracefully:

app.post("/users", |ctx: Context| async move {
    let input: CreateUserInput = ctx.req.json().await?;
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    let user = sqlx::query_as::<_, User>(
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"
    )
    .bind(&input.name)
    .bind(&input.email)
    .fetch_one(db)
    .await
    .map_err(|e| {
        // Handle unique constraint violations
        if let sqlx::Error::Database(db_err) = &e {
            if db_err.constraint() == Some("users_email_key") {
                return UltimoError::BadRequest("Email already exists".to_string());
            }
        }
        UltimoError::Internal(format!("Database error: {}", e))
    })?;
 
    ctx.status(201).await;
    ctx.json(user).await
});

Migrations

SQLx includes a built-in migration tool:

# Create migrations directory
mkdir -p migrations
 
# Create a migration
sqlx migrate add create_users_table
 
# Edit the generated SQL file
echo "CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);" > migrations/$(ls migrations | tail -1)
 
# Run migrations
sqlx migrate run

Apply migrations in your app:

use sqlx::migrate::Migrator;
 
#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
    let pool = SqlxPool::connect(&database_url).await?;
 
    // Run migrations
    let migrator = Migrator::new(std::path::Path::new("./migrations")).await?;
    migrator.run(pool.pool()).await?;
 
    app.with_sqlx(pool);
    // ... rest of setup
}

Health Checks

Add a health check endpoint:

app.get("/health", |ctx: Context| async move {
    let db = ctx.sqlx::<sqlx::Postgres>()?;
 
    // Check database connection
    match sqlx::query("SELECT 1").execute(db).await {
        Ok(_) => ctx.json(json!({
            "status": "healthy",
            "database": "connected"
        })).await,
        Err(e) => {
            ctx.status(503).await;
            ctx.json(json!({
                "status": "unhealthy",
                "database": format!("error: {}", e)
            })).await
        }
    }
});

Testing

Write tests with a test database:

#[cfg(test)]
mod tests {
    use super::*;
 
    async fn setup_test_db() -> SqlxPool {
        let pool = SqlxPool::connect("postgres://localhost/test_db")
            .await
            .unwrap();
 
        // Create test table
        sqlx::query(
            "CREATE TABLE IF NOT EXISTS test_users (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )"
        )
        .execute(pool.pool())
        .await
        .unwrap();
 
        pool
    }
 
    #[tokio::test]
    #[ignore] // Run with: cargo test -- --ignored
    async fn test_create_user() {
        let pool = setup_test_db().await;
 
        let user = sqlx::query_as::<_, User>(
            "INSERT INTO test_users (name, email)
             VALUES ($1, $2) RETURNING id, name, email"
        )
        .bind("Alice")
        .bind("alice@example.com")
        .fetch_one(pool.pool())
        .await
        .unwrap();
 
        assert_eq!(user.name, "Alice");
        assert_eq!(user.email, "alice@example.com");
    }
}

Best Practices

  1. Use connection pooling - Don't create new connections per request
  2. Parameterize queries - Prevent SQL injection with $1, $2 placeholders
  3. Use transactions - For operations that must succeed or fail together
  4. Handle errors gracefully - Convert database errors to user-friendly messages
  5. Add indexes - For frequently queried columns
  6. Use migrations - Track database schema changes
  7. Monitor connection pool - Watch for connection exhaustion

Examples

Check out the working example:

cd examples/database-sqlx
DATABASE_URL=postgres://postgres:postgres@localhost/ultimo_test cargo run

See Also