Beta version — Register now while we fine-tune the service and get free credits!
tutorial14 min read

What is a Rolling Buffer or Sliding Window? Complete Guide to Video Buffering

Understand rolling buffers (circular buffers) and sliding windows in video streaming. Learn how they enable time-shift playback, DVR functionality, and efficient video storage management.

VideoBuffer Team
VideoBuffer Team
February 22, 2026
What is a Rolling Buffer or Sliding Window? Complete Guide to Video Buffering

What is a Rolling Buffer or Sliding Window? Complete Guide to Video Buffering

Rolling buffers, also known as circular buffers or sliding windows, are fundamental data structures that enable modern video streaming features like time-shift playback, instant replay, and cloud DVR functionality. This comprehensive guide explains what rolling buffers are, how they work, and how to implement them effectively.

Understanding Rolling Buffers

What is a Rolling Buffer?

A rolling buffer (also called a circular buffer or ring buffer) is a fixed-size data structure that continuously stores the most recent data while automatically discarding the oldest data when the buffer is full.

Key Characteristics:

  • Fixed maximum size (e.g., 24 hours of video)
  • Continuous recording with automatic old data removal
  • FIFO (First-In-First-Out) behavior
  • Efficient memory/storage usage
  • Enables time-shift playback and replay

Visual Representation

Rolling Buffer (24-hour capacity):

┌────────────────────────────────────────────┐
│  [Hour 1] [Hour 2] ... [Hour 23] [Hour 24] │  ← Buffer Full
└────────────────────────────────────────────┘
     ↑                                    ↑
   Oldest                              Newest
   (will be deleted next)

When new data arrives:
┌────────────────────────────────────────────┐
│  [Hour 2] [Hour 3] ... [Hour 24] [Hour 25] │  ← Hour 1 deleted
└────────────────────────────────────────────┘
     ↑                                    ↑
   Oldest                              Newest

Rolling Buffer vs. Traditional Storage

FeatureRolling BufferTraditional Storage
SizeFixed (e.g., 24 hours)Unlimited (until disk full)
Old DataAutomatically deletedManually deleted or kept forever
Storage CostPredictable, constantGrows indefinitely
Use CaseLive streams, DVR, monitoringArchives, permanent records
ComplexityAutomated cleanupManual management required

How Rolling Buffers Work

The Circular Buffer Concept

A rolling buffer uses a circular data structure where the write position wraps around to the beginning when it reaches the end.

Conceptual Model:

Circular Buffer Structure:

        [Segment 5]
       /            \
  [Segment 4]    [Segment 6]
      |              |
  [Segment 3]    [Segment 7]
       \            /
        [Segment 8]
             |
        Write Head →

Buffer Operations

1. Write Operation (Adding New Data)

class RollingBuffer:
    def __init__(self, max_segments=1440):  # 24 hours at 1 min segments
        self.max_segments = max_segments
        self.segments = []
        self.write_position = 0
    
    def add_segment(self, segment):
        """
        Add new segment to buffer.
        If buffer is full, oldest segment is overwritten.
        """
        if len(self.segments) < self.max_segments:
            # Buffer not full yet, append
            self.segments.append(segment)
        else:
            # Buffer full, overwrite oldest
            self.segments[self.write_position] = segment
        
        # Move write position (circular)
        self.write_position = (self.write_position + 1) % self.max_segments
        
        return True

2. Read Operation (Playback)

def get_segments(self, start_time, end_time):
    """
    Retrieve segments between start and end times.
    Enables time-shift playback.
    """
    available_segments = []
    
    for segment in self.segments:
        if start_time <= segment.timestamp <= end_time:
            available_segments.append(segment)
    
    return sorted(available_segments, key=lambda s: s.timestamp)

def get_latest_segments(self, count=10):
    """
    Get most recent N segments for live playback.
    """
    if len(self.segments) < count:
        return self.segments
    
    # Get last N segments
    return self.segments[-count:]

3. Cleanup Operation (Removing Old Data)

def cleanup_old_segments(self, retention_hours=24):
    """
    Remove segments older than retention period.
    Called periodically or on every write.
    """
    current_time = time.time()
    cutoff_time = current_time - (retention_hours * 3600)
    
    # Remove segments older than cutoff
    self.segments = [
        seg for seg in self.segments 
        if seg.timestamp >= cutoff_time
    ]

Implementation Strategies

Strategy 1: File-Based Rolling Buffer

