Use Cases

Workflow Automation with AI Agents

Build intelligent workflow automation with AI decision nodes, external service integration, and parallel execution in Node.js.

Workflow Automation with AI Agents

Overview

Workflow automation has existed for decades, but AI agents fundamentally change what is automatable. Instead of hard-coding every conditional branch and edge case into a rules engine, you can now insert LLM decision nodes that evaluate context, reason about ambiguity, and route work intelligently. This article walks through building a production-grade AI workflow engine in Node.js — from defining workflows in a JSON DSL to executing parallel steps, integrating external services, and tracking costs per execution.

Prerequisites

  • Node.js 18+ installed
  • An OpenAI API key (or any LLM provider — we use OpenAI here but the pattern is provider-agnostic)
  • Familiarity with Express.js and async/await patterns
  • Basic understanding of state machines and event-driven architecture
  • A Slack workspace with a bot token (for the integration examples)
  • PostgreSQL or MongoDB for workflow state persistence

The Evolution from Rule-Based to AI-Driven Workflow Automation

Traditional workflow engines like Camunda, Airflow, and n8n rely on explicit conditional logic. You define every branch, every threshold, every exception. This works until you encounter the real world:

  • A customer support ticket that doesn't fit your taxonomy
  • An invoice approval where the amount is borderline and context matters
  • A content moderation decision that requires nuance beyond keyword matching

Rule-based systems handle 80% of cases well. The remaining 20% get routed to human queues, creating bottlenecks. AI decision nodes address exactly this gap — they evaluate the 20% that rules cannot handle, while the deterministic 80% continues to execute without burning API tokens.

I have shipped systems where adding a single AI decision node eliminated a 200-ticket daily human review queue. The key insight is not replacing rules with AI everywhere, but inserting AI precisely where rules break down.

Designing Intelligent Workflows That Adapt to Context

An intelligent workflow is not just a flowchart with an LLM call bolted on. It is a system that modifies its own execution path based on context accumulated across steps. Each node can enrich the workflow context, and downstream nodes — including AI nodes — consume that enriched context to make better decisions.

The core data model looks like this:

var workflowContext = {
  workflowId: "wf_abc123",
  executionId: "exec_789",
  startedAt: new Date().toISOString(),
  currentStep: "evaluate_ticket",
  status: "running",
  variables: {
    ticketText: "My API returns 500 errors intermittently under load",
    customerTier: "enterprise",
    previousTickets: 12,
    accountValue: 85000
  },
  stepResults: {},
  costAccumulator: 0.0,
  errors: []
};

Every step writes its output into stepResults, and every subsequent step can read from the full context. AI nodes get the complete picture — not just their immediate input, but the entire execution history.

Implementing a Workflow Definition DSL in JSON

Workflows should be data, not code. When workflows are defined in JSON, non-engineers can author them, version control tracks changes cleanly, and you can generate them programmatically. Here is the DSL I use in production:

var ticketTriageWorkflow = {
  id: "ticket-triage-v3",
  name: "Customer Ticket Triage",
  version: 3,
  trigger: { type: "webhook", path: "/webhooks/support-ticket" },
  steps: [
    {
      id: "classify",
      type: "ai-decision",
      config: {
        model: "gpt-4o-mini",
        prompt: "Classify this support ticket into one of: billing, technical, feature-request, urgent-outage. Ticket: {{variables.ticketText}}",
        outputVariable: "ticketCategory",
        outputFormat: "enum",
        allowedValues: ["billing", "technical", "feature-request", "urgent-outage"]
      },
      next: {
        "billing": "route-billing",
        "technical": "check-severity",
        "feature-request": "log-feature",
        "urgent-outage": "page-oncall"
      }
    },
    {
      id: "check-severity",
      type: "ai-decision",
      config: {
        model: "gpt-4o-mini",
        prompt: "Rate the severity of this technical issue from 1-5. Consider the customer tier ({{variables.customerTier}}) and account value (${{variables.accountValue}}). Issue: {{variables.ticketText}}",
        outputVariable: "severity",
        outputFormat: "number",
        min: 1,
        max: 5
      },
      next: {
        "default": "assign-engineer"
      }
    },
    {
      id: "page-oncall",
      type: "service",
      config: {
        service: "slack",
        action: "sendMessage",
        channel: "#oncall-alerts",
        message: "URGENT: {{variables.ticketText}} | Customer: {{variables.customerTier}}"
      },
      next: { "default": "create-incident" }
    },
    {
      id: "assign-engineer",
      type: "service",
      config: {
        service: "ticketSystem",
        action: "assign",
        queue: "{{stepResults.classify.ticketCategory}}",
        priority: "{{stepResults.check-severity.severity}}"
      },
      next: { "default": "notify-customer" }
    },
    {
      id: "notify-customer",
      type: "service",
      config: {
        service: "email",
        action: "send",
        template: "ticket-acknowledged",
        to: "{{variables.customerEmail}}"
      },
      next: { "default": "end" }
    }
  ]
};

