Skip to main content

RBAC/ABAC Attributes in Postgres and Filters

Extend CODITECT DMS with user and policy tables for RBAC/ABAC enforcement.

User and Role Schema

-- Users (extend existing auth system)
CREATE TABLE users (
id TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
email TEXT NOT NULL,
business_unit TEXT,
desk TEXT,
facility TEXT
);

-- Roles
CREATE TABLE roles (
id TEXT PRIMARY KEY,
description TEXT NOT NULL
);

-- User-Role assignments
CREATE TABLE user_roles (
user_id TEXT NOT NULL REFERENCES users(id),
role_id TEXT NOT NULL REFERENCES roles(id),
PRIMARY KEY (user_id, role_id)
);

-- Permissions
CREATE TABLE permissions (
id TEXT PRIMARY KEY, -- "doc.read", "doc.view_phi", ...
description TEXT NOT NULL
);

-- Role-Permission grants
CREATE TABLE role_permissions (
role_id TEXT NOT NULL REFERENCES roles(id),
permission_id TEXT NOT NULL REFERENCES permissions(id),
PRIMARY KEY (role_id, permission_id)
);

ABAC in document_metadata

Resource attributes already exist in document_metadata:

AttributeColumn
Domaindomain
Typedocument_type
Classificationsecurity_class
Contains PHIcontains_phi
Contains Financialcontains_financial
Business Unitbusiness_unit
Deskdesk
Facilityfacility
Regulationsregulations

Authorization Flow

Load User Context

async def get_user_context(user_id: str) -> UserContext:
"""Load user with roles and permissions."""
user = await db.fetch_one(
"SELECT * FROM users WHERE id = $1", [user_id]
)

roles = await db.fetch_all("""
SELECT r.id, r.description
FROM roles r
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = $1
""", [user_id])

permissions = await db.fetch_all("""
SELECT DISTINCT p.id
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN user_roles ur ON rp.role_id = ur.role_id
WHERE ur.user_id = $1
""", [user_id])

return UserContext(
user_id=user_id,
roles=[r['id'] for r in roles],
permissions=[p['id'] for p in permissions],
business_unit=user['business_unit'],
desk=user['desk'],
facility=user['facility']
)

Check Document Access

async def check_access(user: UserContext, doc_id: UUID, action: str) -> bool:
"""Check if user can perform action on document."""
metadata = await db.fetch_one(
"SELECT * FROM document_metadata WHERE document_id = $1", [doc_id]
)

# Check base permission
if f"doc.{action}" not in user.permissions:
return False

# PHI documents require doc.view_phi and facility match
if metadata['contains_phi']:
if 'doc.view_phi' not in user.permissions:
return False
if user.facility != metadata['facility']:
return False

# Financial documents require doc.view_financial
if metadata['contains_financial']:
if 'doc.view_financial' not in user.permissions:
return False

# Desk-scoped documents (finance/trading)
if metadata['desk'] and metadata['domain'] == 'finance':
if user.desk != metadata['desk']:
return False

return True

Generate Meilisearch filters based on user context:

def build_access_filter(user: UserContext) -> str:
"""Build Meilisearch filter from user context."""
filters = []

# PHI access
if 'doc.view_phi' in user.permissions:
# Can see PHI in their facility, or non-PHI anywhere
filters.append(
f'(contains_phi = false OR facility = "{user.facility}")'
)
else:
# Cannot see any PHI
filters.append('contains_phi = false')

# Financial access
if 'doc.view_financial' not in user.permissions:
filters.append('contains_financial = false')

# Desk scoping for traders
if 'trader' in user.roles and user.desk:
filters.append(
f'(domain != "finance" OR desk = "{user.desk}" OR desk IS NULL)'
)

# Facility scoping for clinicians
if 'clinician' in user.roles and user.facility:
filters.append(
f'(domain != "clinical" OR facility = "{user.facility}" OR facility IS NULL)'
)

return ' AND '.join(filters) if filters else None

Integration with Search API

@app.post("/api/v1/search")
async def search_documents(request: SearchRequest, user: User = Depends(get_current_user)):
"""Search with access control filtering."""

# Get user context
context = await get_user_context(user.id)

# Build access filter
access_filter = build_access_filter(context)

# Combine with user filters
combined_filter = request.filter
if access_filter:
if combined_filter:
combined_filter = f"({combined_filter}) AND ({access_filter})"
else:
combined_filter = access_filter

# Search with combined filter
results = meilisearch_client.index('documents').search(
request.query,
{
'filter': combined_filter,
'limit': request.limit,
'offset': request.offset
}
)

return SearchResponse(
results=results['hits'],
total=results['estimatedTotalHits']
)

Example Scenarios

Clinician at Hospital-A

context = UserContext(
user_id='u001',
roles=['clinician'],
permissions=['doc.read', 'doc.view_phi'],
facility='Hospital-A'
)

# Generated filter:
# (contains_phi = false OR facility = "Hospital-A") AND
# contains_financial = false AND
# (domain != "clinical" OR facility = "Hospital-A" OR facility IS NULL)

Trader at Equities Desk

context = UserContext(
user_id='u002',
roles=['trader'],
permissions=['doc.read', 'doc.view_financial'],
desk='Equities'
)

# Generated filter:
# contains_phi = false AND
# (domain != "finance" OR desk = "Equities" OR desk IS NULL)

Privacy Officer

context = UserContext(
user_id='u003',
roles=['privacy_officer', 'compliance_officer'],
permissions=['doc.read', 'doc.view_phi', 'doc.approve', 'audit.view'],
facility='Hospital-A'
)

# Generated filter:
# (contains_phi = false OR facility = "Hospital-A") AND
# contains_financial = false

References