Delegate MCP Implementation Details
Server Architecture
Class: DelegateMCPServer
Main server class handling all MCP protocol communication and tool routing.
class DelegateMCPServer:
def __init__(self):
self.delegations: Dict[str, dict] = {} # Active delegations
self.missions: Dict[str, dict] = {} # Orchestrated missions
self.boot_context: Dict[str, Any] = {} # Loaded from JSON
self.credentials: Dict[str, Any] = {} # From Locker
self.redis_pools: Dict[str, redis.Redis] = {} # Connection pools
Credential Loading
Credentials are loaded lazily on first tool execution:
- Connect to Locker Redis (port 6720)
- Find locker by ID (e1ac29f6)
- Load all 12 environment credentials
- Cache in self.credentials dict
async def _load_credentials_from_locker(self):
locker_redis = redis.Redis(host='localhost', port=6720,
password='Locker3Vault2025')
# Scan for locker, load credentials
Connection Pooling
Redis connections are pooled per environment:
async def _get_redis_connection(self, environment: str, use_vault: bool = False):
pool_key = f"{environment}:{'vault' if use_vault else 'operational'}"
if pool_key in self.redis_pools:
return self.redis_pools[pool_key]
# Create new connection, test with ping, cache in pool
Tool Routing
The _execute_tool method routes tool calls to Direct Redis handlers:
async def _execute_tool(self, tool_name: str, args: dict, user_id: str):
# KB Tools
if tool_name == "kb_create":
return await self._redis_kb_create(args, user_id)
elif tool_name == "kb_update":
return await self._redis_kb_update(args, user_id)
# ... 30+ more tool routes
Key Patterns
Redis Key Formats
- Contact:
cont:contact:{contact_id} - Track Project:
trck:project:{project_id} - Track Task:
trck:task:{project_id}:{task_id} - KB Node:
kb:{user_id}:{timestamp}:node:{node_id} - Context Summary:
ctxt:summary:{user_id}:{summary_id}
Index Patterns
- Contact name index:
cont:index:name:{name_lower} - KB roots:
kb:{user_id}:roots - KB all nodes:
kb:{user_id}:all_nodes - KB slug lookup:
kb:{user_id}:slug:{slug_path} - KB children:
kb:{user_id}:node:{parent_id}:children
Agentic Loop
The delegation loop runs until task_complete is called:
async def _run_delegation(...):
while iteration < max_iterations and not task_completed:
# Call LLM with working memory
response = await self._call_openrouter(model, working_memory, tools)
# Extract tool calls
tool_calls = response['choices'][0]['message'].get('tool_calls', [])
# Execute each tool, add results to memory
for tc in tool_calls:
result = await self._execute_tool(tool_name, args, user_id)
working_memory.append({"role": "tool", "content": json.dumps(result)})
if tool_name == "task_complete":
task_completed = True
break
Error Handling
All tool implementations follow this pattern:
async def _redis_tool_name(self, args: dict, user_id: str) -> dict:
conn = await self._get_redis_connection("environment", use_vault=True)
if not conn:
return {"error": "Could not connect to environment"}
try:
# Tool logic
return {"success": True, ...}
except Exception as e:
return {"error": f"Failed: {str(e)}"}