Files
spidy-look-at-it/02-multiple-dashboards.md
Kulvir Singh b22477ef13 WOW
2025-12-22 20:40:55 +05:30

18 KiB

Requirements

  • team has its own dashboard tab in the UI
  • COIs for a team are only visible to that team members and super admins.
  • team can have multiple checklists
  • separate email agents
  • user can belong to multiple teams

RBAC

Permissions

class Permission(models.TextChoices):
    CONTRACT_VIEW = "contract:view"
    CONTRACT_CREATE = "contract:create"
    CONTRACT_ANALYZE = "contract:analyze"
    CONTRACT_DELETE = "contract:delete"
    
    CHECKLIST_VIEW = "checklist:view"
    CHECKLIST_CREATE = "checklist:create"
    CHECKLIST_EDIT = "checklist:edit"
    CHECKLIST_DELETE = "checklist:delete"
    CHECKLIST_SET_ACTIVE = "checklist:set_active"
    
    TEAM_VIEW = "team:view"
    TEAM_VIEW_ALL = "team:view:all"  # super admin - see all teams
    TEAM_MANAGE_MEMBERS = "team:manage_members"
    TEAM_EDIT_SETTINGS = "team:edit_settings"
    TEAM_DELETE = "team:delete"
    
    EMAIL_AGENT_VIEW = "email_agent:view"
    EMAIL_AGENT_CONFIGURE = "email_agent:configure"
    
    USER_INVITE = "user:invite"
    USER_MANAGE = "user:manage"
class Role():
    class RoleType(models.TextChoices):
        ADMIN = "ADMIN"
        MASTER = "MASTER"
        VIEWER = "VIEWER"
    
    name = models.CharField(max_length=50, unique=True)
    description = models.TextField(blank=True)
    permissions = models.JSONField(default=list)
    
    is_system = models.BooleanField(default=False)


ROLE_PERMISSIONS = {
    "ADMIN": [
        "contract:view", "contract:create", "contract:analyze", "contract:delete",
        "checklist:view", "checklist:create", "checklist:edit", "checklist:delete", "checklist:set_active",
        "team:view", "team:view:all", "team:manage_members", "team:edit_settings", "team:delete",
        "email_agent:view", "email_agent:configure",
        "user:invite", "user:manage",
    ],
    "MASTER": [
        "contract:view", "contract:create", "contract:analyze",
        "checklist:view", "checklist:create", "checklist:edit", "checklist:set_active",
        "team:view", "team:manage_members",
        "email_agent:view",
    ],
    "VIEWER": [
        "contract:view",
        "checklist:view",
        "team:view",
    ],
}

Permission

def get_user_permissions(user: User, team_id: str = None) -> set:
    membership = TeamMembership.objects.filter(
        user=user,
        team_id=team_id,
        deleted_at__isnull=True
    ).select_related("role").first()
    
    if not membership:
        return set()
    
    return set(membership.role.permissions)    

def has_permission(user: User, permission: str, team_id: str = None) -> bool:
    return permission in get_user_permissions(user, team_id)


def has_team_access(user: User, team_id: str) -> bool:
    if has_permission(user, "team:view:all"):
        return True
    
    return TeamMembership.objects.filter(
        user=user,
        team_id=team_id,
        deleted_at__isnull=True
    ).exists()
def require_permission(permission: str, team_id: UUID)
    def decorator(view_func):
        @wraps(view_func)
        def wrapper(request, *args, **kwargs):
            user = request.user
            team_id = kwargs.get(team_id_param) or request.data.get(team_id_param)
            
            if not has_permission(user, permission, team_id):
                return Response(
                    { "error": "Permission denied" },
                    status=status.HTTP_403_FORBIDDEN
                )
            
            return view_func(request, *args, **kwargs)
        return wrapper
    return decorator


def require_team_access(team_id_param: str = "team_id"):
    """Decorator to check team membership"""
    def decorator(view_func):
        @wraps(view_func)
        def wrapper(request, *args, **kwargs):
            team_id = kwargs.get(team_id_param)
            
            if not has_team_access(request.user, team_id):
                return Response(
                    {"error": "Team not found"},
                    status=status.HTTP_404_NOT_FOUND
                )
            
            return view_func(request, *args, **kwargs)
        return wrapper
    return decorator