The DSL supports four step types: ai-decision (LLM evaluates and returns structured output), service (calls an external integration), transform (manipulates data without external calls), and parallel (executes multiple branches concurrently).

AI Decision Nodes: LLM Evaluates Conditions and Picks Branches

The AI decision node is the heart of the engine. It takes a prompt template, interpolates workflow context, calls the LLM, parses the response, and routes to the appropriate next step.

var OpenAI = require("openai");

var openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

function executeAIDecision(step, context) {
  var prompt = interpolateTemplate(step.config.prompt, context);

  var systemPrompt = "You are a workflow decision engine. Respond with ONLY the requested value. No explanation, no formatting, no markdown. Just the raw value.";

  if (step.config.outputFormat === "enum") {
    systemPrompt += " Allowed values: " + step.config.allowedValues.join(", ");
  }

  return openai.chat.completions.create({
    model: step.config.model || "gpt-4o-mini",
    messages: [
      { role: "system", content: systemPrompt },
      { role: "user", content: prompt }
    ],
    max_tokens: 100,
    temperature: 0.1
  }).then(function(response) {
    var rawOutput = response.choices[0].message.content.trim();
    var parsed = parseOutput(rawOutput, step.config);
    var tokenCost = calculateCost(response.usage, step.config.model);

    return {
      stepId: step.id,
      output: parsed,
      rawOutput: rawOutput,
      tokenUsage: response.usage,
      cost: tokenCost,
      nextStep: resolveNextStep(step, parsed)
    };
  });
}

function parseOutput(raw, config) {
  if (config.outputFormat === "enum") {
    var normalized = raw.toLowerCase().replace(/[^a-z0-9-]/g, "");
    if (config.allowedValues.indexOf(normalized) === -1) {
      throw new Error("AI returned invalid enum value: " + raw + ". Allowed: " + config.allowedValues.join(", "));
    }
    return normalized;
  }
  if (config.outputFormat === "number") {
    var num = parseFloat(raw);
    if (isNaN(num) || num < config.min || num > config.max) {
      throw new Error("AI returned invalid number: " + raw + ". Expected " + config.min + "-" + config.max);
    }
    return num;
  }
  if (config.outputFormat === "json") {
    return JSON.parse(raw);
  }
  return raw;
}

function resolveNextStep(step, output) {
  var outputStr = String(output);
  if (step.next[outputStr]) {
    return step.next[outputStr];
  }
  if (step.next["default"]) {
    return step.next["default"];
  }
  throw new Error("No matching next step for output: " + outputStr + " in step " + step.id);
}

function interpolateTemplate(template, context) {
  return template.replace(/\{\{([^}]+)\}\}/g, function(match, path) {
    var parts = path.trim().split(".");
    var value = context;
    for (var i = 0; i < parts.length; i++) {
      value = value[parts[i]];
      if (value === undefined) return match;
    }
    return String(value);
  });
}

I set temperature: 0.1 for decision nodes because you want consistency, not creativity. If the same ticket comes in twice, it should be classified the same way both times.

Dynamic Workflow Generation from Natural Language

One of the most powerful capabilities is having the LLM generate workflow definitions from natural language descriptions. A product manager describes what they need, and the engine produces an executable workflow:

function generateWorkflow(description) {
  var schemaExample = JSON.stringify({
    id: "example",
    name: "Example Workflow",
    version: 1,
    trigger: { type: "manual" },
    steps: [
      {
        id: "step1",
        type: "ai-decision",
        config: { model: "gpt-4o-mini", prompt: "...", outputVariable: "result", outputFormat: "enum", allowedValues: [] },
        next: { "default": "step2" }
      },
      {
        id: "step2",
        type: "service",
        config: { service: "slack", action: "sendMessage", channel: "#general", message: "..." },
        next: { "default": "end" }
      }
    ]
  }, null, 2);

  return openai.chat.completions.create({
    model: "gpt-4o",
    messages: [
      {
        role: "system",
        content: "You generate workflow definitions in JSON. Available step types: ai-decision, service, transform, parallel. Available services: slack, email, database, http. Output ONLY valid JSON matching this schema:\n" + schemaExample
      },
      {
        role: "user",
        content: description
      }
    ],
    temperature: 0.2,
    response_format: { type: "json_object" }
  }).then(function(response) {
    var workflow = JSON.parse(response.choices[0].message.content);
    var validation = validateWorkflow(workflow);
    if (!validation.valid) {
      throw new Error("Generated workflow is invalid: " + validation.errors.join(", "));
    }
    return workflow;
  });
}

In practice, I always validate generated workflows before execution and present them for human review. Autonomous workflow generation is powerful but not something you deploy without a review step.

Integrating External Services

The service layer abstracts external integrations behind a uniform interface. Each service adapter implements execute(action, params, context):

