Agents

Workflow Agents for Business Process Automation

Build workflow agents that automate business processes with LLM-powered classification, routing, and multi-step execution in Node.js.

Workflow Agents for Business Process Automation

Workflow agents combine the deterministic reliability of traditional workflow engines with the adaptive intelligence of large language models. Instead of hard-coding every branch condition and routing rule, you define multi-step business processes where an LLM handles classification, extraction, and decision-making at each stage. The result is an automation system that handles ambiguity, adapts to unstructured input, and still follows your business rules with full auditability.

Prerequisites

  • Node.js v18 or later
  • Working knowledge of Express.js and async/await patterns
  • An OpenAI or Anthropic API key for LLM integration
  • Basic understanding of LLM prompt engineering
  • Familiarity with node-cron for scheduling
  • A PostgreSQL or MongoDB instance for persistence (examples use PostgreSQL)

What Workflow Agents Are

A traditional workflow engine executes a sequence of predefined steps. You define triggers, conditions, and actions. When input arrives, the engine evaluates conditions and routes accordingly. This works well when your conditions are simple and your data is structured. But business processes rarely stay that clean.

Consider an incoming support ticket. A rule-based system might check for keywords like "billing" or "outage" and route accordingly. But what about a ticket that says "I was charged twice for a subscription I cancelled last month and now my dashboard shows an error"? That ticket touches billing, account management, and technical support. Keyword matching falls apart.

A workflow agent solves this by inserting LLM intelligence at decision points within a structured workflow. The workflow definition itself remains deterministic — step A leads to step B, retries happen on failure, audit logs get written. But the classification, extraction, and routing decisions within those steps leverage an LLM's ability to understand natural language.

The key distinction: the LLM does not control the workflow. The workflow engine controls execution flow. The LLM is a tool invoked at specific steps where human-like judgment is needed.

Designing Workflow Definitions

Every workflow starts with a definition. I use JSON-based definitions because they serialize cleanly, version well in source control, and can be validated with JSON Schema. Here is a basic structure:

var supportTicketWorkflow = {
  id: "support-ticket-v1",
  name: "Support Ticket Processing",
  version: 1,
  trigger: "ticket.created",
  steps: [
    {
      id: "classify",
      type: "llm-classify",
      config: {
        categories: ["billing", "technical", "account", "feature-request", "security"],
        inputField: "description",
        outputField: "category"
      },
      next: "extract-info"
    },
    {
      id: "extract-info",
      type: "llm-extract",
      config: {
        fields: ["urgency", "product_area", "customer_sentiment", "action_requested"],
        inputField: "description",
        outputField: "extracted"
      },
      next: "route-decision"
    },
    {
      id: "route-decision",
      type: "conditional",
      conditions: [
        { when: "extracted.urgency === 'critical'", next: "escalate" },
        { when: "category === 'security'", next: "security-team" },
        { when: "category === 'billing'", next: "billing-team" },
        { default: true, next: "general-queue" }
      ]
    },
    {
      id: "escalate",
      type: "parallel",
      steps: ["notify-oncall", "notify-manager", "create-incident"],
      next: "log-complete"
    },
    {
      id: "notify-oncall",
      type: "action",
      config: { service: "slack", channel: "#oncall", template: "escalation" }
    },
    {
      id: "notify-manager",
      type: "action",
      config: { service: "email", template: "manager-escalation" }
    },
    {
      id: "create-incident",
      type: "action",
      config: { service: "pagerduty", severity: "high" }
    },
    {
      id: "billing-team",
      type: "action",
      config: { service: "queue", team: "billing" },
      next: "log-complete"
    },
    {
      id: "security-team",
      type: "human-handoff",
      config: { team: "security", requireApproval: true },
      next: "log-complete"
    },
    {
      id: "general-queue",
      type: "action",
      config: { service: "queue", team: "general" },
      next: "log-complete"
    },
    {
      id: "log-complete",
      type: "audit-log",
      config: { event: "workflow.completed" }
    }
  ],
  retryPolicy: {
    maxAttempts: 3,
    backoffMs: 1000,
    backoffMultiplier: 2
  }
};

Each step has a type that tells the engine what executor to invoke. The next field chains steps together. Conditional steps evaluate expressions against the workflow context. Parallel steps fan out to multiple actions simultaneously.

Implementing the Workflow Engine

The engine is the core. It loads a workflow definition, initializes context with the trigger payload, and executes steps sequentially. Here is the implementation:

var Anthropic = require("@anthropic-ai/sdk");

var anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

function WorkflowEngine(options) {
  this.workflows = {};
  this.executors = {};
  this.auditLog = options.auditLog || console.log;
  this.onError = options.onError || function () {};

  this.registerDefaultExecutors();
}

