Skip to content

Email Auto-Labeler

This example shows how to build an automation that automatically categorizes and labels emails using AI. It demonstrates lifecycle hooks for real-time processing.

Overview

  • Features: oauth + automation
  • Trigger: Webhook (real-time) + Cron (fallback)
  • OAuth: Google (Gmail API)
  • Lifecycle hooks: onConnect (sets up Gmail watch), onWebhook (processes notifications)

Manifest

json
{
  "automation": {
    "triggers": [
      {
        "type": "webhook",
        "description": "Real-time processing via Gmail push notifications"
      },
      {
        "type": "cron",
        "default": "*/15 * * * *",
        "description": "Fallback check every 15 minutes"
      }
    ],
    "timeout": 120,
    "onWebhook": true
  },
  
  "oauth": {
    "provider": "google",
    "scopes": [
      "https://www.googleapis.com/auth/gmail.modify",
      "https://www.googleapis.com/auth/gmail.labels"
    ],
    "clientIdSecret": "GOOGLE_CLIENT_ID",
    "clientSecretSecret": "GOOGLE_CLIENT_SECRET"
  },
  
  "developerSecrets": {
    "GOOGLE_CLIENT_ID": { "required": true },
    "GOOGLE_CLIENT_SECRET": { "required": true },
    "GMAIL_PUBSUB_TOPIC": { 
      "description": "Google Cloud Pub/Sub topic for push notifications",
      "required": false 
    }
  }
}

Code

javascript
const GMAIL_API = 'https://gmail.googleapis.com/gmail/v1/users/me';

const LABELS = [
  'Urgent', 'Action Required', 'Awaiting Reply', 'Reference',
  'Newsletter', 'Social', 'Calendar', 'Travel',
  'Financial Records', 'Shipping', 'Medical', 'Family',
  'Recruiting', 'Cold Pitch', 'Marketing', 'Intro Request', 'Transactional'
];

let labelCache = {};

function getHeader(headers, name) {
  const header = headers.find(h => h.name.toLowerCase() === name.toLowerCase());
  return header?.value || '';
}

async function getOrCreateLabel(context, labelName) {
  if (labelCache[labelName]) return labelCache[labelName];
  
  const listResp = await context.oauth.fetch(`${GMAIL_API}/labels`);
  const labelsData = JSON.parse(listResp.body);
  
  const existing = labelsData.labels?.find(l => l.name === labelName);
  if (existing) {
    labelCache[labelName] = existing.id;
    return existing.id;
  }
  
  const createResp = await context.oauth.fetch(`${GMAIL_API}/labels`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      name: labelName,
      labelListVisibility: 'labelShow',
      messageListVisibility: 'show',
    }),
  });
  
  const newLabel = JSON.parse(createResp.body);
  labelCache[labelName] = newLabel.id;
  return newLabel.id;
}

async function processEmail(context, messageId) {
  const msgResp = await context.oauth.fetch(
    `${GMAIL_API}/messages/${messageId}?format=metadata&metadataHeaders=From&metadataHeaders=Subject`
  );
  
  if (!msgResp.ok) return null;
  
  const msgData = JSON.parse(msgResp.body);
  const headers = msgData.payload?.headers || [];
  const from = getHeader(headers, 'From');
  const subject = getHeader(headers, 'Subject');
  const snippet = msgData.snippet || '';
  
  // Skip if already labeled
  const existingLabels = msgData.labelIds || [];
  if (LABELS.some(l => existingLabels.includes(l))) return null;
  
  // Classify with AI
  const classification = await context.ai.analyze({
    prompt: `Classify this email into ONE category: ${LABELS.join(', ')}
             Return JSON: {"label": "CategoryName"}`,
    data: { from, subject, snippet: snippet.substring(0, 200) }
  });
  
  let labelName = classification?.label || 'Reference';
  if (!LABELS.includes(labelName)) labelName = 'Reference';
  
  const labelId = await getOrCreateLabel(context, labelName);
  
  await context.oauth.fetch(`${GMAIL_API}/messages/${messageId}/modify`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ addLabelIds: [labelId] }),
  });
  
  return { messageId, from, subject, label: labelName };
}

/**
 * onConnect - Called after OAuth connection
 * Sets up Gmail push notifications via Pub/Sub
 */
async function onConnect(input, context) {
  const topic = context.secrets.GMAIL_PUBSUB_TOPIC;
  if (!topic) {
    return { success: true, watch: false, reason: 'No Pub/Sub topic configured' };
  }
  
  const watchResp = await context.oauth.fetch(`${GMAIL_API}/watch`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      topicName: topic,
      labelIds: ['INBOX'],
    }),
  });
  
  if (!watchResp.ok) {
    throw new Error(`Gmail watch failed: ${watchResp.status}`);
  }
  
  await context.notify('Email Auto-Labeler is now active! New emails will be labeled automatically.', {
    title: 'Auto-Labeler Connected'
  });
  
  return { success: true, watch: true };
}

/**
 * onWebhook - Called when Pub/Sub pushes a notification
 */