Usage in Views

@api_view(["POST"])
@permission_classes([IsAuthenticated])
@require_team_access("team_id")
@require_permission("contract:create", "team_id")
def upload_contract(request, team_id):
    # user has access to team AND has contract:create permission

Team

class Team():
    name = models.CharField(max_length=200)
    description = models.TextField(blank=True)
    
    organization = models.ForeignKey(
        "Organization",
        on_delete=models.CASCADE,
        related_name="teams"
    )

    is_default = models.BooleanField(default=False)
    settings = models.JSONField(default=dict)
class TeamMembership():
    user = models.ForeignKey(
        "User",
        on_delete=models.CASCADE,
        related_name="team_memberships"
    )
    
    team = models.ForeignKey(
        Team,
        on_delete=models.CASCADE,
        related_name="memberships"
    )
    
    role = models.ForeignKey(
        "Role",
        on_delete=models.PROTECT,
        related_name="team_memberships"
    )
    
    added_by = models.ForeignKey(
        "User",
        on_delete=models.SET_NULL,
        null=True,
        related_name="memberships_added"
    )  
class TeamEmailAgent(BaseModel):
    team = models.OneToOneField(
        Team,
        on_delete=models.CASCADE,
        related_name="email_agent"
    )
    
    provider_type = models.CharField(
        max_length=20,
        choices=ProviderType.choices,
        default=ProviderType.MICROSOFT
    )
    
    inbox_email = models.EmailField(unique=True)
    
    ms_client_id = models.CharField(max_length=500, blank=True)
    ms_client_secret = models.CharField(max_length=500, blank=True)  # maybe encrypt this
    ms_tenant_id = models.CharField(max_length=500, blank=True)
    
    subscription_id = models.CharField(max_length=500, null=True, blank=True)
    subscription_expiry = models.DateTimeField(null=True, blank=True)
    
    # processing settings
    # {
    #   "default_checklist_id": "uuid",
    #   "automated_reminder": {} .....
    # }
    processing_settings = models.JSONField(default=dict)
    
    enabled = models.BooleanField(default=False)
    
    def get_access_token(self) -> str:
        """Get OAuth access token for email operations"""
        from assistantService.services.graph_service.service import GraphEmailService
        
        graph = GraphEmailService()
        return graph.generate_access_token(
            client_id=self.ms_client_id,
            client_secret=self.ms_client_secret,
            tenant_id=self.ms_tenant_id
        )
class Contract():
    # existing fields
    
    team = models.ForeignKey(
        "Team",
        on_delete=models.SET_NULL,
        null=True,
        blank=True,
        related_name="contracts"
    )
class Checklist(BaseModel):
    # existing fields
    
    # NULL might mean this checklist is available org wide....
    team = models.ForeignKey(
        "Team",
        on_delete=models.SET_NULL,
        null=True,
        blank=True,
        related_name="checklists"
    )

Team Management

