AI Business Maturity Model
Certifications
Find a CoachFind a SpeakerSign In
Deep Dive

/

Part 4

Part 4 of 6

The Streaming Layer

How the UI Stays Live

The engine is an async generator that yields typed events as each phase completes. The API route wraps this in a ReadableStream and sends it as Server-Sent Events. The user is never staring at a blank screen — every tool call, thinking step, and cost update appears in real time.

The Async Generator Pattern

The engine function signature is async function* — an async generator. Instead of returning a single value, it yields events throughout execution. This is what makes real-time streaming possible without WebSockets or polling.

Engine as an async generator

TypeScript
// The engine yields events throughout its execution
async function* streamWorker(
  worker: WorkerDefinition,
  goal: string,
  chatHistory: ChatMessage[],
  organizationId: string,
  userId: string
): AsyncGenerator<WorkerEvent> {

  // Phase 0: Initialize state
  const state = createWorkerState(worker, goal, chatHistory, organizationId, userId);

  // Phase 1: Pre-enrichment
  yield { type: 'pre_enrichment', data: { message: 'Gathering context...' } };
  const enrichment = await runWorkerPreEnrichment(worker, goal, organizationId, userId);
  if (enrichment.contextMarkdown) {
    state.preEnrichedContext = enrichment.contextMarkdown;
    yield {
      type: 'pre_enrichment',
      data: { toolsRun: enrichment.toolsRun, message: `Pre-loaded ${enrichment.toolsRun.length} data sources` }
    };
  }

  // Phase 2: Main loop
  while (true) {
    const exit = checkExitConditions(state);
    if (exit.shouldExit) {
      // Phase 4: Synthesis fallback
      const result = await synthesizeResponse(worker, state);
      yield { type: 'response', data: { message: result.message } };
      break;
    }

    yield { type: 'status', data: { message: `Pass ${state.passCount + 1}: analyzing...` } };

    // Get AI decision
    const decision = await getAIDecision(worker, state);
    yield { type: 'thinking', data: { thinking: decision.thinking } };

    // Update living document
    if (decision.document_updates) {
      updateDocument(state, decision.document_updates);
      yield { type: 'document_update', data: { sections: state.document.sections } };
    }

    // Ready to respond?
    if (decision.should_respond && decision.response) {
      yield { type: 'response', data: { message: decision.response } };
      break;
    }

    // Execute tools
    for (const call of decision.tool_calls) {
      yield { type: 'tool_start', data: { tool: call.tool, params: call.params } };
      const result = await executeToolByName(call.tool, call.params, context);
      yield { type: 'tool_result', data: { tool: call.tool, result } };
    }

    yield { type: 'cost_update', data: { totalCost: state.totalCost } };
    state.passCount++;
  }

  yield { type: 'done', data: { passes: state.passCount, totalCost: state.totalCost } };
}

Wiring to Server-Sent Events

The API route consumes the async generator and converts each yielded event into an SSE message. The browser receives a stream of data: {...}\n\n lines and processes them as they arrive.

API route: generator → SSE stream

TypeScript
// app/api/workers/[employeeId]/route.ts

// Route handler (Next.js App Router: export this as the named HTTP method)
async function handleWorkerRequest(request: Request, { params }: { params: { employeeId: string } }) {
  const { message, chatHistory, stream } = await request.json();
  const worker = getWorkerById(params.employeeId);

  if (!stream) {
    // Non-streaming mode: run to completion and return JSON
    const result = await runWorker(worker, message, chatHistory, orgId, userId);
    return Response.json({ result });
  }

  // Streaming mode: wrap generator in ReadableStream
  const readable = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      try {
        // Consume the async generator
        for await (const event of streamWorker(worker, message, chatHistory, orgId, userId)) {
          // Format as SSE: "data: {...}\n\n"
          const sseMessage = `data: ${JSON.stringify(event)}\n\n`;
          controller.enqueue(encoder.encode(sseMessage));
        }
      } catch (error) {
        const errorEvent = { type: 'error', data: { message: 'An error occurred' } };
        controller.enqueue(encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`));
      } finally {
        controller.close();
      }
    }
  });

  return new Response(readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  });
}

Browser: consuming the SSE stream

TypeScript
// Frontend: consuming the SSE stream
async function sendMessage(message: string) {
  const response = await fetch('/api/workers/record-analyst', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message, chatHistory, stream: true }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split('\n').filter(line => line.startsWith('data: '));

    for (const line of lines) {
      const event = JSON.parse(line.slice(6)); // Remove "data: " prefix
      handleEvent(event); // Dispatch to UI state
    }
  }
}

function handleEvent(event: WorkerEvent) {
  switch (event.type) {
    case 'thinking':
      setThinkingText(prev => prev + event.data.thinking);
      break;
    case 'tool_start':
      addToolActivity({ tool: event.data.tool, status: 'running' });
      break;
    case 'tool_result':
      updateToolActivity(event.data.tool, { status: 'done', summary: event.data.result.summary });
      break;
    case 'response':
      setFinalResponse(event.data.message);
      break;
    case 'approval':
      showApprovalCard(event.data.approvals);
      break;
    case 'done':
      setSessionComplete({ passes: event.data.passes, cost: event.data.totalCost });
      break;
  }
}

All Event Types

The engine emits 12 event types. Each has a type string and a data payload. Here is the complete reference:

Event TypeTriggered WhenUI Behavior
pre_enrichment
Before loop startsShows "Gathering context..." banner with tool count
status
Each pass startUpdates "Pass N: analyzing..." status indicator
thinking
After AI decision parsedAccumulates into collapsible "View reasoning" panel
tool_start
Before each tool executesShows tool name as "running" in activity panel
tool_result
After each tool completesUpdates tool to "done" or "failed" with summary text
document_update
After living doc updateUpdates collapsible loop state panel with new sections
cost_update
After tool execution batchUpdates cost display ($0.04 / $1.50)
approval
Write tool requestedShows approval card with tool name, params, and Approve button
response
Final answer readyRenders Markdown response in the chat window
done
Loop exitsShows exit reason + final cost + pass count
error
Any unhandled errorShows error message with retry option
debug
Throughout (dev only)Shown in developer debug panel only, hidden in production

The Approval Flow — Pausing and Resuming

When the AI requests a write tool, the loop pauses and emits an approval event. The UI shows an approval card. When the user clicks "Approve," the frontend sends the approval IDs back to the API — which executes the approved tools directly, bypassing the loop entirely.

Approval flow — pause and resume

TypeScript
// 1. Engine detects write tool → pauses loop
if (tool.permissionLevel === 'write' || tool.permissionLevel === 'destructive') {
  state.pendingApprovals.push({
    id: generateId(),
    tool: call.tool,
    params: call.params,
    description: `Update record: ${call.params.recordId}`,
  });
}

// 2. Exit condition fires: pendingApprovals.length > 0
yield {
  type: 'approval',
  data: { approvals: state.pendingApprovals }
};
yield { type: 'done', data: { reason: 'approval_needed' } };
// Stream closes. User sees approval card.

// 3. User clicks "Approve" → frontend sends:
// POST /api/workers/record-analyst
// Body: { approvals: ['approval-id-1', 'approval-id-2'] }

// 4. API route detects approvals in body → bypass loop
if (body.approvals?.length > 0) {
  const results = await executeApprovedWorkerTools(worker, body.approvals, orgId, userId);
  return Response.json({ results });
  // No loop. No AI decision. Just execute the pre-approved tools.
}

Part 3: The EnginePart 5: API Layer