Skip to content

Diesel Integration

Ultimo provides first-class support for Diesel, a safe, extensible ORM with compile-time guarantees.

Why Diesel?

  • 🛡️ Type-Safe DSL - Build queries with compile-time verification
  • High Performance - Zero-cost abstractions for SQL
  • 🗃️ Multiple Databases - PostgreSQL, MySQL, SQLite support
  • 🚀 Connection Pooling - r2d2 integration for connection management
  • 🔄 Migrations - Powerful CLI for schema management
  • 📦 Mature Ecosystem - Battle-tested in production
  • 🎯 Strong Typing - Catch errors at compile time, not runtime

Installation

Add dependencies to Cargo.toml:

[dependencies]
ultimo = { version = "0.1.2", features = ["diesel"] }
diesel = { version = "2", features = ["postgres", "r2d2"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }

For other databases:

  • MySQL: features = ["mysql", "r2d2"]
  • SQLite: features = ["sqlite", "r2d2"]

Install Diesel CLI:

cargo install diesel_cli --no-default-features --features postgres

Basic Setup

1. Initialize Diesel

# Create diesel.toml and migrations directory
diesel setup
 
# Create a migration
diesel migration generate create_users
 
# Edit the migration files
# migrations/TIMESTAMP_create_users/up.sql
# migrations/TIMESTAMP_create_users/down.sql
 
# Run migrations
diesel migration run

2. Define Schema

Diesel generates src/schema.rs:

// src/schema.rs - Auto-generated
diesel::table! {
    users (id) {
        id -> Int4,
        name -> Varchar,
        email -> Varchar,
        created_at -> Timestamptz,
    }
}

3. Define Models

// src/models.rs
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use crate::schema::users;
 
#[derive(Queryable, Selectable, Serialize)]
#[diesel(table_name = users)]
pub struct User {
    pub id: i32,
    pub name: String,
    pub email: String,
    pub created_at: chrono::NaiveDateTime,
}
 
#[derive(Insertable, Deserialize)]
#[diesel(table_name = users)]
pub struct NewUser {
    pub name: String,
    pub email: String,
}

4. Setup Application

use ultimo::prelude::*;
use ultimo::database::diesel::DieselPool;
use diesel::prelude::*;
use diesel::pg::PgConnection;
 
mod schema;
mod models;
 
#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
 
    // Connect to database
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/mydb".to_string());
 
    let pool = DieselPool::<PgConnection>::new(&database_url)?;
    app.with_diesel(pool);
 
    // Routes
    app.get("/users", get_users);
    app.post("/users", create_user);
    app.get("/users/:id", get_user);
    app.put("/users/:id", update_user);
    app.delete("/users/:id", delete_user);
 
    app.listen("127.0.0.1:3000").await
}

CRUD Operations

Read (Query)

use crate::schema::users::dsl::*;
 
// List all users
app.get("/users", |ctx: Context| async move {
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let results = users
        .select(User::as_select())
        .order(id.asc())
        .load(&mut *conn)?;
 
    ctx.json(json!({ "users": results, "total": results.len() })).await
});
 
// Get single user
app.get("/users/:id", |ctx: Context| async move {
    let user_id: i32 = ctx.req.param("id")?.parse()?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let user = users
        .find(user_id)
        .select(User::as_select())
        .first(&mut *conn)
        .optional()?;
 
    match user {
        Some(user) => ctx.json(user).await,
        None => Err(UltimoError::NotFound("User not found".to_string())),
    }
});
 
// Filter queries
app.get("/search", |ctx: Context| async move {
    let query_param = ctx.req.query("q")?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let results = users
        .filter(email.like(format!("%{}%", query_param)))
        .or_filter(name.like(format!("%{}%", query_param)))
        .select(User::as_select())
        .load(&mut *conn)?;
 
    ctx.json(json!({ "users": results })).await
});

Create (Insert)

use crate::models::NewUser;
 