WorkflowEngine.prototype.registerWorkflow = function (definition) {
  this.workflows[definition.id] = definition;
  this.auditLog({
    event: "workflow.registered",
    workflowId: definition.id,
    version: definition.version
  });
};

WorkflowEngine.prototype.registerExecutor = function (type, handler) {
  this.executors[type] = handler;
};

WorkflowEngine.prototype.run = function (workflowId, payload) {
  var self = this;
  var definition = this.workflows[workflowId];

  if (!definition) {
    return Promise.reject(new Error("Workflow not found: " + workflowId));
  }

  var context = {
    workflowId: workflowId,
    runId: generateRunId(),
    version: definition.version,
    payload: payload,
    data: Object.assign({}, payload),
    audit: [],
    startedAt: new Date().toISOString(),
    status: "running"
  };

  self.logAudit(context, "workflow.started", { payload: payload });

  return self.executeStep(definition, definition.steps[0].id, context)
    .then(function () {
      context.status = "completed";
      context.completedAt = new Date().toISOString();
      self.logAudit(context, "workflow.completed", {
        duration: Date.now() - new Date(context.startedAt).getTime()
      });
      return context;
    })
    .catch(function (err) {
      context.status = "failed";
      context.error = err.message;
      self.logAudit(context, "workflow.failed", { error: err.message });
      self.onError(err, context);
      throw err;
    });
};

WorkflowEngine.prototype.executeStep = function (definition, stepId, context) {
  var self = this;

  if (!stepId) {
    return Promise.resolve(context);
  }

  var step = definition.steps.find(function (s) { return s.id === stepId; });

  if (!step) {
    return Promise.reject(new Error("Step not found: " + stepId));
  }

  var executor = self.executors[step.type];

  if (!executor) {
    return Promise.reject(new Error("No executor for step type: " + step.type));
  }

  self.logAudit(context, "step.started", { stepId: step.id, type: step.type });

  return self.executeWithRetry(executor, step, context, definition.retryPolicy)
    .then(function (result) {
      self.logAudit(context, "step.completed", { stepId: step.id, result: result });

      var nextStepId = self.resolveNextStep(step, context);
      return self.executeStep(definition, nextStepId, context);
    });
};

WorkflowEngine.prototype.executeWithRetry = function (executor, step, context, retryPolicy) {
  var maxAttempts = (step.retryPolicy || retryPolicy || {}).maxAttempts || 3;
  var backoffMs = (step.retryPolicy || retryPolicy || {}).backoffMs || 1000;
  var multiplier = (step.retryPolicy || retryPolicy || {}).backoffMultiplier || 2;
  var attempt = 0;

  function tryExecute() {
    attempt++;
    return executor(step, context).catch(function (err) {
      if (attempt >= maxAttempts) {
        throw new Error(
          "Step " + step.id + " failed after " + attempt +
          " attempts: " + err.message
        );
      }
      var delay = backoffMs * Math.pow(multiplier, attempt - 1);
      return new Promise(function (resolve) {
        setTimeout(resolve, delay);
      }).then(tryExecute);
    });
  }

  return tryExecute();
};

WorkflowEngine.prototype.resolveNextStep = function (step, context) {
  if (step.type === "conditional") {
    for (var i = 0; i < step.conditions.length; i++) {
      var condition = step.conditions[i];
      if (condition.default) {
        return condition.next;
      }
      if (evaluateCondition(condition.when, context.data)) {
        return condition.next;
      }
    }
    return null;
  }
  return step.next || null;
};

WorkflowEngine.prototype.logAudit = function (context, event, details) {
  var entry = {
    timestamp: new Date().toISOString(),
    runId: context.runId,
    workflowId: context.workflowId,
    event: event,
    details: details
  };
  context.audit.push(entry);
  this.auditLog(entry);
};

function generateRunId() {
  return "run_" + Date.now() + "_" + Math.random().toString(36).substr(2, 9);
}

