Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Menelaus29/c2-framework/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The server main module implements the FastAPI application that receives and processes agent beacons. It handles all inbound C2 protocol messages including check-ins, task pulls, task results, and heartbeats. Source: server/server_main.py

Application Setup

FastAPI Application

app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
The FastAPI application is configured with:
  • Disabled API documentation endpoints for operational security
  • Async lifespan context manager for startup/shutdown
  • Single /beacon endpoint for all agent communication

Lifespan Management

@asynccontextmanager
async def lifespan(app)
Manages server lifecycle with automatic resource initialization and cleanup. Startup:
  • Initializes SQLite database connection
  • Creates SessionManager instance
  • Creates CommandQueue instance
  • Restores active sessions from database
Shutdown:
  • Closes database connection cleanly
  • Logs shutdown event

Global State

The server maintains shared state across all requests:
db
Database
Database instance for persistent storage
session_mgr
SessionManager
In-memory session state manager
cmd_queue
CommandQueue
Command queue manager for all active sessions

API Endpoints

POST /beacon

@app.post('/beacon')
async def beacon(request: Request) -> Response
Main C2 communication endpoint. Receives encrypted agent messages and returns encrypted responses.
request
Request
FastAPI Request object containing encrypted beacon payload
Response
Response
Encrypted response with status code:
  • 200: Success with packed response body
  • 400: Bad request (invalid payload, unknown message type)
  • 409: Replay attack detected (duplicate nonce)
  • 413: Payload too large (exceeds MAX_BEACON_SIZE)
  • 500: Internal server error
Processing Flow:
  1. Size Check: Reject payloads larger than 262144 bytes (256 KB)
  2. Decryption: Unpack and decrypt payload using session key
  3. Replay Protection: Verify nonce hasn’t been seen in last 24 hours
  4. Dispatch: Route message to appropriate handler based on msg_type
  5. Response: Encrypt and return response payload
Message Types:
Agent sends initial check-in with system information. Server creates new session and returns assigned session_id.Handler: _handle_checkin()
Agent polls for pending tasks. Server returns next task from queue or no-task response.Handler: _handle_task_pull()
Agent submits task execution results. Server stores output and marks task complete.Handler: _handle_task_result()
Agent sends heartbeat to update last_seen timestamp. Server acknowledges.Handler: _handle_heartbeat()

Message Handlers

_handle_checkin()

async def _handle_checkin(payload: dict, source_ip: str) -> dict
Register new agent session and return assigned session_id.
payload
dict
Decrypted message payload containing:
  • hostname: Agent hostname
  • username: Current username
  • os: Operating system version
  • agent_ver: Agent version string
  • jitter_pct: Beacon jitter percentage
source_ip
str
Source IP address from HTTP request
return
dict
Response payload:
{
  "msg_type": "CHECKIN",
  "session_id": "uuid-string",
  "payload": {
    "session_id": "uuid-string",
    "status": "ok"
  }
}
Example:
# Agent check-in payload
payload = {
    'msg_type': 'CHECKIN',
    'payload': {
        'hostname': 'VICTIM-PC',
        'username': 'jdoe',
        'os': 'Windows 10 22H2',
        'agent_ver': '1.0.0',
        'jitter_pct': 20
    }
}

# Server creates session and responds
response = await _handle_checkin(payload, '192.168.1.100')
# Returns: session_id assigned to agent

_handle_task_pull()

async def _handle_task_pull(session_id: str) -> dict
Return next pending task for the session, or a no-task response.
session_id
str
UUID of the agent session
return
dict | None
Response payload with task dispatch or no-task message:Task Available:
{
  "msg_type": "TASK_DISPATCH",
  "session_id": "uuid",
  "payload": {
    "task_id": "uuid",
    "command": "shell",
    "args": ["whoami"],
    "timeout_s": 30
  }
}
No Task:
{
  "msg_type": "TASK_PULL",
  "session_id": "uuid",
  "payload": {"status": "no_task"}
}
Session Terminated:
{
  "msg_type": "TERMINATE",
  "session_id": "uuid",
  "payload": {"reason": "session killed by operator"}
}
Returns None if session_id is invalid.
Behavior:
  • Updates last_seen timestamp for session
  • Checks if session is still active in database
  • Sends TERMINATE message if session was deactivated
  • Peeks at next pending task without removing from queue
  • Marks task as DISPATCHED when sent to agent

