轻量级邮件收发与管理系统搭建教程(只收邮件)

Administrator 24 2025-07-30

最终版:带网页界面的轻量级邮件接收系统搭建完整教程

本教程将指导您在一台全新的 Debian 12 服务器上,搭建一个轻量级邮件系统。该系统不仅可以通过API接口提取邮件,还提供了一个简单的、受密码保护的网页界面,让您可以方便地浏览和管理所有收到的邮件。

第一部分:前期准备

  1. 一台服务器:拥有一个固定的公网 IP 地址 (本教程以 Debian 12 为例)。

  2. 一个域名:您拥有该域名的 DNS 管理权限 (本教程以 mail.xxx.mn 为例)。

  3. SSH 访问权限:能够以 root 用户身份登录服务器。

第二部分:DNS 与服务器配置

1. 配置域名 DNS

登录到您域名的DNS管理后台,添加以下两条记录:

  • A 记录:

    • 类型: A

    • 主机/名称: mail

    • 值/指向: 您的服务器IP地址

  • MX 记录:

    • 类型: MX

    • 主机/名称: mail

    • 值/服务器: mail.xxx.mn

    • 优先级: 10

2. 配置服务器环境

⚠️ 重要提示:本教程所有命令均以 root 用户身份执行,全程无需使用 sudo 命令。

# 更新系统
apt-get update && apt-get upgrade -y

# 安装必要的软件包
apt-get install -y python3-pip python3-venv ufw sqlite3

# 配置防火墙 (假设您的Web服务运行在2099端口)
ufw allow ssh
ufw allow 25/tcp
ufw allow 2099/tcp
ufw enable # 出现提示时输入 y

第三部分:部署应用程序

1. 创建项目结构和依赖安装

# 创建并进入项目目录
mkdir -p /opt/mail_api
cd /opt/mail_api

# 创建并激活 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装 Python 依赖库
pip install flask gunicorn aiosmtpd Werkzeug

2. 创建 app.py (API接口、网页界面与邮件处理逻辑)

使用 nano /opt/mail_api/app.py 命令创建文件,并将下面的完整代码全部复制粘贴进去。这份代码已经包含了您需要的所有新功能。

import sqlite3
import re
from flask import Flask, request, Response, jsonify, session, redirect, url_for, render_template_string
from functools import wraps
from email import message_from_bytes
from email.header import decode_header
from markupsafe import escape

# --- 配置 ---
DB_FILE = 'emails.db'
YOUR_API_TOKEN = "2088" 
YOUR_UI_PASSWORD = "change_this_password" # !!! 强烈建议:修改为一个非常复杂的密码

# --- Flask 应用设置 ---
app = Flask(__name__)
app.config['SECRET_KEY'] = 'a_very_secret_random_string_for_session' # !!! 强烈建议修改

