PostgreSQL TimescaleDB Compatibility

Overview

TimescaleDB is a PostgreSQL extension that transforms PostgreSQL into a scalable time-series database. Orbit-RS will provide native compatibility with TimescaleDB functions and features, allowing existing applications to seamlessly migrate from TimescaleDB to Orbit’s distributed architecture.

Planned Features

Phase 12: TimescaleDB Foundation

Core Time Series Tables

Basic Functions

Phase 13: Advanced TimescaleDB Features

Continuous Aggregates

Advanced Analytics

Phase 14: Enterprise TimescaleDB Features

Data Retention and Compression

Multi-Node and Clustering

Technical Implementation

SQL Extensions

Hypertable Management

-- Create extension
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- Create hypertable from existing table
SELECT create_hypertable('sensor_data', 'timestamp', 
    chunk_time_interval => INTERVAL '1 hour',
    partitioning_column => 'sensor_id',
    number_partitions => 4);

-- Create retention policy
SELECT add_retention_policy('sensor_data', INTERVAL '30 days');

-- Create compression policy
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');

Time-Series Queries

-- Time bucket aggregation
SELECT 
    time_bucket('1 hour', timestamp) AS hour,
    sensor_id,
    AVG(temperature) as avg_temp,
    MAX(temperature) as max_temp,
    MIN(temperature) as min_temp
FROM sensor_data 
WHERE timestamp >= NOW() - INTERVAL '1 day'
GROUP BY hour, sensor_id
ORDER BY hour DESC;

-- Gap filling with interpolation
SELECT 
    time_bucket_gapfill('15 minutes', timestamp) AS bucket,
    sensor_id,
    AVG(temperature) as avg_temp,
    interpolate(AVG(temperature)) as interpolated_temp
FROM sensor_data 
WHERE timestamp >= NOW() - INTERVAL '4 hours'
  AND sensor_id = 'temp_001'
GROUP BY bucket, sensor_id
ORDER BY bucket;

-- Continuous aggregate (materialized view)
CREATE MATERIALIZED VIEW sensor_data_hourly
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', timestamp) AS hour,
    sensor_id,
    AVG(temperature) as avg_temp,
    COUNT(*) as sample_count
FROM sensor_data
GROUP BY hour, sensor_id;

-- Refresh continuous aggregate
CALL refresh_continuous_aggregate('sensor_data_hourly', 
    start_time => NOW() - INTERVAL '1 day',
    end_time => NOW());

Actor Architecture


#[async_trait]
pub trait HypertableActor: ActorWithStringKey {
    // Hypertable management
    async fn create_hypertable(&self, config: HypertableConfig) -> OrbitResult<()>;
    async fn drop_hypertable(&self) -> OrbitResult<()>;
    async fn get_hypertable_info(&self) -> OrbitResult<HypertableInfo>;
    
    // Chunk management
    async fn show_chunks(&self, time_range: Option<TimeRange>) -> OrbitResult<Vec<ChunkInfo>>;
    async fn drop_chunks(&self, older_than: SystemTime) -> OrbitResult<Vec<String>>;
    async fn compress_chunk(&self, chunk_id: String) -> OrbitResult<()>;
    
    // Time-series operations
    async fn time_bucket_query(&self, params: TimeBucketParams) -> OrbitResult<Vec<TimeBucketResult>>;
    async fn gapfill_query(&self, params: GapfillParams) -> OrbitResult<Vec<GapfillResult>>;
    
    // Continuous aggregates
    async fn create_continuous_aggregate(&self, config: ContinuousAggregateConfig) -> OrbitResult<()>;
    async fn refresh_continuous_aggregate(&self, time_range: TimeRange) -> OrbitResult<()>;
}