Store segments as individual files with timestamp-based naming.

Directory Structure:

/buffer/
├── 20260222_120000_001.ts  (oldest)
├── 20260222_120006_002.ts
├── 20260222_120012_003.ts
├── ...
└── 20260223_115954_1440.ts (newest)

Implementation:

import os
import time
from datetime import datetime, timedelta

class FileBasedRollingBuffer:
    def __init__(self, buffer_dir, retention_hours=24):
        self.buffer_dir = buffer_dir
        self.retention_hours = retention_hours
        os.makedirs(buffer_dir, exist_ok=True)
    
    def add_segment(self, segment_data):
        """
        Save segment to disk with timestamp filename.
        """
        timestamp = datetime.now()
        filename = timestamp.strftime("%Y%m%d_%H%M%S.ts")
        filepath = os.path.join(self.buffer_dir, filename)
        
        # Write segment to file
        with open(filepath, 'wb') as f:
            f.write(segment_data)
        
        # Cleanup old segments
        self.cleanup()
        
        return filepath
    
    def cleanup(self):
        """
        Remove segments older than retention period.
        """
        cutoff_time = datetime.now() - timedelta(hours=self.retention_hours)
        
        for filename in os.listdir(self.buffer_dir):
            filepath = os.path.join(self.buffer_dir, filename)
            
            # Parse timestamp from filename
            try:
                file_time = datetime.strptime(
                    filename[:15], "%Y%m%d_%H%M%S"
                )
                
                # Delete if older than retention
                if file_time < cutoff_time:
                    os.remove(filepath)
                    print(f"Deleted old segment: {filename}")
            
            except (ValueError, OSError) as e:
                print(f"Error processing {filename}: {e}")
    
    def get_segments_in_range(self, start_time, end_time):
        """
        Get all segments within time range.
        """
        segments = []
        
        for filename in sorted(os.listdir(self.buffer_dir)):
            try:
                file_time = datetime.strptime(
                    filename[:15], "%Y%m%d_%H%M%S"
                )
                
                if start_time <= file_time <= end_time:
                    filepath = os.path.join(self.buffer_dir, filename)
                    segments.append(filepath)
            
            except ValueError:
                continue
        
        return segments

# Usage
buffer = FileBasedRollingBuffer(
    buffer_dir="/var/video/buffer",
    retention_hours=24
)

# Add new segment
buffer.add_segment(video_segment_data)

# Get segments from last hour
start = datetime.now() - timedelta(hours=1)
end = datetime.now()
segments = buffer.get_segments_in_range(start, end)

Strategy 2: Database-Based Rolling Buffer

Store segment metadata in database with references to file storage.

Database Schema:

CREATE TABLE video_segments (
    id SERIAL PRIMARY KEY,
    camera_id VARCHAR(50) NOT NULL,
    segment_path VARCHAR(255) NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    duration_seconds INT NOT NULL,
    size_bytes BIGINT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_camera_timestamp (camera_id, timestamp),
    INDEX idx_timestamp (timestamp)
);

CREATE TABLE buffer_config (
    camera_id VARCHAR(50) PRIMARY KEY,
    retention_hours INT DEFAULT 24,
    max_size_gb INT DEFAULT 100
);

Implementation:

import psycopg2
from datetime import datetime, timedelta