var axios = require("axios");
var nodemailer = require("nodemailer");

var serviceRegistry = {};

serviceRegistry.slack = {
  execute: function(action, params, context) {
    if (action === "sendMessage") {
      return axios.post("https://slack.com/api/chat.postMessage", {
        channel: interpolateTemplate(params.channel, context),
        text: interpolateTemplate(params.message, context)
      }, {
        headers: { "Authorization": "Bearer " + process.env.SLACK_BOT_TOKEN }
      }).then(function(res) {
        if (!res.data.ok) throw new Error("Slack error: " + res.data.error);
        return { messageTs: res.data.ts, channel: res.data.channel };
      });
    }
    throw new Error("Unknown Slack action: " + action);
  }
};

serviceRegistry.email = {
  execute: function(action, params, context) {
    var transporter = nodemailer.createTransport({
      host: process.env.SMTP_HOST,
      port: parseInt(process.env.SMTP_PORT),
      auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }
    });

    if (action === "send") {
      return transporter.sendMail({
        from: process.env.EMAIL_FROM,
        to: interpolateTemplate(params.to, context),
        subject: interpolateTemplate(params.subject || "Workflow Notification", context),
        html: interpolateTemplate(params.body || params.template, context)
      });
    }
    throw new Error("Unknown email action: " + action);
  }
};

serviceRegistry.http = {
  execute: function(action, params, context) {
    var url = interpolateTemplate(params.url, context);
    var method = (params.method || "GET").toLowerCase();
    var headers = {};

    if (params.headers) {
      Object.keys(params.headers).forEach(function(key) {
        headers[key] = interpolateTemplate(params.headers[key], context);
      });
    }

    return axios({
      method: method,
      url: url,
      headers: headers,
      data: params.body ? JSON.parse(interpolateTemplate(JSON.stringify(params.body), context)) : undefined,
      timeout: params.timeout || 30000
    }).then(function(res) {
      return { status: res.status, data: res.data, headers: res.headers };
    });
  }
};

serviceRegistry.database = {
  execute: function(action, params, context) {
    var pool = require("./db");
    if (action === "query") {
      var query = interpolateTemplate(params.query, context);
      var values = (params.values || []).map(function(v) {
        return interpolateTemplate(String(v), context);
      });
      return pool.query(query, values).then(function(result) {
        return { rows: result.rows, rowCount: result.rowCount };
      });
    }
    throw new Error("Unknown database action: " + action);
  }
};

This registry pattern means adding a new service integration is just adding a new object with an execute method. The workflow engine never needs to know the specifics.

Implementing Approval Workflows with AI Pre-Screening

A common pattern: AI pre-screens requests and only escalates ambiguous cases to humans. This dramatically reduces the human review burden.

var approvalWorkflow = {
  id: "expense-approval-v2",
  name: "Expense Approval with AI Pre-Screen",
  version: 2,
  steps: [
    {
      id: "ai-prescreen",
      type: "ai-decision",
      config: {
        model: "gpt-4o-mini",
        prompt: "Review this expense report. Amount: ${{variables.amount}}. Category: {{variables.category}}. Description: {{variables.description}}. Employee level: {{variables.employeeLevel}}. Company policy: expenses under $500 auto-approve, $500-$5000 need manager approval, over $5000 need VP approval. Unusual categories (entertainment, travel first class) always need human review. Respond with: auto-approve, needs-review, or reject. Include a one-line reason.",
        outputVariable: "screeningResult",
        outputFormat: "json"
      },
      next: {
        "default": "route-decision"
      }
    },
    {
      id: "route-decision",
      type: "transform",
      config: {
        expression: "stepResults['ai-prescreen'].output.decision"
      },
      next: {
        "auto-approve": "process-payment",
        "needs-review": "create-approval-request",
        "reject": "notify-rejection"
      }
    },
    {
      id: "create-approval-request",
      type: "service",
      config: {
        service: "slack",
        action: "sendMessage",
        channel: "#approvals",
        message: "Expense approval needed: ${{variables.amount}} by {{variables.employeeName}}. AI assessment: {{stepResults.ai-prescreen.output.reason}}. React with :white_check_mark: to approve or :x: to reject."
      },
      next: { "default": "wait-for-approval" }
    }
  ]
};

The AI pre-screen eliminates 60-70% of the review queue in my experience. Straightforward expenses get approved instantly, obvious policy violations get rejected with an explanation, and only edge cases reach a human reviewer — who now has the AI's reasoning to speed up their decision.

Event-Driven Workflows: Triggers from Webhooks, Schedules, and Database Changes

Workflows need to start from somewhere. The trigger system supports multiple event sources:

var cron = require("node-cron");
var EventEmitter = require("events");

var triggerBus = new EventEmitter();

