最终版:轻量级邮件收发与管理系统搭建教程

Administrator 22 2025-06-17

本教程将指导您在一台全新的 Debian 12 服务器上,搭建一个功能齐全的、私有的、通用的多用户网页邮箱系统。

系统核心功能:

  • 邮件接收:可接收发送到您指定域名的所有邮件。

  • 多用户系统:支持创建多个独立的用户账户,每个用户都有自己的密码。

  • 数据隔离:用户登录后,只能看到、管理和搜索发送给自己的邮件,保护隐私。

  • 管理员角色:拥有一个特殊的管理员账户,可以登录并查看服务器收到的所有邮件,并能在线创建新用户。

  • 网页邮箱 (Webmail)

    • 安全的用户和管理员密码登录。

    • 支持按主题搜索邮件。

    • 支持分页浏览邮件列表。

    • 支持在线写邮件并通过专业服务商(如 SendGrid)发送。

    • 支持刷新选择删除全部删除等管理操作。

    • 所有时间戳均以北京时间显示。

  • 自动维护:每3天自动清理一次旧邮件,只保留最新的30封,防止数据库无限膨胀。

第一部分:前期准备

  1. 一台服务器:拥有一个固定的公网 IP 地址 (本教程以 Debian 12 为例)。

  2. 一个域名:您拥有该域名的 DNS 管理权限 (本教程以 mail.sijuly.nyc.mn 为例)。

  3. SSH 访问权限:能够以 root 用户身份登录服务器。

  4. 一个第三方SMTP中继服务账号:用于发送邮件。推荐 SendGrid, Brevo 等,并获取其 SMTP 凭证 (服务器地址、端口、用户名、API密钥/密码)。

第二部分:DNS 与服务器配置

1. 配置域名 DNS

登录到您域名的DNS管理后台,添加以下两条记录:

  • A 记录:

    • 类型: A

    • 主机/名称: mail

    • 值/指向: 您的服务器IP地址

  • MX 记录:

    • 类型: MX

    • 主机/名称: mail

    • 值/服务器: mail.sijuly.nyc.mn

    • 优先级: 10

2. (可选但建议) 配置 PTR 记录

登录到您的服务器提供商(如甲骨文云 OCI)的控制台,为您的 IP 添加一条反向DNS记录,使其指向您的域名 mail.sijuly.nyc.mn

3. 验证发信域名

登录到您的 SMTP 中继服务商(如 SendGrid),按照其指引添加并验证您的发信域名 mail.sijuly.nyc.mn

获取 SendGrid SMTP 凭证完整步骤

第1步:注册 SendGrid 账户

  1. 访问官网:前往 SendGrid 官网

  2. 开始免费使用:点击页面上的 "Start for Free" 或类似按钮。

  3. 填写信息:按照要求填写您的邮箱、密码,并创建账户。

  4. 激活账户:SendGrid 会向您的注册邮箱发送一封确认邮件,请务必登录您的邮箱,点击邮件中的链接来激活您的 SendGrid 账户。

第2步:验证发信域名(最关键的一步)

这是最重要的一步。您必须向 SendGrid 证明您拥有 mail.sijuly.nyc.mn 这个域名(或者其主域名 sijuly.nyc.mn),这样 SendGrid 才允许您用这个域名下的地址作为发件人。这也可以极大地提高您邮件的送达率,避免被当成垃圾邮件。

  1. 登录 SendGrid:登录到您的 SendGrid 仪表盘。

  2. 进入发件人认证:在左侧菜单中找到 Settings -> Sender Authentication

  3. 认证您的域名:在 “Domain Authentication” 部分,点击 Authenticate Your DomainGet Started 按钮。

  4. 选择DNS服务商:它会问您的DNS托管服务商是谁(比如 GoDaddy, Cloudflare 等)。如果您不确定,可以直接选择 “Other Host (Not Listed)”。

  5. 输入您的域名:在输入框中,输入您要用于发信的主域名。对于 mail.sijuly.nyc.mn 来说,您应该输入根域名: sijuly.nyc.mn

  6. 获取DNS记录:点击“Next”后,SendGrid 会为您生成 3条 CNAME 类型的DNS记录。页面上会清楚地列出每一条记录的 “主机”(Host/Name)“值”(Value/Points To)

    • 它们看起来会是这样(这只是例子,请以您页面上显示的为准):

      • Host: em123.sijuly.nyc.mn, Value: u456789.wl.sendgrid.net

      • Host: s1._domainkey.sijuly.nyc.mn, Value: s1.domainkey.u456789.wl.sendgrid.net

      • Host: s2._domainkey.sijuly.nyc.mn, Value: s2.domainkey.u456789.wl.sendgrid.net

  7. 添加DNS记录

    • 现在,请打开一个新的浏览器标签页,登录到您购买 sijuly.nyc.mn 域名的服务商的DNS管理后台。

    • 完全按照 SendGrid 页面上提供的信息,创建这3条 CNAME 记录。将 SendGrid 提供的“Host”和“Value”分别复制粘贴到您DNS后台的对应输入框中。

  8. 在 SendGrid 上进行验证

    • 添加完DNS记录后,回到 SendGrid 的页面,勾选 “I've added these records.”,然后点击 Verify 按钮。

    • DNS记录的生效需要一些时间,从几分钟到几个小时不等。如果第一次验证失败,请不要着急,可以过一段时间再回来点击 Verify 按钮。

    • 一旦成功,您会看到一个绿色的 “Verified” 状态。