class DatabaseRollingBuffer:
    def __init__(self, db_conn, camera_id, retention_hours=24):
        self.db = db_conn
        self.camera_id = camera_id
        self.retention_hours = retention_hours
    
    def add_segment(self, segment_path, duration, size_bytes):
        """
        Register new segment in database.
        """
        cursor = self.db.cursor()
        
        # Insert segment record
        cursor.execute("""
            INSERT INTO video_segments 
            (camera_id, segment_path, timestamp, duration_seconds, size_bytes)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING id
        """, (
            self.camera_id,
            segment_path,
            datetime.now(),
            duration,
            size_bytes
        ))
        
        segment_id = cursor.fetchone()[0]
        self.db.commit()
        
        # Cleanup old segments
        self.cleanup()
        
        return segment_id
    
    def cleanup(self):
        """
        Remove segments older than retention period.
        """
        cursor = self.db.cursor()
        cutoff_time = datetime.now() - timedelta(hours=self.retention_hours)
        
        # Get segments to delete
        cursor.execute("""
            SELECT id, segment_path 
            FROM video_segments 
            WHERE camera_id = %s AND timestamp < %s
        """, (self.camera_id, cutoff_time))
        
        old_segments = cursor.fetchall()
        
        # Delete files and database records
        for segment_id, segment_path in old_segments:
            try:
                # Delete physical file
                if os.path.exists(segment_path):
                    os.remove(segment_path)
                
                # Delete database record
                cursor.execute(
                    "DELETE FROM video_segments WHERE id = %s",
                    (segment_id,)
                )
            except OSError as e:
                print(f"Error deleting segment {segment_id}: {e}")
        
        self.db.commit()
        cursor.close()
    
    def get_segments(self, start_time, end_time):
        """
        Query segments within time range.
        """
        cursor = self.db.cursor()
        
        cursor.execute("""
            SELECT segment_path, timestamp, duration_seconds
            FROM video_segments
            WHERE camera_id = %s 
              AND timestamp >= %s 
              AND timestamp <= %s
            ORDER BY timestamp ASC
        """, (self.camera_id, start_time, end_time))
        
        segments = cursor.fetchall()
        cursor.close()
        
        return segments
    
    def get_buffer_stats(self):
        """
        Get current buffer statistics.
        """
        cursor = self.db.cursor()
        
        cursor.execute("""
            SELECT 
                COUNT(*) as segment_count,
                SUM(size_bytes) as total_size_bytes,
                MIN(timestamp) as oldest_segment,
                MAX(timestamp) as newest_segment,
                SUM(duration_seconds) as total_duration_seconds
            FROM video_segments
            WHERE camera_id = %s
        """, (self.camera_id,))
        
        stats = cursor.fetchone()
        cursor.close()
        
        return {
            'segment_count': stats[0],
            'total_size_gb': stats[1] / (1024**3) if stats[1] else 0,
            'oldest_segment': stats[2],
            'newest_segment': stats[3],
            'total_duration_hours': stats[4] / 3600 if stats[4] else 0
        }

Strategy 3: Cloud Storage Rolling Buffer

Use cloud object storage (S3, Google Cloud Storage) with lifecycle policies.

AWS S3 Implementation:

import boto3
from datetime import datetime, timedelta

class S3RollingBuffer:
    def __init__(self, bucket_name, camera_id, retention_hours=24):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
        self.camera_id = camera_id
        self.retention_hours = retention_hours
        self.prefix = f"cameras/{camera_id}/segments/"
    
    def add_segment(self, segment_data):
        """
        Upload segment to S3.
        """
        timestamp = datetime.now()
        key = f"{self.prefix}{timestamp.strftime('%Y/%m/%d/%H%M%S')}.ts"
        
        # Upload to S3
        self.s3.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=segment_data,
            Metadata={
                'camera_id': self.camera_id,
                'timestamp': timestamp.isoformat(),
            },
            StorageClass='STANDARD_IA'  # Infrequent Access for cost savings
        )
        
        return key
    
    def setup_lifecycle_policy(self):
        """
        Configure S3 lifecycle rule for automatic deletion.
        """
        lifecycle_config = {
            'Rules': [
                {
                    'Id': f'delete-old-segments-{self.camera_id}',
                    'Status': 'Enabled',
                    'Prefix': self.prefix,
                    'Expiration': {
                        'Days': self.retention_hours // 24 + 1
                    }
                }
            ]
        }
        
        self.s3.put_bucket_lifecycle_configuration(
            Bucket=self.bucket,
            LifecycleConfiguration=lifecycle_config
        )
    
    def get_segments(self, start_time, end_time):
        """
        List segments within time range.
        """
        segments = []
        
        # List objects with prefix
        paginator = self.s3.get_paginator('list_objects_v2')
        
        for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
            for obj in page.get('Contents', []):
                # Get object metadata
                metadata = self.s3.head_object(
                    Bucket=self.bucket,
                    Key=obj['Key']
                )
                
                segment_time = datetime.fromisoformat(
                    metadata['Metadata']['timestamp']
                )
                
                if start_time <= segment_time <= end_time:
                    segments.append({
                        'key': obj['Key'],
                        'timestamp': segment_time,
                        'size': obj['Size']
                    })
        
        return sorted(segments, key=lambda s: s['timestamp'])
    
    def generate_presigned_url(self, key, expiration=3600):
        """
        Generate temporary URL for segment playback.
        """
        url = self.s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': self.bucket, 'Key': key},
            ExpiresIn=expiration
        )
        return url

Use Cases for Rolling Buffers

1. Security Camera DVR