_handle_task_result()

async def _handle_task_result(session_id: str, payload: dict) -> dict
Store task result and mark task complete.
session_id
str
UUID of the agent session
payload
dict
Decrypted message containing task results:
  • task_id: UUID of completed task
  • stdout: Command standard output
  • stderr: Command standard error
  • exit_code: Process exit code
  • duration_ms: Execution duration in milliseconds
return
dict | None
Acknowledgment response:
{
  "msg_type": "TASK_RESULT",
  "session_id": "uuid",
  "payload": {
    "status": "received",
    "task_id": "uuid"
  }
}
Returns None if session_id is invalid.
Example:
# Agent submits task result
payload = {
    'msg_type': 'TASK_RESULT',
    'session_id': 'abc-123',
    'payload': {
        'task_id': 'def-456',
        'stdout': 'VICTIM-PC\\jdoe',
        'stderr': '',
        'exit_code': 0,
        'duration_ms': 42
    }
}

response = await _handle_task_result('abc-123', payload)
# Stores result in database and marks task complete

_handle_heartbeat()

async def _handle_heartbeat(session_id: str) -> dict
Update last_seen and acknowledge heartbeat.
session_id
str
UUID of the agent session
return
dict | None
Acknowledgment response:
{
  "msg_type": "HEARTBEAT",
  "session_id": "uuid",
  "payload": {"status": "ok"}
}
Returns None if session_id is invalid.

Dispatch Helper

_dispatch()

async def _dispatch(msg_type: str, session_id: str,
                    payload: dict, source_ip: str) -> dict | None
Route message to the correct handler and return the response payload dict.
msg_type
str
Message type from protocol: CHECKIN, TASK_PULL, TASK_RESULT, HEARTBEAT
session_id
str
UUID of the agent session (may be None for CHECKIN)
payload
dict
Decrypted message payload
source_ip
str
Source IP address from HTTP request
return
dict | None
Response payload dict to encrypt and return, or None if unknown msg_type

Catch-All Route

Catch-All Handler

@app.api_route('/{path:path}', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
async def catch_all(path: str) -> JSONResponse
Reject all non-beacon paths to reduce attack surface.
Response
JSONResponse
Status 404 with body: {"error": "not found"}

Constants

MAX_BEACON_SIZE
int
default:"262144"
Maximum allowed beacon payload size in bytes (256 KB). Accommodates max stdout+stderr plus protocol overhead.

Running the Server

Entry Point

if __name__ == '__main__':
    ssl_kwargs = {} if config.BEHIND_NGINX else {
        'ssl_keyfile':  config.TLS_CERT_PATH.replace('.crt', '.key'),
        'ssl_certfile': config.TLS_CERT_PATH,
    }
    uvicorn.run(
        'server.server_main:app',
        host      = '0.0.0.0',
        port      = config.BACKEND_PORT,
        log_level = config.LOG_LEVEL.lower(),
        **ssl_kwargs,
    )
Configuration:
  • Binds to all interfaces (0.0.0.0)
  • Uses port from config.BACKEND_PORT
  • Enables TLS unless behind Nginx reverse proxy
  • Log level from config.LOG_LEVEL
TLS Modes:

Behind Nginx

Runs without TLS. Nginx handles SSL/TLS termination on port 443.

Direct

Runs with TLS using certificates from config.TLS_CERT_PATH.

Security Features

Rejects payloads larger than 256 KB to prevent memory exhaustion attacks.
Every message includes a nonce. Server rejects duplicate nonces within 24-hour window.
All messages encrypted with AES-256-GCM using shared session key.
Only /beacon endpoint exposed. All other paths return 404.
Swagger and ReDoc endpoints disabled to avoid information disclosure.

Logging

Structured logging with contextual fields:
logger.info('beacon received', extra={
    'source_ip': source_ip,
    'payload_size_bytes': len(raw_body),
    'session_id': None,
})
Events:
  • Server startup/shutdown
  • Beacon received/unpacked
  • Agent check-in
  • Task dispatched
  • Task result received
  • Replay attempts
  • Unpack failures
  • Invalid sessions

SessionManager

Session state management API

CommandQueue

Task queue management API

Database

Persistent storage API

Message Format

C2 protocol message format