app.post("/users", |ctx: Context| async move {
    let input: NewUser = ctx.req.json().await?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let user = diesel::insert_into(users)
        .values(&input)
        .returning(User::as_returning())
        .get_result(&mut *conn)?;
 
    ctx.status(201).await;
    ctx.json(user).await
});

Update

use diesel::prelude::*;
 
#[derive(Deserialize, AsChangeset)]
#[diesel(table_name = users)]
struct UpdateUser {
    name: Option<String>,
    email: Option<String>,
}
 
app.put("/users/:id", |ctx: Context| async move {
    let user_id: i32 = ctx.req.param("id")?.parse()?;
    let input: UpdateUser = ctx.req.json().await?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let user = diesel::update(users.find(user_id))
        .set(&input)
        .returning(User::as_returning())
        .get_result(&mut *conn)
        .optional()?;
 
    match user {
        Some(user) => ctx.json(user).await,
        None => Err(UltimoError::NotFound("User not found".to_string())),
    }
});

Delete

app.delete("/users/:id", |ctx: Context| async move {
    let user_id: i32 = ctx.req.param("id")?.parse()?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let deleted = diesel::delete(users.find(user_id))
        .execute(&mut *conn)?;
 
    if deleted == 0 {
        return Err(UltimoError::NotFound("User not found".to_string()));
    }
 
    ctx.status(204).await;
    Ok(())
});

Transactions

Execute multiple queries atomically:

use diesel::prelude::*;
 
app.post("/transfer", |ctx: Context| async move {
    #[derive(Deserialize)]
    struct Transfer {
        from_account: i32,
        to_account: i32,
        amount: f64,
    }
 
    let transfer: Transfer = ctx.req.json().await?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    conn.transaction::<_, diesel::result::Error, _>(|conn| {
        // Deduct from sender
        diesel::update(accounts.find(transfer.from_account))
            .set(balance.eq(balance - transfer.amount))
            .execute(conn)?;
 
        // Add to receiver
        diesel::update(accounts.find(transfer.to_account))
            .set(balance.eq(balance + transfer.amount))
            .execute(conn)?;
 
        Ok(())
    })?;
 
    ctx.json(json!({"success": true, "amount": transfer.amount})).await
});

Connection Pooling

Configure connection pool with r2d2:

use diesel::r2d2::{self, ConnectionManager};
use std::time::Duration;
 
let manager = ConnectionManager::<PgConnection>::new(&database_url);
 
let pool = r2d2::Pool::builder()
    .max_size(10)                          // Maximum connections
    .min_idle(Some(2))                     // Minimum idle connections
    .connection_timeout(Duration::from_secs(5))
    .idle_timeout(Some(Duration::from_secs(300)))
    .build(manager)?;
 
let diesel_pool = DieselPool::from_pool(pool);
app.with_diesel(diesel_pool);

Advanced Queries

Joins

use crate::schema::{users, posts};
 
#[derive(Queryable, Serialize)]
struct UserWithPosts {
    user: User,
    posts: Vec<Post>,
}
 
app.get("/users/:id/posts", |ctx: Context| async move {
    let user_id: i32 = ctx.req.param("id")?.parse()?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let results = users::table
        .inner_join(posts::table)
        .filter(users::id.eq(user_id))
        .select((User::as_select(), Post::as_select()))
        .load::<(User, Post)>(&mut *conn)?;
 
    ctx.json(results).await
});

Aggregations

use diesel::dsl::*;
 
app.get("/stats", |ctx: Context| async move {
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let total: i64 = users.count().get_result(&mut *conn)?;
    let max_id: Option<i32> = users.select(max(id)).first(&mut *conn)?;
 
    ctx.json(json!({
        "total_users": total,
        "max_id": max_id
    })).await
});

Pagination

app.get("/users", |ctx: Context| async move {
    let page: i64 = ctx.req.query("page")?.parse().unwrap_or(1);
    let per_page: i64 = ctx.req.query("per_page")?.parse().unwrap_or(10);
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let offset = (page - 1) * per_page;
 
    let results = users
        .select(User::as_select())
        .limit(per_page)
        .offset(offset)
        .load(&mut *conn)?;
 
    let total: i64 = users.count().get_result(&mut *conn)?;
 
    ctx.json(json!({
        "users": results,
        "page": page,
        "per_page": per_page,
        "total": total,
        "total_pages": (total as f64 / per_page as f64).ceil() as i64
    })).await
});

