最终版:带网页界面的轻量级邮件接收系统搭建完整教程
本教程将指导您在一台全新的 Debian 12 服务器上,搭建一个轻量级邮件系统。该系统不仅可以通过API接口提取邮件,还提供了一个简单的、受密码保护的网页界面,让您可以方便地浏览和管理所有收到的邮件。
第一部分:前期准备
一台服务器:拥有一个固定的公网 IP 地址 (本教程以 Debian 12 为例)。
一个域名:您拥有该域名的 DNS 管理权限 (本教程以
mail.xxx.mn为例)。SSH 访问权限:能够以
root用户身份登录服务器。
第二部分:DNS 与服务器配置
1. 配置域名 DNS
登录到您域名的DNS管理后台,添加以下两条记录:
A 记录:
类型:
A主机/名称:
mail值/指向: 您的服务器IP地址
MX 记录:
类型:
MX主机/名称:
mail值/服务器:
mail.xxx.mn优先级:
10
2. 配置服务器环境
⚠️ 重要提示:本教程所有命令均以
root用户身份执行,全程无需使用sudo命令。
# 更新系统
apt-get update && apt-get upgrade -y
# 安装必要的软件包
apt-get install -y python3-pip python3-venv ufw sqlite3
# 配置防火墙 (假设您的Web服务运行在2099端口)
ufw allow ssh
ufw allow 25/tcp
ufw allow 2099/tcp
ufw enable # 出现提示时输入 y
第三部分:部署应用程序
1. 创建项目结构和依赖安装
# 创建并进入项目目录
mkdir -p /opt/mail_api
cd /opt/mail_api
# 创建并激活 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装 Python 依赖库
pip install flask gunicorn aiosmtpd Werkzeug
2. 创建 app.py (API接口、网页界面与邮件处理逻辑)
使用 nano /opt/mail_api/app.py 命令创建文件,并将下面的完整代码全部复制粘贴进去。这份代码已经包含了您需要的所有新功能。
import sqlite3
import re
from flask import Flask, request, Response, jsonify, session, redirect, url_for, render_template_string
from functools import wraps
from email import message_from_bytes
from email.header import decode_header
from markupsafe import escape
# --- 配置 ---
DB_FILE = 'emails.db'
YOUR_API_TOKEN = "2088"
YOUR_UI_PASSWORD = "change_this_password" # !!! 强烈建议:修改为一个非常复杂的密码
# --- Flask 应用设置 ---
app = Flask(__name__)
app.config['SECRET_KEY'] = 'a_very_secret_random_string_for_session' # !!! 强烈建议修改
# --- 数据库操作 ---
def get_db_conn():
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_conn()
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS received_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT, sender TEXT, subject TEXT, body TEXT, body_type TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
# --- 核心邮件处理逻辑 ---
def process_email_data(to_address, raw_email_data):
msg = message_from_bytes(raw_email_data)
subject = ""
if msg['Subject']:
subject_raw, encoding = decode_header(msg['Subject'])[0]
if isinstance(subject_raw, bytes): subject = subject_raw.decode(encoding or 'utf-8', errors='ignore')
else: subject = str(subject_raw)
body, body_type = "", "text/plain"
if msg.is_multipart():
html_part, text_part = None, None
for part in msg.walk():
if "text/html" in part.get_content_type(): html_part = part
elif "text/plain" in part.get_content_type(): text_part = part
if html_part:
body = html_part.get_payload(decode=True).decode(html_part.get_content_charset() or 'utf-8', errors='ignore')
body_type = "text/html"
elif text_part:
body = text_part.get_payload(decode=True).decode(text_part.get_content_charset() or 'utf-8', errors='ignore')
body_type = "text/plain"
else:
body = msg.get_payload(decode=True).decode(msg.get_content_charset() or 'utf-8', errors='ignore')
body_type = msg.get_content_type()
conn = None
try:
conn = get_db_conn()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO received_emails (recipient, sender, subject, body, body_type) VALUES (?, ?, ?, ?, ?)",
(to_address, msg.get('From'), subject, body, body_type)
)
conn.commit()
print(f"邮件已存入数据库: To='{to_address}', Subject='{subject}'")
finally:
if conn: conn.close()
# --- 登录系统 ---
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
# --- Flask 应用 ---
@app.route('/Mail', methods=['GET'])
def get_mail_content():
token = request.args.get('token')
mail_address_to_find = request.args.get('mail')
if not token or token != YOUR_API_TOKEN: return jsonify({"error": "Invalid token"}), 401
if not mail_address_to_find: return jsonify({"error": "mail parameter is missing"}), 400
subject_keywords = ["verify your email address", "验证您的电子邮件地址", "e メールアドレスを検証してください"]
try:
conn = get_db_conn()
cursor = conn.cursor()
cursor.execute("SELECT id, subject, body, body_type FROM received_emails WHERE recipient = ? ORDER BY id DESC LIMIT 50", (mail_address_to_find,))
messages = cursor.fetchall()
for msg in messages:
subject = msg['subject'] or ""
if any(keyword in subject.lower() for keyword in subject_keywords):
return Response(msg['body'], mimetype=f"{msg['body_type']}; charset=utf-8")
return jsonify({"error": "Not found or no code available"}), 404
finally:
if 'conn' in locals() and conn: conn.close()
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
if request.form.get('password') == YOUR_UI_PASSWORD:
session['logged_in'] = True
next_url = request.args.get('next') or url_for('view_emails')
return redirect(next_url)
else:
error = '密码错误,请重试'
login_form_html = f"""
<!DOCTYPE html><html><head><title>登录</title>
<style>body{{display:flex; justify-content:center; align-items:center; height:100vh; font-family:sans-serif;}}
.login-box{{padding:2em; border:1px solid #ccc; border-radius:5px; background-color:#f9f9f9;}}
input{{display:block; margin-top:0.5em; margin-bottom:1em; padding:0.5em; width: 200px;}}
.error{{color:red;}}</style></head>
<body><div class="login-box"><h2>请登录查看邮箱</h2>
{'<p class="error">' + escape(error) + '</p>' if error else ''}
<form method="post"><label>密码:</label><input type="password" name="password">
<input type="hidden" name="next" value="{escape(request.args.get('next', ''))}">
<input type="submit" value="登录" style="width:100%;"></form>
</div></body></html>
"""
return Response(login_form_html, mimetype="text/html; charset=utf-8")
# 【已升级】邮件列表查看页面,增加管理功能
@app.route('/view_emails')
@login_required
def view_emails():
conn = get_db_conn()
cursor = conn.cursor()
cursor.execute("SELECT id, timestamp, sender, recipient, subject FROM received_emails ORDER BY id DESC LIMIT 100")
emails = cursor.fetchall()
conn.close()
html = """
<!DOCTYPE html><html><head><title>收件箱</title>
<style>
body{font-family: sans-serif; margin: 2em;}
table{border-collapse: collapse; width: 100%;}
th, td{border: 1px solid #ddd; padding: 8px; text-align: left;}
tr:nth-child(even){background-color: #f2f2f2;}
th{background-color: #4CAF50; color: white;}
.actions { margin-bottom: 1em; }
.actions button { margin-right: 10px; padding: 8px 12px; cursor: pointer; }
</style>
</head><body>
<h2>收件箱 (最新 100 封)</h2>
<div class="actions">
<button onclick="window.location.reload();">刷新列表</button>
<form method="POST" action="/delete_all_emails" style="display: inline;" onsubmit="return confirm('您确定要删除所有邮件吗?这将无法恢复!');">
<button type="submit">删除所有邮件</button>
</form>
</div>
<form method="POST" action="/delete_selected_emails">
<table>
<thead><tr>
<th><input type="checkbox" onclick="toggleAllCheckboxes(this);"></th>
<th>时间</th><th>发件人</th><th>收件人</th><th>主题</th><th>操作</th>
</tr></thead>
<tbody>
"""
for email in emails:
html += f"""<tr>
<td><input type="checkbox" name="selected_ids" value="{email['id']}"></td>
<td>{escape(email['timestamp'])}</td>
<td>{escape(email['sender'])}</td>
<td>{escape(email['recipient'])}</td>
<td>{escape(email['subject'])}</td>
<td><a href="/view_email/{email["id"]}" target="_blank">查看</a></td>
</tr>"""
html += """
</tbody>
</table>
<div class="actions" style="margin-top: 1em;">
<button type="submit">删除选中邮件</button>
</div>
</form>
<script>
function toggleAllCheckboxes(source) {
checkboxes = document.getElementsByName('selected_ids');
for(var i=0, n=checkboxes.length;i<n;i++) {
checkboxes[i].checked = source.checked;
}
}
</script>
</body></html>
"""
return Response(html, mimetype="text/html; charset=utf-8")
@app.route('/view_email/<int:email_id>')
@login_required
def view_email_detail(email_id):
conn = get_db_conn()
cursor = conn.cursor()
email = cursor.execute("SELECT body, body_type FROM received_emails WHERE id = ?", (email_id,)).fetchone()
conn.close()
if email:
return Response(email['body'], mimetype=f"{email['body_type']}; charset=utf-8")
return "邮件未找到", 404
# 【新增】处理删除选中邮件的请求
@app.route('/delete_selected_emails', methods=['POST'])
@login_required
def delete_selected_emails():
selected_ids = request.form.getlist('selected_ids')
if selected_ids:
try:
conn = get_db_conn()
cursor = conn.cursor()
placeholders = ','.join('?' for _ in selected_ids)
query = f"DELETE FROM received_emails WHERE id IN ({placeholders})"
cursor.execute(query, selected_ids)
conn.commit()
finally:
if 'conn' in locals() and conn:
conn.close()
return redirect(url_for('view_emails'))
# 【新增】处理删除所有邮件的请求
@app.route('/delete_all_emails', methods=['POST'])
@login_required
def delete_all_emails():
try:
conn = get_db_conn()
cursor = conn.cursor()
cursor.execute("DELETE FROM received_emails")
conn.commit()
finally:
if 'conn' in locals() and conn:
conn.close()
return redirect(url_for('view_emails'))
init_db()
3. 创建 smtp_server.py (邮件接收服务)
使用 nano /opt/mail_api/smtp_server.py 命令创建文件,并将以下全部代码粘贴进去。
import asyncio
import time
from aiosmtpd.controller import Controller
from app import process_email_data
class CustomSMTPHandler:
async def handle_DATA(self, server, session, envelope):
print(f'收到邮件 from <{envelope.mail_from}> to <{envelope.rcpt_tos}>')
for recipient in envelope.rcpt_tos:
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_email_data, recipient, envelope.content)
except Exception as e:
print(f"处理邮件时发生错误: {e}")
return '250 OK'
if __name__ == '__main__':
controller = Controller(CustomSMTPHandler(), hostname='0.0.0.0', port=25)
controller.start()
print("SMTP 服务器已启动,正在后台运行...")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
print("正在停止 SMTP 服务器...")
finally:
controller.stop()
print("SMTP 服务器已停止。")
4. 设置为系统服务 (持久化运行)
创建
mail-api.service文件: 使用nano /etc/systemd/system/mail-api.service创建文件,并粘贴以下内容:[Unit] Description=Gunicorn instance for Mail API After=network.target [Service] User=root WorkingDirectory=/opt/mail_api Environment="PATH=/opt/mail_api/venv/bin" ExecStart=/opt/mail_api/venv/bin/gunicorn --workers 3 --bind 0.0.0.0:2099 app:app Restart=always [Install] WantedBy=multi-user.target创建
mail-smtp.service文件: 使用nano /etc/systemd/system/mail-smtp.service创建文件,并粘贴以下内容:[Unit] Description=Custom Python SMTP Server After=network.target [Service] User=root WorkingDirectory=/opt/mail_api ExecStart=/opt/mail_api/venv/bin/python3 smtp_server.py AmbientCapabilities=CAP_NET_BIND_SERVICE Restart=always [Install] WantedBy=multi-user.target
5. 启动服务
# 重新加载配置
systemctl daemon-reload
# 启动并设置开机自启
systemctl restart mail-api.service mail-smtp.service
systemctl enable mail-api.service mail-smtp.service
# 检查服务状态
systemctl status mail-api.service
systemctl status mail-smtp.service
第五部分:使用与测试
修改密码:在部署后,请务必用
nano /opt/mail_api/app.py命令,将YOUR_UI_PASSWORD和app.config['SECRET_KEY']修改为您自己的复杂密码和密钥,然后重启服务systemctl restart mail-api.service mail-smtp.service。访问网页端:在浏览器中打开
http://您的服务器IP:2099/view_emails。使用新功能:
页面顶部现在有了“刷新列表”和“删除所有邮件”按钮。
每封邮件前都有一个复选框,您可以勾选后点击页面底部的“删除选中邮件”按钮。
表格头部的复选框可以帮您实现全选/全不选。