Amblem
Furkan Baytekin

Implementing Role-Based Access Control (RBAC) in Flask with SQLite: A Comprehensive Guide

Secure your Flask app with Role-Based Access Control using SQLite

Implementing Role-Based Access Control (RBAC) in Flask with SQLite: A Comprehensive Guide
140
14 minutes

Securing web applications is a top priority for developers, especially when managing resources like blog posts, website content, or user-submitted messages. Role-Based Access Control (RBAC) is a powerful security model that ensures users only perform actions they’re authorized for. In this detailed guide, we’ll explore what RBAC is, why it’s essential, and how to implement it in a Flask application using SQLite. You’ll get a fully functional demo that enforces RBAC for Create, Read, Update, and Delete (CRUD) operations on blog posts, website content, and contact messages, using a custom @RequirePermission decorator.

What is Role-Based Access Control (RBAC)?

RBAC is a security model that restricts system access based on roles assigned to users. Instead of granting permissions to individual users, RBAC associates permissions with roles, and users are assigned one or more roles. This approach simplifies permission management and enhances security.

Key Components of RBAC

For example:

Why Use RBAC?

RBAC offers several benefits:

RBAC is ideal for applications ranging from small blogs to enterprise systems, ensuring secure and manageable access control.

Building an RBAC-Powered Flask Application

Let’s create a Flask application that implements RBAC using SQLite to manage blog posts, website content, and contact messages. The app will use a @RequirePermission decorator to enforce permissions for CRUD operations and provide a RESTful API.

Prerequisites

To follow along, you’ll need:

Step 1: Setting Up the Flask Application

Our Flask app will:

Here’s the complete Flask application code:

python
import sqlite3 from flask import Flask, jsonify, request, g from functools import wraps import uuid app = Flask(__name__) # SQLite database connection def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect('rbac_demo.db') db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() # Initialize database tables def init_db(): with app.app_context(): db = get_db() db.executescript(''' CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, username TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS roles ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS user_roles ( user_id TEXT, role_id TEXT, FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (role_id) REFERENCES roles(id), PRIMARY KEY (user_id, role_id) ); CREATE TABLE IF NOT EXISTS permissions ( role_id TEXT, resource TEXT NOT NULL, action TEXT NOT NULL, FOREIGN KEY (role_id) REFERENCES roles(id), PRIMARY KEY (role_id, resource, action) ); CREATE TABLE IF NOT EXISTS blog ( id TEXT PRIMARY KEY, title TEXT NOT NULL, content TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS website ( id TEXT PRIMARY KEY, page_name TEXT NOT NULL, content TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS contact ( id TEXT PRIMARY KEY, message TEXT NOT NULL ); ''') # Insert sample data cursor = db.cursor() # Users cursor.execute("INSERT OR IGNORE INTO users (id, username) VALUES (?, ?)", (str(uuid.uuid4()), "admin_user")) cursor.execute("INSERT OR IGNORE INTO users (id, username) VALUES (?, ?)", (str(uuid.uuid4()), "editor_user")) # Roles cursor.execute("INSERT OR IGNORE INTO roles (id, name) VALUES (?, ?)", (str(uuid.uuid4()), "admin")) cursor.execute("INSERT OR IGNORE INTO roles (id, name) VALUES (?, ?)", (str(uuid.uuid4()), "editor")) # User-role assignments admin_id = cursor.execute("SELECT id FROM users WHERE username = ?", ("admin_user",)).fetchone()[0] editor_id = cursor.execute("SELECT id FROM users WHERE username = ?", ("editor_user",)).fetchone()[0] admin_role_id = cursor.execute("SELECT id FROM roles WHERE name = ?", ("admin",)).fetchone()[0] editor_role_id = cursor.execute("SELECT id FROM roles WHERE name = ?", ("editor",)).fetchone()[0] cursor.execute("INSERT OR IGNORE INTO user_roles (user_id, role_id) VALUES (?, ?)", (admin_id, admin_role_id)) cursor.execute("INSERT OR IGNORE INTO user_roles (user_id, role_id) VALUES (?, ?)", (editor_id, editor_role_id)) # Permissions cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (admin_role_id, "blog", "crud")) cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (admin_role_id, "website", "crud")) cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (admin_role_id, "contact", "crud")) cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (editor_role_id, "blog", "cru")) cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (editor_role_id, "website", "ru")) cursor.execute("INSERT OR IGNORE INTO permissions (role_id, resource, action) VALUES (?, ?, ?)", (editor_role_id, "contact", "r")) db.commit() # RBAC permission check decorator def RequirePermission(resource, actions): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): user = getattr(g, 'current_user', None) if not user: return jsonify({"error": "Unauthorized: No user logged in"}), 401 db = get_db() cursor = db.cursor() # Get user roles cursor.execute(''' SELECT r.id, r.name FROM roles r JOIN user_roles ur ON r.id = ur.role_id WHERE ur.user_id = ? ''', (user['id'],)) roles = cursor.fetchall() # Check required permissions # Using a redis cache to store the permissions # may be a better approach for performance on # a production environment for action in actions: has_permission = False for role in roles: cursor.execute(''' SELECT action FROM permissions WHERE role_id = ? AND resource = ? AND (action = ? OR action = 'crud') ''', (role['id'], resource, action)) if cursor.fetchone(): has_permission = True break if not has_permission: return jsonify({"error": f"Permission denied for {resource} {action}"}), 403 return f(*args, **kwargs) return decorated_function return decorator # Simulated logged-in user @app.before_request def set_current_user(): # In a real app, use JWT or Flask-Login for authentication g.current_user = {"id": None, "username": "admin_user"} if g.current_user['username']: db = get_db() cursor = db.cursor() cursor.execute("SELECT id FROM users WHERE username = ?", (g.current_user['username'],)) user = cursor.fetchone() if user: g.current_user['id'] = user['id'] # Blog CRUD operations @app.route('/blog', methods=['POST']) @RequirePermission("blog", "c") def create_blog(): data = request.get_json() title = data.get('title') content = data.get('content') if not title or not content: return jsonify({"error": "Title and content are required"}), 400 db = get_db() cursor = db.cursor() blog_id = str(uuid.uuid4()) cursor.execute("INSERT INTO blog (id, title, content) VALUES (?, ?, ?)", (blog_id, title, content)) db.commit() return jsonify({"id": blog_id, "title": title, "content": content}), 201 @app.route('/blog/<id>', methods=['GET']) @RequirePermission("blog", "r") def read_blog(id): db = get_db() cursor = db.cursor() cursor.execute("SELECT * FROM blog WHERE id = ?", (id,)) blog = cursor.fetchone() if not blog: return jsonify({"error": "Blog not found"}), 404 return jsonify({"id": blog['id'], "title": blog['title'], "content": blog['content']}) @app.route('/blog/<id>', methods=['PUT']) @RequirePermission("blog", "u") def update_blog(id): data = request.get_json() title = data.get('title') content = data.get('content') if not title or not content: return jsonify({"error": "Title and content are required"}), 400 db = get_db() cursor = db.cursor() cursor.execute("UPDATE blog SET title = ?, content = ? WHERE id = ?", (title, content, id)) if cursor.rowcount == 0: return jsonify({"error": "Blog not found"}), 404 db.commit() return jsonify({"id": id, "title": title, "content": content}) @app.route('/blog/<id>', methods=['DELETE']) @RequirePermission("blog", "d") def delete_blog(id): db = get_db() cursor = db.cursor() cursor.execute("DELETE FROM blog WHERE id = ?", (id,)) if cursor.rowcount == 0: return jsonify({"error": "Blog not found"}), 404 db.commit() return jsonify({"message": "Blog deleted"}) # Website CRUD operations @app.route('/website', methods=['POST']) @RequirePermission("website", "c") def create_website(): data = request.get_json() page_name = data.get('page_name') content = data.get('content') if not page_name or not content: return jsonify({"error": "Page name and content are required"}), 400 db = get_db() cursor = db.cursor() page_id = str(uuid.uuid4()) cursor.execute("INSERT INTO website (id, page_name, content) VALUES (?, ?, ?)", (page_id, page_name, content)) db.commit() return jsonify({"id": page_id, "page_name": page_name, "content": content}), 201 @app.route('/website/<id>', methods=['GET']) @RequirePermission("website", "r") def read_website(id): db = get_db() cursor = db.cursor() cursor.execute("SELECT * FROM website WHERE id = ?", (id,)) page = cursor.fetchone() if not page: return jsonify({"error": "Page not found"}), 404 return jsonify({"id": page['id'], "page_name": page['page_name'], "content": page['content']}) @app.route('/website/<id>', methods=['PUT']) @RequirePermission("website", "u") def update_website(id): data = request.get_json() page_name = data.get('page_name') content = data.get('content') if not page_name or not content: return jsonify({"error": "Page name and content are required"}), 400 db = get_db() cursor = db.cursor() cursor.execute("UPDATE website SET page_name = ?, content = ? WHERE id = ?", (page_name, content, id)) if cursor.rowcount == 0: return jsonify({"error": "Page not found"}), 404 db.commit() return jsonify({"id": id, "page_name": page_name, "content": content}) @app.route('/website/<id>', methods=['DELETE']) @RequirePermission("website", "d") def delete_website(id): db = get_db() cursor = db.cursor() cursor.execute("DELETE FROM website WHERE id = ?", (id,)) if cursor.rowcount == 0: return jsonify({"error": "Page not found"}), 404 db.commit() return jsonify({"message": "Page deleted"}) # Contact CRUD operations @app.route('/contact', methods=['POST']) @RequirePermission("contact", "c") def create_contact(): data = request.get_json() message = data.get('message') if not message: return jsonify({"error": "Message is required"}), 400 db = get_db() cursor = db.cursor() contact_id = str(uuid.uuid4()) cursor.execute("INSERT INTO contact (id, message) VALUES (?, ?)", (contact_id, message)) db.commit() return jsonify({"id": contact_id, "message": message}), 201 @app.route('/contact/<id>', methods=['GET']) @RequirePermission("contact", "r") def read_contact(id): db = get_db() cursor = db.cursor() cursor.execute("SELECT * FROM contact WHERE id = ?", (id,)) contact = cursor.fetchone() if not contact: return jsonify({"error": "Contact message not found"}), 404 return jsonify({"id": contact['id'], "message": contact['message']}) @app.route('/contact/<id>', methods=['PUT']) @RequirePermission("contact", "u") def update_contact(id): data = request.get_json() message = data.get('message') if not message: return jsonify({"error": "Message is required"}), 400 db = get_db() cursor = db.cursor() cursor.execute("UPDATE contact SET message = ? WHERE id = ?", (message, id)) if cursor.rowcount == 0: return jsonify({"error": "Contact message not found"}), 404 db.commit() return jsonify({"id": id, "message": message}) @app.route('/contact/<id>', methods=['DELETE']) @RequirePermission("contact", "d") def delete_contact(id): db = get_db() cursor = db.cursor() cursor.execute("DELETE FROM contact WHERE id = ?", (id,)) if cursor.rowcount == 0: return jsonify({"error": "Contact message not found"}), 404 db.commit() return jsonify({"message": "Contact message deleted"}) if __name__ == '__main__': init_db() app.run(debug=True)

