Skip to main content

API Step

An API step is exposed as an HTTP endpoint that acts as an entry point into your sequence of steps, or flow. It allows external systems or clients to trigger and interact with your flows through a REST API interface. Like any Motia Step, an API Step can be configured to emit events or wait for events to occur.

Config

The following properties are specific to the API Step, in addition to the common step config.

<DescriptionTable type={{ path: { description: 'The HTTP path for the API endpoint', type: 'string', }, method: { description: 'The HTTP method for the API endpoint (GET, POST, PUT, DELETE, etc.)', type: 'string', }, bodySchema: { description: 'Schema for validating the request body. For TypeScript/JavaScript steps, it uses zod schemas. For Python steps, it uses Pydantic models.', type: 'object', }, responseSchema: { description: 'Mostly used for documentation, the expected output of an API endpoint. For TypeScript/JavaScript steps, it uses zod schemas. For Python steps, it uses Pydantic models or Dict Json Schema.', type: 'object', }, queryParams: { description: 'Mostly for documentation, the expected query params', type: 'array', }, middleware: { description: 'Optional middleware functions to run before the handler', type: 'array', }, }} />

Defining an API Step

<Tabs items={['TypeScript', 'JavaScript', 'Python']}> ```typescript import { ApiRouteConfig, Handlers } from 'motia' import { z } from 'zod'

export const config: ApiRouteConfig = {
type: 'api',
name: 'GetMessage',
description: 'Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint',
path: '/openai/:traceId',
method: 'GET',
emits: ['call-openai'],
flows: ['openai'],
responseSchema: {
// When response code is 200
200: z.object({ message: z.string({ description: 'The message from OpenAI' }) }),
// When response code is 400
400: z.object({ message: z.string({ description: 'The error message' }) })
},
queryParams: [
{
name: 'includeProps',
description: 'Whether to include the properties of the message',
},
],
}

export const handler: Handlers['GetMessage'] = async (req, { logger }) => {
logger.info('[Call OpenAI] Received callOpenAi event', req)

return {
status: 200,
body: { message: 'OpenAI response sent' },
}
}
```
```typescript const { z } = require('zod')
export const config = {
type: 'api',
name: 'Get Message by Trace ID',
description: 'Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint',
path: '/openai/:traceId',
method: 'GET',
emits: ['call-openai'],
flows: ['openai'],
responseSchema: {
// When response code is 200
200: z.object({ message: z.string({ description: 'The message from OpenAI' }) }),
// When response code is 400
400: z.object({ message: z.string({ description: 'The error message' }) })
},
queryParams: [
{
name: 'includeProps',
description: 'Whether to include the properties of the message',
},
],
}

export const handler = async (req, { logger }) => {
logger.info('[Call OpenAI] Received callOpenAi event', req)

return {
status: 200,
body: { message: 'OpenAI response sent' },
}
}
```
```python from pydantic import BaseModel
# Define a Pydantic model for request body validation
class RequestBody(BaseModel):
message: str

config = {
"type": "api",
"name": "Get Message by Trace ID",
"description": "Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint",
"path": "/openai/:traceId",
"method": "GET",
"emits": ["call-openai"],
"flows": ["openai"],
"responseSchema": {
"200": RequestBody.model_json_schema()
},
"queryParams": [
{
"name": "includeProps",
"description": "Whether to include the properties of the message",
},
],
}

async def handler(req, context):
context.logger.info("[Call OpenAI] Received callOpenAi event", {"body": req.get("body")})

return {
"status": 200,
"body": { "message": "OpenAI response sent" },
}
```

This should create an endpoint that can be viewed and requested from Workbench UI.

Endpoint Visualization in Workbench

The following examples showcase how to configure an API Step

Using Middleware

API Steps support middleware functions that can be applied to requests before they reach your handler. Middleware functions are completely framework-agnostic and can perform tasks such as:

  • Authentication and authorization
  • Request logging
  • Rate limiting
  • CORS handling
  • Request validation
  • Response transformation

Middleware Function Signature

type ApiMiddleware = (req: ApiRequest, ctx: FlowContext, next: () => Promise<ApiResponse>) => Promise<ApiResponse>

Middleware functions receive:

  • req: The API request object with body, headers, pathParams, and queryParams
  • ctx: The flow context with logger, state, emit, and traceId
  • next: A function to call the next middleware or handler in the chain
    • Call next() to continue to the next middleware or handler
    • The return value of next() is the response from the next middleware or handler
    • You can modify this response before returning it

Example Middleware Usage

import { ApiMiddleware } from 'motia'

// Logging middleware
const loggingMiddleware: ApiMiddleware = async (req, ctx, next) => {
ctx.logger.info('Request received', { path: req.pathParams })
const start = Date.now()

// Call the next middleware and get its response
const response = await next()

const duration = Date.now() - start
ctx.logger.info('Request completed', { duration, status: response.status })

return response
}

// Authentication middleware
const authMiddleware: ApiMiddleware = async (req, ctx, next) => {
const authHeader = req.headers.authorization

if (!authHeader) {
// Return early without calling next()
return {
status: 401,
body: { error: 'Unauthorized' },
}
}

// Continue to the next middleware
return next()
}

export const config = {
type: 'api',
name: 'protected-endpoint',
path: '/api/protected',
method: 'POST',
emits: ['USER_ACTION'],
middleware: [loggingMiddleware, authMiddleware],
}

export const handler = async (req, ctx) => {
// This handler will only be called if all middleware pass
return {
status: 200,
body: { message: 'Protected data accessed successfully' },
}
}

