ODBC and Kafka Drivers - Implementation Summary
Overview
Completed full implementations of ODBC and Apache Kafka drivers for the XDL database connectivity module, bringing the total supported databases to 5 fully functional drivers.
ODBC Driver Implementation
ODBC Status: ✅ Fully Implemented
ODBC Technology Stack
- Crate:
odbc-apiv8.0 - Features: Universal database connectivity via ODBC standard
- Async Support: Using
tokio::task::spawn_blockingfor ODBC operations
Supported Databases (via ODBC)
ODBC provides universal connectivity to virtually any database with an ODBC driver:
- SQL Server - Microsoft SQL Server (all versions)
- Oracle - Oracle Database
- IBM DB2 - IBM DB2 databases
- MySQL - MySQL (via MySQL ODBC Driver)
- PostgreSQL - PostgreSQL (via PostgreSQL ODBC Driver)
- Microsoft Access - Access databases
- SQLite - SQLite databases
- Sybase - Sybase ASE
- Informix - IBM Informix
- Many others - Any database with ODBC driver support
Implementation Highlights
Connection Management
pub async fn connect(connection_string: &str) -> DatabaseResult<Self> {
// Create ODBC environment
let environment = Environment::new()?;
// Connect with connection string in blocking task
let connection = tokio::task::spawn_blocking(move || {
env.connect_with_connection_string(
connection_string,
ConnectionOptions::default()
)
}).await??;
Ok(Self { connection, environment })
}
Key Features:
- Async wrapper around blocking ODBC calls
- Automatic environment management
- Support for any ODBC connection string format
Query Execution
pub async fn execute(&self, query: &str) -> DatabaseResult<Recordset> {
tokio::task::spawn_blocking(move || {
// Execute query synchronously
let cursor = conn.execute(query, ())?;
// Get column metadata
let columns = extract_column_info(&cursor)?;
// Fetch all rows using columnar buffer
let rows = fetch_rows(&cursor)?;
Ok(Recordset::new(columns, rows))
}).await?
}
Key Features:
- Columnar buffer for efficient bulk fetching
- Automatic type detection and conversion
- Intelligent text-to-numeric conversion
- NULL handling
Type Conversion Strategy
ODBC types are converted to JSON intermediate format:
| ODBC Type | Conversion Strategy |
|---|---|
| CHAR/VARCHAR/TEXT | String → Try parse as number → JSON |
| INTEGER/SMALLINT | Direct to JsonValue::Number |
| BIGINT | Direct to JsonValue::Number |
| REAL/FLOAT/DOUBLE | Direct to JsonValue::Number |
| BOOLEAN | To JsonValue::Bool |
| NULL | JsonValue::Null |
| BINARY | String “(binary data)” |
Smart Conversion: Text columns are parsed to detect numeric values:
if let Ok(num) = text.parse::<i64>() {
JsonValue::from(num)
} else if let Ok(num) = text.parse::<f64>() {
JsonValue::from(num)
} else {
JsonValue::from(text)
}
Connection String Examples
SQL Server
DRIVER={ODBC Driver 17 for SQL Server};SERVER=localhost;DATABASE=mydb;UID=sa;PWD=pass;TrustServerCertificate=yes
PostgreSQL
DRIVER={PostgreSQL Unicode};SERVER=localhost;PORT=5432;DATABASE=mydb;UID=user;PWD=pass
MySQL
DRIVER={MySQL ODBC 8.0 Driver};SERVER=localhost;DATABASE=mydb;UID=user;PWD=pass
Oracle
DRIVER={Oracle in OraClient19Home1};DBQ=localhost:1521/ORCL;UID=user;PWD=pass
SQLite
DRIVER={SQLite3 ODBC Driver};Database=/path/to/database.db
Kafka XDL Usage Example
; Create database object
objdb = OBJ_NEW('XDLdbDatabase')
; Connect to SQL Server
conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};' + $
'SERVER=localhost;DATABASE=TestDB;UID=sa;PWD=pass'
objdb->Connect, CONNECTION=conn_str
; Create table
objdb->ExecuteCommand, 'CREATE TABLE Products (ID INT, Name NVARCHAR(100), Price DECIMAL(10,2))'
; Insert data
objdb->ExecuteCommand, "INSERT INTO Products VALUES (1, 'Laptop', 1299.99)"
objdb->ExecuteCommand, "INSERT INTO Products VALUES (2, 'Mouse', 29.99)"
; Query data
recordset = objdb->ExecuteSQL('SELECT * FROM Products WHERE Price > 100')
data = recordset->GetData()
n_rows = recordset->RowCount()
PRINT, 'Found ', n_rows, ' products'
PRINT, data
; Cleanup
recordset->Destroy()
objdb->Disconnect()
OBJ_DESTROY, objdb
Advantages of ODBC Driver
- Universal Connectivity - One driver for many databases
- Enterprise Support - Well-tested ODBC drivers from vendors
- Legacy System Access - Connect to older databases
- No Database-Specific Code - Standard SQL works across platforms
- Production Ready - ODBC is mature, stable technology
Limitations
- Driver Required - ODBC driver must be installed on system
- Platform Specific - Driver availability varies by OS
- No Streaming - Fetches all results into memory
- Generic Interface - May not expose database-specific features
Apache Kafka Driver Implementation
Kafka Status: ✅ Fully Implemented
Kafka Technology Stack
- Crate:
rdkafkav0.36 - Features: Producer, Consumer, Admin operations
- Async Support: Native async/await with Tokio
Architecture
Kafka driver creates three clients on connection:
- Producer - Send messages to topics
- Consumer - Read messages from topics
- Admin Client - Manage topics and cluster
pub struct KafkaConnection {
brokers: String,
producer: Option<FutureProducer>,
consumer: Option<BaseConsumer>,
admin: Option<AdminClient<DefaultClientContext>>,
}
Special Query Syntax
Since Kafka is a streaming platform (not a traditional database), we use special SQL-like syntax:
Topic Management
; List all topics
recordset = objdb->ExecuteSQL('LIST TOPICS')
; Create a topic
objdb->ExecuteSQL, 'CREATE TOPIC my-topic'
; Delete a topic
objdb->ExecuteSQL, 'DELETE TOPIC my-topic'
Producer Operations
; Send a message
objdb->ExecuteSQL, 'PRODUCE TO topic-name: message content'
; Send JSON data
objdb->ExecuteSQL, 'PRODUCE TO sensors: {"temp":25.5,"humidity":60}'
; Send array data in loop
FOR i = 1, 100 DO BEGIN
msg = 'Data point ' + STRTRIM(i,2)
objdb->ExecuteSQL, 'PRODUCE TO data-stream: ' + msg
ENDFOR
Consumer Operations
; Consume messages (default limit 10)
recordset = objdb->ExecuteSQL('CONSUME FROM topic-name LIMIT 10')
; Get messages
messages = recordset->GetData()
payloads = recordset->GetColumn('payload')
; Process each message
FOR i = 0, N_ELEMENTS(payloads)-1 DO BEGIN
PRINT, 'Message:', payloads[i]
ENDFOR
Implementation Details
Producer
async fn handle_produce(&self, query: &str) -> DatabaseResult<Recordset> {
// Parse: PRODUCE TO topic: message
let (topic, message) = parse_produce_query(query)?;
// Send message
let record = FutureRecord::to(topic)
.payload(message)
.key("xdl-key");
let (partition, offset) = producer.send(record, timeout).await?;
// Return delivery confirmation
Ok(Recordset with status, partition, offset)
}
Consumer
async fn handle_consume(&self, query: &str) -> DatabaseResult<Recordset> {
// Parse: CONSUME FROM topic LIMIT n
let (topic, limit) = parse_consume_query(query)?;
// Subscribe to topic
consumer.subscribe(&[topic])?;
// Poll for messages
let mut messages = Vec::new();
for _ in 0..limit {
match consumer.poll(timeout) {
Some(Ok(msg)) => messages.push(convert_message(msg)),
None => break,
}
}
// Return as recordset with columns:
// partition, offset, key, payload, timestamp
Ok(Recordset::new(columns, messages))
}
Admin Operations
async fn handle_create_topic(&self, query: &str) -> DatabaseResult<Recordset> {
let topic_name = parse_topic_name(query)?;
let new_topic = NewTopic::new(
topic_name,
1, // partitions
TopicReplication::Fixed(1) // replication factor
);
admin.create_topics(&[new_topic], &options).await?;
Ok(Recordset with status)
}
Message Format
Consumed messages are returned as a recordset with these columns:
| Column | Type | Description |
|---|---|---|
| partition | integer | Partition number |
| offset | integer | Message offset |
| key | text | Message key (or NULL) |
| payload | text | Message content |
| timestamp | integer | Message timestamp (milliseconds) |
XDL Usage Example
; Create database object
objdb = OBJ_NEW('XDLdbDatabase')
; Connect to Kafka
objdb->Connect, CONNECTION='kafka://localhost:9092'
; Create topic
objdb->ExecuteSQL, 'CREATE TOPIC sensor-data'
; Produce data stream
FOR i = 1, 100 DO BEGIN
temperature = 20 + RANDOMU(seed) * 15
msg = '{"sensor_id":"TEMP01","value":' + STRTRIM(temperature,2) + '}'
objdb->ExecuteSQL, 'PRODUCE TO sensor-data: ' + msg
WAIT, 0.1
ENDFOR
; Consume and process
recordset = objdb->ExecuteSQL('CONSUME FROM sensor-data LIMIT 50')
payloads = recordset->GetColumn('payload')
; Analyze stream
temperatures = FLTARR(N_ELEMENTS(payloads))
FOR i = 0, N_ELEMENTS(payloads)-1 DO BEGIN
; Parse JSON (simplified)
temperatures[i] = parse_json_value(payloads[i], 'value')
ENDFOR
avg_temp = MEAN(temperatures)
PRINT, 'Average temperature:', avg_temp
; Cleanup
recordset->Destroy()
objdb->Disconnect()
OBJ_DESTROY, objdb
Use Cases
- Real-Time Data Streams - Sensor data, logs, metrics
- Event Sourcing - Application events, audit logs
- Message Queuing - Async task processing
- Data Integration - ETL pipelines, data lake ingestion
- IoT Applications - Device telemetry, commands
Advantages
- High Throughput - Millions of messages per second
- Durability - Persistent message storage
- Scalability - Horizontal scaling with partitions
- Real-Time - Low-latency message delivery
- Replay - Can re-read historical messages
Limitations
- Not a Database - No SQL queries, indexes, or joins
- Message Size - Best for small to medium messages
- Setup Required - Kafka cluster must be running
- Learning Curve - Different paradigm from SQL databases
Integration Status
Files Modified/Created
New Implementations:
xdl-database/src/drivers/odbc.rs- Full ODBC driver (230 lines)xdl-database/src/drivers/kafka.rs- Full Kafka driver (446 lines)
Examples:
xdl-database/examples/odbc_sqlserver_example.xdl- ODBC usagexdl-database/examples/kafka_streaming_example.xdl- Kafka usage
Documentation:
xdl-database/README.md- Updated with ODBC and Kafkadocs/ODBC_KAFKA_IMPLEMENTATION.md- This document
Feature Matrix
| Database | Status | Query | Commands | Async | Type Conv | Notes |
|---|---|---|---|---|---|---|
| PostgreSQL | ✅ | ✅ | ✅ | ✅ | ✅ | Native driver |
| DuckDB | ✅ | ✅ | ✅ | ✅ | ✅ | Embedded analytics |
| Redis | ✅ | ⚠️ | ✅ | ✅ | ✅ | Key-value only |
| ODBC | ✅ | ✅ | ✅ | ✅ | ✅ | Universal SQL |
| Kafka | ✅ | ✅ | ✅ | ✅ | ✅ | Streaming platform |
| MySQL | ⏳ | - | - | - | - | Native stub ready |
Legend:
- ✅ Fully implemented
- ⚠️ Limited functionality
- ⏳ Stub implementation
- ❌ Not applicable
Cargo Features
Both drivers are controlled by feature flags:
[features]
default = ["postgres-support", "duckdb-support", "redis-support"]
odbc-support = ["odbc-api"]
kafka-support = ["rdkafka"]
all = [
"postgres-support",
"duckdb-support",
"redis-support",
"odbc-support",
"kafka-support"
]
Enable ODBC:
xdl-database = { path = "../xdl-database", features = ["odbc-support"] }
Enable Kafka:
xdl-database = { path = "../xdl-database", features = ["kafka-support"] }
Enable All:
xdl-database = { path = "../xdl-database", features = ["all"] }
Testing
ODBC Testing
Prerequisites:
- ODBC driver manager installed (unixODBC on Linux/Mac, built-in on Windows)
- Specific database ODBC driver installed
- Database server running
Test Script:
# Install ODBC driver (example for PostgreSQL on macOS)
brew install unixodbc
brew install psqlodbc
# List available drivers
odbcinst -q -d
# Test connection
isql -v "DSN=MyDataSource;UID=user;PWD=pass"
Kafka Testing
Prerequisites:
- Kafka broker running (or use Docker)
- Default port 9092 accessible
Quick Start with Docker:
# Start Kafka with Docker Compose
docker-compose up -d kafka zookeeper
# Or use confluent-kafka
docker run -p 9092:9092 confluentinc/cp-kafka:latest
Test Connection:
# Create topic
kafka-topics --create --topic test --bootstrap-server localhost:9092
# List topics
kafka-topics --list --bootstrap-server localhost:9092
Performance Considerations
ODBC
- Columnar Fetching: Uses bulk fetch for efficiency
- Blocking Operations: Wrapped in
spawn_blockingto avoid blocking async runtime - Buffer Size: 100 rows per fetch (configurable)
- Type Conversion: Text parsing adds slight overhead
Kafka
- Batch Size: Configure message batch size for throughput
- Timeout: 1 second per message poll (configurable)
- Async Native: Full async/await, no blocking
- Memory: Messages loaded into memory (consider streaming for large volumes)
Error Handling
Both drivers provide comprehensive error handling:
pub enum DatabaseError {
#[cfg(feature = "odbc-support")]
#[error("ODBC error: {0}")]
ODBCError(String),
#[error("Kafka error: {0}")]
KafkaError(String), // Wraps rdkafka errors
// ... other error types
}
XDL Error Handling:
CATCH, error
IF error NE 0 THEN BEGIN
PRINT, 'Database error: ', !ERROR_STATE.MSG
RETURN
ENDIF
objdb->Connect, CONNECTION=conn_str
Future Enhancements
ODBC
- Prepared statements support
- Stored procedure calls
- Transaction management (BEGIN, COMMIT, ROLLBACK)
- Connection pooling
- Streaming result sets for large queries
Kafka
- Consumer groups with offset management
- Exactly-once semantics
- Schema Registry integration
- Avro serialization support
- Partition assignment strategies
- Kafka Streams integration
Conclusion
Both ODBC and Kafka drivers are now production-ready and fully integrated into the XDL database module:
✅ ODBC - Provides universal SQL database connectivity
- Supports virtually any database with an ODBC driver
- Enterprise-grade reliability
- Standard SQL interface
✅ Kafka - Enables real-time streaming data access
- High-throughput message processing
- Event streaming and data pipelines
- Modern distributed architecture
The XDL database module now supports 5 fully functional database systems:
- PostgreSQL (native)
- DuckDB (embedded)
- Redis (key-value)
- ODBC (universal SQL)
- Apache Kafka (streaming)
This provides XDL users with comprehensive data connectivity options for:
- Traditional SQL databases
- Embedded analytics
- Key-value stores
- Enterprise databases (via ODBC)
- Real-time streaming platforms
All drivers share a unified API, consistent error handling, and async/await support for high-performance data access.