function registerTriggers(workflow, engine) {
  var trigger = workflow.trigger;

  if (trigger.type === "webhook") {
    engine.app.post(trigger.path, function(req, res) {
      var executionId = engine.startExecution(workflow, req.body);
      res.json({ executionId: executionId, status: "started" });
    });
  }

  if (trigger.type === "schedule") {
    cron.schedule(trigger.cron, function() {
      console.log("[Trigger] Scheduled execution of " + workflow.id);
      engine.startExecution(workflow, { triggeredBy: "schedule", at: new Date().toISOString() });
    });
  }

  if (trigger.type === "event") {
    triggerBus.on(trigger.eventName, function(data) {
      console.log("[Trigger] Event " + trigger.eventName + " fired for " + workflow.id);
      engine.startExecution(workflow, data);
    });
  }

  if (trigger.type === "database-change") {
    // Poll-based change detection (for databases without native CDC)
    var lastCheck = new Date();
    setInterval(function() {
      var pool = require("./db");
      pool.query(trigger.query, [lastCheck]).then(function(result) {
        if (result.rows.length > 0) {
          result.rows.forEach(function(row) {
            engine.startExecution(workflow, { changeRecord: row });
          });
        }
        lastCheck = new Date();
      });
    }, trigger.pollIntervalMs || 30000);
  }
}

I prefer webhooks for most integrations and reserve polling for databases that don't support change data capture. The event bus pattern is useful for chaining workflows — one workflow's completion can trigger another.

Error Handling and Retry in Automated Workflows

Workflows will fail. Networks drop, APIs throttle, LLMs hallucinate. The retry system needs to be step-level, not workflow-level:

function executeStepWithRetry(step, context, attempt) {
  attempt = attempt || 1;
  var maxRetries = step.retryPolicy ? step.retryPolicy.maxRetries : 3;
  var backoffMs = step.retryPolicy ? step.retryPolicy.backoffMs : 1000;

  return executeStep(step, context).catch(function(err) {
    console.error("[Step " + step.id + "] Attempt " + attempt + " failed: " + err.message);

    if (attempt >= maxRetries) {
      if (step.onFailure === "skip") {
        console.warn("[Step " + step.id + "] Max retries reached. Skipping step.");
        return { stepId: step.id, output: null, skipped: true, error: err.message };
      }
      if (step.onFailure === "fallback" && step.fallbackStep) {
        console.warn("[Step " + step.id + "] Max retries reached. Executing fallback: " + step.fallbackStep);
        return { stepId: step.id, output: null, fallback: step.fallbackStep, error: err.message };
      }
      throw new Error("Step " + step.id + " failed after " + maxRetries + " attempts: " + err.message);
    }

    var delay = backoffMs * Math.pow(2, attempt - 1);
    console.log("[Step " + step.id + "] Retrying in " + delay + "ms...");

    return new Promise(function(resolve) {
      setTimeout(resolve, delay);
    }).then(function() {
      return executeStepWithRetry(step, context, attempt + 1);
    });
  });
}

For AI decision nodes specifically, I add a fallback model configuration. If GPT-4o times out, fall back to GPT-4o-mini. The decision quality might be slightly lower, but the workflow continues.

Parallel Execution of Independent Workflow Steps

Not every step depends on the previous one. When steps are independent, run them in parallel:

function executeParallelSteps(parallelStep, context) {
  var branches = parallelStep.config.branches;
  var promises = branches.map(function(branch) {
    return executeStepSequence(branch.steps, context).then(function(results) {
      return { branchId: branch.id, results: results, status: "completed" };
    }).catch(function(err) {
      if (parallelStep.config.failureMode === "continue") {
        return { branchId: branch.id, error: err.message, status: "failed" };
      }
      throw err;
    });
  });

  return Promise.all(promises).then(function(branchResults) {
    var merged = {};
    branchResults.forEach(function(br) {
      merged[br.branchId] = br;
    });
    return { stepId: parallelStep.id, output: merged, type: "parallel" };
  });
}

function executeStepSequence(steps, context) {
  var results = [];
  var chain = Promise.resolve();

  steps.forEach(function(step) {
    chain = chain.then(function() {
      return executeStepWithRetry(step, context).then(function(result) {
        context.stepResults[step.id] = result;
        results.push(result);
      });
    });
  });

  return chain.then(function() {
    return results;
  });
}

A typical parallel step definition in the DSL:

{
  id: "notify-all",
  type: "parallel",
  config: {
    failureMode: "continue",
    branches: [
      {
        id: "slack-notify",
        steps: [{ id: "send-slack", type: "service", config: { service: "slack", action: "sendMessage", channel: "#alerts", message: "..." }, next: { "default": "end" } }]
      },
      {
        id: "email-notify",
        steps: [{ id: "send-email", type: "service", config: { service: "email", action: "send", to: "...", subject: "..." }, next: { "default": "end" } }]
      },
      {
        id: "log-to-db",
        steps: [{ id: "db-insert", type: "service", config: { service: "database", action: "query", query: "INSERT INTO audit_log..." }, next: { "default": "end" } }]
      }
    ]
  },
  next: { "default": "end" }
}