Creating Custom Middleware

You can create your own middleware functions:

import { ApiMiddleware } from 'motia'

// Request modification middleware
const requestModifierMiddleware: ApiMiddleware = async (req, ctx, next) => {
// Modify the request before passing it to the next middleware
req.headers['x-modified-by'] = 'middleware'
req.body.timestamp = Date.now()

// Call the next middleware in the chain
return next()
}

// Response modification middleware
const responseModifierMiddleware: ApiMiddleware = async (req, ctx, next) => {
// Call the next middleware in the chain
const response = await next()

// Modify the response before returning it
response.headers = {
...response.headers,
'x-powered-by': 'Motia',
}

return response
}

// Error handling middleware
const errorHandlingMiddleware: ApiMiddleware = async (req, ctx, next) => {
try {
// Call the next middleware in the chain
return await next()
} catch (error) {
ctx.logger.error('Error in handler', { error })
return {
status: 500,
body: { error: 'Internal server error' },
}
}
}

// Rate limiter middleware with state
const rateLimiterMiddleware: ApiMiddleware = (() => {
// Closure to maintain state between requests
const requests: Record<string, number[]> = {}
const limit = 100
const windowMs = 60000 // 1 minute

return async (req, ctx, next) => {
const ip = req.headers['x-forwarded-for'] || 'unknown-ip'
const ipStr = Array.isArray(ip) ? ip[0] : ip

const now = Date.now()
if (!requests[ipStr]) {
requests[ipStr] = []
}

// Remove old requests outside the time window
requests[ipStr] = requests[ipStr].filter((time) => now - time < windowMs)

if (requests[ipStr].length >= limit) {
return {
status: 429,
body: { error: 'Too many requests, please try again later' },
}
}

// Add current request
requests[ipStr].push(now)

return next()
}
})()

<Tabs items={['TypeScript', 'JavaScript', 'Python']}> ```typescript import { ApiRouteConfig, Handlers } from 'motia' import { z } from 'zod'

  export const config: ApiRouteConfig = {
type: 'api',
name: 'TestStateApiTrigger',
description: 'test state',
path: '/test-state',
method: 'POST',
emits: ['test-state'],
bodySchema: z.object({}),
flows: ['test-state'],
}

export const handler: Handlers['TestStateApiTrigger'] = async (req, { logger, emit }) => {
logger.info('[Test State] Received request', req)

await emit({
topic: 'test-state',
data: req.body
})

return {
status: 200,
body: { message: 'Success' },
}
}
```
```javascript const { z } = require('zod')
exports.config = {
type: 'api',
name: 'Test state api trigger',
description: 'test state',
path: '/test-state',
method: 'POST',
emits: ['test-state'],
bodySchema: z.object({}),
flows: ['test-state'],
}

exports.handler = async (req, { logger, emit }) => {
logger.info('[Test State] Received request', req)

await emit({
topic: 'test-state',
data: req.body
})

return {
status: 200,
body: { message: 'Success' },
}
}
```
```python from typing import Any, Dict, Callable from motia import ApiMiddleware from datetime import datetime import time
from pydantic import BaseModel

# Define a Pydantic model for request body validation
class RequestBody(BaseModel):
message: str

# Request modification middleware
async def request_modifier_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
# Modify the request before passing it to the next middleware
data['headers']['x-modified-by'] = 'middleware'
data['body']['timestamp'] = int(time.time() * 1000)

# Call the next middleware in the chain
return await next_fn()

# Response modification middleware
async def response_modifier_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
# Call the next middleware in the chain
response = await next_fn()

# Modify the response before returning it
response['headers'] = {
**response.get('headers', {}),
'x-powered-by': 'Motia'
}

return response

# Error handling middleware
async def error_handling_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
try:
# Call the next middleware in the chain
return await next_fn()
except Exception as error:
ctx.logger.error('Error in handler', {'error': str(error)})
return {
'status': 500,
'body': {'error': 'Internal server error'}
}

# Rate limiter middleware with state using a closure
def create_rate_limiter_middleware():
# Closure to maintain state between requests
requests: Dict[str, list] = {}
limit = 100
window_ms = 60000 # 1 minute

async def rate_limiter_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
ip = data['headers'].get('x-forwarded-for', ['unknown-ip'])
ip_str = ip[0] if isinstance(ip, list) else ip

now = int(time.time() * 1000)
if ip_str not in requests:
requests[ip_str] = []

# Remove old requests outside the time window
requests[ip_str] = [t for t in requests[ip_str] if now - t < window_ms]

if len(requests[ip_str]) >= limit:
return {
'status': 429,
'body': {'error': 'Too many requests, please try again later'}
}

# Add current request
requests[ip_str].append(now)

return await next_fn()

return rate_limiter_middleware

config = {
'type': 'api',
'name': 'Test state api trigger',
'description': 'test state',
'path': '/test-state',
'method': 'POST',
'emits': ['test-state'],
'flows': ['test-state'],
'bodySchema': RequestBody.model_json_schema(), # We use jsonschema to validate
'middleware': [
request_modifier_middleware,
response_modifier_middleware,
error_handling_middleware,
create_rate_limiter_middleware()
]
}

async def handler(req, context):
context.logger.info('[Test State] Received request', {'body': req.get("body")})

await context.emit({
'topic': 'test-state',
'data': req.body
})

return {
'status': 200,
'body': {'message': 'Success'}
}
```