class TeamService:
    def __init__(self, user: User):
        self.user = user
    
    def _check_permission(self, permission: str, team_id: str = None):
        if not has_permission(self.user, permission, team_id):
            raise PermissionError(f"Missing permission: {permission}")
    
    @transaction.atomic
    def create_team(
        self,
        organization_id: str,
        name: str,
        description: str = "",
        settings: Dict = None,
    ) -> Team:
        # only admins can create teams
        self._check_permission("team:view:all")
        
        team = Team.objects.create(
            organization_id=organization_id,
            name=name,
            description=description,
            settings=settings or {}
        )
        
        # creator becomes admin of this team
        admin_role = Role.objects.get(name="ADMIN")
        TeamMembership.objects.create(
            user=self.user,
            team=team,
            role=admin_role,
            added_by=self.user
        )
        
        return team
    
    def get_user_teams(self, user: User) -> List[Dict[str, Any]]:
        """Get all teams a user has access to"""
        
        memberships = TeamMembership.objects.filter(
            user=user,
            deleted_at__isnull=True
        ).select_related("team", "role").order_by("team__settings__order", "team__name")
        
        teams = []
        for membership in memberships:
            team = membership.team
            teams.append({
                "id": str(team.id),
                "name": team.name,
                "description": team.description,
                "settings": team.settings,
                "role": membership.role.name,
                "is_default": team.is_default,
            })
        
        return teams
    
    @transaction.atomic
    def add_member(
        self,
        team_id: str,
        user_id: str,
        role_name: str,
    ) -> TeamMembership:
        # requires team:manage_members permission
        self._check_permission("team:manage_members", team_id)
        
        team = Team.objects.get(id=team_id)
        user = User.objects.get(RowKey=user_id, organization_id=team.organization_id)
        role = Role.objects.get(name=role_name.upper())
        
        # prevent assigning higher role than own
        my_membership = TeamMembership.objects.get(user=self.user, team=team)
        if self._role_level(role) > self._role_level(my_membership.role):
            raise PermissionError("Cannot assign role higher than your own")
        
        membership, created = TeamMembership.objects.update_or_create(
            team=team,
            user=user,
            defaults={
                "role": role,
                "added_by": self.user,
                "deleted_at": None,
            }
        )
        return membership
    
    def _role_level(self, role: Role) -> int:
        levels = {"VIEWER": 1, "MASTER": 2, "ADMIN": 3}
        return levels.get(role.name, 0)
    
    def remove_member(self, team_id: str, user_id: str) -> bool:
        self._check_permission("team:manage_members", team_id)
        
        membership = TeamMembership.objects.filter(
            team_id=team_id,
            user_id=user_id,
            deleted_at__isnull=True
        ).first()
        
        if membership:
            membership.deleted_at = timezone.now()
            membership.save()
            return True
        
        return False
    
    def get_team_stats(self, team_id: str) -> Dict[str, int]:
        from assistantService.models import Contract
        
        today = date.today()
        next_30 = today + timedelta(days=30)
        
        contracts = Contract.objects.filter(
            team_id=team_id,
            deleted_at__isnull=True
        )
        
        return {
            "total_contracts": contracts.count(),
            "pending": contracts.filter(analysis_status="PENDING").count(),
            "accepted": contracts.filter(analysis_status="ACCEPTED").count(),
            "rejected": contracts.filter(analysis_status="REJECTED").count(),
            "expiring_in_30_days": contracts.filter(
                expiry_date__range=(today, next_30)
            ).count(),
            "expired": contracts.filter(expiry_date__lt=today).count(),
            "members": TeamMembership.objects.filter(
                team_id=team_id,
                deleted_at__isnull=True
            ).count(),
        }

Dashboard Tabs

class DashboardService:
    def __init__(self, user: User):
        self.user = user
    
    def get_dashboard(
        self,
        team_id: str,
        page: int = 1,
        page_size: int = 10,
        filters: Optional[Dict] = None
    ) -> Dict[str, Any]:
        # uses has_team_access from RBAC
        if not has_team_access(self.user, team_id):
            raise PermissionError("You don't have access to this team")
        
        team = Team.objects.get(id=team_id)
        
        query = Contract.objects.filter(
            team_id=team_id,
            deleted_at__isnull=True
        )
        
        if filters:
            query = self._apply_filters(query, filters)
        
        query = query.order_by("-created_at")
        
        query = query.select_related(
            "current_version",
            "active_checklist",
            "uploaded_by"
        )

        paginator = Paginator(query, page_size)
        page_obj = paginator.get_page(page)
        
        contracts = [
            self._format_contract(c, team.organization.settings.get("timezone", "UTC"))
            for c in page_obj
        ]
        
        return {
            "team": {
                "id": str(team.id),
                "name": team.name,
                "settings": team.settings,
            },
            "contracts": contracts,
            "pagination": {
                "current_page": page,
                "total_pages": paginator.num_pages,
                "total_count": paginator.count,
                "has_next": page_obj.has_next(),
                "has_previous": page_obj.has_previous(),
            },
            "filters": self._get_filter_options(team_id),
        }

Email Agent Configuration

