Agent Orchestration Patterns for Complex Tasks
Orchestration patterns for AI agents including sequential pipelines, parallel fan-out, hierarchical delegation, and DAG-based task graphs in Node.js.
Agent Orchestration Patterns for Complex Tasks
Overview
Agent orchestration is the discipline of breaking complex, multi-step AI tasks into smaller subtasks, routing them to specialized agents, managing dependencies between those subtasks, and assembling the results into a coherent output. Once your agent system grows beyond a single prompt-response cycle, orchestration becomes the critical infrastructure that determines whether your system is reliable or a house of cards. This article covers the core orchestration patterns I have used in production Node.js systems, from simple sequential pipelines to full DAG-based task graphs with failure recovery.
Prerequisites
- Solid understanding of Node.js asynchronous programming (Promises, callbacks)
- Familiarity with LLM API calls (OpenAI, Anthropic, or similar)
- Experience building at least one basic AI agent
- Working knowledge of event emitters and state management in Node.js
- Node.js v18 or later installed
What Agent Orchestration Means
A single LLM call can answer a question. But real-world tasks — writing a market analysis report, auditing a codebase, processing a complex customer request — require dozens of coordinated steps. Agent orchestration is the layer that decomposes a high-level goal into executable subtasks, decides the order of execution, manages state between steps, handles failures, and aggregates results.
Think of it like a project manager. The project manager does not write all the code, design all the graphics, and deploy all the servers. They break the project into work items, assign them to specialists, track progress, handle blockers, and deliver the final product. Your orchestrator does the same thing, except the "specialists" are LLM calls, tool invocations, or sub-agents.
The key responsibilities of an orchestrator are:
- Task decomposition — turning a high-level goal into discrete subtasks
- Dependency resolution — determining which tasks must complete before others can start
- Execution management — running tasks sequentially, in parallel, or in a dependency graph
- State propagation — passing outputs from completed tasks as inputs to downstream tasks
- Failure handling — retrying, skipping, or aborting when subtasks fail
- Progress monitoring — tracking cost, latency, and completion status
Sequential Pipelines
The simplest orchestration pattern is a sequential pipeline: task A completes, its output feeds into task B, which feeds into task C. This is appropriate when each step fundamentally depends on the output of the previous step.
var OpenAI = require("openai");
var client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
function runSequentialPipeline(topic) {
var context = { topic: topic };
return runStep("research", buildResearchPrompt(context))
.then(function (researchResult) {
context.research = researchResult;
return runStep("outline", buildOutlinePrompt(context));
})
.then(function (outlineResult) {
context.outline = outlineResult;
return runStep("draft", buildDraftPrompt(context));
})
.then(function (draftResult) {
context.draft = draftResult;
return runStep("review", buildReviewPrompt(context));
})
.then(function (reviewResult) {
context.review = reviewResult;
return context;
});
}
function runStep(name, prompt) {
console.log("[Pipeline] Starting step: " + name);
var startTime = Date.now();
return client.chat.completions
.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
max_tokens: 2000,
})
.then(function (response) {
var elapsed = Date.now() - startTime;
console.log("[Pipeline] Step " + name + " completed in " + elapsed + "ms");
return response.choices[0].message.content;
});
}
Sequential pipelines are easy to reason about and debug, but they are slow. Every step waits for the previous one. If your task has independent subtasks, you are leaving performance on the table.
Parallel Fan-Out / Fan-In
When subtasks are independent of each other, run them in parallel. The fan-out/fan-in pattern dispatches multiple tasks simultaneously, waits for all of them to complete, and then merges the results in a synthesis step.
function runParallelResearch(topic) {
var subtasks = [
{ name: "technical", prompt: "Research the technical aspects of: " + topic },
{ name: "market", prompt: "Research the market landscape for: " + topic },
{ name: "competitors", prompt: "Identify key competitors in: " + topic },
{ name: "risks", prompt: "Analyze risks and challenges for: " + topic },
];
// Fan-out: run all research tasks in parallel
var promises = subtasks.map(function (task) {
return runStep(task.name, task.prompt).then(function (result) {
return { name: task.name, result: result };
});
});
// Fan-in: collect all results, then synthesize
return Promise.all(promises).then(function (results) {
var combined = results
.map(function (r) {
return "## " + r.name + "\n" + r.result;
})
.join("\n\n");
return runStep(
"synthesize",
"Synthesize these research findings into a cohesive report:\n\n" + combined
);
});
}
Fan-out/fan-in cuts wall-clock time dramatically. Four parallel LLM calls that each take 3 seconds finish in 3 seconds total instead of 12. The trade-off is that you need to manage concurrency limits. Most LLM APIs have rate limits, and blasting 50 parallel requests will get you throttled. Add a concurrency limiter:
function limitConcurrency(tasks, maxConcurrent) {
var results = [];
var running = 0;
var index = 0;
return new Promise(function (resolve, reject) {
function next() {
while (running < maxConcurrent && index < tasks.length) {
var currentIndex = index;
index++;
running++;
tasks[currentIndex]()
.then(function (result) {
results[currentIndex] = result;
running--;
if (index >= tasks.length && running === 0) {
resolve(results);
} else {
next();
}
})
.catch(reject);
}
}
next();
});
}
Hierarchical Orchestration
In hierarchical orchestration, a manager agent delegates work to specialist worker agents. The manager decides what needs to be done, the workers execute, and the manager reviews and integrates results. This mirrors how human teams operate.
function ManagerAgent(goal) {
this.goal = goal;
this.workers = {};
this.results = {};
}
ManagerAgent.prototype.registerWorker = function (name, workerFn) {
this.workers[name] = workerFn;
};
ManagerAgent.prototype.planTasks = function () {
var self = this;
return client.chat.completions
.create({
model: "gpt-4o",
messages: [
{
role: "system",
content:
"You are a project manager. Given a goal, break it into subtasks. " +
"Available workers: " +
Object.keys(self.workers).join(", ") +
". " +
'Return a JSON array of {worker, task, dependsOn} objects. dependsOn is an array of task indices.',
},
{ role: "user", content: "Goal: " + self.goal },
],
response_format: { type: "json_object" },
})
.then(function (response) {
return JSON.parse(response.choices[0].message.content).tasks;
});
};
ManagerAgent.prototype.execute = function () {
var self = this;
return self.planTasks().then(function (tasks) {
console.log("[Manager] Planned " + tasks.length + " tasks");
return self.executeTasks(tasks);
});
};
ManagerAgent.prototype.executeTasks = function (tasks) {
var self = this;
var completed = {};
function executeTask(index) {
var task = tasks[index];
var workerFn = self.workers[task.worker];
if (!workerFn) {
return Promise.reject(new Error("Unknown worker: " + task.worker));
}
// Gather dependency outputs
var depOutputs = (task.dependsOn || []).map(function (depIdx) {
return completed[depIdx];
});
return workerFn(task.task, depOutputs).then(function (result) {
completed[index] = result;
return result;
});
}
// Simple topological execution
return tasks.reduce(function (chain, task, index) {
return chain.then(function () {
return executeTask(index);
});
}, Promise.resolve());
};
The hierarchical pattern is powerful because the manager agent can dynamically decide what tasks are needed. You do not hard-code the workflow; the LLM figures it out. The downside is that the planning step itself can fail or produce a bad plan.
DAG-Based Task Graphs
For complex workflows with intricate dependencies, model your tasks as a Directed Acyclic Graph (DAG). Each node is a task, and edges represent dependencies. A DAG executor processes tasks in topological order, running independent tasks in parallel while respecting dependency constraints.
function TaskGraph() {
this.tasks = {};
this.edges = [];
}
TaskGraph.prototype.addTask = function (id, executeFn, options) {
this.tasks[id] = {
id: id,
execute: executeFn,
dependsOn: (options && options.dependsOn) || [],
status: "pending",
result: null,
retries: (options && options.retries) || 0,
maxRetries: (options && options.maxRetries) || 2,
timeout: (options && options.timeout) || 30000,
};
};
TaskGraph.prototype.getReadyTasks = function () {
var self = this;
var ready = [];
Object.keys(self.tasks).forEach(function (id) {
var task = self.tasks[id];
if (task.status !== "pending") return;
var depsComplete = task.dependsOn.every(function (depId) {
return self.tasks[depId] && self.tasks[depId].status === "completed";
});
var depsFailed = task.dependsOn.some(function (depId) {
return self.tasks[depId] && self.tasks[depId].status === "failed";
});
if (depsFailed) {
task.status = "skipped";
} else if (depsComplete) {
ready.push(task);
}
});
return ready;
};
TaskGraph.prototype.execute = function (onProgress) {
var self = this;
return new Promise(function (resolve, reject) {
function tick() {
var ready = self.getReadyTasks();
var allDone = Object.keys(self.tasks).every(function (id) {
var s = self.tasks[id].status;
return s === "completed" || s === "failed" || s === "skipped";
});
if (allDone) {
resolve(self.getResults());
return;
}
if (ready.length === 0) {
// Nothing ready but not all done — check for deadlock
var pendingCount = Object.keys(self.tasks).filter(function (id) {
return self.tasks[id].status === "pending";
}).length;
if (pendingCount > 0) {
reject(new Error("Deadlock detected: " + pendingCount + " tasks stuck"));
}
return;
}
var promises = ready.map(function (task) {
task.status = "running";
if (onProgress) onProgress(task.id, "running");
return self.executeWithTimeout(task).then(
function (result) {
task.status = "completed";
task.result = result;
if (onProgress) onProgress(task.id, "completed");
},
function (err) {
if (task.retries < task.maxRetries) {
task.retries++;
task.status = "pending";
console.log(
"[DAG] Retrying " + task.id + " (attempt " + (task.retries + 1) + ")"
);
} else {
task.status = "failed";
task.result = err;
if (onProgress) onProgress(task.id, "failed");
}
}
);
});
Promise.all(promises).then(tick).catch(reject);
}
tick();
});
};
TaskGraph.prototype.executeWithTimeout = function (task) {
var depResults = {};
var self = this;
task.dependsOn.forEach(function (depId) {
depResults[depId] = self.tasks[depId].result;
});
return new Promise(function (resolve, reject) {
var timer = setTimeout(function () {
reject(new Error("Task " + task.id + " timed out after " + task.timeout + "ms"));
}, task.timeout);
task
.execute(depResults)
.then(function (result) {
clearTimeout(timer);
resolve(result);
})
.catch(function (err) {
clearTimeout(timer);
reject(err);
});
});
};
TaskGraph.prototype.getResults = function () {
var results = {};
var self = this;
Object.keys(self.tasks).forEach(function (id) {
results[id] = {
status: self.tasks[id].status,
result: self.tasks[id].result,
};
});
return results;
};
This DAG executor handles parallel execution, dependency resolution, retries, timeouts, and deadlock detection. It is the backbone of any serious orchestration system.
State Machines for Agent Workflows
For workflows with conditional branching — where the next step depends on the result of the current step — model your orchestration as a state machine. Each state represents a phase of the workflow, and transitions are triggered by the output of agent actions.
function AgentStateMachine(config) {
this.states = config.states;
this.currentState = config.initialState;
this.context = config.context || {};
this.history = [];
this.maxTransitions = config.maxTransitions || 50;
this.transitionCount = 0;
}
AgentStateMachine.prototype.run = function () {
var self = this;
function step() {
if (self.transitionCount >= self.maxTransitions) {
return Promise.reject(
new Error("Max transitions (" + self.maxTransitions + ") exceeded. Possible infinite loop.")
);
}
var state = self.states[self.currentState];
if (!state) {
return Promise.reject(new Error("Unknown state: " + self.currentState));
}
if (state.terminal) {
return Promise.resolve(self.context);
}
self.transitionCount++;
self.history.push(self.currentState);
console.log("[SM] State: " + self.currentState + " (transition " + self.transitionCount + ")");
return state.action(self.context).then(function (result) {
self.context = Object.assign({}, self.context, result.contextUpdates || {});
self.currentState = result.nextState;
return step();
});
}
return step();
};
// Example: a research workflow with conditional branching
var workflow = new AgentStateMachine({
initialState: "analyze_query",
context: { query: "Compare React vs Vue for enterprise applications" },
states: {
analyze_query: {
action: function (ctx) {
return classifyQuery(ctx.query).then(function (classification) {
if (classification.needsResearch) {
return { nextState: "deep_research", contextUpdates: { classification: classification } };
}
return { nextState: "quick_answer", contextUpdates: { classification: classification } };
});
},
},
deep_research: {
action: function (ctx) {
return runParallelResearch(ctx.query).then(function (research) {
return { nextState: "validate", contextUpdates: { research: research } };
});
},
},
validate: {
action: function (ctx) {
return validateFindings(ctx.research).then(function (validation) {
if (validation.confident) {
return { nextState: "compose_report", contextUpdates: { validation: validation } };
}
return { nextState: "deep_research", contextUpdates: { retryReason: validation.gaps } };
});
},
},
quick_answer: {
action: function (ctx) {
return generateQuickAnswer(ctx.query).then(function (answer) {
return { nextState: "done", contextUpdates: { finalAnswer: answer } };
});
},
},
compose_report: {
action: function (ctx) {
return composeReport(ctx).then(function (report) {
return { nextState: "done", contextUpdates: { finalReport: report } };
});
},
},
done: { terminal: true },
},
});
The state machine pattern gives you explicit control over workflow branching, and the transition history is invaluable for debugging. The maxTransitions guard prevents infinite loops when validation keeps failing and sending the workflow back to the research state.
Handling Subtask Failures and Partial Completion
In production orchestration, failures are not exceptional — they are expected. LLM calls fail due to rate limits, timeouts, malformed outputs, and content filters. Your orchestrator must handle partial completion gracefully.
function ResilientOrchestrator(options) {
this.maxRetries = (options && options.maxRetries) || 3;
this.retryDelay = (options && options.retryDelay) || 1000;
this.partialResultsAllowed = (options && options.partialResultsAllowed) || false;
}
ResilientOrchestrator.prototype.executeWithRetry = function (taskFn, taskName) {
var self = this;
var attempts = 0;
function attempt() {
attempts++;
return taskFn().catch(function (err) {
console.error(
"[Orchestrator] " + taskName + " failed (attempt " + attempts + "): " + err.message
);
if (attempts >= self.maxRetries) {
if (self.partialResultsAllowed) {
console.warn("[Orchestrator] " + taskName + " exhausted retries, returning null");
return null;
}
throw err;
}
var delay = self.retryDelay * Math.pow(2, attempts - 1); // Exponential backoff
console.log("[Orchestrator] Retrying " + taskName + " in " + delay + "ms");
return new Promise(function (resolve) {
setTimeout(resolve, delay);
}).then(attempt);
});
}
return attempt();
};
ResilientOrchestrator.prototype.executeParallel = function (tasks) {
var self = this;
var promises = tasks.map(function (task) {
return self.executeWithRetry(task.fn, task.name).then(function (result) {
return { name: task.name, status: "success", result: result };
}).catch(function (err) {
return { name: task.name, status: "failed", error: err.message };
});
});
return Promise.all(promises).then(function (results) {
var succeeded = results.filter(function (r) { return r.status === "success"; });
var failed = results.filter(function (r) { return r.status === "failed"; });
console.log(
"[Orchestrator] " + succeeded.length + "/" + results.length + " tasks succeeded"
);
if (failed.length > 0) {
console.warn("[Orchestrator] Failed tasks: " +
failed.map(function (f) { return f.name; }).join(", ")
);
}
return { succeeded: succeeded, failed: failed, all: results };
});
};
The key design decision is whether partial results are acceptable. For a research report, losing one of four research threads is tolerable — you can synthesize from three. For a financial transaction pipeline, any failure should abort the entire workflow.
Timeout Management for Multi-Step Agents
Multi-step agent workflows can run for minutes. Without timeout management, a single stuck LLM call can block your entire pipeline indefinitely.
function TimeoutManager() {
this.taskTimers = {};
this.globalTimer = null;
this.globalTimeout = 300000; // 5 minutes total
}
TimeoutManager.prototype.startGlobal = function (timeoutMs) {
var self = this;
self.globalTimeout = timeoutMs || self.globalTimeout;
self.startTime = Date.now();
return new Promise(function (resolve, reject) {
self.globalTimer = setTimeout(function () {
reject(new Error(
"Global orchestration timeout: exceeded " + self.globalTimeout + "ms"
));
}, self.globalTimeout);
self.resolveGlobal = resolve;
});
};
TimeoutManager.prototype.getRemainingTime = function () {
return Math.max(0, this.globalTimeout - (Date.now() - this.startTime));
};
TimeoutManager.prototype.wrapTask = function (taskFn, taskName, timeoutMs) {
var self = this;
var remaining = self.getRemainingTime();
var effectiveTimeout = Math.min(timeoutMs || 30000, remaining);
if (remaining <= 0) {
return Promise.reject(new Error("No time remaining for task: " + taskName));
}
return new Promise(function (resolve, reject) {
var timer = setTimeout(function () {
reject(new Error(
"Task '" + taskName + "' timed out after " + effectiveTimeout + "ms"
));
}, effectiveTimeout);
taskFn()
.then(function (result) {
clearTimeout(timer);
resolve(result);
})
.catch(function (err) {
clearTimeout(timer);
reject(err);
});
});
};
TimeoutManager.prototype.cleanup = function () {
if (this.globalTimer) clearTimeout(this.globalTimer);
var self = this;
Object.keys(self.taskTimers).forEach(function (key) {
clearTimeout(self.taskTimers[key]);
});
};
The critical insight here is effective timeout calculation. Each subtask gets the minimum of its own timeout and the remaining global time. If you have 30 seconds left globally, a task configured for a 60-second timeout should only get 30 seconds. Without this, you risk exceeding your global budget on a single slow task.
Dynamic Task Planning
In some workflows, the agent should decide at runtime what to do next based on intermediate results. This is dynamic task planning — the opposite of static DAGs.
function DynamicPlanner(options) {
this.model = (options && options.model) || "gpt-4o";
this.maxSteps = (options && options.maxSteps) || 20;
this.availableTools = options.tools || [];
this.stepCount = 0;
this.history = [];
}
DynamicPlanner.prototype.plan = function (goal, currentContext) {
var self = this;
var toolDescriptions = self.availableTools
.map(function (t) { return t.name + ": " + t.description; })
.join("\n");
return client.chat.completions
.create({
model: self.model,
messages: [
{
role: "system",
content:
"You are a task planner. Given a goal and current context, decide the next action.\n" +
"Available tools:\n" + toolDescriptions + "\n\n" +
'Return JSON: {"action": "tool_name", "input": "...", "reasoning": "...", "isComplete": false}\n' +
'Set isComplete to true when the goal is achieved.',
},
{
role: "user",
content:
"Goal: " + goal +
"\n\nCurrent context:\n" + JSON.stringify(currentContext, null, 2) +
"\n\nPrevious steps:\n" +
self.history.map(function (h) { return h.action + ": " + h.summary; }).join("\n"),
},
],
response_format: { type: "json_object" },
})
.then(function (response) {
return JSON.parse(response.choices[0].message.content);
});
};
DynamicPlanner.prototype.execute = function (goal, initialContext) {
var self = this;
var context = Object.assign({}, initialContext);
function step() {
if (self.stepCount >= self.maxSteps) {
return Promise.resolve({ status: "max_steps_reached", context: context });
}
self.stepCount++;
return self.plan(goal, context).then(function (plan) {
if (plan.isComplete) {
return { status: "completed", context: context };
}
var tool = self.availableTools.find(function (t) { return t.name === plan.action; });
if (!tool) {
return Promise.reject(new Error("Planner selected unknown tool: " + plan.action));
}
console.log("[Planner] Step " + self.stepCount + ": " + plan.action + " — " + plan.reasoning);
return tool.execute(plan.input, context).then(function (result) {
self.history.push({ action: plan.action, summary: result.summary || "done" });
context = Object.assign({}, context, result.contextUpdates || {});
return step();
});
});
}
return step();
};
Dynamic planning is powerful but expensive. Every step requires a planning LLM call in addition to the execution LLM call. Use it when the workflow truly cannot be determined in advance — for example, when an agent needs to explore a codebase where it does not know the structure up front.
Passing Context Between Orchestrated Steps
Context management is where orchestration systems get messy. Naively passing the entire context to every step bloats token usage and cost. Be deliberate about what context each step receives.
function ContextManager() {
this.store = {};
this.accessLog = [];
}
ContextManager.prototype.set = function (key, value, metadata) {
this.store[key] = {
value: value,
metadata: metadata || {},
createdAt: Date.now(),
tokenEstimate: Math.ceil(JSON.stringify(value).length / 4),
};
};
ContextManager.prototype.get = function (key) {
if (this.store[key]) {
this.accessLog.push({ key: key, time: Date.now() });
return this.store[key].value;
}
return undefined;
};
ContextManager.prototype.getForTask = function (taskId, requiredKeys) {
var self = this;
var subset = {};
requiredKeys.forEach(function (key) {
var value = self.get(key);
if (value !== undefined) {
subset[key] = value;
}
});
return subset;
};
ContextManager.prototype.getTokenBudget = function () {
var total = 0;
var self = this;
Object.keys(self.store).forEach(function (key) {
total += self.store[key].tokenEstimate;
});
return total;
};
ContextManager.prototype.summarize = function (key, summaryFn) {
var self = this;
var entry = self.store[key];
if (!entry) return Promise.resolve();
return summaryFn(entry.value).then(function (summary) {
self.store[key] = {
value: summary,
metadata: Object.assign({}, entry.metadata, { summarized: true }),
createdAt: entry.createdAt,
tokenEstimate: Math.ceil(JSON.stringify(summary).length / 4),
};
});
};
The getForTask method is the critical pattern: each task declares what context keys it needs, and the context manager provides only those values. This keeps prompts focused and costs down. The summarize method compresses verbose intermediate results when they are going to be passed downstream — a full search result set might be 10,000 tokens, but a summary of the key findings could be 500.
Monitoring Orchestration Progress and Cost
In production, you need visibility into what your orchestration is doing, how much it is costing, and where bottlenecks are.
function OrchestrationMonitor() {
this.tasks = {};
this.totalInputTokens = 0;
this.totalOutputTokens = 0;
this.startTime = null;
}
OrchestrationMonitor.prototype.start = function () {
this.startTime = Date.now();
};
OrchestrationMonitor.prototype.recordTask = function (taskId, data) {
this.tasks[taskId] = {
status: data.status,
startTime: data.startTime,
endTime: data.endTime,
duration: data.endTime - data.startTime,
inputTokens: data.inputTokens || 0,
outputTokens: data.outputTokens || 0,
model: data.model,
retries: data.retries || 0,
error: data.error || null,
};
this.totalInputTokens += data.inputTokens || 0;
this.totalOutputTokens += data.outputTokens || 0;
};
OrchestrationMonitor.prototype.getReport = function () {
var self = this;
var elapsed = Date.now() - self.startTime;
var taskList = Object.keys(self.tasks);
var failedTasks = taskList.filter(function (id) {
return self.tasks[id].status === "failed";
});
var totalRetries = taskList.reduce(function (sum, id) {
return sum + self.tasks[id].retries;
}, 0);
// Approximate cost calculation (GPT-4o pricing as example)
var inputCost = (self.totalInputTokens / 1000000) * 2.50;
var outputCost = (self.totalOutputTokens / 1000000) * 10.00;
return {
totalDuration: elapsed,
totalTasks: taskList.length,
completedTasks: taskList.length - failedTasks.length,
failedTasks: failedTasks.length,
totalRetries: totalRetries,
totalInputTokens: self.totalInputTokens,
totalOutputTokens: self.totalOutputTokens,
estimatedCost: (inputCost + outputCost).toFixed(4),
tasks: self.tasks,
};
};
Log the report at the end of every orchestration run. In my experience, cost accumulation is the number one surprise in agent systems. A seemingly simple research task can make 20+ LLM calls and cost $0.50 per execution. Multiply that by user volume and you have a problem.
Complete Working Example
Here is a full orchestrator that decomposes a research task into parallel subtasks, manages dependencies, handles failures, and produces a final report.
var OpenAI = require("openai");
var EventEmitter = require("events");
var client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// ─── Research Orchestrator ───
function ResearchOrchestrator(options) {
EventEmitter.call(this);
this.model = (options && options.model) || "gpt-4o";
this.maxConcurrency = (options && options.maxConcurrency) || 3;
this.taskTimeout = (options && options.taskTimeout) || 45000;
this.globalTimeout = (options && options.globalTimeout) || 180000;
this.maxRetries = (options && options.maxRetries) || 2;
this.monitor = new OrchestrationMonitor();
this.context = new ContextManager();
}
ResearchOrchestrator.prototype = Object.create(EventEmitter.prototype);
ResearchOrchestrator.prototype.constructor = ResearchOrchestrator;
ResearchOrchestrator.prototype.research = function (topic) {
var self = this;
self.monitor.start();
self.context.set("topic", topic);
console.log("[Orchestrator] Starting research on: " + topic);
var graph = new TaskGraph();
// Phase 1: Parallel research tasks (no dependencies)
graph.addTask("search_web", function () {
return self.callLLM(
"search_web",
"You are a web research specialist. Provide detailed technical findings on: " + topic +
". Include specific facts, statistics, and recent developments.",
{ timeout: self.taskTimeout }
);
}, { maxRetries: self.maxRetries, timeout: self.taskTimeout });
graph.addTask("search_academic", function () {
return self.callLLM(
"search_academic",
"You are an academic research specialist. Provide scholarly analysis and theoretical " +
"foundations on: " + topic + ". Reference key papers and methodologies.",
{ timeout: self.taskTimeout }
);
}, { maxRetries: self.maxRetries, timeout: self.taskTimeout });
graph.addTask("search_practical", function () {
return self.callLLM(
"search_practical",
"You are a practitioner with hands-on experience. Provide real-world case studies, " +
"lessons learned, and practical advice on: " + topic,
{ timeout: self.taskTimeout }
);
}, { maxRetries: self.maxRetries, timeout: self.taskTimeout });
// Phase 2: Summarize each research thread (depends on corresponding search)
graph.addTask("summarize_web", function (deps) {
return self.callLLM(
"summarize_web",
"Summarize the key findings in 3-5 bullet points:\n\n" + deps.search_web
);
}, { dependsOn: ["search_web"], timeout: self.taskTimeout });
graph.addTask("summarize_academic", function (deps) {
return self.callLLM(
"summarize_academic",
"Summarize the key findings in 3-5 bullet points:\n\n" + deps.search_academic
);
}, { dependsOn: ["search_academic"], timeout: self.taskTimeout });
graph.addTask("summarize_practical", function (deps) {
return self.callLLM(
"summarize_practical",
"Summarize the key findings in 3-5 bullet points:\n\n" + deps.search_practical
);
}, { dependsOn: ["search_practical"], timeout: self.taskTimeout });
// Phase 3: Synthesize all summaries into a final report
graph.addTask("synthesize", function (deps) {
var combined =
"## Web Research\n" + deps.summarize_web +
"\n\n## Academic Research\n" + deps.summarize_academic +
"\n\n## Practical Experience\n" + deps.summarize_practical;
return self.callLLM(
"synthesize",
"You are a senior analyst. Synthesize these research findings into a comprehensive, " +
"well-structured report with an executive summary, key findings, analysis, and " +
"recommendations. Topic: " + topic + "\n\n" + combined,
{ maxTokens: 3000 }
);
}, {
dependsOn: ["summarize_web", "summarize_academic", "summarize_practical"],
timeout: 60000,
});
// Execute the graph
return graph
.execute(function (taskId, status) {
self.emit("progress", { taskId: taskId, status: status });
console.log("[Orchestrator] " + taskId + " -> " + status);
})
.then(function (results) {
var report = self.monitor.getReport();
console.log("\n[Orchestrator] Complete. Cost: $" + report.estimatedCost);
console.log("[Orchestrator] Duration: " + report.totalDuration + "ms");
console.log(
"[Orchestrator] Tasks: " + report.completedTasks + "/" + report.totalTasks + " succeeded"
);
return {
report: results.synthesize ? results.synthesize.result : null,
allResults: results,
monitoring: report,
};
})
.catch(function (err) {
console.error("[Orchestrator] Fatal error: " + err.message);
var report = self.monitor.getReport();
return {
error: err.message,
partialResults: report.tasks,
monitoring: report,
};
});
};
ResearchOrchestrator.prototype.callLLM = function (taskName, prompt, options) {
var self = this;
var maxTokens = (options && options.maxTokens) || 1500;
var startTime = Date.now();
return client.chat.completions
.create({
model: self.model,
messages: [{ role: "user", content: prompt }],
max_tokens: maxTokens,
})
.then(function (response) {
var endTime = Date.now();
var usage = response.usage || {};
self.monitor.recordTask(taskName, {
status: "completed",
startTime: startTime,
endTime: endTime,
inputTokens: usage.prompt_tokens || 0,
outputTokens: usage.completion_tokens || 0,
model: self.model,
});
return response.choices[0].message.content;
})
.catch(function (err) {
var endTime = Date.now();
self.monitor.recordTask(taskName, {
status: "failed",
startTime: startTime,
endTime: endTime,
model: self.model,
error: err.message,
});
throw err;
});
};
// ─── Usage ───
var orchestrator = new ResearchOrchestrator({
model: "gpt-4o",
maxConcurrency: 3,
taskTimeout: 45000,
globalTimeout: 180000,
});
orchestrator.on("progress", function (event) {
// Hook into a dashboard, logging system, or WebSocket
console.log("Progress: " + event.taskId + " is " + event.status);
});
orchestrator
.research("The impact of large language models on software engineering practices")
.then(function (result) {
if (result.error) {
console.error("Research failed: " + result.error);
} else {
console.log("\n=== FINAL REPORT ===\n");
console.log(result.report);
console.log("\n=== COST REPORT ===");
console.log("Total cost: $" + result.monitoring.estimatedCost);
console.log("Total tokens: " + (result.monitoring.totalInputTokens + result.monitoring.totalOutputTokens));
}
});
Common Issues and Troubleshooting
1. Rate Limit Errors During Parallel Execution
Error: 429 Too Many Requests
{"error":{"message":"Rate limit reached for gpt-4o on tokens per min","type":"rate_limit_error"}}
This happens when your fan-out dispatches more parallel requests than your API tier allows. The fix is to implement a concurrency limiter (shown earlier) and add exponential backoff. Set maxConcurrency to a value below your rate limit. For OpenAI Tier 1, keep it at 3 or fewer for GPT-4o.
2. Context Window Overflow in Synthesis Steps
Error: 400 Bad Request
{"error":{"message":"This model's maximum context length is 128000 tokens. However, your messages resulted in 134221 tokens."}}
When you fan out to many research tasks and then try to synthesize all results in one prompt, the combined context can exceed the model's token limit. Fix this by adding a summarization step between research and synthesis. Each research result gets condensed to its key points before being passed to the final synthesis.
3. Deadlock in DAG Execution
Error: Deadlock detected: 3 tasks stuck
This occurs when tasks have circular dependencies or depend on a task that was never added to the graph. Before executing a DAG, validate that all dependency references point to tasks that exist in the graph and that no cycles are present. Add a validation step:
TaskGraph.prototype.validate = function () {
var self = this;
var ids = Object.keys(self.tasks);
ids.forEach(function (id) {
self.tasks[id].dependsOn.forEach(function (depId) {
if (!self.tasks[depId]) {
throw new Error("Task '" + id + "' depends on unknown task '" + depId + "'");
}
});
});
// Simple cycle detection using DFS
var visited = {};
var recursionStack = {};
function hasCycle(id) {
visited[id] = true;
recursionStack[id] = true;
var deps = self.tasks[id].dependsOn;
for (var i = 0; i < deps.length; i++) {
if (!visited[deps[i]]) {
if (hasCycle(deps[i])) return true;
} else if (recursionStack[deps[i]]) {
return true;
}
}
recursionStack[id] = false;
return false;
}
ids.forEach(function (id) {
if (!visited[id] && hasCycle(id)) {
throw new Error("Cycle detected in task graph involving task: " + id);
}
});
};
4. JSON Parse Failures from LLM Planning Responses
SyntaxError: Unexpected token 'I' at position 0
at JSON.parse (<anonymous>)
When you ask an LLM to return JSON for dynamic planning, it sometimes returns prose instead — especially when it is confused about the task. Even with response_format: { type: "json_object" }, some models occasionally wrap JSON in markdown code fences. Add defensive parsing:
function parseAgentJSON(raw) {
// Strip markdown code fences if present
var cleaned = raw.replace(/^```(?:json)?\s*/m, "").replace(/\s*```$/m, "").trim();
try {
return JSON.parse(cleaned);
} catch (e) {
// Try to extract JSON object from surrounding text
var match = cleaned.match(/\{[\s\S]*\}/);
if (match) {
return JSON.parse(match[0]);
}
throw new Error("Failed to parse agent response as JSON: " + raw.substring(0, 200));
}
}
5. Memory Leaks from Uncleared Timeouts
If your orchestrator creates timeouts for each task but does not clean them up on early exit (e.g., global timeout fires while task timeouts are pending), you get memory leaks. Always call clearTimeout in both success and error paths, and implement a cleanup method that fires on any orchestration exit.
Best Practices
Set explicit budgets for every orchestration: Define maximum cost, maximum wall-clock time, and maximum number of LLM calls before the orchestration starts. Kill the run if any budget is exceeded. Runaway agent loops are the most common source of unexpected LLM bills.
Use the cheapest model that works for each subtask: Not every step needs GPT-4o. Use GPT-4o-mini or Claude Haiku for simple extraction, classification, and summarization. Reserve the expensive models for reasoning-heavy tasks like planning and synthesis. This can cut costs by 80% or more.
Make every task idempotent: If a task fails and retries, it should produce the same result without side effects. Avoid tasks that write to databases or call external APIs as intermediate steps — batch side effects at the end of the pipeline where possible.
Log all LLM inputs and outputs: Store the full prompt and response for every LLM call in your orchestration. This is essential for debugging, cost accounting, and improving prompts. In production, write these to a structured log with the orchestration run ID, task ID, model, token counts, and latency.
Design for partial failure from the start: Assume that any subtask can fail. Decide up front whether each task is critical (failure aborts the run) or optional (failure returns null and downstream tasks adapt). Encode this in your task configuration, not in ad hoc error handling.
Keep context windows lean: Do not pass the full output of every previous task to every subsequent task. Use a context manager that provides only the keys each task needs. Summarize verbose intermediate results before passing them downstream.
Validate DAGs before execution: Check for missing dependency references and cycles before starting execution. A cycle in your task graph will cause a deadlock that is very hard to debug if you do not check for it explicitly.
Implement circuit breakers for external services: If an LLM API starts failing consistently, stop retrying and fail fast. A circuit breaker that opens after 3 consecutive failures and stays open for 60 seconds prevents you from burning through your retry budget during an outage.
Test orchestrations with mock LLM calls: Write deterministic mock implementations of your LLM call function that return canned responses. Run your orchestration logic against these mocks in automated tests. You cannot afford to hit the real API for every test run, and non-deterministic tests are useless.
Version your orchestration configurations: Treat task graphs, prompt templates, and model selections as configuration that is versioned and tracked. When a research report quality drops, you need to know whether someone changed a prompt, switched a model, or modified the task graph.
References
- OpenAI API Reference: Chat Completions - Official docs for the LLM calls used in all examples
- Node.js Events Documentation - EventEmitter used in the orchestrator for progress monitoring
- Directed Acyclic Graphs (Wikipedia) - Theory behind DAG-based task execution
- Anthropic: Building Effective Agents - Patterns for building reliable agent systems
- LangChain LangGraph - Python framework implementing similar orchestration patterns
- Temporal.io Workflow Engine - Production-grade workflow orchestration for long-running processes