The failureMode: "continue" setting is critical for notification steps. If Slack is down, you still want the email sent and the database logged.

Workflow Monitoring and Dashboards

Every workflow execution should be observable. Here is the monitoring data model and API:

var executionStore = {};

function trackExecution(executionId, data) {
  if (!executionStore[executionId]) {
    executionStore[executionId] = {
      executionId: executionId,
      workflowId: data.workflowId,
      status: "running",
      startedAt: new Date().toISOString(),
      steps: [],
      totalCost: 0
    };
  }

  var execution = executionStore[executionId];

  if (data.stepResult) {
    execution.steps.push({
      stepId: data.stepResult.stepId,
      status: data.stepResult.skipped ? "skipped" : "completed",
      startedAt: data.stepResult.startedAt,
      completedAt: new Date().toISOString(),
      duration: Date.now() - new Date(data.stepResult.startedAt).getTime(),
      cost: data.stepResult.cost || 0,
      output: data.stepResult.output
    });
    execution.totalCost += (data.stepResult.cost || 0);
  }

  if (data.status) {
    execution.status = data.status;
    if (data.status === "completed" || data.status === "failed") {
      execution.completedAt = new Date().toISOString();
      execution.totalDuration = Date.now() - new Date(execution.startedAt).getTime();
    }
  }

  return execution;
}

function getWorkflowMetrics(workflowId) {
  var executions = Object.values(executionStore).filter(function(e) {
    return e.workflowId === workflowId;
  });

  var completed = executions.filter(function(e) { return e.status === "completed"; });
  var failed = executions.filter(function(e) { return e.status === "failed"; });

  var totalCost = executions.reduce(function(sum, e) { return sum + e.totalCost; }, 0);
  var avgDuration = completed.length > 0 ? completed.reduce(function(sum, e) { return sum + (e.totalDuration || 0); }, 0) / completed.length : 0;

  return {
    workflowId: workflowId,
    totalExecutions: executions.length,
    completed: completed.length,
    failed: failed.length,
    successRate: executions.length > 0 ? (completed.length / executions.length * 100).toFixed(1) + "%" : "N/A",
    totalCost: "$" + totalCost.toFixed(4),
    avgCostPerExecution: "$" + (executions.length > 0 ? (totalCost / executions.length).toFixed(4) : "0"),
    avgDurationMs: Math.round(avgDuration),
    last24h: executions.filter(function(e) {
      return new Date(e.startedAt) > new Date(Date.now() - 86400000);
    }).length
  };
}

Workflow Templates for Common Business Patterns

Templates save you from rebuilding common patterns. Here are three I deploy repeatedly:

var workflowTemplates = {
  "content-moderation": {
    name: "Content Moderation Pipeline",
    steps: [
      { id: "ai-screen", type: "ai-decision", config: { prompt: "Evaluate this content for policy violations: {{variables.content}}", outputVariable: "modResult", outputFormat: "json" }, next: { "default": "route" } },
      { id: "route", type: "transform", config: { expression: "stepResults['ai-screen'].output.action" }, next: { "approve": "publish", "flag": "human-review", "reject": "notify-author" } }
    ]
  },
  "data-enrichment": {
    name: "Lead Enrichment Pipeline",
    steps: [
      { id: "lookup-company", type: "service", config: { service: "http", url: "https://api.clearbit.com/v2/companies/find?domain={{variables.domain}}" }, next: { "default": "ai-qualify" } },
      { id: "ai-qualify", type: "ai-decision", config: { prompt: "Score this lead 1-10: {{stepResults.lookup-company.output}}", outputVariable: "leadScore", outputFormat: "number", min: 1, max: 10 }, next: { "default": "store" } }
    ]
  },
  "incident-response": {
    name: "Automated Incident Response",
    steps: [
      { id: "classify-severity", type: "ai-decision", config: { prompt: "Classify incident severity (P1-P4): {{variables.alertMessage}}", outputVariable: "severity", outputFormat: "enum", allowedValues: ["P1", "P2", "P3", "P4"] }, next: { "P1": "page-team", "P2": "page-team", "P3": "slack-notify", "P4": "log-only" } }
    ]
  }
};

function createFromTemplate(templateId, overrides) {
  var template = workflowTemplates[templateId];
  if (!template) throw new Error("Unknown template: " + templateId);

  var workflow = JSON.parse(JSON.stringify(template));
  workflow.id = templateId + "-" + Date.now();
  workflow.version = 1;

  if (overrides) {
    Object.keys(overrides).forEach(function(key) {
      workflow[key] = overrides[key];
    });
  }

  return workflow;
}

Versioning and Migrating Workflows

Running workflows must complete on the version they started with, even if a newer version is deployed. The versioning strategy:

var workflowVersions = {};