function evaluateCondition(expression, data) {
  var parts = expression.split(" === ");
  if (parts.length !== 2) return false;

  var fieldPath = parts[0].trim();
  var expected = parts[1].trim().replace(/'/g, "");
  var actual = getNestedValue(data, fieldPath);

  return actual === expected;
}

function getNestedValue(obj, path) {
  var keys = path.split(".");
  var current = obj;
  for (var i = 0; i < keys.length; i++) {
    if (current === undefined || current === null) return undefined;
    current = current[keys[i]];
  }
  return current;
}

The engine is deliberately simple. It walks through steps, invokes executors, handles retries, and logs everything. The complexity lives in the executors themselves.

Registering Default Executors

The engine needs executors for each step type. Here are the core ones:

WorkflowEngine.prototype.registerDefaultExecutors = function () {
  var self = this;

  // LLM Classification
  self.registerExecutor("llm-classify", function (step, context) {
    var input = context.data[step.config.inputField];
    var categories = step.config.categories.join(", ");

    var prompt = "Classify the following text into exactly one of these categories: " +
      categories + ".\n\nText: " + input +
      "\n\nRespond with only the category name, nothing else.";

    return callLLM(prompt).then(function (result) {
      var category = result.trim().toLowerCase();
      context.data[step.config.outputField] = category;
      return { category: category };
    });
  });

  // LLM Extraction
  self.registerExecutor("llm-extract", function (step, context) {
    var input = context.data[step.config.inputField];
    var fields = step.config.fields.join(", ");

    var prompt = "Extract the following fields from this text: " + fields +
      ".\n\nText: " + input +
      "\n\nRespond with valid JSON only. Use null for fields that cannot be determined.";

    return callLLM(prompt).then(function (result) {
      var extracted;
      try {
        extracted = JSON.parse(result);
      } catch (e) {
        var jsonMatch = result.match(/\{[\s\S]*\}/);
        if (jsonMatch) {
          extracted = JSON.parse(jsonMatch[0]);
        } else {
          throw new Error("LLM returned invalid JSON: " + result.substring(0, 200));
        }
      }
      context.data[step.config.outputField] = extracted;
      return extracted;
    });
  });

  // Conditional branching (handled by resolveNextStep, executor is a no-op)
  self.registerExecutor("conditional", function (step, context) {
    return Promise.resolve({ evaluated: true });
  });

  // Parallel execution
  self.registerExecutor("parallel", function (step, context) {
    var definition = null;
    var keys = Object.keys(self.workflows);
    for (var i = 0; i < keys.length; i++) {
      var wf = self.workflows[keys[i]];
      var found = wf.steps.find(function (s) { return s.id === step.id; });
      if (found) { definition = wf; break; }
    }

    var parallelSteps = step.steps.map(function (subStepId) {
      var subStep = definition.steps.find(function (s) { return s.id === subStepId; });
      var executor = self.executors[subStep.type];
      return executor(subStep, context);
    });

    return Promise.all(parallelSteps).then(function (results) {
      return { parallelResults: results };
    });
  });

  // Action executor
  self.registerExecutor("action", function (step, context) {
    return executeAction(step.config, context);
  });

  // Human handoff
  self.registerExecutor("human-handoff", function (step, context) {
    context.data.pendingApproval = {
      team: step.config.team,
      requireApproval: step.config.requireApproval,
      createdAt: new Date().toISOString()
    };
    return executeAction(
      { service: "queue", team: step.config.team, priority: "high" },
      context
    );
  });

  // Audit log
  self.registerExecutor("audit-log", function (step, context) {
    self.logAudit(context, step.config.event, {
      finalData: context.data,
      totalSteps: context.audit.length
    });
    return Promise.resolve({ logged: true });
  });
};

Calling the LLM

The callLLM function wraps the Anthropic API. You can swap this for OpenAI or any other provider:

function callLLM(prompt, options) {
  var model = (options && options.model) || "claude-haiku-4-20250414";
  var maxTokens = (options && options.maxTokens) || 1024;

  return anthropic.messages.create({
    model: model,
    max_tokens: maxTokens,
    messages: [{ role: "user", content: prompt }]
  }).then(function (response) {
    return response.content[0].text;
  });
}

I use Haiku for classification and extraction tasks. It is fast, cheap, and accurate enough for structured decisions. Save the larger models for steps that need nuanced reasoning.

Integrating with External Services

The executeAction function is your integration layer. Each service type maps to a concrete implementation:

var nodemailer = require("nodemailer");
var axios = require("axios");

var transporter = nodemailer.createTransport({
  host: process.env.SMTP_HOST,
  port: process.env.SMTP_PORT,
  auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }
});

function executeAction(config, context) {
  switch (config.service) {
    case "slack":
      return sendSlackNotification(config, context);
    case "email":
      return sendEmail(config, context);
    case "queue":
      return assignToQueue(config, context);
    case "pagerduty":
      return createPagerDutyIncident(config, context);
    default:
      return Promise.reject(new Error("Unknown service: " + config.service));
  }
}