Data Structures


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HypertableConfig {
    pub table_name: String,
    pub time_column: String,
    pub chunk_time_interval: Duration,
    pub partitioning_column: Option<String>,
    pub number_partitions: Option<u32>,
    pub replication_factor: Option<u16>,
    pub data_retention: Option<Duration>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkInfo {
    pub chunk_id: String,
    pub chunk_name: String,
    pub start_time: SystemTime,
    pub end_time: SystemTime,
    pub size_bytes: u64,
    pub compressed: bool,
    pub node_id: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeBucketParams {
    pub bucket_size: Duration,
    pub time_column: String,
    pub group_by: Vec<String>,
    pub aggregates: Vec<AggregateFunction>,
    pub time_range: TimeRange,
    pub filters: Vec<Filter>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContinuousAggregateConfig {
    pub view_name: String,
    pub source_table: String,
    pub refresh_policy: RefreshPolicy,
    pub materialization_hypertable_config: Option<HypertableConfig>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RefreshPolicy {
    Manual,
    Automatic { 
        refresh_interval: Duration,
        max_interval_per_job: Duration,
    },
    RealTime { 
        max_interval_per_job: Duration,
        materialized_only: bool,
    },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AggregateFunction {
    Count,
    Sum(String),
    Avg(String),
    Min(String),
    Max(String),
    StdDev(String),
    Variance(String),
    Percentile { column: String, percentile: f64 },
    First { column: String, order_by: String },
    Last { column: String, order_by: String },
}

TimescaleDB Functions Reference

Hypertable Functions

Time Functions

Analytics Functions

Compression Functions

Usage Examples

IoT Sensor Data Management

-- Create sensors table
CREATE TABLE sensor_readings (
    timestamp TIMESTAMPTZ NOT NULL,
    sensor_id TEXT NOT NULL,
    location TEXT NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION,
    pressure DOUBLE PRECISION,
    battery_level INTEGER
);

-- Convert to hypertable with 1-hour chunks, partitioned by sensor_id
SELECT create_hypertable('sensor_readings', 'timestamp',
    chunk_time_interval => INTERVAL '1 hour',
    partitioning_column => 'sensor_id',
    number_partitions => 8);

-- Create index for efficient queries
CREATE INDEX ON sensor_readings (sensor_id, timestamp DESC);

-- Set up data retention (keep data for 90 days)
SELECT add_retention_policy('sensor_readings', INTERVAL '90 days');

-- Set up compression (compress data older than 1 day)
SELECT add_compression_policy('sensor_readings', INTERVAL '1 day');

-- Insert sample data
INSERT INTO sensor_readings VALUES 
('2024-01-01 10:00:00', 'temp_001', 'building_a', 23.5, 45.2, 1013.25, 87),
('2024-01-01 10:01:00', 'temp_001', 'building_a', 23.6, 45.1, 1013.30, 87),
('2024-01-01 10:02:00', 'temp_001', 'building_a', 23.4, 45.3, 1013.20, 86);

-- Query with time bucketing
SELECT 
    time_bucket('15 minutes', timestamp) AS fifteen_min_bucket,
    sensor_id,
    location,
    AVG(temperature) as avg_temp,
    MAX(temperature) - MIN(temperature) as temp_range,
    AVG(humidity) as avg_humidity
FROM sensor_readings
WHERE timestamp >= NOW() - INTERVAL '4 hours'
GROUP BY fifteen_min_bucket, sensor_id, location
ORDER BY fifteen_min_bucket DESC, sensor_id;

Financial Time Series

-- Create stock prices table
CREATE TABLE stock_prices (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol TEXT NOT NULL,
    price DOUBLE PRECISION NOT NULL,
    volume BIGINT,
    market_cap DOUBLE PRECISION
);

-- Create hypertable for stock data
SELECT create_hypertable('stock_prices', 'timestamp',
    chunk_time_interval => INTERVAL '1 day',
    partitioning_column => 'symbol',
    number_partitions => 16);

-- Create continuous aggregate for OHLCV data
CREATE MATERIALIZED VIEW stock_ohlcv_1min
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 minute', timestamp) AS bucket,
    symbol,
    first(price, timestamp) AS open,
    max(price) AS high,
    min(price) AS low,
    last(price, timestamp) AS close,
    sum(volume) AS volume
FROM stock_prices
GROUP BY bucket, symbol;

-- Add automatic refresh policy
SELECT add_continuous_aggregate_policy('stock_ohlcv_1min',
    start_offset => INTERVAL '1 hour',
    end_offset => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

-- Query OHLCV data with gap filling
SELECT 
    time_bucket_gapfill('1 minute', bucket, NOW() - INTERVAL '2 hours', NOW()) AS time,
    symbol,
    locf(open) AS open,
    locf(high) AS high,
    locf(low) AS low,
    locf(close) AS close,
    coalesce(volume, 0) AS volume
FROM stock_ohlcv_1min
WHERE bucket >= NOW() - INTERVAL '2 hours'
  AND symbol = 'AAPL'
GROUP BY time, symbol
ORDER BY time DESC;

Application Performance Monitoring

-- Create metrics table for APM data
CREATE TABLE app_metrics (
    timestamp TIMESTAMPTZ NOT NULL,
    service_name TEXT NOT NULL,
    instance_id TEXT NOT NULL,
    metric_name TEXT NOT NULL,
    metric_value DOUBLE PRECISION NOT NULL,
    tags JSONB
);

-- Create hypertable
SELECT create_hypertable('app_metrics', 'timestamp',
    chunk_time_interval => INTERVAL '6 hours',
    partitioning_column => 'service_name',
    number_partitions => 4);

-- Create index for efficient tag queries
CREATE INDEX ON app_metrics USING GIN (tags);

-- Continuous aggregate for service health dashboard
CREATE MATERIALIZED VIEW service_health_5min
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('5 minutes', timestamp) AS bucket,
    service_name,
    metric_name,
    AVG(metric_value) as avg_value,
    percentile_agg(0.95) as p95_value,
    percentile_agg(0.99) as p99_value,
    COUNT(*) as sample_count
FROM app_metrics
WHERE metric_name IN ('response_time', 'cpu_usage', 'memory_usage', 'error_rate')
GROUP BY bucket, service_name, metric_name;

-- Query service performance trends
SELECT 
    bucket,
    service_name,
    metric_name,
    avg_value,
    percentile(p95_value, 0.95) OVER (
        PARTITION BY service_name, metric_name 
        ORDER BY bucket 
        ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
    ) as rolling_p95
FROM service_health_5min
WHERE bucket >= NOW() - INTERVAL '1 hour'
  AND service_name = 'user-service'
ORDER BY bucket DESC, metric_name;

Integration with Orbit Features

Distributed Architecture

Performance Optimization

Advanced Features

Monitoring and Observability

TimescaleDB-specific Metrics

Grafana Integration

Development Timeline

Phase Duration Features
Phase 12 10-12 weeks Core hypertable functionality, basic time functions
Phase 13 8-10 weeks Continuous aggregates, advanced analytics functions
Phase 14 10-12 weeks Compression, multi-node features, enterprise analytics

Total Estimated Effort: 28-34 weeks

Compatibility and Migration

TimescaleDB Compatibility

Migration Tools

Integration Ecosystem

This comprehensive TimescaleDB implementation will establish Orbit-RS as a powerful alternative to TimescaleDB while providing enhanced distributed capabilities and maintaining full PostgreSQL compatibility.