Enterprise Integration Patterns
Connecting Agentic AI to Enterprise Systems
Document ID: C6-ENTERPRISE-INTEGRATION
Version: 1.0
Category: Technical Deep Dive
Integration Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ AGENTIC AI LAYER │
│ (Orchestrator, Agents, Memory, Tools) │
└───────────────────────────┬─────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────┐
│ INTEGRATION PLATFORM │
│ (API Gateway, Event Bus, Transformation) │
└───────────────────────────┬─────────────────────────────┘
│
┌───────────┬───────────┼───────────┬───────────┐
▼ ▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│ CRM │ │ ERP │ │ ITSM │ │ HCM │ │ Custom│
│ │ │ │ │ │ │ │ │ Apps │
└───────┘ └───────┘ └───────┘ └───────┘ └───────┘
Salesforce Integration
Connection Patterns
Pattern 1: REST API Direct
import requests
from simple_salesforce import Salesforce
class SalesforceConnector:
def __init__(self, username, password, security_token):
self.sf = Salesforce(
username=username,
password=password,
security_token=security_token
)
def get_account(self, account_id):
return self.sf.Account.get(account_id)
def query_opportunities(self, account_id):
return self.sf.query(f"""
SELECT Id, Name, Amount, StageName, CloseDate
FROM Opportunity
WHERE AccountId = '{account_id}'
ORDER BY CloseDate DESC
""")
def create_case(self, account_id, subject, description):
return self.sf.Case.create({
'AccountId': account_id,
'Subject': subject,
'Description': description,
'Origin': 'AI Agent'
})
Pattern 2: Event-Driven (Platform Events)
from salesforce_streaming import SalesforceStreaming
class SalesforceEventListener:
def __init__(self, sf_client):
self.streaming = SalesforceStreaming(sf_client)
def subscribe_to_events(self, channel, callback):
"""Subscribe to Salesforce Platform Events"""
self.streaming.subscribe(channel, callback)
def on_opportunity_change(self, event):
"""Handle opportunity stage changes"""
opp_id = event['payload']['OpportunityId__c']
new_stage = event['payload']['NewStage__c']
# Trigger agentic workflow
if new_stage == 'Proposal/Price Quote':
return AgentTask(
type='generate_proposal',
context={'opportunity_id': opp_id}
)
Agent Tools for Salesforce
SALESFORCE_TOOLS = [
{
"name": "salesforce_query",
"description": "Query Salesforce objects using SOQL",
"parameters": {
"query": "SOQL query string"
}
},
{
"name": "salesforce_get_record",
"description": "Get a specific Salesforce record",
"parameters": {
"object_type": "Account, Contact, Opportunity, etc.",
"record_id": "Salesforce record ID"
}
},
{
"name": "salesforce_create_record",
"description": "Create a new Salesforce record",
"parameters": {
"object_type": "Object type to create",
"fields": "Field values as key-value pairs"
}
},
{
"name": "salesforce_update_record",
"description": "Update an existing Salesforce record",
"parameters": {
"object_type": "Object type",
"record_id": "Record ID to update",
"fields": "Fields to update"
}
}
]
SAP Integration
Connection Methods
RFC/BAPI via PyRFC
from pyrfc import Connection
class SAPConnector:
def __init__(self, config):
self.conn = Connection(**config)
def get_customer(self, customer_id):
"""Get customer master data"""
result = self.conn.call('BAPI_CUSTOMER_GETDETAIL2',
CUSTOMERNO=customer_id
)
return result
def get_sales_orders(self, customer_id, date_from, date_to):
"""Get sales orders for customer"""
result = self.conn.call('BAPI_SALESORDER_GETLIST',
CUSTOMER_NUMBER=customer_id,
SALES_ORGANIZATION='1000',
DOCUMENT_DATE_LOW=date_from,
DOCUMENT_DATE_HIGH=date_to
)
return result['SALES_ORDERS']
def create_sales_order(self, order_data):
"""Create new sales order"""
result = self.conn.call('BAPI_SALESORDER_CREATEFROMDAT2',
ORDER_HEADER_IN=order_data['header'],
ORDER_ITEMS_IN=order_data['items'],
ORDER_PARTNERS=order_data['partners']
)
# Commit if successful
if not result.get('RETURN', [{}])[0].get('TYPE') == 'E':
self.conn.call('BAPI_TRANSACTION_COMMIT', WAIT='X')
return result
SAP OData API
import requests
class SAPODataConnector:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.auth = (username, password)
self.headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def get_entity(self, entity_set, entity_id):
url = f"{self.base_url}/{entity_set}('{entity_id}')"
response = requests.get(url, auth=self.auth, headers=self.headers)
return response.json()
def query_entity(self, entity_set, filters=None, select=None, top=100):
url = f"{self.base_url}/{entity_set}"
params = {'$top': top}
if filters:
params['$filter'] = filters
if select:
params['$select'] = select
response = requests.get(url, auth=self.auth, headers=self.headers, params=params)
return response.json()['d']['results']
ServiceNow Integration
REST API Connection
import requests
from typing import Dict, List, Optional
class ServiceNowConnector:
def __init__(self, instance, username, password):
self.base_url = f"https://{instance}.service-now.com/api/now"
self.auth = (username, password)
self.headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def get_incident(self, incident_number: str) -> Dict:
"""Get incident by number"""
url = f"{self.base_url}/table/incident"
params = {'sysparm_query': f'number={incident_number}'}
response = requests.get(url, auth=self.auth, headers=self.headers, params=params)
results = response.json()['result']
return results[0] if results else None
def create_incident(self, short_description: str, description: str,
caller_id: str, category: str, priority: int = 3) -> Dict:
"""Create new incident"""
url = f"{self.base_url}/table/incident"
payload = {
'short_description': short_description,
'description': description,
'caller_id': caller_id,
'category': category,
'priority': priority,
'contact_type': 'AI Agent'
}
response = requests.post(url, auth=self.auth, headers=self.headers, json=payload)
return response.json()['result']
def update_incident(self, sys_id: str, updates: Dict) -> Dict:
"""Update existing incident"""
url = f"{self.base_url}/table/incident/{sys_id}"
response = requests.patch(url, auth=self.auth, headers=self.headers, json=updates)
return response.json()['result']
def search_knowledge(self, query: str, limit: int = 5) -> List[Dict]:
"""Search knowledge base"""
url = f"{self.base_url}/table/kb_knowledge"
params = {
'sysparm_query': f'textLIKE{query}^active=true',
'sysparm_limit': limit,
'sysparm_fields': 'number,short_description,text,sys_id'
}
response = requests.get(url, auth=self.auth, headers=self.headers, params=params)
return response.json()['result']
ServiceNow Flow Designer Integration
// ServiceNow Flow Designer - Trigger from AI Agent
// Subflow: AI_Agent_Handler
(function execute(inputs, outputs) {
var agentRequest = JSON.parse(inputs.agent_payload);
// Route based on intent
switch(agentRequest.intent) {
case 'create_incident':
outputs.action = 'create_incident';
outputs.data = agentRequest.data;
break;
case 'lookup_user':
outputs.action = 'lookup_user';
outputs.data = agentRequest.data;
break;
case 'kb_search':
outputs.action = 'kb_search';
outputs.data = agentRequest.data;
break;
}
})(inputs, outputs);
Workday Integration
REST API Connection
import requests
from requests_oauthlib import OAuth2Session
class WorkdayConnector:
def __init__(self, tenant, client_id, client_secret, refresh_token):
self.base_url = f"https://wd2-impl-services1.workday.com/ccx/api/v1/{tenant}"
self.token_url = f"https://wd2-impl-services1.workday.com/ccx/oauth2/{tenant}/token"
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self._refresh_access_token()
def _refresh_access_token(self):
"""Refresh OAuth token"""
response = requests.post(self.token_url, data={
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
})
self.access_token = response.json()['access_token']
self.headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
def get_worker(self, worker_id: str) -> Dict:
"""Get worker details"""
url = f"{self.base_url}/workers/{worker_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def search_workers(self, search_criteria: Dict) -> List[Dict]:
"""Search for workers"""
url = f"{self.base_url}/workers"
response = requests.get(url, headers=self.headers, params=search_criteria)
return response.json()['data']
def get_time_off_balance(self, worker_id: str) -> Dict:
"""Get time off balance for worker"""
url = f"{self.base_url}/workers/{worker_id}/timeOffBalance"
response = requests.get(url, headers=self.headers)
return response.json()
Integration Best Practices
Error Handling
class EnterpriseIntegrationError(Exception):
"""Base exception for enterprise integrations"""
pass
class RateLimitError(EnterpriseIntegrationError):
"""API rate limit exceeded"""
pass
class AuthenticationError(EnterpriseIntegrationError):
"""Authentication failed"""
pass
def with_retry(max_retries=3, backoff_factor=2):
"""Decorator for retry logic"""
def decorator(func):
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except RateLimitError:
if attempt < max_retries - 1:
await asyncio.sleep(backoff_factor ** attempt)
else:
raise
except AuthenticationError:
# Attempt token refresh once
if attempt == 0:
await refresh_token()
else:
raise
return wrapper
return decorator
Connection Pooling
from aiohttp import ClientSession, TCPConnector
class EnterpriseConnectionPool:
def __init__(self, max_connections=100):
self.connector = TCPConnector(limit=max_connections)
self.session = None
async def get_session(self) -> ClientSession:
if self.session is None or self.session.closed:
self.session = ClientSession(connector=self.connector)
return self.session
async def close(self):
if self.session:
await self.session.close()
Audit Logging
import logging
from datetime import datetime
class IntegrationAuditLogger:
def __init__(self):
self.logger = logging.getLogger('enterprise_integration')
def log_api_call(self, system: str, operation: str,
request: Dict, response: Dict, duration_ms: float):
self.logger.info({
'timestamp': datetime.utcnow().isoformat(),
'system': system,
'operation': operation,
'request_summary': self._summarize(request),
'response_status': response.get('status'),
'duration_ms': duration_ms,
'agent_id': get_current_agent_id()
})
def _summarize(self, data: Dict) -> Dict:
"""Summarize without sensitive data"""
# Remove sensitive fields
return {k: v for k, v in data.items()
if k not in ['password', 'token', 'ssn', 'credit_card']}
Quick Reference: Connector Matrix
| System | Auth Method | Rate Limits | Best For |
|---|---|---|---|
| Salesforce | OAuth 2.0 | 100K/day | CRM operations |
| SAP | Basic/OAuth | Varies | ERP transactions |
| ServiceNow | Basic/OAuth | 10K/hour | IT service mgmt |
| Workday | OAuth 2.0 | Varies | HR operations |
| Microsoft 365 | OAuth 2.0 | 10K/10min | Productivity |
Document maintained by CODITECT Integration Team