function sendSlackNotification(config, context) {
  var message = buildMessage(config.template, context.data);
  return axios.post(process.env.SLACK_WEBHOOK_URL, {
    channel: config.channel,
    text: message,
    blocks: [
      {
        type: "section",
        text: { type: "mrkdwn", text: "*New " + context.data.category + " ticket*" }
      },
      {
        type: "section",
        text: { type: "mrkdwn", text: message }
      }
    ]
  });
}

function sendEmail(config, context) {
  var message = buildMessage(config.template, context.data);
  return transporter.sendMail({
    from: process.env.EMAIL_FROM,
    to: getTeamEmail(config.team || context.data.category),
    subject: "[" + (context.data.extracted && context.data.extracted.urgency || "normal") +
      "] Support Ticket: " + context.data.subject,
    html: message
  });
}

function assignToQueue(config, context) {
  // In production this writes to your ticketing system
  var pool = require("../db/postgres");
  return pool.query(
    "INSERT INTO ticket_assignments (ticket_id, team, priority, workflow_run_id, assigned_at) " +
    "VALUES ($1, $2, $3, $4, NOW())",
    [context.data.ticketId, config.team, config.priority || "normal", context.runId]
  );
}

function createPagerDutyIncident(config, context) {
  return axios.post("https://events.pagerduty.com/v2/enqueue", {
    routing_key: process.env.PAGERDUTY_ROUTING_KEY,
    event_action: "trigger",
    payload: {
      summary: "Critical support ticket: " + context.data.subject,
      severity: config.severity,
      source: "workflow-agent",
      custom_details: context.data.extracted
    }
  });
}

function buildMessage(template, data) {
  var templates = {
    escalation: "CRITICAL: Ticket #" + data.ticketId + " - " + data.subject +
      "\nCategory: " + data.category +
      "\nUrgency: " + (data.extracted && data.extracted.urgency || "unknown") +
      "\nSentiment: " + (data.extracted && data.extracted.customer_sentiment || "unknown"),
    "manager-escalation": "<h2>Escalated Ticket</h2>" +
      "<p><strong>Ticket:</strong> #" + data.ticketId + "</p>" +
      "<p><strong>Description:</strong> " + data.description + "</p>" +
      "<p><strong>Analysis:</strong> " + JSON.stringify(data.extracted) + "</p>"
  };
  return templates[template] || JSON.stringify(data);
}

function getTeamEmail(team) {
  var emails = {
    billing: "[email protected]",
    security: "[email protected]",
    technical: "[email protected]",
    general: "[email protected]"
  };
  return emails[team] || emails.general;
}

Form Data Extraction and Processing

LLMs excel at extracting structured data from unstructured form submissions. This is especially useful for customer-facing forms where people write free-text descriptions:

function extractFormData(formText, schema) {
  var fieldDescriptions = Object.keys(schema).map(function (key) {
    return key + " (" + schema[key].type + "): " + schema[key].description;
  }).join("\n");

  var prompt = "Extract structured data from this form submission.\n\n" +
    "Expected fields:\n" + fieldDescriptions + "\n\n" +
    "Form submission:\n" + formText + "\n\n" +
    "Return valid JSON matching the field names exactly. " +
    "Use null for fields not found. For dates, use ISO 8601 format.";

  return callLLM(prompt).then(function (result) {
    var parsed = JSON.parse(result.match(/\{[\s\S]*\}/)[0]);

    // Validate against schema
    var errors = [];
    Object.keys(schema).forEach(function (key) {
      if (schema[key].required && (parsed[key] === null || parsed[key] === undefined)) {
        errors.push("Missing required field: " + key);
      }
    });

    if (errors.length > 0) {
      return { data: parsed, valid: false, errors: errors };
    }

    return { data: parsed, valid: true, errors: [] };
  });
}

Document Classification and Routing

Beyond support tickets, workflow agents handle document classification across any domain. Insurance claims, legal documents, compliance filings — the pattern is the same:

function createDocumentRouter(routingRules) {
  return function (document) {
    var prompt = "Analyze this document and determine:\n" +
      "1. Document type (from: " + routingRules.types.join(", ") + ")\n" +
      "2. Priority (critical, high, normal, low)\n" +
      "3. Required reviewers (from: " + routingRules.reviewers.join(", ") + ")\n" +
      "4. Compliance flags (any regulatory concerns)\n\n" +
      "Document content:\n" + document.content.substring(0, 4000) + "\n\n" +
      "Respond in JSON format.";

    return callLLM(prompt, { model: "claude-sonnet-4-20250514" })
      .then(function (result) {
        var analysis = JSON.parse(result.match(/\{[\s\S]*\}/)[0]);
        analysis.documentId = document.id;
        analysis.receivedAt = new Date().toISOString();
        return analysis;
      });
  };
}

Scheduling Recurring Workflows

