section

Implementation Details

Technical implementation details for Delegate MCP v3.0

Delegate MCP Implementation Details

Server Architecture

Class: DelegateMCPServer

Main server class handling all MCP protocol communication and tool routing.

class DelegateMCPServer:
    def __init__(self):
        self.delegations: Dict[str, dict] = {}  # Active delegations
        self.missions: Dict[str, dict] = {}      # Orchestrated missions
        self.boot_context: Dict[str, Any] = {}   # Loaded from JSON
        self.credentials: Dict[str, Any] = {}    # From Locker
        self.redis_pools: Dict[str, redis.Redis] = {}  # Connection pools

Credential Loading

Credentials are loaded lazily on first tool execution:

  1. Connect to Locker Redis (port 6720)
  2. Find locker by ID (e1ac29f6)
  3. Load all 12 environment credentials
  4. Cache in self.credentials dict
async def _load_credentials_from_locker(self):
    locker_redis = redis.Redis(host='localhost', port=6720, 
                                password='Locker3Vault2025')
    # Scan for locker, load credentials

Connection Pooling

Redis connections are pooled per environment:

async def _get_redis_connection(self, environment: str, use_vault: bool = False):
    pool_key = f"{environment}:{'vault' if use_vault else 'operational'}"
    if pool_key in self.redis_pools:
        return self.redis_pools[pool_key]
    # Create new connection, test with ping, cache in pool

Tool Routing

The _execute_tool method routes tool calls to Direct Redis handlers:

async def _execute_tool(self, tool_name: str, args: dict, user_id: str):
    # KB Tools
    if tool_name == "kb_create":
        return await self._redis_kb_create(args, user_id)
    elif tool_name == "kb_update":
        return await self._redis_kb_update(args, user_id)
    # ... 30+ more tool routes

Key Patterns

Redis Key Formats

  • Contact: cont:contact:{contact_id}
  • Track Project: trck:project:{project_id}
  • Track Task: trck:task:{project_id}:{task_id}
  • KB Node: kb:{user_id}:{timestamp}:node:{node_id}
  • Context Summary: ctxt:summary:{user_id}:{summary_id}

Index Patterns

  • Contact name index: cont:index:name:{name_lower}
  • KB roots: kb:{user_id}:roots
  • KB all nodes: kb:{user_id}:all_nodes
  • KB slug lookup: kb:{user_id}:slug:{slug_path}
  • KB children: kb:{user_id}:node:{parent_id}:children

Agentic Loop

The delegation loop runs until task_complete is called:

async def _run_delegation(...):
    while iteration < max_iterations and not task_completed:
        # Call LLM with working memory
        response = await self._call_openrouter(model, working_memory, tools)

        # Extract tool calls
        tool_calls = response['choices'][0]['message'].get('tool_calls', [])

        # Execute each tool, add results to memory
        for tc in tool_calls:
            result = await self._execute_tool(tool_name, args, user_id)
            working_memory.append({"role": "tool", "content": json.dumps(result)})

            if tool_name == "task_complete":
                task_completed = True
                break

Error Handling

All tool implementations follow this pattern:

async def _redis_tool_name(self, args: dict, user_id: str) -> dict:
    conn = await self._get_redis_connection("environment", use_vault=True)
    if not conn:
        return {"error": "Could not connect to environment"}

    try:
        # Tool logic
        return {"success": True, ...}
    except Exception as e:
        return {"error": f"Failed: {str(e)}"}
ID: 4650f2f2
Path: Nexus > MCP Servers > Delegate MCP > Implementation Details
Updated: 2025-12-07T17:29:54