Step 2: Understanding the Code Components

Let’s break down the key components of the Flask application to understand how RBAC is implemented.

Database Schema

The SQLite database includes five main tables:

The init_db() function initializes the database and adds sample data:

RBAC Decorator

The @RequirePermission(resource, actions) decorator enforces RBAC by:

Example usage:

python
@RequirePermission("blog", "r") def read_blog(id): # Only accessible if the user has read permission for blog

API Endpoints

The application provides RESTful API endpoints for each resource:

Each endpoint is protected by the @RequirePermission decorator, ensuring only authorized users can perform actions.

Simulated Authentication

For simplicity, the demo uses a hardcoded admin_user as the logged-in user. In a production environment, you’d integrate Flask-Login or JWT to dynamically set the g.current_user based on the authenticated user.

Step 3: Running the Application

To run the application:

  1. Save the code in a file named app.py.
  2. Install Flask:
bash
pip install flask
  1. Run the application:
bash
python app.py
  1. The Flask development server will start at http://127.0.0.1:5000.

Step 4: Testing the API

Use Postman or cURL to test the API endpoints. Here are some examples:

Create a Blog Post

bash
curl -X POST http://127.0.0.1:5000/blog \ -H "Content-Type: application/json" \ -H "Authorization: Bearer <token>" \ -d '{"title":"My First Post","content":"This is a test blog post."}'

Response (if the user has create permission for blog):

json
{ "id": "uuid", "title": "My First Post", "content": "This is a test blog post." }

Read a Blog Post

bash
curl http://127.0.0.1:5000/blog/<uuid> \ -H "Authorization: Bearer <token>"

Response:

json
{ "id": "uuid", "title": "My First Post", "content": "This is a test blog post." }

Test Permission Denial

If you change g.current_user to editor_user and try to delete a blog post (which requires d permission, not granted to the editor role):

bash
curl -X DELETE http://127.0.0.1:5000/blog/<uuid> \ -H "Authorization: Bearer <token>"

Response:

json
{ "error": "Permission denied for blog d" }

Step 5: Enhancing the Application

While this demo provides a solid foundation for RBAC in Flask, here are ways to improve it for production:

Add Caching for Performance

As you suggested, caching permissions could significantly improve performance, especially for frequent permission checks. You could use a caching solution like Redis or Flask-Caching to store user roles and permissions in memory, reducing database queries. For example:

This approach minimizes database load and speeds up permission checks, which is crucial for high-traffic applications.

Implement Proper Authentication

Replace the simulated g.current_user with a real authentication system:

Use an ORM

For larger applications, consider SQLAlchemy to simplify database interactions and improve maintainability. SQLAlchemy handles complex queries and relationships more elegantly than raw SQLite queries.

Add Input Validation

Enhance input validation to prevent issues like invalid data. Libraries like marshmallow or pydantic can validate JSON payloads effectively.

Expand Permission Granularity

The demo uses simple CRUD permissions, but you could support more granular permissions, such as:

SEO Optimization Tips

To ensure this blog post ranks well on search engines, I’ve included:

Conclusion

Implementing Role-Based Access Control in a Flask application with SQLite is a powerful way to secure your web application. The @RequirePermission decorator ensures that users only perform actions allowed by their roles, protecting resources like blog posts, website content, and contact messages. This demo provides a solid starting point for building secure, scalable applications.

To enhance the demo, consider adding caching (as you suggested), proper authentication, or an ORM like SQLAlchemy. Try running the code, testing the API endpoints, and experimenting with different permission scenarios.


Album of the day:

Suggested Blog Posts