# --- 数据库操作 ---
def get_db_conn():
    conn = sqlite3.connect(DB_FILE, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn

def init_db():
    conn = get_db_conn()
    c = conn.cursor()
    c.execute('''
        CREATE TABLE IF NOT EXISTS received_emails (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            recipient TEXT, sender TEXT, subject TEXT, body TEXT, body_type TEXT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    conn.close()

# --- 核心邮件处理逻辑 ---
def process_email_data(to_address, raw_email_data):
    msg = message_from_bytes(raw_email_data)
    subject = ""
    if msg['Subject']:
        subject_raw, encoding = decode_header(msg['Subject'])[0]
        if isinstance(subject_raw, bytes): subject = subject_raw.decode(encoding or 'utf-8', errors='ignore')
        else: subject = str(subject_raw)
    body, body_type = "", "text/plain"
    if msg.is_multipart():
        html_part, text_part = None, None
        for part in msg.walk():
            if "text/html" in part.get_content_type(): html_part = part
            elif "text/plain" in part.get_content_type(): text_part = part
        if html_part:
            body = html_part.get_payload(decode=True).decode(html_part.get_content_charset() or 'utf-8', errors='ignore')
            body_type = "text/html"
        elif text_part:
            body = text_part.get_payload(decode=True).decode(text_part.get_content_charset() or 'utf-8', errors='ignore')
            body_type = "text/plain"
    else:
        body = msg.get_payload(decode=True).decode(msg.get_content_charset() or 'utf-8', errors='ignore')
        body_type = msg.get_content_type()
    conn = None
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO received_emails (recipient, sender, subject, body, body_type) VALUES (?, ?, ?, ?, ?)",
            (to_address, msg.get('From'), subject, body, body_type)
        )
        conn.commit()
        print(f"邮件已存入数据库: To='{to_address}', Subject='{subject}'")
    finally:
        if conn: conn.close()

# --- 登录系统 ---
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'logged_in' not in session:
            return redirect(url_for('login', next=request.url))
        return f(*args, **kwargs)
    return decorated_function

# --- Flask 应用 ---
@app.route('/Mail', methods=['GET'])
def get_mail_content():
    token = request.args.get('token')
    mail_address_to_find = request.args.get('mail')
    if not token or token != YOUR_API_TOKEN: return jsonify({"error": "Invalid token"}), 401
    if not mail_address_to_find: return jsonify({"error": "mail parameter is missing"}), 400
    subject_keywords = ["verify your email address", "验证您的电子邮件地址", "e メールアドレスを検証してください"]
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute("SELECT id, subject, body, body_type FROM received_emails WHERE recipient = ? ORDER BY id DESC LIMIT 50", (mail_address_to_find,))
        messages = cursor.fetchall()
        for msg in messages:
            subject = msg['subject'] or ""
            if any(keyword in subject.lower() for keyword in subject_keywords):
                return Response(msg['body'], mimetype=f"{msg['body_type']}; charset=utf-8")
        return jsonify({"error": "Not found or no code available"}), 404
    finally:
        if 'conn' in locals() and conn: conn.close()

@app.route('/login', methods=['GET', 'POST'])
def login():
    error = None
    if request.method == 'POST':
        if request.form.get('password') == YOUR_UI_PASSWORD:
            session['logged_in'] = True
            next_url = request.args.get('next') or url_for('view_emails')
            return redirect(next_url)
        else:
            error = '密码错误,请重试'
    
    login_form_html = f"""
        <!DOCTYPE html><html><head><title>登录</title>
        <style>body{{display:flex; justify-content:center; align-items:center; height:100vh; font-family:sans-serif;}} 
        .login-box{{padding:2em; border:1px solid #ccc; border-radius:5px; background-color:#f9f9f9;}}
        input{{display:block; margin-top:0.5em; margin-bottom:1em; padding:0.5em; width: 200px;}}
        .error{{color:red;}}</style></head>
        <body><div class="login-box"><h2>请登录查看邮箱</h2>
        {'<p class="error">' + escape(error) + '</p>' if error else ''}
        <form method="post"><label>密码:</label><input type="password" name="password">
        <input type="hidden" name="next" value="{escape(request.args.get('next', ''))}">
        <input type="submit" value="登录" style="width:100%;"></form>
        </div></body></html>
    """
    return Response(login_form_html, mimetype="text/html; charset=utf-8")

# 【已升级】邮件列表查看页面,增加管理功能
@app.route('/view_emails')
@login_required
def view_emails():
    conn = get_db_conn()
    cursor = conn.cursor()
    cursor.execute("SELECT id, timestamp, sender, recipient, subject FROM received_emails ORDER BY id DESC LIMIT 100")
    emails = cursor.fetchall()
    conn.close()
    
    html = """
    <!DOCTYPE html><html><head><title>收件箱</title>
    <style>
        body{font-family: sans-serif; margin: 2em;} 
        table{border-collapse: collapse; width: 100%;}
        th, td{border: 1px solid #ddd; padding: 8px; text-align: left;}
        tr:nth-child(even){background-color: #f2f2f2;} 
        th{background-color: #4CAF50; color: white;}
        .actions { margin-bottom: 1em; }
        .actions button { margin-right: 10px; padding: 8px 12px; cursor: pointer; }
    </style>
    </head><body>
    <h2>收件箱 (最新 100 封)</h2>
    <div class="actions">
        <button onclick="window.location.reload();">刷新列表</button>
        <form method="POST" action="/delete_all_emails" style="display: inline;" onsubmit="return confirm('您确定要删除所有邮件吗?这将无法恢复!');">
            <button type="submit">删除所有邮件</button>
        </form>
    </div>
    <form method="POST" action="/delete_selected_emails">
    <table>
        <thead><tr>
            <th><input type="checkbox" onclick="toggleAllCheckboxes(this);"></th>
            <th>时间</th><th>发件人</th><th>收件人</th><th>主题</th><th>操作</th>
        </tr></thead>
        <tbody>
    """
    for email in emails:
        html += f"""<tr>
            <td><input type="checkbox" name="selected_ids" value="{email['id']}"></td>
            <td>{escape(email['timestamp'])}</td>
            <td>{escape(email['sender'])}</td>
            <td>{escape(email['recipient'])}</td>
            <td>{escape(email['subject'])}</td>
            <td><a href="/view_email/{email["id"]}" target="_blank">查看</a></td>
        </tr>"""
    html += """
        </tbody>
    </table>
    <div class="actions" style="margin-top: 1em;">
        <button type="submit">删除选中邮件</button>
    </div>
    </form>
    <script>
        function toggleAllCheckboxes(source) {
            checkboxes = document.getElementsByName('selected_ids');
            for(var i=0, n=checkboxes.length;i<n;i++) {
                checkboxes[i].checked = source.checked;
            }
        }
    </script>
    </body></html>
    """
    return Response(html, mimetype="text/html; charset=utf-8")

@app.route('/view_email/<int:email_id>')
@login_required
def view_email_detail(email_id):
    conn = get_db_conn()
    cursor = conn.cursor()
    email = cursor.execute("SELECT body, body_type FROM received_emails WHERE id = ?", (email_id,)).fetchone()
    conn.close()
    if email:
        return Response(email['body'], mimetype=f"{email['body_type']}; charset=utf-8")
    return "邮件未找到", 404

# 【新增】处理删除选中邮件的请求
@app.route('/delete_selected_emails', methods=['POST'])
@login_required
def delete_selected_emails():
    selected_ids = request.form.getlist('selected_ids')
    if selected_ids:
        try:
            conn = get_db_conn()
            cursor = conn.cursor()
            placeholders = ','.join('?' for _ in selected_ids)
            query = f"DELETE FROM received_emails WHERE id IN ({placeholders})"
            cursor.execute(query, selected_ids)
            conn.commit()
        finally:
            if 'conn' in locals() and conn:
                conn.close()
    return redirect(url_for('view_emails'))

# 【新增】处理删除所有邮件的请求
@app.route('/delete_all_emails', methods=['POST'])
@login_required
def delete_all_emails():
    try:
        conn = get_db_conn()
        cursor = conn.cursor()
        cursor.execute("DELETE FROM received_emails")
        conn.commit()
    finally:
        if 'conn' in locals() and conn:
            conn.close()
    return redirect(url_for('view_emails'))

init_db()

3. 创建 smtp_server.py (邮件接收服务)

使用 nano /opt/mail_api/smtp_server.py 命令创建文件,并将以下全部代码粘贴进去。

import asyncio
import time
from aiosmtpd.controller import Controller
from app import process_email_data

class CustomSMTPHandler:
    async def handle_DATA(self, server, session, envelope):
        print(f'收到邮件 from <{envelope.mail_from}> to <{envelope.rcpt_tos}>')
        for recipient in envelope.rcpt_tos:
            try:
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(None, process_email_data, recipient, envelope.content)
            except Exception as e:
                print(f"处理邮件时发生错误: {e}")
        return '250 OK'

if __name__ == '__main__':
    controller = Controller(CustomSMTPHandler(), hostname='0.0.0.0', port=25)
    controller.start()
    print("SMTP 服务器已启动,正在后台运行...")
    try:
        while True:
            time.sleep(3600)
    except KeyboardInterrupt:
        print("正在停止 SMTP 服务器...")
    finally:
        controller.stop()
        print("SMTP 服务器已停止。")

4. 设置为系统服务 (持久化运行)

  • 创建 mail-api.service 文件: 使用 nano /etc/systemd/system/mail-api.service 创建文件,并粘贴以下内容:

    [Unit]
    Description=Gunicorn instance for Mail API
    After=network.target
    
    [Service]
    User=root
    WorkingDirectory=/opt/mail_api
    Environment="PATH=/opt/mail_api/venv/bin"
    ExecStart=/opt/mail_api/venv/bin/gunicorn --workers 3 --bind 0.0.0.0:2099 app:app
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
    
  • 创建 mail-smtp.service 文件: 使用 nano /etc/systemd/system/mail-smtp.service 创建文件,并粘贴以下内容:

    [Unit]
    Description=Custom Python SMTP Server
    After=network.target
    
    [Service]
    User=root
    WorkingDirectory=/opt/mail_api
    ExecStart=/opt/mail_api/venv/bin/python3 smtp_server.py
    AmbientCapabilities=CAP_NET_BIND_SERVICE
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
    

5. 启动服务

# 重新加载配置
systemctl daemon-reload

# 启动并设置开机自启
systemctl restart mail-api.service mail-smtp.service
systemctl enable mail-api.service mail-smtp.service

# 检查服务状态
systemctl status mail-api.service
systemctl status mail-smtp.service

第五部分:使用与测试

  1. 修改密码:在部署后,请务必用 nano /opt/mail_api/app.py 命令,将 YOUR_UI_PASSWORDapp.config['SECRET_KEY'] 修改为您自己的复杂密码和密钥,然后重启服务 systemctl restart mail-api.service mail-smtp.service

  2. 访问网页端:在浏览器中打开 http://您的服务器IP:2099/view_emails

  3. 使用新功能

    • 页面顶部现在有了“刷新列表”和“删除所有邮件”按钮。

    • 每封邮件前都有一个复选框,您可以勾选后点击页面底部的“删除选中邮件”按钮。

    • 表格头部的复选框可以帮您实现全选/全不选。