Migrations

Create Migration

diesel migration generate create_posts
 
# Edit migrations/TIMESTAMP_create_posts/up.sql
CREATE TABLE posts (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id),
    title VARCHAR(255) NOT NULL,
    content TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
 
# Edit migrations/TIMESTAMP_create_posts/down.sql
DROP TABLE posts;
 
# Run migration
diesel migration run
 
# Rollback (if needed)
diesel migration revert

Embed Migrations

Run migrations in your app:

use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
 
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/");
 
#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
    let pool = DieselPool::<PgConnection>::new(&database_url)?;
 
    // Run migrations
    let mut conn = pool.pool().get()?;
    conn.run_pending_migrations(MIGRATIONS)
        .expect("Failed to run migrations");
 
    app.with_diesel(pool);
    // ... rest of setup
}

Error Handling

Handle Diesel-specific errors:

app.post("/users", |ctx: Context| async move {
    let input: NewUser = ctx.req.json().await?;
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    let user = diesel::insert_into(users)
        .values(&input)
        .returning(User::as_returning())
        .get_result(&mut *conn)
        .map_err(|e| match e {
            diesel::result::Error::DatabaseError(
                diesel::result::DatabaseErrorKind::UniqueViolation,
                _
            ) => UltimoError::BadRequest("Email already exists".to_string()),
            _ => UltimoError::Internal(format!("Database error: {}", e)),
        })?;
 
    ctx.status(201).await;
    ctx.json(user).await
});

Testing

Write tests with a test database:

#[cfg(test)]
mod tests {
    use super::*;
    use diesel::prelude::*;
 
    fn setup_test_db() -> DieselPool<PgConnection> {
        let pool = DieselPool::new("postgres://localhost/test_db").unwrap();
 
        let mut conn = pool.pool().get().unwrap();
 
        diesel::sql_query(
            "CREATE TABLE IF NOT EXISTS test_users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(255) NOT NULL,
                email VARCHAR(255) UNIQUE NOT NULL
            )"
        )
        .execute(&mut conn)
        .unwrap();
 
        pool
    }
 
    #[test]
    #[ignore] // Run with: cargo test -- --ignored
    fn test_create_user() {
        let pool = setup_test_db();
        let mut conn = pool.pool().get().unwrap();
 
        let new_user = NewUser {
            name: "Alice".to_string(),
            email: "alice@example.com".to_string(),
        };
 
        let user = diesel::insert_into(users)
            .values(&new_user)
            .returning(User::as_returning())
            .get_result(&mut conn)
            .unwrap();
 
        assert_eq!(user.name, "Alice");
        assert_eq!(user.email, "alice@example.com");
    }
}

Best Practices

  1. Use schema.rs - Let Diesel manage your schema, don't edit manually
  2. Type-safe queries - Leverage Diesel's DSL for compile-time safety
  3. Use migrations - Track all schema changes with migrations
  4. Connection pooling - Configure appropriate pool sizes
  5. Transactions - Use for operations that must succeed/fail together
  6. Indexes - Add indexes for frequently queried columns
  7. Error handling - Convert Diesel errors to user-friendly messages
  8. Async wrapping - Use tokio::task::spawn_blocking for CPU-intensive queries

Async Context

Diesel is synchronous but works well with async code:

app.get("/users", |ctx: Context| async move {
    // Diesel connection pool is Send + Sync
    let mut conn = ctx.diesel::<PgConnection>()?;
 
    // For heavy queries, use spawn_blocking
    let results = tokio::task::spawn_blocking(move || {
        users
            .select(User::as_select())
            .load(&mut *conn)
    })
    .await??;
 
    ctx.json(results).await
});

Examples

Check out the working example:

cd examples/database-diesel
DATABASE_URL=postgres://postgres:postgres@localhost/ultimo_test cargo run

See Also