Many business processes run on a schedule. Use node-cron to trigger workflows at regular intervals:

var cron = require("node-cron");

function WorkflowScheduler(engine) {
  this.engine = engine;
  this.schedules = [];
}

WorkflowScheduler.prototype.schedule = function (cronExpression, workflowId, payloadFn) {
  var self = this;

  var task = cron.schedule(cronExpression, function () {
    var payload = typeof payloadFn === "function" ? payloadFn() : payloadFn;

    self.engine.run(workflowId, payload).catch(function (err) {
      console.error("Scheduled workflow failed:", workflowId, err.message);
    });
  });

  self.schedules.push({ workflowId: workflowId, cron: cronExpression, task: task });
};

// Example: Daily SLA violation check at 8 AM
var scheduler = new WorkflowScheduler(engine);
scheduler.schedule("0 8 * * *", "sla-check-v1", function () {
  return { checkDate: new Date().toISOString(), threshold: 24 };
});

// Example: Weekly report generation every Monday at 6 AM
scheduler.schedule("0 6 * * 1", "weekly-report-v1", function () {
  var now = new Date();
  var weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
  return { startDate: weekAgo.toISOString(), endDate: now.toISOString() };
});

Workflow Versioning and Migration

When you update a workflow definition, in-flight executions should complete on the version they started with. Track versions explicitly:

WorkflowEngine.prototype.registerWorkflow = function (definition) {
  var versionKey = definition.id + "@" + definition.version;
  this.workflows[versionKey] = definition;
  this.workflows[definition.id] = definition; // "latest" always points to newest
  this.auditLog({
    event: "workflow.registered",
    workflowId: definition.id,
    version: definition.version,
    versionKey: versionKey
  });
};

WorkflowEngine.prototype.run = function (workflowId, payload, options) {
  var version = (options && options.version) || null;
  var lookupKey = version ? (workflowId + "@" + version) : workflowId;
  var definition = this.workflows[lookupKey];

  if (!definition) {
    return Promise.reject(new Error("Workflow not found: " + lookupKey));
  }

  // Context stores the exact version for resumability
  var context = {
    workflowId: workflowId,
    runId: generateRunId(),
    version: definition.version,
    // ... rest of context
  };

  // ... execution logic
};

Audit Trails and Compliance Logging

Every step, decision, and LLM call should be recorded. For regulated industries, this is not optional:

function ComplianceLogger(pool) {
  this.pool = pool;
}

ComplianceLogger.prototype.log = function (entry) {
  return this.pool.query(
    "INSERT INTO workflow_audit_log " +
    "(run_id, workflow_id, event, step_id, details, llm_input, llm_output, timestamp) " +
    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
    [
      entry.runId,
      entry.workflowId,
      entry.event,
      entry.details && entry.details.stepId || null,
      JSON.stringify(entry.details),
      entry.llmInput || null,
      entry.llmOutput || null,
      entry.timestamp
    ]
  );
};

// Modified callLLM that logs inputs and outputs
function callLLMWithAudit(prompt, options, context, logger) {
  var model = (options && options.model) || "claude-haiku-4-20250414";
  var startTime = Date.now();

  return anthropic.messages.create({
    model: model,
    max_tokens: (options && options.maxTokens) || 1024,
    messages: [{ role: "user", content: prompt }]
  }).then(function (response) {
    var output = response.content[0].text;

    logger.log({
      runId: context.runId,
      workflowId: context.workflowId,
      event: "llm.call",
      details: {
        model: model,
        durationMs: Date.now() - startTime,
        inputTokens: response.usage.input_tokens,
        outputTokens: response.usage.output_tokens
      },
      llmInput: prompt,
      llmOutput: output,
      timestamp: new Date().toISOString()
    });

    return output;
  });
}

Parallel Workflow Steps

Some steps within a workflow are independent and can run simultaneously. The parallel executor fans out and waits for all to complete:

WorkflowEngine.prototype.executeParallel = function (definition, stepIds, context) {
  var self = this;

  var promises = stepIds.map(function (stepId) {
    var step = definition.steps.find(function (s) { return s.id === stepId; });
    var executor = self.executors[step.type];

    return executor(step, context)
      .then(function (result) {
        return { stepId: stepId, status: "completed", result: result };
      })
      .catch(function (err) {
        return { stepId: stepId, status: "failed", error: err.message };
      });
  });

  return Promise.all(promises).then(function (results) {
    var failed = results.filter(function (r) { return r.status === "failed"; });

    if (failed.length > 0) {
      self.logAudit(context, "parallel.partial-failure", {
        total: results.length,
        failed: failed.length,
        errors: failed.map(function (f) { return f.stepId + ": " + f.error; })
      });
    }

    context.data.parallelResults = results;
    return results;
  });
};

