XDL Database Connectivity Module - Implementation Guide
Overview
This document describes the implementation of the XDL database connectivity module, which provides unified access to multiple database systems including PostgreSQL, MySQL, DuckDB, ODBC, Redis, and Apache Kafka.
Architecture
Module Structure
xdl-database/
├── Cargo.toml # Dependencies and features
├── README.md # User documentation
├── examples/ # XDL usage examples
│ ├── postgresql_example.xdl
│ └── duckdb_analytics.xdl
└── src/
├── lib.rs # Main module & registry
├── error.rs # Error types
├── connection.rs # Connection enum
├── recordset.rs # Query results
└── drivers/ # Database drivers
├── mod.rs
├── postgres.rs # ✅ Full implementation
├── duckdb.rs # ✅ Full implementation
├── redis_driver.rs # ✅ Full implementation
├── mysql.rs # ⏳ Stub
├── odbc.rs # ⏳ Stub
└── kafka.rs # ⏳ Stub
Key Design Decisions
- Async-First Architecture
- Built on Tokio async runtime
- Non-blocking I/O for all database operations
- Enables high concurrency and performance
- Type-Safe Wrappers
- DatabaseConnection enum wraps all driver types
- Compile-time type safety with runtime flexibility
- Feature flags for optional dependencies
- Unified Interface
- All drivers implement same methods
- Connection string determines database type
- Consistent error handling across drivers
- XDL Integration
- Object-oriented API (OBJ_NEW, method calls ->)
- Global registry maps object IDs to instances
- Automatic type conversion to XDL types
Core Components
1. XDLDatabase (lib.rs)
Main database object that users interact with from XDL.
Key Methods:
pub async fn connect(&mut self, connection_string: &str) -> XdlResult<()>
pub async fn disconnect(&mut self) -> XdlResult<()>
pub async fn execute_sql(&self, query: &str) -> XdlResult<Recordset>
pub async fn execute_command(&self, command: &str) -> XdlResult<u64>
pub fn is_connected(&self) -> bool
Features:
- Automatic database type detection from connection string
- Connection state management
- Error tracking and reporting
- Async operation support
2. DatabaseRegistry (lib.rs)
Global registry for object lifecycle management.
Purpose:
- Maps XDL object IDs to database instances
- Thread-safe with RwLock
- Manages both databases and recordsets
- Enables XDL’s OBJ_NEW/OBJ_DESTROY pattern
Implementation:
pub struct DatabaseRegistry {
databases: RwLock<HashMap<usize, Arc<RwLock<XDLDatabase>>>>,
recordsets: RwLock<HashMap<usize, Arc<RwLock<Recordset>>>>,
next_id: RwLock<usize>,
}
3. DatabaseConnection (connection.rs)
Enum wrapper providing unified interface to all drivers.
Pattern:
pub enum DatabaseConnection {
#[cfg(feature = "postgres-support")]
PostgreSQL(drivers::postgres::PostgresConnection),
#[cfg(feature = "duckdb-support")]
DuckDB(drivers::duckdb::DuckDBConnection),
// ... other drivers
}
Benefits:
- Single API for all databases
- Feature-gated compilation
- Type-safe driver dispatch
- Easy to add new drivers
4. Recordset (recordset.rs)
Container for query results with XDL-compatible access methods.
Key Features:
- Column metadata (names, types, ordinals)
- Row-based and column-based data access
- Automatic type conversion to XdlValue
- Iterator support (Next, Reset)
- Structure-like data access
Data Formats:
// Nested array (row-major)
pub fn get_data(&self) -> XdlResult<XdlValue>
// Structured columns (column-major)
pub fn get_data_structured(&self) -> XdlResult<HashMap<String, Vec<XdlValue>>>
// Single column
pub fn get_column(&self, column_name: &str) -> XdlResult<Vec<XdlValue>>
5. Error Handling (error.rs)
Comprehensive error types using thiserror.
Error Categories:
- Connection errors
- Query errors
- Type conversion errors
- Database-specific errors
- Generic errors
Design:
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("Connection error: {0}")]
ConnectionError(String),
#[cfg(feature = "postgres-support")]
#[error("PostgreSQL error: {0}")]
PostgresError(#[from] tokio_postgres::Error),
// ... other errors
}
Database Drivers
PostgreSQL Driver (postgres.rs)
Status: ✅ Fully Implemented
Technology Stack:
tokio-postgres- Async PostgreSQL clientdeadpool-postgres- Connection pooling (ready)
Key Implementation Details:
- Connection:
pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
let config: Config = connection_string.parse()?;
let (client, connection) = config.connect(NoTls).await?;
// Spawn connection handler
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
Ok(Self { client: Some(client) })
}
- Query Execution:
pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
let rows = self.client.as_ref()?.query(query, &[]).await?;
// Extract columns
let columns: Vec<ColumnInfo> = rows[0].columns()
.iter()
.enumerate()
.map(|(i, col)| ColumnInfo {
name: col.name().to_string(),
data_type: format!("{:?}", col.type_()),
ordinal: i,
})
.collect();
// Convert rows
let mut data_rows = Vec::new();
for row in rows {
let row_data: Vec<JsonValue> = (0..row.len())
.map(|i| postgres_value_to_json(&row, i))
.collect::<Result<_, _>>()?;
data_rows.push(row_data);
}
Ok(Recordset::new(columns, data_rows))
}
- Type Conversion:
- Handles all PostgreSQL types (INT, FLOAT, TEXT, etc.)
- Converts to JSON intermediate format
- JSON converts to XdlValue
DuckDB Driver (duckdb.rs)
Status: ✅ Fully Implemented
Technology Stack:
duckdb- Embedded analytical database- Features: bundled (includes DuckDB binary)
Advantages:
- In-process analytics
- No server required
- Fast analytical queries
- Parquet/CSV support
Implementation Highlights:
pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
let path = connection_string
.trim_start_matches("duckdb://")
.trim_start_matches("duckdb:");
let conn = Connection::open(path)?;
Ok(Self { conn: Some(conn) })
}
Redis Driver (redis_driver.rs)
Status: ✅ Fully Implemented
Technology Stack:
rediscrate with async support- Connection manager for reliability
Usage Pattern:
- Key-value operations via ExecuteCommand
- GET/SET/DEL commands
- Not traditional SQL
Example:
objdb->ExecuteCommand, 'SET mykey myvalue'
objdb->ExecuteCommand, 'DEL mykey'
MySQL, ODBC, Kafka Drivers
Status: ⏳ Stub Implementations Ready
These drivers have stub implementations that:
- Define the interface
- Return appropriate errors
- Are ready for full implementation
To Implement:
- MySQL:
- Add
mysqlorsqlxwith MySQL support - Implement connection parsing
- Add type conversions
- Add
- ODBC:
- Use
odbc-apicrate - Parse ODBC connection strings
- Handle various ODBC drivers
- Use
- Kafka:
- Use
rdkafkacrate - Implement producer/consumer
- Handle streaming data
- Use
Integration with XDL
Object System Integration
XDL’s object system uses:
OBJ_NEW('ClassName')- Creates object, returns IDobject->Method, ARGS- Calls method on objectOBJ_DESTROY, object- Destroys object
Integration Points:
- stdlib Registration (xdl-stdlib)
Add database functions to stdlib:
// In xdl-stdlib/src/lib.rs
use xdl_database::{GLOBAL_DB_REGISTRY, XDLDatabase};
// OBJ_NEW handler
pub fn obj_new(class_name: &str) -> XdlResult<XdlValue> {
match class_name.to_uppercase().as_str() {
"XDLDBDATABASE" => {
let db = XDLDatabase::new();
let id = tokio::runtime::Runtime::new()
.unwrap()
.block_on(GLOBAL_DB_REGISTRY.register_database(db));
Ok(XdlValue::ObjRef(id))
}
_ => Err(XdlError::RuntimeError(format!("Unknown class: {}", class_name)))
}
}
// Method call handler
pub async fn call_method(
obj_id: usize,
method_name: &str,
args: &[XdlValue],
keywords: &HashMap<String, XdlValue>
) -> XdlResult<XdlValue> {
match method_name.to_uppercase().as_str() {
"CONNECT" => {
if let Some(conn_str) = keywords.get("CONNECTION") {
let db = GLOBAL_DB_REGISTRY.get_database(obj_id).await?;
let mut db_lock = db.write().await;
db_lock.connect(&conn_str.to_string_repr()).await?;
Ok(XdlValue::Long(1))
} else {
Err(XdlError::InvalidArgument("CONNECTION keyword required".to_string()))
}
}
"EXECUTESQL" => {
if args.is_empty() {
return Err(XdlError::InvalidArgument("Query string required".to_string()));
}
let query = args[0].to_string_repr();
let db = GLOBAL_DB_REGISTRY.get_database(obj_id).await?;
let db_lock = db.read().await;
let recordset = db_lock.execute_sql(&query).await?;
let rs_id = GLOBAL_DB_REGISTRY.register_recordset(recordset).await;
Ok(XdlValue::ObjRef(rs_id))
}
_ => Err(XdlError::NotImplemented(format!("Method: {}", method_name)))
}
}
- Evaluator Integration (xdl-interpreter)
Update evaluator to handle database method calls:
// In evaluate_method_call()
Expression::MethodCall { object, method, args, keywords, .. } => {
if let Expression::Variable { name, .. } = object.as_ref() {
if let Some(XdlValue::ObjRef(obj_id)) = context.get_variable(name) {
// Check if it's a database object
if let Some(db) = GLOBAL_DB_REGISTRY.get_database(*obj_id).await {
return self.call_database_method(*obj_id, method, args, keywords, context).await;
}
}
}
// ... existing method call handling
}
Type Conversion
Database → JSON → XDL
Flow:
- Database driver returns native types
- Convert to
serde_json::Value(intermediate) - Convert JSON to
XdlValue
Rationale:
- JSON is universal interchange format
- Easy to extend for new types
- JSON handles NULL consistently
- Simplifies driver implementation
Conversion Table:
| Database | JSON | XDL |
|---|---|---|
| NULL | Null | Undefined |
| BOOLEAN | Bool | Long (0/1) |
| SMALLINT | Number | Int |
| INTEGER | Number | Long |
| BIGINT | Number | Long64 |
| REAL | Number | Float |
| DOUBLE | Number | Double |
| VARCHAR/TEXT | String | String |
| ARRAY | Array | NestedArray |
| JSON | Object | String (serialized) |
Feature Flags
The module uses Cargo features for optional dependencies:
[features]
default = ["postgres-support", "duckdb-support", "redis-support"]
postgres-support = ["tokio-postgres", "deadpool-postgres"]
mysql-support = ["mysql"]
duckdb-support = ["duckdb"]
odbc-support = ["odbc-api"]
redis-support = ["redis"]
kafka-support = ["rdkafka"]
all = [
"postgres-support",
"mysql-support",
"duckdb-support",
"odbc-support",
"redis-support",
"kafka-support"
]
Benefits:
- Reduce binary size
- Only compile needed databases
- Easy to add new databases
- Clear dependency management
Testing Strategy
Unit Tests
#[cfg(test)]
mod tests {
#[test]
fn test_database_type_detection() {
assert_eq!(
DatabaseType::from_connection_string("postgresql://localhost/db"),
DatabaseType::PostgreSQL
);
}
#[tokio::test]
async fn test_postgres_connection() {
// Requires running PostgreSQL
let conn = PostgresConnection::connect("postgresql://localhost/test").await;
assert!(conn.is_ok());
}
}
Integration Tests
#[tokio::test]
async fn test_full_query_cycle() {
let mut db = XDLDatabase::new();
db.connect("test.duckdb").await.unwrap();
db.execute_command("CREATE TABLE test (id INT, name VARCHAR)").await.unwrap();
db.execute_command("INSERT INTO test VALUES (1, 'Alice')").await.unwrap();
let rs = db.execute_sql("SELECT * FROM test").await.unwrap();
assert_eq!(rs.row_count(), 1);
}
XDL Examples
Test complete integration from XDL scripts (see examples/ directory).
Performance Considerations
Async/Await
All database operations are async:
- Non-blocking I/O
- High concurrency
- Efficient resource usage
Connection Pooling
Ready for connection pooling:
// Future enhancement
use deadpool_postgres::{Config, Pool};
pub struct PostgresConnectionPool {
pool: Pool,
}
Streaming Results
For large datasets, can add streaming:
pub async fn execute_stream(&self, query: &str) -> impl Stream<Item = Row> {
// Stream results without loading all into memory
}
Security Considerations
SQL Injection Prevention
Current: String concatenation (user must sanitize)
Future: Prepared statements
stmt = objdb->Prepare('SELECT * FROM users WHERE id = ?')
recordset = stmt->Execute([user_id])
Connection Security
- TLS/SSL support (tokio-postgres has native-tls)
- Password handling (never logged)
- Connection string parsing validates format
Extending the Module
Adding a New Database
- Add Dependency:
[dependencies]
mydb = { version = "1.0", optional = true }
[features]
mydb-support = ["mydb"]
- Create Driver:
// src/drivers/mydb.rs
pub struct MyDBConnection {
client: MyDBClient,
}
impl MyDBConnection {
pub async fn connect(conn_str: &str) -> DatabaseResult<Self> {
// Implementation
}
pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
// Implementation
}
// ... other methods
}
- Add to Connection Enum:
pub enum DatabaseConnection {
#[cfg(feature = "mydb-support")]
MyDB(drivers::mydb::MyDBConnection),
// ... existing variants
}
- Update DatabaseType:
pub enum DatabaseType {
MyDB,
// ... existing types
}
impl DatabaseType {
pub fn from_connection_string(conn_str: &str) -> Self {
if conn_str.starts_with("mydb://") {
DatabaseType::MyDB
}
// ... existing logic
}
}
- Document and Test:
- Add README section
- Create example script
- Write unit tests
Troubleshooting
Common Issues
- “Feature not enabled”
- Solution: Add feature flag to Cargo.toml dependencies
- “Connection failed”
- Check connection string format
- Verify database is running
- Check network/firewall
- “Type conversion error”
- Some database types not yet supported
- Add conversion in driver’s value_to_json function
- “Async runtime error”
- Ensure Tokio runtime is available
- XDL stdlib must create runtime for async calls
Future Roadmap
Short Term
- PostgreSQL support
- DuckDB support
- Redis support
- Complete MySQL implementation
- Complete ODBC implementation
- Prepared statements
- Transaction support
Medium Term
- Connection pooling
- Streaming results
- Snowflake support
- MongoDB support
- Complete Kafka implementation
Long Term
- Query builder DSL
- ORM-like features
- Migration tools
- Schema introspection
- Performance monitoring
Conclusion
The XDL database module provides a solid foundation for database connectivity with:
✅ Clean, modular architecture ✅ Multiple database support ✅ Async-first design ✅ Type-safe implementation ✅ Extensible driver system ✅ XDL-native API ✅ Comprehensive documentation
The module is production-ready for PostgreSQL, DuckDB, and Redis, with a clear path for adding additional databases.
References
- XDL Core:
xdl-core/src/types.rs - Database Drivers:
- PostgreSQL: https://docs.rs/tokio-postgres
- DuckDB: https://docs.rs/duckdb
- Redis: https://docs.rs/redis
- Async Runtime: https://docs.rs/tokio
- Error Handling: https://docs.rs/thiserror