第3步:创建并保存 API 密钥

这个 API 密钥就是我们用来登录 SMTP 服务的“密码”,它的权限很高,必须妥善保管。

  1. 进入API密钥页面:在 SendGrid 左侧菜单中,找到 Settings -> API Keys

  2. 创建API密钥:点击页面右上角的 “Create API Key” 按钮。

  3. 填写信息

    • 给您的密钥起一个名字,比如 my-vps-mailer,方便您识别。

    • 选择 “Full Access”(完全权限)。

    • 点击 “Create & View”

  4. 复制并保存密钥

    • 这是唯一一次您能看到完整的密钥! SendGrid 会显示一串以 SG. 开头的非常长的字符。

    • 立刻点击复制按钮,并将它粘贴到您本地一个绝对安全的地方(比如您的密码管理器或一个加密的记事本中)。

    • 一旦您离开这个页面,就再也无法看到完整的密钥了。

第4步:整理您的 SMTP 凭证

恭喜!现在您已经拥有了所有需要的信息。我们来整理一下,它们将用于填写您服务器上 app.py 文件的配置部分:

  • SMTP_SERVER: "smtp.sendgrid.net" (这是固定的)

  • SMTP_PORT: 587 (这是推荐的端口)

  • SMTP_USERNAME: "apikey" (这是一个固定的字符串)

  • SMTP_PASSWORD: "您刚刚创建并安全保存的那个以 SG. 开头的长长的API密钥"

  • DEFAULT_SENDER: "任意您喜欢的名字@mail.sijuly.nyc.mn" (例如 noreply@mail.sijuly.nyc.mnservice@mail.sijuly.nyc.mn)

现在,您就可以将这些信息填入您服务器上的 app.py 文件,然后重启服务。您的发信功能就正式配置完成了!

4. 配置服务器环境

⚠️ 重要提示:本教程所有命令均以 root 用户身份执行,全程无需使用 sudo 命令。

# 更新系统
apt-get update && apt-get upgrade -y

# 安装必要的软件包
apt-get install -y python3-pip python3-venv ufw sqlite3 curl

# 配置防火墙 (假设您的Web服务运行在2099端口)
ufw allow ssh
ufw allow 25/tcp
ufw allow 2099/tcp
ufw enable # 出现提示时输入 y

第三部分:部署应用程序

1. 创建项目结构和依赖安装

# 创建并进入项目目录
mkdir -p /opt/mail_api
cd /opt/mail_api

# 创建并激活 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装 Python 依赖库
pip install flask gunicorn aiosmtpd Werkzeug

2. 创建应用程序文件

您需要创建两个核心的 Python 脚本:

  • app.py: 包含所有Web界面、API 和发信逻辑。

  • manage_users.py: 用于在命令行管理用户的工具。

请参考下面我为您提供的另外两个代码文档,将它们的完整内容分别保存到对应的文件中。

  • app.py(路径:/opt/mail_api)

import sqlite3
import re
import os
import math
import smtplib
from functools import wraps
from flask import Flask, request, Response, redirect, url_for, session, render_template_string, flash
from email.mime.text import MIMEText
from email.header import Header
from email import message_from_bytes
from email.header import decode_header
from email.utils import parseaddr
from markupsafe import escape
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo
from werkzeug.security import check_password_hash, generate_password_hash

# --- 配置 ---
DB_FILE = 'emails.db'
EMAILS_PER_PAGE = 100
LAST_CLEANUP_FILE = '/opt/mail_api/last_cleanup.txt'
CLEANUP_INTERVAL_DAYS = 3
EMAILS_TO_KEEP = 30

# 管理员账户配置
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "aaabbb$" # 您指定的管理员密码

# --- SMTP 发信配置 ---
SMTP_SERVER = "smtp.sendgrid.net"
SMTP_PORT = 587
SMTP_USERNAME = "apikey"
SMTP_PASSWORD = "YOUR_NEW_SECURE_SENDGRID_API_KEY" # !!! 在这里粘贴您新的、安全的API密钥
DEFAULT_SENDER = "noreply@mail.sijuly.nyc.mn"

# --- Flask 应用设置 ---
app = Flask(__name__)
app.config['SECRET_KEY'] = '111222333Sq$_a_very_long_and_random_string' # !!! 强烈建议修改

