API Step
An API step is exposed as an HTTP endpoint that acts as an entry point into your sequence of steps, or flow. It allows external systems or clients to trigger and interact with your flows through a REST API interface. Like any Motia Step, an API Step can be configured to emit events or wait for events to occur.
Config
The following properties are specific to the API Step, in addition to the common step config.
<DescriptionTable type={{ path: { description: 'The HTTP path for the API endpoint', type: 'string', }, method: { description: 'The HTTP method for the API endpoint (GET, POST, PUT, DELETE, etc.)', type: 'string', }, bodySchema: { description: 'Schema for validating the request body. For TypeScript/JavaScript steps, it uses zod schemas. For Python steps, it uses Pydantic models.', type: 'object', }, responseSchema: { description: 'Mostly used for documentation, the expected output of an API endpoint. For TypeScript/JavaScript steps, it uses zod schemas. For Python steps, it uses Pydantic models or Dict Json Schema.', type: 'object', }, queryParams: { description: 'Mostly for documentation, the expected query params', type: 'array', }, middleware: { description: 'Optional middleware functions to run before the handler', type: 'array', }, }} />
Defining an API Step
<Tabs items={['TypeScript', 'JavaScript', 'Python']}>
export const config: ApiRouteConfig = {
type: 'api',
name: 'GetMessage',
description: 'Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint',
path: '/openai/:traceId',
method: 'GET',
emits: ['call-openai'],
flows: ['openai'],
responseSchema: {
// When response code is 200
200: z.object({ message: z.string({ description: 'The message from OpenAI' }) }),
// When response code is 400
400: z.object({ message: z.string({ description: 'The error message' }) })
},
queryParams: [
{
name: 'includeProps',
description: 'Whether to include the properties of the message',
},
],
}
export const handler: Handlers['GetMessage'] = async (req, { logger }) => {
logger.info('[Call OpenAI] Received callOpenAi event', req)
return {
status: 200,
body: { message: 'OpenAI response sent' },
}
}
```
export const config = {
type: 'api',
name: 'Get Message by Trace ID',
description: 'Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint',
path: '/openai/:traceId',
method: 'GET',
emits: ['call-openai'],
flows: ['openai'],
responseSchema: {
// When response code is 200
200: z.object({ message: z.string({ description: 'The message from OpenAI' }) }),
// When response code is 400
400: z.object({ message: z.string({ description: 'The error message' }) })
},
queryParams: [
{
name: 'includeProps',
description: 'Whether to include the properties of the message',
},
],
}
export const handler = async (req, { logger }) => {
logger.info('[Call OpenAI] Received callOpenAi event', req)
return {
status: 200,
body: { message: 'OpenAI response sent' },
}
}
```
# Define a Pydantic model for request body validation
class RequestBody(BaseModel):
message: str
config = {
"type": "api",
"name": "Get Message by Trace ID",
"description": "Retrieves a generated message from OpenAI based on the Trace ID returned by the POST /openai endpoint",
"path": "/openai/:traceId",
"method": "GET",
"emits": ["call-openai"],
"flows": ["openai"],
"responseSchema": {
"200": RequestBody.model_json_schema()
},
"queryParams": [
{
"name": "includeProps",
"description": "Whether to include the properties of the message",
},
],
}
async def handler(req, context):
context.logger.info("[Call OpenAI] Received callOpenAi event", {"body": req.get("body")})
return {
"status": 200,
"body": { "message": "OpenAI response sent" },
}
```
This should create an endpoint that can be viewed and requested from Workbench UI.

The following examples showcase how to configure an API Step
Using Middleware
API Steps support middleware functions that can be applied to requests before they reach your handler. Middleware functions are completely framework-agnostic and can perform tasks such as:
- Authentication and authorization
- Request logging
- Rate limiting
- CORS handling
- Request validation
- Response transformation
Middleware Function Signature
type ApiMiddleware = (req: ApiRequest, ctx: FlowContext, next: () => Promise<ApiResponse>) => Promise<ApiResponse>
Middleware functions receive:
req: The API request object with body, headers, pathParams, and queryParamsctx: The flow context with logger, state, emit, and traceIdnext: A function to call the next middleware or handler in the chain- Call
next()to continue to the next middleware or handler - The return value of
next()is the response from the next middleware or handler - You can modify this response before returning it
- Call
Example Middleware Usage
import { ApiMiddleware } from 'motia'
// Logging middleware
const loggingMiddleware: ApiMiddleware = async (req, ctx, next) => {
ctx.logger.info('Request received', { path: req.pathParams })
const start = Date.now()
// Call the next middleware and get its response
const response = await next()
const duration = Date.now() - start
ctx.logger.info('Request completed', { duration, status: response.status })
return response
}
// Authentication middleware
const authMiddleware: ApiMiddleware = async (req, ctx, next) => {
const authHeader = req.headers.authorization
if (!authHeader) {
// Return early without calling next()
return {
status: 401,
body: { error: 'Unauthorized' },
}
}
// Continue to the next middleware
return next()
}
export const config = {
type: 'api',
name: 'protected-endpoint',
path: '/api/protected',
method: 'POST',
emits: ['USER_ACTION'],
middleware: [loggingMiddleware, authMiddleware],
}
export const handler = async (req, ctx) => {
// This handler will only be called if all middleware pass
return {
status: 200,
body: { message: 'Protected data accessed successfully' },
}
}
Creating Custom Middleware
You can create your own middleware functions:
import { ApiMiddleware } from 'motia'
// Request modification middleware
const requestModifierMiddleware: ApiMiddleware = async (req, ctx, next) => {
// Modify the request before passing it to the next middleware
req.headers['x-modified-by'] = 'middleware'
req.body.timestamp = Date.now()
// Call the next middleware in the chain
return next()
}
// Response modification middleware
const responseModifierMiddleware: ApiMiddleware = async (req, ctx, next) => {
// Call the next middleware in the chain
const response = await next()
// Modify the response before returning it
response.headers = {
...response.headers,
'x-powered-by': 'Motia',
}
return response
}
// Error handling middleware
const errorHandlingMiddleware: ApiMiddleware = async (req, ctx, next) => {
try {
// Call the next middleware in the chain
return await next()
} catch (error) {
ctx.logger.error('Error in handler', { error })
return {
status: 500,
body: { error: 'Internal server error' },
}
}
}
// Rate limiter middleware with state
const rateLimiterMiddleware: ApiMiddleware = (() => {
// Closure to maintain state between requests
const requests: Record<string, number[]> = {}
const limit = 100
const windowMs = 60000 // 1 minute
return async (req, ctx, next) => {
const ip = req.headers['x-forwarded-for'] || 'unknown-ip'
const ipStr = Array.isArray(ip) ? ip[0] : ip
const now = Date.now()
if (!requests[ipStr]) {
requests[ipStr] = []
}
// Remove old requests outside the time window
requests[ipStr] = requests[ipStr].filter((time) => now - time < windowMs)
if (requests[ipStr].length >= limit) {
return {
status: 429,
body: { error: 'Too many requests, please try again later' },
}
}
// Add current request
requests[ipStr].push(now)
return next()
}
})()
<Tabs items={['TypeScript', 'JavaScript', 'Python']}>
export const config: ApiRouteConfig = {
type: 'api',
name: 'TestStateApiTrigger',
description: 'test state',
path: '/test-state',
method: 'POST',
emits: ['test-state'],
bodySchema: z.object({}),
flows: ['test-state'],
}
export const handler: Handlers['TestStateApiTrigger'] = async (req, { logger, emit }) => {
logger.info('[Test State] Received request', req)
await emit({
topic: 'test-state',
data: req.body
})
return {
status: 200,
body: { message: 'Success' },
}
}
```
exports.config = {
type: 'api',
name: 'Test state api trigger',
description: 'test state',
path: '/test-state',
method: 'POST',
emits: ['test-state'],
bodySchema: z.object({}),
flows: ['test-state'],
}
exports.handler = async (req, { logger, emit }) => {
logger.info('[Test State] Received request', req)
await emit({
topic: 'test-state',
data: req.body
})
return {
status: 200,
body: { message: 'Success' },
}
}
```
from pydantic import BaseModel
# Define a Pydantic model for request body validation
class RequestBody(BaseModel):
message: str
# Request modification middleware
async def request_modifier_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
# Modify the request before passing it to the next middleware
data['headers']['x-modified-by'] = 'middleware'
data['body']['timestamp'] = int(time.time() * 1000)
# Call the next middleware in the chain
return await next_fn()
# Response modification middleware
async def response_modifier_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
# Call the next middleware in the chain
response = await next_fn()
# Modify the response before returning it
response['headers'] = {
**response.get('headers', {}),
'x-powered-by': 'Motia'
}
return response
# Error handling middleware
async def error_handling_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
try:
# Call the next middleware in the chain
return await next_fn()
except Exception as error:
ctx.logger.error('Error in handler', {'error': str(error)})
return {
'status': 500,
'body': {'error': 'Internal server error'}
}
# Rate limiter middleware with state using a closure
def create_rate_limiter_middleware():
# Closure to maintain state between requests
requests: Dict[str, list] = {}
limit = 100
window_ms = 60000 # 1 minute
async def rate_limiter_middleware(data: Dict[str, Any], ctx: Any, next_fn: Callable):
ip = data['headers'].get('x-forwarded-for', ['unknown-ip'])
ip_str = ip[0] if isinstance(ip, list) else ip
now = int(time.time() * 1000)
if ip_str not in requests:
requests[ip_str] = []
# Remove old requests outside the time window
requests[ip_str] = [t for t in requests[ip_str] if now - t < window_ms]
if len(requests[ip_str]) >= limit:
return {
'status': 429,
'body': {'error': 'Too many requests, please try again later'}
}
# Add current request
requests[ip_str].append(now)
return await next_fn()
return rate_limiter_middleware
config = {
'type': 'api',
'name': 'Test state api trigger',
'description': 'test state',
'path': '/test-state',
'method': 'POST',
'emits': ['test-state'],
'flows': ['test-state'],
'bodySchema': RequestBody.model_json_schema(), # We use jsonschema to validate
'middleware': [
request_modifier_middleware,
response_modifier_middleware,
error_handling_middleware,
create_rate_limiter_middleware()
]
}
async def handler(req, context):
context.logger.info('[Test State] Received request', {'body': req.get("body")})
await context.emit({
'topic': 'test-state',
'data': req.body
})
return {
'status': 200,
'body': {'message': 'Success'}
}
```