Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Menelaus29/c2-framework/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The feature extractor computes statistical and entropy features from FlowRecords for machine learning analysis. It extracts timing features (IAT statistics, burstiness), flow features (throughput), size features (payload distributions), and entropy measures.
Source: telemetry/feature_extractor.py
Feature Categories
Extracted features are organized into four categories:
Timing Features
| Feature | Description | Use Case |
|---|
mean_iat | Mean inter-arrival time (seconds) | Detect regular beaconing |
std_iat | Standard deviation of IAT | Measure timing jitter |
min_iat | Minimum IAT | Detect burst patterns |
max_iat | Maximum IAT | Identify gaps |
burstiness | Coefficient of variation (std/mean) | Distinguish bursty vs regular traffic |
iat_autocorr | Lag-1 autocorrelation of IAT series | Detect periodic patterns |
Flow Features
| Feature | Description | Use Case |
|---|
flow_duration_s | Total flow duration (seconds) | Session length analysis |
total_bytes | Total bytes transferred | Volume analysis |
total_packets | Total packet count | Activity level |
bytes_per_second | Throughput (bytes/sec) | Bandwidth usage |
packets_per_second | Packet rate (packets/sec) | Activity intensity |
Size Features
| Feature | Description | Use Case |
|---|
payload_len_mean | Mean payload size (bytes) | Detect padding |
payload_len_std | Std dev of payload size | Size variance |
payload_len_min | Minimum payload size | Detect empty packets |
payload_len_max | Maximum payload size | MTU analysis |
Entropy Features
| Feature | Description | Use Case |
|---|
shannon_entropy | Shannon entropy of payload sizes | Detect encryption/randomization |
Core Functions
Extract all features from a single FlowRecord:
from telemetry.feature_extractor import extract_features
from telemetry.flow_parser import FlowRecord
flow = FlowRecord(
src_ip='192.168.56.102',
dst_ip='192.168.56.101',
src_port=54321,
dst_port=443,
protocol='TCP',
start_time=1710163852.0,
end_time=1710163854.5,
duration_s=2.5,
packet_count=10,
byte_count=8000,
inter_arrival_times=[0.2, 0.3, 0.25, 0.28, 0.22, 0.31, 0.27, 0.24, 0.29],
payload_sizes=[1400, 1400, 1400, 1400, 1400, 600, 0, 0, 0, 0],
beacon_iats=[10.5]
)
features = extract_features(flow)
print(f"Mean IAT: {features['mean_iat']:.4f}s")
print(f"Burstiness: {features['burstiness']:.4f}")
print(f"Throughput: {features['bytes_per_second']:.1f} bytes/s")
print(f"Entropy: {features['shannon_entropy']:.4f}")
Returns: Dictionary containing all 20 features plus metadata fields:
{
# Metadata
'src_ip': '192.168.56.102',
'dst_ip': '192.168.56.101',
'src_port': 54321,
'dst_port': 443,
'protocol': 'TCP',
# Timing features
'mean_iat': 0.2611,
'std_iat': 0.0359,
'min_iat': 0.2,
'max_iat': 0.31,
'burstiness': 0.1375,
'iat_autocorr': -0.1234,
# Flow features
'flow_duration_s': 2.5,
'total_bytes': 8000,
'total_packets': 10,
'bytes_per_second': 3200.0,
'packets_per_second': 4.0,
# Size features
'payload_len_mean': 800.0,
'payload_len_std': 620.48,
'payload_len_min': 0.0,
'payload_len_max': 1400.0,
# Entropy features
'shannon_entropy': 1.5710
}
Load a .flows file and extract features for all flows:
from telemetry.feature_extractor import extract_all
features = extract_all('pcaps/capture.flows')
print(f'Extracted {len(features)} feature vectors')
# Access individual features
for feat in features:
if feat['dst_port'] == 443:
print(f"{feat['src_ip']} -> {feat['dst_ip']}")
print(f" Mean IAT: {feat['mean_iat']:.4f}s")
print(f" Entropy: {feat['shannon_entropy']:.4f}")
Parameters:
flows_file (str): Path to .flows JSON Lines file
Returns: list[dict] - One feature dictionary per flow
save_features
Write features to both CSV and JSON formats:
from telemetry.feature_extractor import save_features
save_features(features, 'pcaps/capture.features.csv')
# Creates two files:
# pcaps/capture.features.csv
# pcaps/capture.features.json
Parameters:
features (list[dict]): Feature dictionaries from extract_all()
output_file (str): Base output path (.csv suffix optional)
Output Formats:
CSV - Header row with all feature columns:
src_ip,dst_ip,src_port,dst_port,protocol,mean_iat,std_iat,min_iat,max_iat,burstiness,iat_autocorr,flow_duration_s,total_bytes,total_packets,bytes_per_second,packets_per_second,payload_len_mean,payload_len_std,payload_len_min,payload_len_max,shannon_entropy
192.168.56.102,192.168.56.101,54321,443,TCP,0.2611,0.0359,0.2,0.31,0.1375,-0.1234,2.5,8000,10,3200.0,4.0,800.0,620.48,0.0,1400.0,1.5710
JSON Lines - One object per line:
{"src_ip":"192.168.56.102","dst_ip":"192.168.56.101","src_port":54321,"dst_port":443,"protocol":"TCP","mean_iat":0.2611,"std_iat":0.0359,"min_iat":0.2,"max_iat":0.31,"burstiness":0.1375,"iat_autocorr":-0.1234,"flow_duration_s":2.5,"total_bytes":8000,"total_packets":10,"bytes_per_second":3200.0,"packets_per_second":4.0,"payload_len_mean":800.0,"payload_len_std":620.48,"payload_len_min":0.0,"payload_len_max":1400.0,"shannon_entropy":1.571}
Command-Line Usage
Run as a standalone module:
# Basic feature extraction
python -m telemetry.feature_extractor \
--input pcaps/capture.flows \
--output pcaps/capture.features.csv
# Output
Extracted 42 feature vectors → pcaps/capture.features.csv
Arguments:
--input (required): Input .flows file from flow_parser
--output (required): Output CSV file (JSON also written automatically)
Feature Computation Details
Burstiness
Coefficient of variation of inter-arrival times:
burstiness = std_iat / mean_iat # if mean_iat > 0, else 0.0
Interpretation:
- Low values (< 0.5): Regular, periodic traffic (e.g., unmodified beacons)
- High values (> 1.0): Bursty, irregular traffic (e.g., human browsing)
IAT Autocorrelation
Lag-1 autocorrelation measures correlation between consecutive IATs:
iat_autocorr = sum(
(iats[i] - mean) * (iats[i-1] - mean)
for i in range(1, n)
) / ((n - 1) * std ** 2)
Interpretation (telemetry/feature_extractor.py:42-51):
- Positive values: Consecutive IATs are similar (periodic patterns)
- Near zero: IATs are independent (random)
- Negative values: Alternating fast/slow patterns
Shannon Entropy
Measures randomness of payload size distribution:
shannon_entropy = -sum(
(count / total) * log2(count / total)
for count in byte_counts if count > 0
)
Note: Due to lack of raw payload data in FlowRecords, entropy is computed over per-packet sizes modulo 256. This underestimates true entropy but provides consistent relative measurements across profiles.
Interpretation:
- Low entropy (< 2.0): Uniform sizes (e.g., fixed-size packets)
- High entropy (> 5.0): Varied sizes (e.g., random padding)
Integration Example
Complete pipeline from PCAP to features:
import os
from telemetry import flow_parser, feature_extractor
# Parse PCAP → flows
pcap_path = 'pcaps/experiment.pcap'
flows = flow_parser.parse_pcap(pcap_path)
# Save flows
flows_path = pcap_path.replace('.pcap', '.flows')
flow_parser.save_flows(flows, flows_path)
# Extract features
features = feature_extractor.extract_all(flows_path)
# Save features to CSV and JSON
features_path = pcap_path.replace('.pcap', '.features.csv')
feature_extractor.save_features(features, features_path)
print(f'Pipeline complete: {len(features)} feature vectors')
print(f'CSV: {features_path}')
print(f'JSON: {features_path.replace(".csv", ".json")}')
Analysis Examples
Compare Baseline vs Evasion Profiles
import csv
import statistics
def load_csv_features(filepath):
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
return [{k: float(v) if k not in ['src_ip', 'dst_ip', 'protocol']
else v for k, v in row.items()} for row in reader]
baseline = load_csv_features('pcaps/baseline.features.csv')
evasion = load_csv_features('pcaps/high.features.csv')
# Compare burstiness
baseline_burst = statistics.mean(f['burstiness'] for f in baseline)
evasion_burst = statistics.mean(f['burstiness'] for f in evasion)
print(f'Baseline burstiness: {baseline_burst:.4f}')
print(f'Evasion burstiness: {evasion_burst:.4f}')
print(f'Increase: {(evasion_burst / baseline_burst - 1) * 100:.1f}%')
# Compare entropy
baseline_entropy = statistics.mean(f['shannon_entropy'] for f in baseline)
evasion_entropy = statistics.mean(f['shannon_entropy'] for f in evasion)
print(f'\nBaseline entropy: {baseline_entropy:.4f}')
print(f'Evasion entropy: {evasion_entropy:.4f}')
Filter by Flow Characteristics
features = extract_all('pcaps/capture.flows')
# Find long-duration flows
long_flows = [f for f in features if f['flow_duration_s'] > 5.0]
print(f'Long flows (>5s): {len(long_flows)}')
# Find high-throughput flows
fast_flows = [f for f in features if f['bytes_per_second'] > 10000]
print(f'High throughput (>10KB/s): {len(fast_flows)}')
# Find regular beacon-like flows
regular_flows = [f for f in features if f['burstiness'] < 0.3]
print(f'Regular flows (burstiness < 0.3): {len(regular_flows)}')
Zero-Division Handling
Safe divisors prevent division by zero (telemetry/feature_extractor.py:12):
_SAFE_DIVISOR = 1e-9
# Used when denominators may be zero
safe_duration = duration if duration > 0 else _SAFE_DIVISOR
bytes_per_second = byte_count / safe_duration
Processing Speed:
- ~50,000 flows/second on typical hardware
- Feature extraction from 10K flow file ≈ 0.2 seconds
Memory Usage:
- Loads entire
.flows file into memory
- Typical flow: ~200 bytes in memory
- 100K flows ≈ 20 MB RAM
Output File Organization
project_root/
├── pcaps/
│ ├── baseline.pcap # Raw capture
│ ├── baseline.flows # Parsed flows (JSON Lines)
│ ├── baseline.features.csv # Features (CSV)
│ ├── baseline.features.json # Features (JSON Lines)
│ ├── high.pcap
│ ├── high.flows
│ ├── high.features.csv
│ └── high.features.json
└── telemetry/
└── feature_extractor.py
Logging
Feature extraction is logged:
from common.logger import get_logger
logger = get_logger('feature_extractor')
Log Events:
features extracted: Logged after processing (includes count)
no features extracted: Warning if flows file is empty
features saved: Logged after writing CSV/JSON (includes paths)
Troubleshooting
FileNotFoundError:
- Ensure
.flows file exists (run flow_parser first)
- Use absolute paths or run from project root
Empty feature list:
- Check if
.flows file contains valid JSON lines
- Verify flows were successfully parsed from PCAP
NaN or inf values:
- Should not occur due to safe divisors
- Report as bug if encountered
ML Integration
Features are ready for scikit-learn, TensorFlow, or PyTorch:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
# Load features as DataFrame
df = pd.read_csv('pcaps/baseline.features.csv')
# Select numeric features only
feature_cols = [
'mean_iat', 'std_iat', 'burstiness', 'iat_autocorr',
'bytes_per_second', 'packets_per_second',
'payload_len_mean', 'payload_len_std', 'shannon_entropy'
]
X = df[feature_cols].values
y = df['label'].values # if you have ground-truth labels
# Train classifier
clf = RandomForestClassifier(n_estimators=100)
clf.fit(X, y)
Next Steps
- Run experiments: See Experiments for automated pipelines
- Visualize features: Use Jupyter notebooks to plot distributions
- Train models: Feed features into ML classifiers for C2 detection
See Also