Diesel Integration
Ultimo provides first-class support for Diesel, a safe, extensible ORM with compile-time guarantees.
Why Diesel?
- 🛡️ Type-Safe DSL - Build queries with compile-time verification
- ⚡ High Performance - Zero-cost abstractions for SQL
- 🗃️ Multiple Databases - PostgreSQL, MySQL, SQLite support
- 🚀 Connection Pooling - r2d2 integration for connection management
- 🔄 Migrations - Powerful CLI for schema management
- 📦 Mature Ecosystem - Battle-tested in production
- 🎯 Strong Typing - Catch errors at compile time, not runtime
Installation
Add dependencies to Cargo.toml:
[dependencies]
ultimo = { version = "0.1.2", features = ["diesel"] }
diesel = { version = "2", features = ["postgres", "r2d2"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }For other databases:
- MySQL:
features = ["mysql", "r2d2"] - SQLite:
features = ["sqlite", "r2d2"]
Install Diesel CLI:
cargo install diesel_cli --no-default-features --features postgresBasic Setup
1. Initialize Diesel
# Create diesel.toml and migrations directory
diesel setup
# Create a migration
diesel migration generate create_users
# Edit the migration files
# migrations/TIMESTAMP_create_users/up.sql
# migrations/TIMESTAMP_create_users/down.sql
# Run migrations
diesel migration run2. Define Schema
Diesel generates src/schema.rs:
// src/schema.rs - Auto-generated
diesel::table! {
users (id) {
id -> Int4,
name -> Varchar,
email -> Varchar,
created_at -> Timestamptz,
}
}3. Define Models
// src/models.rs
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use crate::schema::users;
#[derive(Queryable, Selectable, Serialize)]
#[diesel(table_name = users)]
pub struct User {
pub id: i32,
pub name: String,
pub email: String,
pub created_at: chrono::NaiveDateTime,
}
#[derive(Insertable, Deserialize)]
#[diesel(table_name = users)]
pub struct NewUser {
pub name: String,
pub email: String,
}4. Setup Application
use ultimo::prelude::*;
use ultimo::database::diesel::DieselPool;
use diesel::prelude::*;
use diesel::pg::PgConnection;
mod schema;
mod models;
#[tokio::main]
async fn main() -> ultimo::Result<()> {
let mut app = Ultimo::new();
// Connect to database
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/mydb".to_string());
let pool = DieselPool::<PgConnection>::new(&database_url)?;
app.with_diesel(pool);
// Routes
app.get("/users", get_users);
app.post("/users", create_user);
app.get("/users/:id", get_user);
app.put("/users/:id", update_user);
app.delete("/users/:id", delete_user);
app.listen("127.0.0.1:3000").await
}CRUD Operations
Read (Query)
use crate::schema::users::dsl::*;
// List all users
app.get("/users", |ctx: Context| async move {
let mut conn = ctx.diesel::<PgConnection>()?;
let results = users
.select(User::as_select())
.order(id.asc())
.load(&mut *conn)?;
ctx.json(json!({ "users": results, "total": results.len() })).await
});
// Get single user
app.get("/users/:id", |ctx: Context| async move {
let user_id: i32 = ctx.req.param("id")?.parse()?;
let mut conn = ctx.diesel::<PgConnection>()?;
let user = users
.find(user_id)
.select(User::as_select())
.first(&mut *conn)
.optional()?;
match user {
Some(user) => ctx.json(user).await,
None => Err(UltimoError::NotFound("User not found".to_string())),
}
});
// Filter queries
app.get("/search", |ctx: Context| async move {
let query_param = ctx.req.query("q")?;
let mut conn = ctx.diesel::<PgConnection>()?;
let results = users
.filter(email.like(format!("%{}%", query_param)))
.or_filter(name.like(format!("%{}%", query_param)))
.select(User::as_select())
.load(&mut *conn)?;
ctx.json(json!({ "users": results })).await
});Create (Insert)
use crate::models::NewUser;
app.post("/users", |ctx: Context| async move {
let input: NewUser = ctx.req.json().await?;
let mut conn = ctx.diesel::<PgConnection>()?;
let user = diesel::insert_into(users)
.values(&input)
.returning(User::as_returning())
.get_result(&mut *conn)?;
ctx.status(201).await;
ctx.json(user).await
});Update
use diesel::prelude::*;
#[derive(Deserialize, AsChangeset)]
#[diesel(table_name = users)]
struct UpdateUser {
name: Option<String>,
email: Option<String>,
}
app.put("/users/:id", |ctx: Context| async move {
let user_id: i32 = ctx.req.param("id")?.parse()?;
let input: UpdateUser = ctx.req.json().await?;
let mut conn = ctx.diesel::<PgConnection>()?;
let user = diesel::update(users.find(user_id))
.set(&input)
.returning(User::as_returning())
.get_result(&mut *conn)
.optional()?;
match user {
Some(user) => ctx.json(user).await,
None => Err(UltimoError::NotFound("User not found".to_string())),
}
});Delete
app.delete("/users/:id", |ctx: Context| async move {
let user_id: i32 = ctx.req.param("id")?.parse()?;
let mut conn = ctx.diesel::<PgConnection>()?;
let deleted = diesel::delete(users.find(user_id))
.execute(&mut *conn)?;
if deleted == 0 {
return Err(UltimoError::NotFound("User not found".to_string()));
}
ctx.status(204).await;
Ok(())
});Transactions
Execute multiple queries atomically:
use diesel::prelude::*;
app.post("/transfer", |ctx: Context| async move {
#[derive(Deserialize)]
struct Transfer {
from_account: i32,
to_account: i32,
amount: f64,
}
let transfer: Transfer = ctx.req.json().await?;
let mut conn = ctx.diesel::<PgConnection>()?;
conn.transaction::<_, diesel::result::Error, _>(|conn| {
// Deduct from sender
diesel::update(accounts.find(transfer.from_account))
.set(balance.eq(balance - transfer.amount))
.execute(conn)?;
// Add to receiver
diesel::update(accounts.find(transfer.to_account))
.set(balance.eq(balance + transfer.amount))
.execute(conn)?;
Ok(())
})?;
ctx.json(json!({"success": true, "amount": transfer.amount})).await
});Connection Pooling
Configure connection pool with r2d2:
use diesel::r2d2::{self, ConnectionManager};
use std::time::Duration;
let manager = ConnectionManager::<PgConnection>::new(&database_url);
let pool = r2d2::Pool::builder()
.max_size(10) // Maximum connections
.min_idle(Some(2)) // Minimum idle connections
.connection_timeout(Duration::from_secs(5))
.idle_timeout(Some(Duration::from_secs(300)))
.build(manager)?;
let diesel_pool = DieselPool::from_pool(pool);
app.with_diesel(diesel_pool);Advanced Queries
Joins
use crate::schema::{users, posts};
#[derive(Queryable, Serialize)]
struct UserWithPosts {
user: User,
posts: Vec<Post>,
}
app.get("/users/:id/posts", |ctx: Context| async move {
let user_id: i32 = ctx.req.param("id")?.parse()?;
let mut conn = ctx.diesel::<PgConnection>()?;
let results = users::table
.inner_join(posts::table)
.filter(users::id.eq(user_id))
.select((User::as_select(), Post::as_select()))
.load::<(User, Post)>(&mut *conn)?;
ctx.json(results).await
});Aggregations
use diesel::dsl::*;
app.get("/stats", |ctx: Context| async move {
let mut conn = ctx.diesel::<PgConnection>()?;
let total: i64 = users.count().get_result(&mut *conn)?;
let max_id: Option<i32> = users.select(max(id)).first(&mut *conn)?;
ctx.json(json!({
"total_users": total,
"max_id": max_id
})).await
});Pagination
app.get("/users", |ctx: Context| async move {
let page: i64 = ctx.req.query("page")?.parse().unwrap_or(1);
let per_page: i64 = ctx.req.query("per_page")?.parse().unwrap_or(10);
let mut conn = ctx.diesel::<PgConnection>()?;
let offset = (page - 1) * per_page;
let results = users
.select(User::as_select())
.limit(per_page)
.offset(offset)
.load(&mut *conn)?;
let total: i64 = users.count().get_result(&mut *conn)?;
ctx.json(json!({
"users": results,
"page": page,
"per_page": per_page,
"total": total,
"total_pages": (total as f64 / per_page as f64).ceil() as i64
})).await
});Migrations
Create Migration
diesel migration generate create_posts
# Edit migrations/TIMESTAMP_create_posts/up.sql
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id),
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
# Edit migrations/TIMESTAMP_create_posts/down.sql
DROP TABLE posts;
# Run migration
diesel migration run
# Rollback (if needed)
diesel migration revertEmbed Migrations
Run migrations in your app:
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/");
#[tokio::main]
async fn main() -> ultimo::Result<()> {
let mut app = Ultimo::new();
let pool = DieselPool::<PgConnection>::new(&database_url)?;
// Run migrations
let mut conn = pool.pool().get()?;
conn.run_pending_migrations(MIGRATIONS)
.expect("Failed to run migrations");
app.with_diesel(pool);
// ... rest of setup
}Error Handling
Handle Diesel-specific errors:
app.post("/users", |ctx: Context| async move {
let input: NewUser = ctx.req.json().await?;
let mut conn = ctx.diesel::<PgConnection>()?;
let user = diesel::insert_into(users)
.values(&input)
.returning(User::as_returning())
.get_result(&mut *conn)
.map_err(|e| match e {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_
) => UltimoError::BadRequest("Email already exists".to_string()),
_ => UltimoError::Internal(format!("Database error: {}", e)),
})?;
ctx.status(201).await;
ctx.json(user).await
});Testing
Write tests with a test database:
#[cfg(test)]
mod tests {
use super::*;
use diesel::prelude::*;
fn setup_test_db() -> DieselPool<PgConnection> {
let pool = DieselPool::new("postgres://localhost/test_db").unwrap();
let mut conn = pool.pool().get().unwrap();
diesel::sql_query(
"CREATE TABLE IF NOT EXISTS test_users (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL
)"
)
.execute(&mut conn)
.unwrap();
pool
}
#[test]
#[ignore] // Run with: cargo test -- --ignored
fn test_create_user() {
let pool = setup_test_db();
let mut conn = pool.pool().get().unwrap();
let new_user = NewUser {
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
};
let user = diesel::insert_into(users)
.values(&new_user)
.returning(User::as_returning())
.get_result(&mut conn)
.unwrap();
assert_eq!(user.name, "Alice");
assert_eq!(user.email, "alice@example.com");
}
}Best Practices
- Use schema.rs - Let Diesel manage your schema, don't edit manually
- Type-safe queries - Leverage Diesel's DSL for compile-time safety
- Use migrations - Track all schema changes with migrations
- Connection pooling - Configure appropriate pool sizes
- Transactions - Use for operations that must succeed/fail together
- Indexes - Add indexes for frequently queried columns
- Error handling - Convert Diesel errors to user-friendly messages
- Async wrapping - Use
tokio::task::spawn_blockingfor CPU-intensive queries
Async Context
Diesel is synchronous but works well with async code:
app.get("/users", |ctx: Context| async move {
// Diesel connection pool is Send + Sync
let mut conn = ctx.diesel::<PgConnection>()?;
// For heavy queries, use spawn_blocking
let results = tokio::task::spawn_blocking(move || {
users
.select(User::as_select())
.load(&mut *conn)
})
.await??;
ctx.json(results).await
});Examples
Check out the working example:
cd examples/database-diesel
DATABASE_URL=postgres://postgres:postgres@localhost/ultimo_test cargo run