function deployWorkflow(workflow) {
  var key = workflow.id;
  if (!workflowVersions[key]) {
    workflowVersions[key] = { versions: {}, activeVersion: null };
  }

  workflowVersions[key].versions[workflow.version] = workflow;
  workflowVersions[key].activeVersion = workflow.version;

  console.log("[Deploy] Workflow " + key + " v" + workflow.version + " is now active");
  return workflow;
}

function getWorkflow(workflowId, version) {
  var entry = workflowVersions[workflowId];
  if (!entry) throw new Error("Workflow not found: " + workflowId);

  var v = version || entry.activeVersion;
  var workflow = entry.versions[v];
  if (!workflow) throw new Error("Version " + v + " not found for workflow " + workflowId);

  return workflow;
}

function migrateWorkflow(workflowId, fromVersion, toVersion, migrationFn) {
  var oldWorkflow = getWorkflow(workflowId, fromVersion);
  var newWorkflow = JSON.parse(JSON.stringify(oldWorkflow));
  newWorkflow.version = toVersion;

  if (migrationFn) {
    newWorkflow = migrationFn(newWorkflow);
  }

  return deployWorkflow(newWorkflow);
}

Cost Tracking Per Workflow Execution

LLM calls have variable costs. Track them per step and per execution so you can identify expensive workflows and optimize:

var MODEL_COSTS = {
  "gpt-4o": { input: 0.0025, output: 0.01 },        // per 1K tokens
  "gpt-4o-mini": { input: 0.00015, output: 0.0006 },
  "gpt-4-turbo": { input: 0.01, output: 0.03 },
  "claude-3-haiku": { input: 0.00025, output: 0.00125 }
};

function calculateCost(usage, model) {
  var rates = MODEL_COSTS[model] || MODEL_COSTS["gpt-4o-mini"];
  var inputCost = (usage.prompt_tokens / 1000) * rates.input;
  var outputCost = (usage.completion_tokens / 1000) * rates.output;
  return inputCost + outputCost;
}

function getCostReport(workflowId, days) {
  days = days || 30;
  var cutoff = new Date(Date.now() - days * 86400000);

  var executions = Object.values(executionStore).filter(function(e) {
    return e.workflowId === workflowId && new Date(e.startedAt) > cutoff;
  });

  var stepCosts = {};
  executions.forEach(function(exec) {
    exec.steps.forEach(function(step) {
      if (!stepCosts[step.stepId]) {
        stepCosts[step.stepId] = { total: 0, count: 0 };
      }
      stepCosts[step.stepId].total += step.cost;
      stepCosts[step.stepId].count += 1;
    });
  });

  var report = {
    workflowId: workflowId,
    period: days + " days",
    totalExecutions: executions.length,
    totalCost: Object.values(stepCosts).reduce(function(sum, s) { return sum + s.total; }, 0),
    costByStep: {}
  };

  Object.keys(stepCosts).forEach(function(stepId) {
    report.costByStep[stepId] = {
      totalCost: "$" + stepCosts[stepId].total.toFixed(4),
      avgCost: "$" + (stepCosts[stepId].total / stepCosts[stepId].count).toFixed(4),
      invocations: stepCosts[stepId].count
    };
  });

  return report;
}

Complete Working Example: AI Workflow Engine with Express.js

Here is the full engine assembled into a working Express.js application:

var express = require("express");
var bodyParser = require("body-parser");
var crypto = require("crypto");
var OpenAI = require("openai");

var app = express();
app.use(bodyParser.json());

var openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

// ============ Core Engine ============

var workflows = {};
var executions = {};

function WorkflowEngine(expressApp) {
  this.app = expressApp;
}

WorkflowEngine.prototype.deploy = function(workflow) {
  workflows[workflow.id] = workflow;
  registerTriggers(workflow, this);
  console.log("[Engine] Deployed workflow: " + workflow.id);
  return workflow;
};

WorkflowEngine.prototype.startExecution = function(workflow, variables) {
  var executionId = "exec_" + crypto.randomBytes(8).toString("hex");

  var context = {
    workflowId: workflow.id,
    executionId: executionId,
    startedAt: new Date().toISOString(),
    status: "running",
    variables: variables || {},
    stepResults: {},
    costAccumulator: 0
  };

  executions[executionId] = context;
  console.log("[Engine] Started execution " + executionId + " of " + workflow.id);

  var self = this;
  self.runExecution(workflow, context).then(function() {
    context.status = "completed";
    context.completedAt = new Date().toISOString();
    console.log("[Engine] Execution " + executionId + " completed. Cost: $" + context.costAccumulator.toFixed(4));
  }).catch(function(err) {
    context.status = "failed";
    context.error = err.message;
    context.completedAt = new Date().toISOString();
    console.error("[Engine] Execution " + executionId + " failed: " + err.message);
  });

  return executionId;
};