Notice that parallel steps catch individual failures without blocking the others. A notification failing should not prevent incident creation.

Human Handoff Points

Not everything should be automated. Security incidents, large refunds, and legal matters need human judgment. The human-handoff step pauses the workflow and creates an approval request:

WorkflowEngine.prototype.registerExecutor("human-handoff", function (step, context) {
  var handoff = {
    id: "handoff_" + Date.now(),
    runId: context.runId,
    workflowId: context.workflowId,
    stepId: step.id,
    team: step.config.team,
    requireApproval: step.config.requireApproval,
    contextSnapshot: JSON.parse(JSON.stringify(context.data)),
    createdAt: new Date().toISOString(),
    status: "pending"
  };

  // Persist the handoff request
  return saveHandoff(handoff).then(function () {
    // Notify the team
    return sendSlackNotification({
      channel: "#" + step.config.team,
      template: "approval-request"
    }, context);
  }).then(function () {
    // If approval is required, pause workflow execution
    if (step.config.requireApproval) {
      context.status = "awaiting-approval";
      context.data.pendingHandoff = handoff.id;
      throw new WorkflowPausedError("Awaiting human approval: " + handoff.id);
    }
    return { handoffId: handoff.id };
  });
});

// Resume endpoint for approved workflows
function resumeWorkflow(engine, handoffId, approved, approverNotes) {
  return loadHandoff(handoffId).then(function (handoff) {
    if (!approved) {
      return updateHandoff(handoffId, { status: "rejected", notes: approverNotes });
    }

    return loadWorkflowContext(handoff.runId).then(function (context) {
      context.data.approvedBy = approverNotes.approver;
      context.data.approvalNotes = approverNotes.notes;
      context.status = "running";

      var definition = engine.workflows[context.workflowId + "@" + context.version];
      var step = definition.steps.find(function (s) { return s.id === handoff.stepId; });

      return engine.executeStep(definition, step.next, context);
    });
  });
}

Complete Working Example: Support Ticket Processor

Here is everything wired together as an Express application. This processes incoming support tickets end-to-end:

var express = require("express");
var bodyParser = require("body-parser");
var Anthropic = require("@anthropic-ai/sdk");
var cron = require("node-cron");

var app = express();
app.use(bodyParser.json());

var anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

// --- Workflow Engine (simplified for this example) ---

function WorkflowEngine() {
  this.executors = {};
  this.runs = {};
}

WorkflowEngine.prototype.registerExecutor = function (type, fn) {
  this.executors[type] = fn;
};

WorkflowEngine.prototype.processTicket = function (ticket) {
  var self = this;
  var runId = "run_" + Date.now();
  var context = {
    runId: runId,
    ticket: ticket,
    audit: [],
    startedAt: new Date().toISOString()
  };

  self.runs[runId] = context;
  self.audit(context, "workflow.started");

  return self.classify(context)
    .then(function () { return self.extract(context); })
    .then(function () { return self.route(context); })
    .then(function () { return self.notify(context); })
    .then(function () { return self.escalateIfNeeded(context); })
    .then(function () {
      self.audit(context, "workflow.completed");
      return context;
    })
    .catch(function (err) {
      self.audit(context, "workflow.failed", { error: err.message });
      throw err;
    });
};

WorkflowEngine.prototype.classify = function (context) {
  var self = this;
  var prompt = "Classify this support ticket into exactly one category.\n" +
    "Categories: billing, technical, account, feature-request, security\n\n" +
    "Subject: " + context.ticket.subject + "\n" +
    "Description: " + context.ticket.description + "\n\n" +
    "Respond with only the category name.";

  return anthropic.messages.create({
    model: "claude-haiku-4-20250414",
    max_tokens: 50,
    messages: [{ role: "user", content: prompt }]
  }).then(function (response) {
    context.category = response.content[0].text.trim().toLowerCase();
    self.audit(context, "step.classify", { category: context.category });
  });
};

WorkflowEngine.prototype.extract = function (context) {
  var self = this;
  var prompt = "Extract the following from this support ticket:\n" +
    "- urgency: critical, high, normal, or low\n" +
    "- product_area: which product or feature is affected\n" +
    "- customer_sentiment: frustrated, neutral, or satisfied\n" +
    "- action_requested: what the customer wants done\n\n" +
    "Subject: " + context.ticket.subject + "\n" +
    "Description: " + context.ticket.description + "\n\n" +
    "Respond with valid JSON only.";

  return anthropic.messages.create({
    model: "claude-haiku-4-20250414",
    max_tokens: 300,
    messages: [{ role: "user", content: prompt }]
  }).then(function (response) {
    var text = response.content[0].text;
    var jsonMatch = text.match(/\{[\s\S]*\}/);
    context.extracted = JSON.parse(jsonMatch[0]);
    self.audit(context, "step.extract", { extracted: context.extracted });
  });
};

WorkflowEngine.prototype.route = function (context) {
  var self = this;
  var teams = {
    billing: { queue: "billing-support", email: "[email protected]" },
    technical: { queue: "engineering", email: "[email protected]" },
    account: { queue: "account-mgmt", email: "[email protected]" },
    "feature-request": { queue: "product", email: "[email protected]" },
    security: { queue: "security-response", email: "[email protected]" }
  };

  var assignment = teams[context.category] || teams.technical;
  context.assignment = assignment;
  self.audit(context, "step.route", { assignment: assignment });

  return Promise.resolve();
};

WorkflowEngine.prototype.notify = function (context) {
  var self = this;

  // In production, these would be real API calls
  var notifications = [];

  // Slack notification to the assigned team
  notifications.push(
    self.sendSlack(
      "#" + context.assignment.queue,
      "New " + context.category + " ticket: " + context.ticket.subject +
      "\nUrgency: " + context.extracted.urgency +
      "\nSentiment: " + context.extracted.customer_sentiment
    )
  );

  // Email confirmation to customer
  notifications.push(
    self.sendEmail(
      context.ticket.email,
      "We received your request",
      "Your ticket has been assigned to our " + context.category +
      " team. Reference: " + context.ticket.id
    )
  );

  return Promise.all(notifications).then(function () {
    self.audit(context, "step.notify", { sent: notifications.length });
  });
};

WorkflowEngine.prototype.escalateIfNeeded = function (context) {
  var self = this;

  if (context.extracted.urgency !== "critical" && context.category !== "security") {
    return Promise.resolve();
  }

  var escalations = [];

  escalations.push(
    self.sendSlack("#oncall", "ESCALATION: " + context.ticket.subject +
      "\nCategory: " + context.category +
      "\nUrgency: " + context.extracted.urgency)
  );

  escalations.push(
    self.sendEmail(
      "[email protected]",
      "ESCALATED: " + context.ticket.subject,
      "Critical ticket requires immediate attention.\n\n" +
      JSON.stringify(context.extracted, null, 2)
    )
  );

  self.audit(context, "step.escalated", { reason: context.extracted.urgency });

  return Promise.all(escalations);
};

WorkflowEngine.prototype.sendSlack = function (channel, message) {
  console.log("[Slack -> " + channel + "] " + message);
  // Replace with actual Slack webhook call:
  // return axios.post(process.env.SLACK_WEBHOOK_URL, { channel, text: message });
  return Promise.resolve({ channel: channel, sent: true });
};

WorkflowEngine.prototype.sendEmail = function (to, subject, body) {
  console.log("[Email -> " + to + "] " + subject);
  // Replace with actual email send via nodemailer or SES
  return Promise.resolve({ to: to, subject: subject, sent: true });
};

WorkflowEngine.prototype.audit = function (context, event, details) {
  var entry = {
    timestamp: new Date().toISOString(),
    runId: context.runId,
    event: event,
    details: details || {}
  };
  context.audit.push(entry);
  console.log("[AUDIT]", JSON.stringify(entry));
};

// --- API Routes ---

var engine = new WorkflowEngine();

app.post("/api/tickets", function (req, res) {
  var ticket = {
    id: "TKT-" + Date.now(),
    subject: req.body.subject,
    description: req.body.description,
    email: req.body.email,
    createdAt: new Date().toISOString()
  };

  engine.processTicket(ticket)
    .then(function (context) {
      res.json({
        ticketId: ticket.id,
        category: context.category,
        assignment: context.assignment,
        extracted: context.extracted,
        runId: context.runId
      });
    })
    .catch(function (err) {
      console.error("Workflow failed:", err);
      res.status(500).json({ error: "Failed to process ticket", message: err.message });
    });
});

app.get("/api/tickets/:runId/audit", function (req, res) {
  var context = engine.runs[req.params.runId];
  if (!context) {
    return res.status(404).json({ error: "Run not found" });
  }
  res.json({ runId: req.params.runId, audit: context.audit });
});

// --- Scheduled SLA Check ---

cron.schedule("0 */4 * * *", function () {
  console.log("[Scheduler] Running SLA compliance check...");
  // Check for tickets older than 4 hours without a response
  // In production, query your database for unresolved tickets
});

var PORT = process.env.PORT || 3000;
app.listen(PORT, function () {
  console.log("Workflow agent listening on port " + PORT);
});

Send a test ticket:

curl -X POST http://localhost:3000/api/tickets \
  -H "Content-Type: application/json" \
  -d '{
    "subject": "Charged twice and dashboard broken",
    "description": "I was charged $49.99 twice on my credit card for the Pro plan. I tried to check my billing page but the dashboard keeps showing a 500 error. I need this resolved immediately, this is very frustrating.",
    "email": "[email protected]"
  }'

The response will show the ticket classified as "billing" (primary issue), urgency as "critical" (double charge plus system error), and negative sentiment. It will be routed to the billing queue and escalated due to urgency.

Common Issues and Troubleshooting

1. LLM returns invalid JSON from extraction steps

Error: LLM returned invalid JSON: The extracted fields are as follows:
urgency: "high"...

This happens when the LLM wraps its response in explanatory text instead of returning raw JSON. Fix it by adding "Respond with only valid JSON, no explanation or markdown" to your prompts, and always use the regex fallback result.match(/\{[\s\S]*\}/) to extract JSON from surrounding text.

2. Retry loop exhaustion on rate-limited API calls

Error: Step notify-oncall failed after 3 attempts: Request failed with status code 429

The default retry policy does not distinguish between retryable and permanent errors. Add rate-limit awareness:

function isRetryable(err) {
  if (err.response && err.response.status === 429) return true;
  if (err.response && err.response.status >= 500) return true;
  if (err.code === "ECONNRESET" || err.code === "ETIMEDOUT") return true;
  return false;
}

Modify executeWithRetry to check isRetryable(err) before retrying and to use the Retry-After header when present.

3. Parallel step failures swallowed silently

TypeError: Cannot read property 'urgency' of undefined

This happens when a parallel extraction step fails but Promise.all is wrapped in individual .catch() handlers that return { status: "failed" }. Downstream steps then try to read data that was never extracted. Add validation between stages:

// After parallel execution, verify required data exists
if (!context.data.extracted || !context.data.extracted.urgency) {
  throw new Error("Required extraction data missing. " +
    "Parallel results: " + JSON.stringify(context.data.parallelResults));
}

4. Workflow context mutation during parallel execution

Error: Assignment conflict: ticket assigned to both billing and technical

Multiple parallel steps writing to the same context properties create race conditions. Use scoped result objects instead of mutating shared context:

// Bad: both steps write to context.data.assignment
// Good: each step writes to its own namespace
context.data.parallelResults[stepId] = result;

5. Stale workflow version after hot reload

Error: Step "validate-sla" not found in workflow definition

This occurs when a workflow run started on version 1 but the engine was restarted and only version 2 (which renamed the step) is registered. Always persist the workflow version with the run context and load the matching definition when resuming.

Best Practices

  • Keep LLM calls behind a single abstraction. Wrap all LLM interactions in a callLLM function that handles retries, logging, token counting, and provider switching. When you need to change from Anthropic to OpenAI or add caching, you change one function.

  • Use the cheapest model that works. Classification and extraction tasks rarely need the most capable model. Start with Haiku or GPT-4o-mini and only upgrade if accuracy demands it. Your workflow might make 5-10 LLM calls per execution — cost per run matters at scale.

  • Make every step idempotent. If a workflow fails at step 4 of 6, you need to resume from step 4 without re-sending the Slack notification from step 3. Use idempotency keys and check for existing results before executing.

  • Log LLM inputs and outputs in their entirety. When an LLM makes a wrong classification, you need the exact prompt and response to diagnose why. Structured audit logs with full prompt text are essential for debugging and compliance.

  • Set timeouts on every external call. An LLM API call hanging for 60 seconds blocks the entire workflow. Set explicit timeouts (10-15 seconds for LLM calls, 5 seconds for webhooks) and let the retry mechanism handle transient failures.

  • Validate LLM outputs against a schema. Do not trust that the LLM returned the fields you asked for. Check that required fields exist, values are within expected ranges, and types match. Fail the step explicitly if validation fails rather than passing garbage downstream.

  • Version your prompts alongside your workflow definitions. A prompt change can completely alter classification behavior. Track prompt versions in your audit log so you can correlate behavior changes with prompt updates.

  • Test workflows with recorded fixtures. Capture real LLM responses as fixtures and replay them in tests. This gives you fast, deterministic tests without API calls while ensuring your workflow logic handles actual LLM output patterns.

  • Implement circuit breakers for external services. If Slack is down, do not let every workflow run fail on the notification step. Track consecutive failures per service and skip non-critical steps when a service is unhealthy.

References

Powered by Contentful