async function onWebhook(input, context) {
  const payload = input.triggerData;
  
  // Decode Pub/Sub message
  let notification;
  if (payload.message?.data) {
    notification = JSON.parse(atob(payload.message.data));
  } else {
    notification = payload;
  }
  
  const { historyId, emailAddress } = notification;
  
  // Verify this is for the right user
  const connectionInfo = await context.oauth.getConnectionInfo();
  if (emailAddress && connectionInfo.email !== emailAddress) {
    return { success: true, skipped: true, reason: 'Different user' };
  }
  
  // Fetch history for new messages
  const historyResp = await context.oauth.fetch(
    `${GMAIL_API}/history?startHistoryId=${historyId}&historyTypes=messageAdded`
  );
  
  if (!historyResp.ok) {
    // History too old, process recent inbox
    const recentResp = await context.oauth.fetch(`${GMAIL_API}/messages?labelIds=INBOX&maxResults=5`);
    const recentData = JSON.parse(recentResp.body);
    
    let processed = 0;
    let urgentEmails = [];
    
    for (const msg of (recentData.messages || [])) {
      const result = await processEmail(context, msg.id);
      if (result) {
        processed++;
        if (result.label === 'Urgent') urgentEmails.push(result);
      }
    }
    
    if (urgentEmails.length > 0) {
      const urgentList = urgentEmails.map(e => `- **${e.subject}** from ${e.from}`).join('\n');
      await context.notify(`**${urgentEmails.length} urgent email(s):**\n\n${urgentList}`, {
        urgent: true, title: 'Urgent Emails'
      });
    }
    
    return { success: true, processed };
  }
  
  const historyData = JSON.parse(historyResp.body);
  let processed = 0;
  let urgentEmails = [];
  
  for (const record of (historyData.history || [])) {
    for (const added of (record.messagesAdded || [])) {
      const result = await processEmail(context, added.message.id);
      if (result) {
        processed++;
        if (result.label === 'Urgent') urgentEmails.push(result);
      }
    }
  }
  
  if (urgentEmails.length > 0) {
    const urgentList = urgentEmails.map(e => `- **${e.subject}** from ${e.from}`).join('\n');
    await context.notify(`**${urgentEmails.length} urgent email(s):**\n\n${urgentList}`, {
      urgent: true, title: 'Urgent Emails'
    });
  }
  
  return { success: true, processed };
}

/**
 * handler - Fallback for cron/manual triggers
 */
async function handler(input, context) {
  if (!await context.oauth.isConnected()) {
    return { success: false, reason: 'Gmail not connected' };
  }
  
  const sinceTime = input.lastRunAt || (Date.now() - 15 * 60 * 1000);
  const sinceSeconds = Math.floor(sinceTime / 1000);
  
  const searchResp = await context.oauth.fetch(
    `${GMAIL_API}/messages?q=after:${sinceSeconds}&maxResults=20`
  );
  
  const searchData = JSON.parse(searchResp.body);
  if (!searchData.messages) return { success: true, labeled: 0 };
  
  let labeledCount = 0;
  let urgentEmails = [];
  
  for (const msg of searchData.messages) {
    const result = await processEmail(context, msg.id);
    if (result) {
      labeledCount++;
      if (result.label === 'Urgent') urgentEmails.push(result);
    }
  }
  
  if (urgentEmails.length > 0) {
    const urgentList = urgentEmails.map(e => `- **${e.subject}** from ${e.from}`).join('\n');
    await context.notify(`**${urgentEmails.length} urgent email(s):**\n\n${urgentList}`, {
      urgent: true, title: 'Urgent Emails'
    });
  }
  
  return { success: true, labeled: labeledCount };
}

module.exports = { handler, onConnect, onWebhook };

Key Concepts

Lifecycle Hooks

This agent uses three handlers:

HandlerWhen it runsPurpose
onConnectAfter OAuthSets up Gmail push notifications
onWebhookWhen Pub/Sub pushesProcesses new emails in real-time
handlerCron scheduleFallback if webhooks miss something

Setting Up Gmail Watch

The onConnect handler calls Gmail's watch API:

javascript
await context.oauth.fetch(`${GMAIL_API}/watch`, {
  method: 'POST',
  body: JSON.stringify({
    topicName: context.secrets.GMAIL_PUBSUB_TOPIC,
    labelIds: ['INBOX'],
  }),
});

This tells Gmail to push notifications to your Pub/Sub topic.

Processing Webhook Payloads

Pub/Sub messages are base64 encoded:

javascript
const notification = JSON.parse(atob(payload.message.data));
const { historyId, emailAddress } = notification;

User Verification

Since the webhook URL is shared, verify the notification is for the right user:

javascript
const connectionInfo = await context.oauth.getConnectionInfo();
if (emailAddress && connectionInfo.email !== emailAddress) {
  return { skipped: true }; // Wrong user
}

Setup Requirements

To enable real-time notifications:

  1. Create a Google Cloud Pub/Sub topic
  2. Configure the topic to push to: https://your-domain/api/webhooks/plugin/{pluginId}
  3. Set GMAIL_PUBSUB_TOPIC in your agent secrets

Without Pub/Sub, the agent still works via the cron fallback (every 15 minutes).

Testing

  1. Install the automation from the Agents page
  2. Connect your Gmail account (triggers onConnect)
  3. Send yourself a test email
  4. Watch for real-time labeling (if Pub/Sub configured) or wait for cron

Built with VitePress