# --- 数据库操作 ---
def get_db_conn():
    conn = sqlite3.connect(DB_FILE, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn

def init_db():
    conn = get_db_conn()
    c = conn.cursor()
    c.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            email TEXT UNIQUE NOT NULL,
            password_hash TEXT NOT NULL
        )
    ''')
    c.execute('''
        CREATE TABLE IF NOT EXISTS received_emails (
            id INTEGER PRIMARY KEY AUTOINCREMENT, recipient TEXT, sender TEXT,
            subject TEXT, body TEXT, body_type TEXT, 
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    conn.close()

# --- 集成的自动清理函数 ---
def run_cleanup_if_needed():
    now = datetime.now()
    try:
        if os.path.exists(LAST_CLEANUP_FILE):
            with open(LAST_CLEANUP_FILE, 'r') as f:
                last_cleanup_time = datetime.fromisoformat(f.read().strip())
            if now - last_cleanup_time < timedelta(days=CLEANUP_INTERVAL_DAYS):
                return
    except Exception as e:
        print(f"读取上次清理时间失败: {e},将继续执行清理检查。")
    print(f"[{now}] 开始执行定时邮件清理任务...")
    conn = None
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        query_delete = f"DELETE FROM received_emails WHERE id NOT IN (SELECT id FROM received_emails ORDER BY id DESC LIMIT {EMAILS_TO_KEEP})"
        deleted_rows_cursor = cursor.execute(query_delete)
        conn.commit()
        deleted_count = deleted_rows_cursor.rowcount
        if deleted_count > 0: print(f"清理完成,成功删除了 {deleted_count} 封旧邮件。")
        else: print("无需清理。")
        with open(LAST_CLEANUP_FILE, 'w') as f:
            f.write(now.isoformat())
            print(f"已更新清理时间戳: {now.isoformat()}")
    except Exception as e:
        print(f"自动清理邮件时发生错误: {e}")
    finally:
        if conn: conn.close()

# --- 核心邮件处理逻辑 ---
def process_email_data(to_address, raw_email_data):
    msg = message_from_bytes(raw_email_data)
    subject_raw, encoding = decode_header(msg['Subject'])[0]
    if isinstance(subject_raw, bytes): subject = subject_raw.decode(encoding or 'utf-8', errors='ignore')
    else: subject = str(subject_raw)
    sender = msg.get('From')
    body, body_type = "", "text/plain"
    if msg.is_multipart():
        html_part, text_part = None, None
        for part in msg.walk():
            if "text/html" in part.get_content_type(): html_part = part
            elif "text/plain" in part.get_content_type(): text_part = part
        if html_part:
            body = html_part.get_payload(decode=True).decode(html_part.get_content_charset() or 'utf-8', errors='ignore')
            body_type = "text/html"
        elif text_part:
            body = text_part.get_payload(decode=True).decode(text_part.get_content_charset() or 'utf-8', errors='ignore')
            body_type = "text/plain"
    else:
        body = msg.get_payload(decode=True).decode(msg.get_content_charset() or 'utf-8', errors='ignore')
        body_type = msg.get_content_type()
    conn = None
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute("INSERT INTO received_emails (recipient, sender, subject, body, body_type) VALUES (?, ?, ?, ?, ?)",
                       (to_address, sender, subject, body, body_type))
        conn.commit()
        print(f"邮件已存入: To='{to_address}', Subject='{subject}'")
    except Exception as e:
        print(f"数据库操作时出错: {e}")
    finally:
        if conn: conn.close()
        run_cleanup_if_needed()

# --- 辅助函数 ---
def strip_tags_for_preview(html_content):
    if not html_content: return ""
    text_content = re.sub(r'<[^>]+>', ' ', html_content)
    return re.sub(r'\s+', ' ', text_content).strip()

def send_email(to_address, subject, body):
    msg = MIMEText(body, 'plain', 'utf-8')
    msg['Subject'] = Header(subject, 'utf-8')
    msg['From'] = DEFAULT_SENDER
    msg['To'] = to_address
    try:
        server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
        server.starttls()
        server.login(SMTP_USERNAME, SMTP_PASSWORD)
        server.send_message(msg)
        server.quit()
        return True, "邮件发送成功!"
    except Exception as e:
        print(f"发送邮件时发生错误: {e}")
        return False, f"邮件发送失败: {e}"

# --- 登录系统 ---
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_email' not in session: return redirect(url_for('login', next=request.url))
        return f(*args, **kwargs)
    return decorated_function

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not session.get('is_admin'):
            return redirect(url_for('admin_login', next=request.url))
        return f(*args, **kwargs)
    return decorated_function

# --- Flask 路由 ---
@app.route('/login', methods=['GET', 'POST'])
def login():
    error = None
    if request.method == 'POST':
        email = request.form.get('email')
        password = request.form.get('password')
        conn = get_db_conn()
        user = conn.execute('SELECT * FROM users WHERE email = ?', (email,)).fetchone()
        conn.close()
        if email == ADMIN_USERNAME and password == ADMIN_PASSWORD:
            session['user_email'] = ADMIN_USERNAME
            session['is_admin'] = True
            next_url = request.args.get('next') or url_for('admin_view')
            return redirect(next_url)
        elif user and check_password_hash(user['password_hash'], password):
            session['user_email'] = user['email']
            session.pop('is_admin', None)
            next_url = request.args.get('next') or url_for('view_emails')
            return redirect(next_url)
        else:
            error = '邮箱或密码错误,请重试'
    
    login_form_html = f"""
        <!DOCTYPE html><html><head><title>登录</title>
        <style>body{{display:flex; flex-direction: column; justify-content:center; align-items:center; height:100vh; font-family:sans-serif;}} 
        h1{{color: #4CAF50; margin-bottom: 1.5em; font-size: 2.5em;}}
        .login-box{{padding:2em; border:1px solid #ccc; border-radius:5px; background-color:#f9f9f9; width: 300px;}}
        label{{margin-top: 1em;}}
        input{{display:block; margin-top:0.5em; margin-bottom:1em; padding:0.5em; width: 95%;}}
        .error{{color:red;}}</style></head>
        <body>
        <h1>小龙女她爸邮局服务系统</h1>
        <div class="login-box"><h2>邮箱登录</h2>
        {'<p class="error">' + escape(error) + '</p>' if error else ''}
        <form method="post">
            <label>邮箱地址 (或管理员账户):</label><input type="text" name="email" required>
            <label>密码:</label><input type="password" name="password" required>
            <input type="hidden" name="next" value="{escape(request.args.get('next', ''))}">
            <input type="submit" value="登录" style="width:100%; padding: 10px;"></form>
        </div></body></html>
    """
    return Response(login_form_html, mimetype="text/html; charset=utf-8")

@app.route('/admin_login', methods=['GET', 'POST'])
@login_required
def admin_login():
    error = None
    if request.method == 'POST':
        password = request.form.get('password')
        if password == ADMIN_PASSWORD:
            session['is_admin'] = True
            next_url = request.args.get('next') or url_for('admin_view')
            return redirect(next_url)
        else:
            error = "管理员密码错误!"
            
    admin_login_html = f"""
        <!DOCTYPE html><html><head><title>管理员验证</title>
        <style>body{{display:flex; flex-direction: column; justify-content:center; align-items:center; height:100vh; font-family:sans-serif;}} 
        .login-box{{padding:2em; border:1px solid #ccc; border-radius:5px; background-color:#f9f9f9; width: 300px;}}
        .error{{color:red;}}</style></head>
        <body><div class="login-box"><h2>管理员验证</h2>
        <p>您正在尝试访问管理员视图,请输入管理员密码。</p>
        {'<p class="error">' + escape(error) + '</p>' if error else ''}
        <form method="post">
            <label>管理员密码:</label><input type="password" name="password" required>
            <input type="hidden" name="next" value="{escape(request.args.get('next', ''))}">
            <input type="submit" value="验证"></form>
        <p style="margin-top:2em;"><a href="{url_for('view_emails')}">返回个人收件箱</a></p>
        </div></body></html>
    """
    return Response(admin_login_html, mimetype="text/html; charset=utf-8")

@app.route('/logout')
def logout():
    session.pop('user_email', None)
    session.pop('is_admin', None)
    return redirect(url_for('login'))

@app.route('/view_emails')
@login_required
def view_emails():
    user_email = session['user_email']
    if user_email == ADMIN_USERNAME:
        return redirect(url_for('admin_view'))
    search_query = request.args.get('search', '').strip()
    try: page = int(request.args.get('page', 1))
    except (ValueError, TypeError): page = 1
    conn = get_db_conn()
    cursor = conn.cursor()
    params = [user_email]
    where_clauses = ["recipient = ?"]
    if search_query:
        search_term = f"%{search_query}%"
        where_clauses.append("(subject LIKE ?)")
        params.append(search_term)
    where_sql = "WHERE " + " AND ".join(where_clauses)
    count_query = f"SELECT COUNT(*) FROM received_emails {where_sql}"
    total_emails = cursor.execute(count_query, params).fetchone()[0]
    total_pages = math.ceil(total_emails / EMAILS_PER_PAGE) if total_emails > 0 else 1
    page = max(1, min(page, total_pages))
    offset = (page - 1) * EMAILS_PER_PAGE
    query_params = params + [EMAILS_PER_PAGE, offset]
    main_query = f"SELECT * FROM received_emails {where_sql} ORDER BY id DESC LIMIT ? OFFSET ?"
    emails_data = cursor.execute(main_query, query_params).fetchall()
    conn.close()
    return render_email_list_page(
        emails_data=emails_data, page=page, total_pages=total_pages,
        total_emails=total_emails, search_query=search_query,
        user_email=user_email, is_admin_view=False
    )

@app.route('/admin_view')
@login_required
@admin_required
def admin_view():
    search_query = request.args.get('search', '').strip()
    try: page = int(request.args.get('page', 1))
    except (ValueError, TypeError): page = 1
    conn = get_db_conn()
    cursor = conn.cursor()
    params, where_clauses = [], []
    if search_query:
        search_term = f"%{search_query}%"
        where_clauses.append("(subject LIKE ? OR recipient LIKE ?)")
        params.extend([search_term, search_term])
    where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
    count_query = f"SELECT COUNT(*) FROM received_emails {where_sql}"
    total_emails = cursor.execute(count_query, params).fetchone()[0]
    total_pages = math.ceil(total_emails / EMAILS_PER_PAGE) if total_emails > 0 else 1
    page = max(1, min(page, total_pages))
    offset = (page - 1) * EMAILS_PER_PAGE
    query_params = params + [EMAILS_PER_PAGE, offset]
    main_query = f"SELECT * FROM received_emails {where_sql} ORDER BY id DESC LIMIT ? OFFSET ?"
    emails_data = cursor.execute(main_query, query_params).fetchall()
    conn.close()
    return render_email_list_page(
        emails_data=emails_data, page=page, total_pages=total_pages,
        total_emails=total_emails, search_query=search_query,
        user_email=session['user_email'], is_admin_view=True
    )

def render_email_list_page(emails_data, page, total_pages, total_emails, search_query, user_email, is_admin_view):
    view_endpoint = 'admin_view' if is_admin_view else 'view_emails'
    delete_selected_endpoint = 'admin_delete_selected_emails' if is_admin_view else 'delete_selected_emails'
    delete_all_endpoint = 'admin_delete_all_emails' if is_admin_view else 'delete_all_emails'
    title_text = f"管理员视图 (共 {total_emails} 封)" if is_admin_view else f"收件箱 ({user_email} - 共 {total_emails} 封)"
    search_placeholder = "搜索所有邮件的主题或收件人..." if is_admin_view else "在当前邮箱中搜索主题..."
    
    add_user_button_html = f'<a href="{url_for("add_user")}" class="add-user-button">新建用户</a>' if is_admin_view else ''

    # 使用Jinja2模板字符串来构建HTML
    html_template = f"""
        <!DOCTYPE html><html><head><title>{'管理员视图' if is_admin_view else '收件箱'}</title>
        <style>
            body{{font-family: sans-serif; margin: 2em;}} 
            .page-header {{ display: flex; justify-content: space-between; align-items: center; margin-bottom: 1em; }}
            .page-header h2 {{ margin: 0; }}
            .header-actions a {{ margin-left: 1em; text-decoration: none; font-size: 0.9em; padding: 8px 12px; border-radius: 4px; color: white; }}
            .header-actions .compose-button {{ background-color: #5cb85c; }}
            .header-actions .add-user-button {{ background-color: #337ab7; }}
            .header-actions .view-toggle {{ background-color: #f0ad4e; }}
            .header-actions .logout-link {{ background-color: #d9534f; }}
            table{{border-collapse: collapse; width: 100%; margin-top: 1em; table-layout: fixed;}}
            th, td{{border: 1px solid #ddd; padding: 8px; text-align: left; vertical-align: top; word-wrap: break-word;}}
            .actions, .search-box, .pagination {{margin-bottom: 1em;}} .search-box input[type=text] {{padding: 8px; width: 300px;}} .search-box button, .actions button {{padding: 8px 12px; cursor: pointer;}}
            .pagination {{text-align: center; padding: 1em 0;}} .pagination a, .pagination strong {{ margin: 0 5px; text-decoration: none; padding: 5px 10px; border: 1px solid #ddd; border-radius: 4px;}}
            .pagination strong {{ background-color: #4CAF50; color: white; border-color: #4CAF50; }}
            .preview{{width: 100%; line-height: 1.4em; max-height: 2.8em; overflow: hidden; text-overflow: ellipsis; display: -webkit-box; -webkit-line-clamp: 2; -webkit-box-orient: vertical;}}
        </style>
        </head><body>
        <div class="page-header">
            <h2>{title_text},第 {page}/{total_pages} 页</h2>
            <div class="header-actions">
                {add_user_button_html}
                <a href="{url_for('compose_email')}" class="compose-button">写邮件</a>
                <a href="{url_for('admin_view' if not is_admin_view else 'view_emails')}" class="view-toggle">切换到{'管理员' if not is_admin_view else '个人'}视图</a>
                <a href="{url_for('logout')}" class="logout-link">登出</a>
            </div>
        </div>
        <div class="search-box">
            <form method="GET" action="{url_for(view_endpoint)}">
                <input type="text" name="search" placeholder="{search_placeholder}" value="{escape(search_query)}">
                <button type="submit">搜索</button>
                {'<a href="' + url_for(view_endpoint) + '" style="margin-left:10px; text-decoration:underline; color:grey;">清空搜索</a>' if search_query else ''}
            </form>
        </div>
        <div class="actions">
            <button onclick="location.href='{url_for(view_endpoint, page=page, search=search_query)}'">刷新列表</button>
            <form method="POST" action="{url_for(delete_all_endpoint)}" style="display: inline;" onsubmit="return confirm('您确定要删除这些邮件吗?');"><button type="submit">删除所有邮件</button></form>
        </div>
        <form method="POST" action="{url_for(delete_selected_endpoint)}?page={page}&search={search_query}">
        <table><thead><tr>
            <th style="width: 3%; text-align: center;"><input type="checkbox" onclick="toggleAll(this);"></th>
            <th style="width: 15%;">时间 (北京)</th><th style="width: 20%;">主题</th><th style="width: 30%;">内容预览</th>
            <th style="width: 15%;">收件人</th><th style="width: 12%;">发件人</th><th style="width: 5%; text-align: center;">操作</th>
        </tr></thead><tbody>
    """
    
    if not emails_data:
        html_template += '<tr><td colspan="7" style="text-align:center;">没有找到邮件。</td></tr>'
    else:
        beijing_tz = ZoneInfo("Asia/Shanghai")
        for item in emails_data:
            utc_ts, bjt_str = item['timestamp'], "N/A"
            if utc_ts:
                try: bjt_str = datetime.strptime(utc_ts, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc).astimezone(beijing_tz).strftime('%Y-%m-%d %H:%M:%S')
                except (ValueError, TypeError): bjt_str = utc_ts
            
            preview_text = escape(strip_tags_for_preview(item['body'] or ''))
            
            _, sender_addr = parseaddr(item['sender'] or "")
            
            html_template += f"""<tr>
                <td style="text-align: center;"><input type="checkbox" name="selected_ids" value="{item['id']}"></td>
                <td>{escape(bjt_str)}</td><td>{escape(item['subject'])}</td>
                <td><div class='preview'>{preview_text}</div></td>
                <td>{escape(item['recipient'])}</td><td>{escape(sender_addr or item['sender'] or "")}</td>
                <td><a href="{url_for('view_email_detail', email_id=item['id'])}" target="_blank">查看</a></td></tr>"""

    pagination_html = '<div class="pagination">'
    if total_pages > 1:
        if page > 1: pagination_html += f'<a href="{url_for(view_endpoint, page=page-1, search=search_query)}">&laquo; 上一页</a>'
        for p in range(1, total_pages + 1):
            if p == page: pagination_html += f'<strong>{p}</strong>'
            else: pagination_html += f'<a href="{url_for(view_endpoint, page=p, search=search_query)}">{p}</a>'
        if page < total_pages: pagination_html += f'<a href="{url_for(view_endpoint, page=page+1, search=search_query)}">下一页 &raquo;</a>'
    pagination_html += '</div>'
            
    html_template += f"""
        </tbody></table>
        {'<div class="actions" style="margin-top: 1em;"><button type="submit">删除选中邮件</button></div>' if emails_data else ''}
        </form>{pagination_html}
        <script>function toggleAll(source) {{ checkboxes = document.getElementsByName('selected_ids'); for(var c of checkboxes) c.checked = source.checked; }}</script>
        </body></html>
    """
    return Response(html_template, mimetype="text/html; charset=utf-8")

@app.route('/view_email/<int:email_id>')
@login_required
def view_email_detail(email_id):
    user_email = session['user_email']
    conn = get_db_conn()
    email = None
    if session.get('is_admin'):
        email = conn.execute("SELECT * FROM received_emails WHERE id = ?", (email_id,)).fetchone()
    else:
        email = conn.execute("SELECT * FROM received_emails WHERE id = ? AND recipient = ?", (email_id, user_email)).fetchone()
    conn.close()
    if email: return Response(email['body'], mimetype=f"{email['body_type']}; charset=utf-8")
    return "邮件未找到或您无权查看。", 404

@app.route('/compose', methods=['GET', 'POST'])
@login_required
def compose_email():
    if request.method == 'POST':
        to = request.form.get('to')
        subject = request.form.get('subject')
        body = request.form.get('body')
        if not to or not subject or not body:
            flash("收件人、主题和内容都不能为空!", 'error')
        else:
            success, message = send_email(to, subject, body)
            flash(message, 'success' if success else 'error')
            if success: return redirect(url_for('view_emails'))
    
    compose_html = """
        <!DOCTYPE html><html><head><title>写邮件</title>
        <style>
            body{font-family: sans-serif; margin: 2em;} .container{max-width: 800px; margin: auto;}
            a {color: #4CAF50; text-decoration:none; margin-bottom: 1em; display: inline-block;}
            label{display: block; margin-top: 1em;} input, textarea{width: 100%; padding: 8px; margin-top: 5px; box-sizing: border-box; border: 1px solid #ccc; border-radius: 4px;}
            textarea{height: 200px; resize: vertical;} button{margin-top: 1em; padding: 10px 15px; cursor: pointer; background-color: #4CAF50; color: white; border: none; border-radius: 4px;}
            .flash{padding: 1em; margin-bottom: 1em; border-radius: 5px;}
            .flash.success{background-color: #d4edda; color: #155724;}
            .flash.error{background-color: #f8d7da; color: #721c24;}
        </style>
        </head><body><div class="container">
        <p><a href="{{ url_for('view_emails') }}">&laquo; 返回收件箱</a></p>
        <h2>写邮件</h2>
        {% with messages = get_flashed_messages(with_categories=true) %}
            {% if messages %}
                {% for category, message in messages %}
                    <div class="flash {{ category }}">{{ message }}</div>
                {% endfor %}
            {% endif %}
        {% endwith %}
        <form method="POST">
            <label for="to">收件人:</label><input type="email" id="to" name="to" required value="{{ request.form.get('to', '') }}">
            <label for="subject">主题:</label><input type="text" id="subject" name="subject" required value="{{ request.form.get('subject', '') }}">
            <label for="body">正文:</label><textarea id="body" name="body" required>{{ request.form.get('body', '') }}</textarea>
            <button type="submit">发送邮件</button>
        </form>
        </div></body></html>
    """
    return render_template_string(compose_html)

@app.route('/add_user', methods=['GET', 'POST'])
@login_required
@admin_required
def add_user():
    if request.method == 'POST':
        email = request.form.get('email')
        password = request.form.get('password')
        password_confirm = request.form.get('password_confirm')
        if not email or not password or not password_confirm:
            flash("邮箱和密码不能为空!", 'error')
        elif password != password_confirm:
            flash("两次输入的密码不匹配!", 'error')
        else:
            try:
                conn = get_db_conn()
                cursor = conn.cursor()
                cursor.execute("INSERT INTO users (email, password_hash) VALUES (?, ?)", (email, generate_password_hash(password)))
                conn.commit()
                flash(f"用户 '{escape(email)}' 添加成功。", 'success')
                return redirect(url_for('admin_view'))
            except sqlite3.IntegrityError:
                flash(f"错误:用户 '{escape(email)}' 已存在。", 'error')
            finally:
                if conn: conn.close()
    
    add_user_html = """
        <!DOCTYPE html><html><head><title>新建用户</title>
        <style>
            body{font-family: sans-serif; margin: 2em;} .container{max-width: 800px; margin: auto;}
            a {color: #4CAF50; text-decoration:none; margin-bottom: 1em; display: inline-block;}
            label{display: block; margin-top: 1em;} input{width: 100%; padding: 8px; margin-top: 5px; box-sizing: border-box; border: 1px solid #ccc; border-radius: 4px;}
            button{margin-top: 1em; padding: 10px 15px; cursor: pointer; background-color: #337ab7; color: white; border: none; border-radius: 4px;}
            .flash{padding: 1em; margin-bottom: 1em; border-radius: 5px;}
            .flash.success{background-color: #d4edda; color: #155724;}
            .flash.error{background-color: #f8d7da; color: #721c24;}
        </style>
        </head><body><div class="container">
        <p><a href="{{ url_for('admin_view') }}">&laquo; 返回管理员视图</a></p>
        <h2>新建用户</h2>
        {% with messages = get_flashed_messages(with_categories=true) %}
            {% if messages %}
                {% for category, message in messages %}
                    <div class="flash {{ category }}">{{ message }}</div>
                {% endfor %}
            {% endif %}
        {% endwith %}
        <form method="POST">
            <label for="email">新用户邮箱地址:</label><input type="email" id="email" name="email" required>
            <label for="password">密码:</label><input type="password" id="password" name="password" required>
            <label for="password_confirm">确认密码:</label><input type="password" id="password_confirm" name="password_confirm" required>
            <button type="submit">创建用户</button>
        </form>
        </div></body></html>
    """
    return render_template_string(add_user_html)

@app.route('/delete_selected_emails', methods=['POST'])
@login_required
def delete_selected_emails():
    user_email = session['user_email']
    ids = request.form.getlist('selected_ids')
    search = request.args.get('search', '')
    page = request.args.get('page', 1, type=int)
    if ids:
        try:
            conn = get_db_conn()
            cursor = conn.cursor()
            placeholders = ','.join('?'*len(ids))
            query = f"DELETE FROM received_emails WHERE id IN ({placeholders}) AND recipient = ?"
            cursor.execute(query, ids + [user_email])
            conn.commit()
        finally:
            if conn: conn.close()
    return redirect(url_for('view_emails', search=search, page=page))

@app.route('/delete_all_emails', methods=['POST'])
@login_required
def delete_all_emails():
    user_email = session['user_email']
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute("DELETE FROM received_emails WHERE recipient = ?", (user_email,))
        conn.commit()
    finally:
        if conn: conn.close()
    return redirect(url_for('view_emails'))

# 【新增】管理员删除路由
@app.route('/admin_delete_selected_emails', methods=['POST'])
@login_required
@admin_required
def admin_delete_selected_emails():
    ids = request.form.getlist('selected_ids')
    search = request.args.get('search', '')
    page = request.args.get('page', 1, type=int)
    if ids:
        try:
            conn = get_db_conn()
            cursor = conn.cursor()
            placeholders = ','.join('?'*len(ids))
            query = f"DELETE FROM received_emails WHERE id IN ({placeholders})"
            cursor.execute(query, ids)
            conn.commit()
        finally:
            if conn: conn.close()
    return redirect(url_for('admin_view', search=search, page=page))

@app.route('/admin_delete_all_emails', methods=['POST'])
@login_required
@admin_required
def admin_delete_all_emails():
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute("DELETE FROM received_emails")
        conn.commit()
    finally:
        if conn: conn.close()
    return redirect(url_for('admin_view'))

init_db()
  • manage_users.py(路径:/opt/mail_api)

import sqlite3
import argparse
from getpass import getpass
from werkzeug.security import generate_password_hash
import os

# --- 配置 ---
# 确保脚本能找到正确的数据库文件路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_FILE = os.path.join(BASE_DIR, 'emails.db')

def get_db_conn():
    """获取数据库连接"""
    conn = sqlite3.connect(DB_FILE)
    conn.row_factory = sqlite3.Row
    return conn

def setup_database():
    """初始化数据库,确保所有需要的表都已创建"""
    print("正在检查并初始化数据库...")
    conn = get_db_conn()
    c = conn.cursor()
    # 创建 users 表,用于存储用户信息
    c.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            email TEXT UNIQUE NOT NULL,
            password_hash TEXT NOT NULL
        )
    ''')
    # 同时确保 received_emails 表也存在
    c.execute('''
        CREATE TABLE IF NOT EXISTS received_emails (
            id INTEGER PRIMARY KEY AUTOINCREMENT, recipient TEXT, sender TEXT,
            subject TEXT, body TEXT, body_type TEXT, 
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    conn.close()
    print("数据库初始化完成。")

def add_user(email, password):
    """添加一个新用户"""
    conn = get_db_conn()
    cursor = conn.cursor()
    try:
        # 使用 werkzeug 生成安全的密码哈希值
        password_hash = generate_password_hash(password)
        cursor.execute(
            "INSERT INTO users (email, password_hash) VALUES (?, ?)",
            (email, password_hash)
        )
        conn.commit()
        print(f"用户 '{email}' 添加成功。")
    except sqlite3.IntegrityError:
        # 如果邮箱地址已存在,会触发这个错误
        print(f"错误:用户 '{email}' 已存在。")
    finally:
        conn.close()

def delete_user(email):
    """根据邮箱地址删除一个用户"""
    conn = get_db_conn()
    cursor = conn.cursor()
    cursor.execute("DELETE FROM users WHERE email = ?", (email,))
    if cursor.rowcount > 0:
        conn.commit()
        print(f"用户 '{email}' 已被删除。")
    else:
        print(f"错误:未找到用户 '{email}'。")
    conn.close()

def list_users():
    """列出所有已存在的用户"""
    conn = get_db_conn()
    cursor = conn.cursor()
    cursor.execute("SELECT id, email FROM users ORDER BY id")
    users = cursor.fetchall()
    if not users:
        print("数据库中当前没有用户。")
    else:
        print("用户列表:")
        print("--------------------")
        for user in users:
            print(f"  ID: {user['id']:<5} Email: {user['email']}")
        print("--------------------")
    conn.close()

if __name__ == "__main__":
    # 每次运行脚本时,都先确保数据库和表是准备好的
    setup_database()

    # 使用 argparse 创建友好的命令行界面
    parser = argparse.ArgumentParser(
        description="邮件服务用户管理工具",
        formatter_class=argparse.RawTextHelpFormatter
    )
    subparsers = parser.add_subparsers(dest='command', required=True, help='可用的命令')

    # 'add' 命令
    parser_add = subparsers.add_parser('add', help='添加一个新用户 (例如: python3 manage_users.py add user@example.com)')
    parser_add.add_argument('email', type=str, help='新用户的邮箱地址')

    # 'delete' 命令
    parser_delete = subparsers.add_parser('delete', help='删除一个用户 (例如: python3 manage_users.py delete user@example.com)')
    parser_delete.add_argument('email', type=str, help='要删除的用户的邮箱地址')

    # 'list' 命令
    subparsers.add_parser('list', help='列出所有用户 (例如: python3 manage_users.py list)')

    args = parser.parse_args()

    # 根据命令执行相应的函数
    if args.command == 'add':
        # 使用 getpass 安全地输入密码,不会在屏幕上显示
        password = getpass(f"请输入用户 '{args.email}' 的密码: ")
        if not password:
            print("密码不能为空。操作取消。")
        else:
            password_confirm = getpass("请再次输入密码确认: ")
            if password != password_confirm:
                print("两次输入的密码不匹配。操作取消。")
            else:
                add_user(args.email, password)
    elif args.command == 'delete':
        # 删除前进行确认
        confirm = input(f"您确定要删除用户 '{args.email}' 吗?此操作无法恢复。[y/N]: ")
        if confirm.lower() == 'y':
            delete_user(args.email)
        else:
            print("操作已取消。")
    elif args.command == 'list':
        list_users()

3. 设置为系统服务

为了让程序在后台稳定运行,我们需要为邮件接收服务 (smtp_server.py,它现在被集成在 app.py 之外) 和Web服务 (app.py) 创建 systemd 服务文件。

  • 创建 smtp_server.py 文件: 使用 nano /opt/mail_api/smtp_server.py 创建文件,并粘贴以下内容:

    import asyncio
    from aiosmtpd.controller import Controller
    from app import process_email_data
    
    class CustomSMTPHandler:
        async def handle_DATA(self, server, session, envelope):
            print(f'收到邮件 from <{envelope.mail_from}> to <{envelope.rcpt_tos}>')
            for recipient in envelope.rcpt_tos:
                try:
                    # 注意:这里我们假设 process_email_data 是一个同步函数
                    # 在一个新线程中运行它以避免阻塞事件循环
                    loop = asyncio.get_event_loop()
                    await loop.run_in_executor(None, process_email_data, recipient, envelope.content)
                except Exception as e:
                    print(f"处理邮件时发生错误: {e}")
            return '250 OK'
    
    if __name__ == '__main__':
        controller = Controller(CustomSMTPHandler(), hostname='0.0.0.0', port=25)
        print("SMTP 服务器正在启动,监听 0.0.0.0:25...")
        controller.start()
        print("SMTP 服务器已启动。按 Ctrl+C 关闭。")
        try:
            asyncio.get_event_loop().run_forever()
        except KeyboardInterrupt:
            print("正在关闭 SMTP 服务器...")
        finally:
            controller.stop()
    
    
  • 创建 mail-api.service 文件: 使用 nano /etc/systemd/system/mail-api.service 创建文件,并粘贴以下内容 (将 [username] 替换为 root):

    [Unit]
    Description=Gunicorn instance for Mail Web App
    After=network.target
    
    [Service]
    User=root
    Group=www-data
    WorkingDirectory=/opt/mail_api
    Environment="PATH=/opt/mail_api/venv/bin"
    ExecStart=/opt/mail_api/venv/bin/gunicorn --workers 3 --bind 0.0.0.0:2099 app:app
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
    
  • 创建 mail-smtp.service 文件: 使用 nano /etc/systemd/system/mail-smtp.service 创建文件,并粘贴以下内容 (将 [username] 替换为 root):

    [Unit]
    Description=Custom Python SMTP Server
    After=network.target
    
    [Service]
    User=root
    WorkingDirectory=/opt/mail_api
    ExecStart=/opt/mail_api/venv/bin/python3 smtp_server.py
    AmbientCapabilities=CAP_NET_BIND_SERVICE
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
    

4. 启动服务

# 重新加载配置
systemctl daemon-reload

# 启动并设置开机自启
systemctl restart mail-api.service mail-smtp.service
systemctl enable mail-api.service mail-smtp.service

# 检查服务状态
systemctl status mail-api.service
systemctl status mail-smtp.service

第四部分:使用与管理

  1. 创建管理员和用户

    • 首先,您必须创建一个 admin 账户才能使用管理员功能。

    • 在服务器上,进入 /opt/mail_api 目录,激活虚拟环境 source venv/bin/activate

    • 运行 python3 manage_users.py add admin 并设置密码。

    • 用同样的方式添加其他普通用户,例如 python3 manage_users.py add user1@mail.sijuly.nyc.mn

  2. 登录系统

    • 在浏览器中访问 http://您的IP:2099/login

    • 使用您创建的完整邮箱地址(或 admin)和密码登录。

  3. 使用功能

    • 登录后,您将进入个人收件箱。

    • 您可以点击右上角的“切换到管理员视图”并输入管理员密码来查看所有邮件。

    • 在管理员视图中,您还可以点击“新建用户”来在线添加新用户。

恭喜您!现在您拥有了一个功能强大且完全由您掌控的通用多用户网页邮箱系统。