Scenario: Store last 48 hours of footage from security cameras.

Configuration:
  Retention: 48 hours
  Segment Size: 6 seconds
  Resolution: 720p @ 2 Mbps
  Storage per Camera: ~42 GB
  Cameras: 16
  Total Storage: ~672 GB

Features:
  - Live monitoring
  - Rewind to any point in last 48 hours
  - Export clips for incidents
  - Automatic old footage deletion

2. Live Sports Replay

Scenario: Enable instant replay for live sports broadcasts.

Configuration:
  Retention: 2 hours (game duration + buffer)
  Segment Size: 2 seconds (low latency)
  Resolution: 1080p @ 6 Mbps
  Storage: ~5.4 GB per stream

Features:
  - Instant replay capability
  - Multi-angle replay
  - Highlight clip creation
  - Frame-by-frame analysis

3. Time-Shifted TV

Scenario: Allow viewers to pause, rewind live TV.

Configuration:
  Retention: 2 hours per viewer
  Segment Size: 6 seconds
  Resolution: Various (ABR)
  Personal buffer per user

Features:
  - Pause live TV
  - Rewind up to 2 hours
  - Fast forward to live
  - Resume on different device

Best Practices

1. Size Your Buffer Appropriately

Calculate Required Storage:

def calculate_buffer_storage(bitrate_mbps, retention_hours):
    """
    Calculate storage needed for rolling buffer.
    
    bitrate_mbps: Video bitrate in Mbps
    retention_hours: Hours of video to retain
    """
    # Convert bitrate to bytes per second
    bytes_per_second = (bitrate_mbps * 1_000_000) / 8
    
    # Calculate for retention period
    seconds = retention_hours * 3600
    total_bytes = bytes_per_second * seconds
    
    # Convert to GB
    total_gb = total_bytes / (1024**3)
    
    return round(total_gb, 2)

# Example: 720p at 2 Mbps for 24 hours
print(calculate_buffer_storage(2, 24))  # Output: 21.09 GB

2. Implement Efficient Cleanup

Batch Deletion:

def cleanup_batch(buffer_dir, retention_hours, batch_size=100):
    """
    Delete old segments in batches for better performance.
    """
    cutoff_time = datetime.now() - timedelta(hours=retention_hours)
    deleted_count = 0
    
    # Get all files
    files = sorted(os.listdir(buffer_dir))
    
    # Process in batches
    for i in range(0, len(files), batch_size):
        batch = files[i:i+batch_size]
        
        for filename in batch:
            file_time = parse_timestamp_from_filename(filename)
            
            if file_time < cutoff_time:
                filepath = os.path.join(buffer_dir, filename)
                os.remove(filepath)
                deleted_count += 1
    
    return deleted_count

3. Monitor Buffer Health

Key Metrics:

class BufferHealthMonitor:
    def get_health_metrics(self, buffer):
        stats = buffer.get_buffer_stats()
        
        return {
            'storage_usage_percent': (
                stats['total_size_gb'] / buffer.max_size_gb * 100
            ),
            'retention_actual_hours': stats['total_duration_hours'],
            'retention_target_hours': buffer.retention_hours,
            'segment_count': stats['segment_count'],
            'oldest_segment_age_hours': (
                (datetime.now() - stats['oldest_segment']).total_seconds() / 3600
            ),
            'write_rate_segments_per_hour': self.calculate_write_rate(stats)
        }
    
    def check_health(self, metrics):
        issues = []
        
        if metrics['storage_usage_percent'] > 90:
            issues.append('Storage nearly full')
        
        if metrics['retention_actual_hours'] < metrics['retention_target_hours'] * 0.9:
            issues.append('Not retaining enough history')
        
        return {
            'healthy': len(issues) == 0,
            'issues': issues,
            'metrics': metrics
        }

Conclusion

Rolling buffers are essential for modern video streaming applications, enabling:

  • Time-shift playback: Watch live content from any point in the buffer
  • Cloud DVR: Personal recording without local storage
  • Instant replay: Quick access to recent content
  • Efficient storage: Automatic cleanup of old data
  • Predictable costs: Fixed storage requirements

Getting Started with VideoBuffer

VideoBuffer provides fully managed rolling buffer functionality:

  • Configurable retention (1 hour to 30 days)
  • Automatic segment management
  • Time-shift playback API
  • Cloud storage optimization
  • No infrastructure management

Start your free trial and implement rolling buffers in minutes, not months.


Related Articles: