Redis Time Series Compatibility

Overview

Redis Time Series is a Redis module that adds time-series data structure support to Redis. Orbit-RS will provide native compatibility with RedisTimeSeries commands, allowing existing applications to seamlessly migrate or integrate with Orbit’s distributed actor system.

Planned Features

Phase 12: Time Series Foundation

Core Time Series Actor

Basic Commands

Phase 13: Advanced Time Series Operations

Aggregation Commands

Statistical Functions

Phase 14: Enterprise Time Series Features

Advanced Analytics

Performance Optimizations

Technical Implementation

Actor Architecture


#[async_trait]
pub trait TimeSeriesActor: ActorWithStringKey {
    // Core operations
    async fn add_sample(&self, timestamp: u64, value: f64) -> OrbitResult<u64>;
    async fn get_sample(&self, timestamp: Option<u64>) -> OrbitResult<Option<(u64, f64)>>;
    async fn range_query(&self, from: u64, to: u64, aggregation: Option<Aggregation>) -> OrbitResult<Vec<(u64, f64)>>;
    
    // Metadata operations
    async fn create_series(&self, config: TimeSeriesConfig) -> OrbitResult<()>;
    async fn add_label(&self, key: String, value: String) -> OrbitResult<()>;
    async fn get_info(&self) -> OrbitResult<TimeSeriesInfo>;
    
    // Aggregation rules
    async fn create_rule(&self, dest_key: String, aggregation: Aggregation, bucket_duration: u64) -> OrbitResult<()>;
    async fn delete_rule(&self, dest_key: String) -> OrbitResult<bool>;
}

Data Structures


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesConfig {
    pub retention_ms: Option<u64>,
    pub chunk_size: Option<usize>,
    pub duplicate_policy: DuplicatePolicy,
    pub labels: HashMap<String, String>,
    pub uncompressed: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesSample {
    pub timestamp: u64,
    pub value: f64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregationRule {
    pub dest_key: String,
    pub aggregation: Aggregation,
    pub bucket_duration: u64,
    pub alignment_timestamp: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Aggregation {
    Avg,
    Sum,
    Min,
    Max,
    Range,
    Count,
    Std,
    Var,
    First,
    Last,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DuplicatePolicy {
    Block,    // Reject duplicate timestamps
    Last,     // Keep last value
    First,    // Keep first value
    Min,      // Keep minimum value
    Max,      // Keep maximum value
    Sum,      // Sum values
}

Command Reference

TS.CREATE

TS.CREATE key [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [DUPLICATE_POLICY policy] [LABELS label value...]

TS.ADD

TS.ADD key timestamp value [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [ON_DUPLICATE policy] [LABELS label value...]

TS.RANGE

TS.RANGE key fromTimestamp toTimestamp [LATEST] [FILTER_BY_TS ts...] [FILTER_BY_VALUE min max] [COUNT count] [AGGREGATION aggregator bucketDuration [alignTimestamp]] [SELECTED_LABELS label...]

TS.MRANGE

TS.MRANGE fromTimestamp toTimestamp [LATEST] [FILTER_BY_TS ts...] [FILTER_BY_VALUE min max] [WITHLABELS | SELECTED_LABELS label...] [COUNT count] [AGGREGATION aggregator bucketDuration [alignTimestamp]] FILTER filter...

TS.CREATERULE

TS.CREATERULE sourceKey destKey AGGREGATION aggregator bucketDuration [alignTimestamp]

Usage Examples

Basic Time Series Operations

import redis
r = redis.Redis(host='localhost', port=6380)  # Orbit-RS RESP server

# Create time series with 1 hour retention
r.execute_command('TS.CREATE', 'temperature:sensor1', 
                 'RETENTION', 3600000, 
                 'LABELS', 'sensor_id', '1', 'location', 'room_a')

# Add temperature readings
import time
now = int(time.time() * 1000)
r.execute_command('TS.ADD', 'temperature:sensor1', now, 23.5)
r.execute_command('TS.ADD', 'temperature:sensor1', now + 1000, 24.1)
r.execute_command('TS.ADD', 'temperature:sensor1', now + 2000, 23.8)

# Query range with 1-minute averages
results = r.execute_command('TS.RANGE', 'temperature:sensor1', 
                           now - 300000, now,
                           'AGGREGATION', 'AVG', 60000)
print(f"1-minute averages: {results}")

# Create downsampling rule for hourly averages
r.execute_command('TS.CREATERULE', 'temperature:sensor1', 'temperature:sensor1:hourly',
                 'AGGREGATION', 'AVG', 3600000)

Multi-Series Analytics


# Create multiple time series for different sensors
sensors = ['sensor1', 'sensor2', 'sensor3']
for sensor_id in sensors:
    r.execute_command('TS.CREATE', f'temperature:{sensor_id}',
                     'LABELS', 'sensor_id', sensor_id, 'type', 'temperature')

# Add readings to all sensors
for i, sensor_id in enumerate(sensors):
    base_temp = 20 + i  # Different base temperatures
    for j in range(100):
        timestamp = now + j * 1000
        temp = base_temp + (j % 10)  # Simulated temperature variation
        r.execute_command('TS.ADD', f'temperature:{sensor_id}', timestamp, temp)

# Query all temperature sensors for the last 10 minutes
results = r.execute_command('TS.MRANGE', now - 600000, now,
                           'AGGREGATION', 'AVG', 60000,
                           'FILTER', 'type=temperature')

for series_data in results:
    sensor_name, labels, samples = series_data
    print(f"{sensor_name}: {len(samples)} samples")

IoT Data Pipeline


# Real-time IoT data ingestion
def ingest_sensor_data(device_id, sensor_type, value):
    timestamp = int(time.time() * 1000)
    key = f"{sensor_type}:{device_id}"
    
    # Create series if it doesn't exist
    try:
        r.execute_command('TS.CREATE', key,
                         'RETENTION', 86400000,  # 24 hours
                         'LABELS', 
                         'device_id', device_id,
                         'sensor_type', sensor_type,
                         'location', get_device_location(device_id))
    except:
        pass  # Series already exists
    
    # Add sample
    r.execute_command('TS.ADD', key, timestamp, value)
    
    # Create aggregation rules for analytics
    hourly_key = f"{key}:hourly"
    daily_key = f"{key}:daily"
    
    try:
        r.execute_command('TS.CREATERULE', key, hourly_key,
                         'AGGREGATION', 'AVG', 3600000)
        r.execute_command('TS.CREATERULE', hourly_key, daily_key,
                         'AGGREGATION', 'AVG', 86400000)
    except:
        pass  # Rules already exist

# Usage
ingest_sensor_data('device_001', 'temperature', 25.6)
ingest_sensor_data('device_001', 'humidity', 65.2)
ingest_sensor_data('device_002', 'pressure', 1013.25)

Integration with Orbit Features

Distributed Time Series

Transaction Support

Performance Optimization

Monitoring and Observability

Metrics

Grafana Integration

Development Timeline

Phase Duration Features
Phase 12 8-10 weeks Core time series foundation, basic commands
Phase 13 6-8 weeks Advanced operations, aggregation rules
Phase 14 8-10 weeks Enterprise features, performance optimization

Total Estimated Effort: 22-28 weeks

Compatibility

Redis Time Series Protocol Compatibility

Integration Points

This comprehensive time series implementation will position Orbit-RS as a powerful alternative to dedicated time series databases while maintaining Redis protocol compatibility.