class TeamEmailAgentService:
    def __init__(self, user: User):
        self.user = user
    
    def get_config(self, team_id: str) -> Dict[str, Any]:
        if not has_permission(self.user, "email_agent:view", team_id):
            raise PermissionError("Cannot view email agent config")
        
        team = Team.objects.get(id=team_id)
        
        try:
            agent = team.email_agent
            return {
                "is_configured": True,
                "inbox_email": agent.inbox_email,
                "provider_type": agent.provider_type,
                "is_enabled": agent.is_enabled,
                "processing_settings": agent.processing_settings,
                "subscription_status": {
                    "active": agent.subscription_id is not None,
                    "expires_at": agent.subscription_expiry.isoformat() if agent.subscription_expiry else None,
                },
                "last_sync_at": agent.last_sync_at.isoformat() if agent.last_sync_at else None,
            }
        except TeamEmailAgent.DoesNotExist:
            return {
                "is_configured": False,
            }
    
    @transaction.atomic
    def configure(
        self,
        team_id: str,
        inbox_email: str,
        ms_client_id: str,
        ms_client_secret: str,
        ms_tenant_id: str,
        processing_settings: Dict = None
    ) -> TeamEmailAgent:
        # only ADMIN can configure email agent
        if not has_permission(self.user, "email_agent:configure", team_id):
            raise PermissionError("Cannot configure email agent")
        
        team = Team.objects.get(id=team_id)
        
        graph = GraphEmailService()
        try:
            token = graph.generate_access_token(
                client_id=ms_client_id,
                client_secret=ms_client_secret,
                tenant_id=ms_tenant_id
            )
        except Exception as e:
            raise ValueError(f"Invalid Microsoft Graph credentials: {e}")
        
        agent, created = TeamEmailAgent.objects.update_or_create(
            team=team,
            defaults={
                "inbox_email": inbox_email,
                "ms_client_id": ms_client_id,
                "ms_client_secret": ms_client_secret,  # TODO: encrypt
                "ms_tenant_id": ms_tenant_id,
                "processing_settings": processing_settings or {},
            }
        )
        
        return agent
    
    def enable(self, team_id: str) -> TeamEmailAgent:
        if not has_permission(self.user, "email_agent:configure", team_id):
            raise PermissionError("Cannot enable email agent")
        
        agent = TeamEmailAgent.objects.get(team_id=team_id)
        
        if not agent.ms_client_id:
            raise ValueError("Email agent not configured")
        
        subscription = self._create_subscription(agent)
        
        agent.is_enabled = True
        agent.subscription_id = subscription["id"]
        agent.subscription_expiry = subscription["expirationDateTime"]
        agent.save()
        
        return agent
    
    def disable(self, team_id: str) -> TeamEmailAgent:
        if not has_permission(self.user, "email_agent:configure", team_id):
            raise PermissionError("Cannot disable email agent")
        
        agent = TeamEmailAgent.objects.get(team_id=team_id)
        
        if agent.subscription_id:
            self._delete_subscription(agent)
        
        agent.is_enabled = False
        agent.subscription_id = None
        agent.subscription_expiry = None
        agent.save()
        
        return agent  

APIs

urlpatterns = [
    # team management
    path("", views.list_teams),                       # team:view
    path("", views.create_team),                      # team:view:all (admin only)
    path("<uuid:team_id>/", views.get_team),          # team:view + membership
    path("<uuid:team_id>/", views.update_team),       # team:edit_settings
    path("<uuid:team_id>/", views.delete_team),       # team:delete
    
    # members
    path("<uuid:team_id>/members/", views.list_members),                          # team:view
    path("<uuid:team_id>/members/", views.add_member),                            # team:manage_members
    path("<uuid:team_id>/members/<uuid:user_id>/", views.remove_member),          # team:manage_members
    path("<uuid:team_id>/members/<uuid:user_id>/role/", views.update_member_role),# team:manage_members
    
    # dashboard
    path("<uuid:team_id>/dashboard/", views.get_dashboard),        # contract:view
    path("<uuid:team_id>/dashboard/stats/", views.get_dashboard_stats),
    
    # checklists
    path("<uuid:team_id>/checklists/", views.list_team_checklists),  # checklist:view
    
    # email agent
    path("<uuid:team_id>/email-agent/", views.get_email_agent_config),    # email_agent:view
    path("<uuid:team_id>/email-agent/", views.configure_email_agent),     # email_agent:configure
    path("<uuid:team_id>/email-agent/enable/", views.enable_email_agent), # email_agent:configure
    path("<uuid:team_id>/email-agent/disable/", views.disable_email_agent),
]