WorkflowEngine.prototype.runExecution = function(workflow, context) {
  var stepMap = {};
  workflow.steps.forEach(function(step) {
    stepMap[step.id] = step;
  });

  var currentStepId = workflow.steps[0].id;

  function executeNext() {
    if (currentStepId === "end" || !stepMap[currentStepId]) {
      return Promise.resolve();
    }

    var step = stepMap[currentStepId];
    context.currentStep = step.id;

    return executeStepWithRetry(step, context, 1).then(function(result) {
      context.stepResults[step.id] = result;
      context.costAccumulator += (result.cost || 0);

      if (result.fallback) {
        currentStepId = result.fallback;
      } else if (result.nextStep) {
        currentStepId = result.nextStep;
      } else {
        currentStepId = resolveNextStep(step, result.output);
      }

      return executeNext();
    });
  }

  return executeNext();
};

function executeStep(step, context) {
  var startedAt = new Date().toISOString();

  if (step.type === "ai-decision") {
    return executeAIDecision(step, context).then(function(result) {
      result.startedAt = startedAt;
      return result;
    });
  }

  if (step.type === "service") {
    var service = serviceRegistry[step.config.service];
    if (!service) throw new Error("Unknown service: " + step.config.service);

    return service.execute(step.config.action, step.config, context).then(function(output) {
      return {
        stepId: step.id,
        output: output,
        cost: 0,
        startedAt: startedAt,
        nextStep: step.next ? step.next["default"] : "end"
      };
    });
  }

  if (step.type === "transform") {
    var output = evaluateExpression(step.config.expression, context);
    return Promise.resolve({
      stepId: step.id,
      output: output,
      cost: 0,
      startedAt: startedAt,
      nextStep: step.next ? (step.next[output] || step.next["default"]) : "end"
    });
  }

  if (step.type === "parallel") {
    return executeParallelSteps(step, context).then(function(result) {
      result.startedAt = startedAt;
      result.nextStep = step.next ? step.next["default"] : "end";
      return result;
    });
  }

  return Promise.reject(new Error("Unknown step type: " + step.type));
}

function evaluateExpression(expression, context) {
  var parts = expression.split(".");
  var value = context;
  for (var i = 0; i < parts.length; i++) {
    var part = parts[i];
    // Handle bracket notation like stepResults['ai-prescreen']
    var bracketMatch = part.match(/^(\w+)\['([^']+)'\]$/);
    if (bracketMatch) {
      value = value[bracketMatch[1]];
      if (value === undefined) return undefined;
      value = value[bracketMatch[2]];
    } else {
      value = value[part];
    }
    if (value === undefined) return undefined;
  }
  return value;
}

// ============ API Routes ============

var engine = new WorkflowEngine(app);

app.post("/api/workflows", function(req, res) {
  try {
    var workflow = engine.deploy(req.body);
    res.json({ success: true, workflowId: workflow.id });
  } catch (err) {
    res.status(400).json({ error: err.message });
  }
});

app.post("/api/workflows/:id/execute", function(req, res) {
  var workflow = workflows[req.params.id];
  if (!workflow) return res.status(404).json({ error: "Workflow not found" });

  var executionId = engine.startExecution(workflow, req.body);
  res.json({ executionId: executionId, status: "started" });
});

app.get("/api/executions/:id", function(req, res) {
  var execution = executions[req.params.id];
  if (!execution) return res.status(404).json({ error: "Execution not found" });
  res.json(execution);
});

app.get("/api/workflows/:id/metrics", function(req, res) {
  var metrics = getWorkflowMetrics(req.params.id);
  res.json(metrics);
});

app.get("/api/workflows/:id/costs", function(req, res) {
  var days = parseInt(req.query.days) || 30;
  var report = getCostReport(req.params.id, days);
  res.json(report);
});

app.post("/api/workflows/generate", function(req, res) {
  generateWorkflow(req.body.description).then(function(workflow) {
    res.json({ workflow: workflow });
  }).catch(function(err) {
    res.status(500).json({ error: err.message });
  });
});

// ============ Dashboard Route ============

app.get("/dashboard", function(req, res) {
  var workflowList = Object.keys(workflows).map(function(id) {
    var metrics = getWorkflowMetrics(id);
    return {
      id: id,
      name: workflows[id].name,
      metrics: metrics
    };
  });

  res.json({
    totalWorkflows: workflowList.length,
    totalExecutions: Object.keys(executions).length,
    workflows: workflowList
  });
});

// ============ Start Server ============

var PORT = process.env.PORT || 3000;
app.listen(PORT, function() {
  console.log("[Workflow Engine] Running on port " + PORT);
});

module.exports = app;

Start the server and test it:

npm install express body-parser openai axios nodemailer node-cron
node workflow-engine.js

Deploy and execute a workflow:

# Deploy a workflow
curl -X POST http://localhost:3000/api/workflows \
  -H "Content-Type: application/json" \
  -d '{
    "id": "demo-triage",
    "name": "Demo Ticket Triage",
    "version": 1,
    "steps": [
      {
        "id": "classify",
        "type": "ai-decision",
        "config": {
          "model": "gpt-4o-mini",
          "prompt": "Classify this ticket: {{variables.ticketText}}. Respond with: billing, technical, or general.",
          "outputVariable": "category",
          "outputFormat": "enum",
          "allowedValues": ["billing", "technical", "general"]
        },
        "next": { "default": "end" }
      }
    ]
  }'

# Execute the workflow
curl -X POST http://localhost:3000/api/workflows/demo-triage/execute \
  -H "Content-Type: application/json" \
  -d '{"ticketText": "I cannot access my billing dashboard and my card was charged twice"}'

# Check execution status
curl http://localhost:3000/api/executions/exec_abc123

# View metrics
curl http://localhost:3000/api/workflows/demo-triage/metrics

Expected output from metrics:

{
  "workflowId": "demo-triage",
  "totalExecutions": 47,
  "completed": 45,
  "failed": 2,
  "successRate": "95.7%",
  "totalCost": "$0.0312",
  "avgCostPerExecution": "$0.0007",
  "avgDurationMs": 1847,
  "last24h": 12
}

Common Issues and Troubleshooting

1. LLM returns unexpected format, crashing the parser

Error: AI returned invalid enum value: "I would classify this as billing". Allowed: billing, technical, general

This happens when the system prompt is not strict enough. Fix: add explicit formatting instructions and consider using response_format: { type: "json_object" } for structured responses. Also add a normalization layer that strips common LLM preambles before parsing.

2. Workflow execution hangs on parallel steps

[Step notify-all] Timeout after 30000ms — no response from branch email-notify

A single slow branch blocks the entire parallel step. Fix: add per-branch timeouts using Promise.race with a timeout promise. Set failureMode: "continue" so other branches complete even if one times out.

function withTimeout(promise, ms, label) {
  return Promise.race([
    promise,
    new Promise(function(_, reject) {
      setTimeout(function() {
        reject(new Error("Timeout after " + ms + "ms on " + label));
      }, ms);
    })
  ]);
}

3. Template interpolation fails silently on nested paths

[Step assign-engineer] Slack message contains literal "{{stepResults.classify.ticketCategory}}" instead of resolved value

This occurs when a step result has not been written to context yet, usually because of incorrect step ordering. Fix: validate that all referenced template variables exist in context at execution time. Add a validateStepDependencies function that runs at deploy time to catch these errors early.

4. Cost tracking shows $0.00 for all AI steps

{"stepId": "classify", "cost": 0, "tokenUsage": undefined}

This happens when the OpenAI response shape changes or when using a provider that structures token usage differently. Fix: add null checks on response.usage and log a warning when usage data is missing. Some providers (especially proxied ones) strip usage metadata.

5. Webhook trigger receives duplicate events

[Engine] Started execution exec_a1b2c3 of ticket-triage (duplicate)
[Engine] Started execution exec_d4e5f6 of ticket-triage (duplicate)

Many webhook providers send retries when they don't get a fast 200 response. Fix: respond with 200 immediately before starting execution (which the async pattern already handles), and implement idempotency checks using a hash of the incoming payload stored for a short TTL window.

Best Practices

  • Use the cheapest model that works. Start every AI decision node with gpt-4o-mini. Only upgrade to gpt-4o if accuracy is measurably insufficient. Most classification and routing tasks work perfectly with smaller models, and the cost difference is 15x.

  • Always set temperature low for decision nodes. Use 0.0 to 0.2 for workflow decisions. You want determinism, not creativity. Save high temperature for content generation steps.

  • Implement circuit breakers on external services. If Slack is down, stop sending requests after 3 consecutive failures. Resume after a cooldown period. Without this, a single service outage cascades into thousands of failed workflow executions.

  • Log the full AI prompt and response for every decision node. When a workflow makes a bad decision, you need to debug it. Store the exact prompt sent, the raw response received, and the parsed output. This audit trail is non-negotiable for production systems.

  • Version your prompt templates independently from workflow versions. A prompt change is a semantic change even if the workflow structure hasn't changed. Track prompt versions so you can A/B test prompt improvements and roll back bad ones.

  • Set hard cost caps per execution. A runaway workflow with a loop can burn through API budget fast. Enforce a maximum cost per execution (I typically set $0.50) and abort if exceeded. The costAccumulator pattern shown above makes this trivial.

  • Never let the LLM generate executable code in workflows. The AI can generate workflow definitions (JSON data), but never let it generate or execute arbitrary code. Sandboxing is hard, and the risk is not worth it.

  • Test AI decision nodes with deterministic mocks. In your test suite, mock the OpenAI client to return fixed responses. Test every branch path. Only run integration tests against the real API in a dedicated test environment with a spending cap.

  • Design workflows to be idempotent. Any step might execute twice due to retries. Ensure database writes use upserts, notifications include deduplication IDs, and side effects can safely repeat.

References

Powered by Contentful