Implement fine-grained access control with role-based authorization, dynamic permissions, and AI-powered policy enforcement.
# RBAC Configuration for MCPCodex
# rbac.config.yaml
rbac:
# Role Definitions
roles:
# Super Administrator
admin:
name: "Administrator"
description: "Full system access"
permissions:
- "*:*" # All permissions
inherits: []
# Developer Role
developer:
name: "Developer"
description: "AI development and model management"
permissions:
- "models:read"
- "models:write"
- "contexts:read"
- "contexts:write"
- "deployments:read"
- "deployments:write"
- "logs:read"
- "metrics:read"
inherits: ["viewer"]
# AI Operator Role
operator:
name: "AI Operator"
description: "Production operations and monitoring"
permissions:
- "models:read"
- "deployments:read"
- "deployments:scale"
- "monitoring:read"
- "monitoring:write"
- "alerts:manage"
- "logs:read"
- "logs:export"
inherits: ["viewer"]
# Data Scientist Role
data_scientist:
name: "Data Scientist"
description: "Model training and experimentation"
permissions:
- "models:read"
- "models:train"
- "datasets:read"
- "datasets:write"
- "experiments:read"
- "experiments:write"
- "notebooks:read"
- "notebooks:write"
inherits: ["viewer"]
# Business User Role
business_user:
name: "Business User"
description: "AI application usage"
permissions:
- "apps:read"
- "apps:use"
- "reports:read"
- "dashboards:read"
inherits: ["viewer"]
# Viewer Role (Base)
viewer:
name: "Viewer"
description: "Read-only access"
permissions:
- "profile:read"
- "profile:write"
- "docs:read"
inherits: []
# Resource-Based Permissions
resources:
models:
actions: ["read", "write", "train", "deploy", "delete"]
attributes:
- "owner"
- "team"
- "visibility"
contexts:
actions: ["read", "write", "share", "delete"]
attributes:
- "owner"
- "sensitivity"
- "team"
deployments:
actions: ["read", "write", "scale", "rollback", "delete"]
attributes:
- "environment"
- "owner"
- "team"
datasets:
actions: ["read", "write", "share", "delete"]
attributes:
- "classification"
- "owner"
- "team"
# Attribute-Based Access Control (ABAC)
policies:
# Environment-based access
production_access:
effect: "allow"
condition: |
user.role in ["admin", "operator"] AND
resource.environment == "production"
# Data sensitivity policy
sensitive_data_access:
effect: "allow"
condition: |
user.clearance_level >= resource.classification_level AND
user.team == resource.team
# Model ownership policy
model_ownership:
effect: "allow"
condition: |
user.id == resource.owner OR
user.team == resource.team OR
resource.visibility == "public"
# Time-based access
business_hours_only:
effect: "allow"
condition: |
current_time >= "09:00" AND
current_time <= "17:00" AND
day_of_week in ["monday", "tuesday", "wednesday", "thursday", "friday"]
# Dynamic Role Assignment
dynamic_roles:
project_lead:
condition: "user.projects.*.role == 'lead'"
temporary_permissions:
- "projects:manage"
- "teams:manage"
- "deployments:approve"
expires_after: "30d"
emergency_responder:
condition: "incident.severity == 'critical'"
temporary_permissions:
- "systems:emergency_access"
- "logs:full_access"
- "configs:emergency_change"
expires_after: "4h"
auto_revoke: true
// Authorization Service Implementation
// authorization.service.ts
import { User, Resource, Permission, Role } from '@mcpcodex/types';
import { PolicyEngine } from '@mcpcodex/policy-engine';
export class AuthorizationService {
private policyEngine: PolicyEngine;
private roleCache: Map<string, Role[]> = new Map();
constructor() {
this.policyEngine = new PolicyEngine();
}
// Check if user has permission for a specific action on a resource
async hasPermission(
user: User,
action: string,
resource?: Resource
): Promise<boolean> {
try {
// Get user roles (with caching)
const roles = await this.getUserRoles(user.id);
// Check direct permissions
const directPermissions = this.getDirectPermissions(roles, action);
if (directPermissions.length > 0) {
return await this.evaluatePermissions(directPermissions, user, resource);
}
// Check policy-based permissions (ABAC)
return await this.evaluatePolicies(user, action, resource);
} catch (error) {
console.error('Permission check failed:', error);
return false; // Fail closed
}
}
// Role-Based Access Control (RBAC)
async getUserRoles(userId: string): Promise<Role[]> {
// Check cache first
if (this.roleCache.has(userId)) {
return this.roleCache.get(userId)!;
}
// Fetch from database
const roles = await this.fetchUserRoles(userId);
// Add inherited roles
const allRoles = await this.expandInheritedRoles(roles);
// Cache for 5 minutes
this.roleCache.set(userId, allRoles);
setTimeout(() => this.roleCache.delete(userId), 300000);
return allRoles;
}
// Attribute-Based Access Control (ABAC)
async evaluatePolicies(
user: User,
action: string,
resource?: Resource
): Promise<boolean> {
const policies = await this.getApplicablePolicies(action, resource);
for (const policy of policies) {
const result = await this.policyEngine.evaluate(policy, {
user,
resource,
action,
context: this.getRequestContext()
});
if (result.effect === 'deny') {
return false; // Explicit deny
}
if (result.effect === 'allow') {
return true; // Explicit allow
}
}
return false; // Default deny
}
// Dynamic Role Assignment
async assignDynamicRole(
userId: string,
roleId: string,
condition: string,
duration?: number
): Promise<void> {
const user = await this.getUser(userId);
const conditionMet = await this.policyEngine.evaluateCondition(condition, {
user,
context: this.getRequestContext()
});
if (!conditionMet) {
throw new Error('Dynamic role condition not met');
}
// Assign temporary role
await this.assignTemporaryRole(userId, roleId, duration);
// Set up auto-revocation if specified
if (duration) {
setTimeout(async () => {
await this.revokeTemporaryRole(userId, roleId);
}, duration);
}
}
// Permission Inheritance and Resolution
private async expandInheritedRoles(roles: Role[]): Promise<Role[]> {
const expandedRoles = new Set<Role>();
const processed = new Set<string>();
const processRole = async (role: Role) => {
if (processed.has(role.id)) return;
expandedRoles.add(role);
processed.add(role.id);
// Process inherited roles
for (const inheritedRoleId of role.inherits || []) {
const inheritedRole = await this.getRole(inheritedRoleId);
if (inheritedRole) {
await processRole(inheritedRole);
}
}
};
for (const role of roles) {
await processRole(role);
}
return Array.from(expandedRoles);
}
// Resource-Based Authorization
async authorizeResourceAccess(
user: User,
resource: Resource,
action: string
): Promise<boolean> {
// Check ownership
if (resource.owner === user.id) {
return true;
}
// Check team membership
if (resource.team && user.teams?.includes(resource.team)) {
const hasTeamPermission = await this.hasPermission(user, action);
if (hasTeamPermission) return true;
}
// Check visibility rules
if (resource.visibility === 'public' && action === 'read') {
return true;
}
// Check shared access
if (resource.sharedWith?.includes(user.id)) {
return true;
}
return false;
}
// Audit and Logging
async logAuthorizationEvent(
user: User,
action: string,
resource?: Resource,
result: boolean = false,
reason?: string
): Promise<void> {
const auditEvent = {
timestamp: new Date(),
userId: user.id,
userEmail: user.email,
action,
resource: resource ? {
type: resource.type,
id: resource.id,
owner: resource.owner
} : null,
result: result ? 'allowed' : 'denied',
reason,
ip: this.getClientIP(),
userAgent: this.getUserAgent()
};
await this.auditLogger.log(auditEvent);
// Alert on suspicious activity
if (!result && this.isSuspiciousActivity(user, action)) {
await this.securityAlerter.alert('suspicious_access_attempt', auditEvent);
}
}
private getDirectPermissions(roles: Role[], action: string): Permission[] {
return roles
.flatMap(role => role.permissions || [])
.filter(permission => this.matchesPermission(permission, action));
}
private matchesPermission(permission: string, action: string): boolean {
// Handle wildcard permissions
if (permission === '*:*') return true;
if (permission.endsWith(':*')) {
const resource = permission.split(':')[0];
return action.startsWith(resource + ':');
}
return permission === action;
}
}
// Dynamic Permission System
// dynamic-permissions.ts
import { User, Permission, Context } from '@mcpcodex/types';
export class DynamicPermissionSystem {
private permissionCache: Map<string, Permission[]> = new Map();
// Context-aware permission resolution
async resolvePermissions(
user: User,
context: Context
): Promise<Permission[]> {
const cacheKey = `${user.id}-${context.environment}-${context.timestamp}`;
if (this.permissionCache.has(cacheKey)) {
return this.permissionCache.get(cacheKey)!;
}
const permissions = await this.calculatePermissions(user, context);
// Cache for 10 minutes
this.permissionCache.set(cacheKey, permissions);
setTimeout(() => this.permissionCache.delete(cacheKey), 600000);
return permissions;
}
// AI-powered permission recommendations
async recommendPermissions(
user: User,
requestedAction: string,
resource?: any
): Promise<{
recommended: Permission[];
reasoning: string;
confidence: number;
}> {
const userBehavior = await this.analyzeUserBehavior(user.id);
const similarUsers = await this.findSimilarUsers(user);
const actionContext = await this.analyzeActionContext(requestedAction);
return {
recommended: await this.aiModel.recommend({
user,
behavior: userBehavior,
similarUsers,
actionContext,
resource
}),
reasoning: "Based on role patterns and usage history",
confidence: 0.87
};
}
// Temporal permissions
async grantTemporaryPermission(
userId: string,
permission: Permission,
duration: number,
reason: string
): Promise<void> {
const tempPermission = {
...permission,
temporary: true,
expiresAt: Date.now() + duration,
grantReason: reason
};
await this.grantPermission(userId, tempPermission);
// Auto-revoke
setTimeout(async () => {
await this.revokePermission(userId, permission.id);
}, duration);
}
}
Implement robust authorization with AI-powered policy enforcement.