你想让 Agent 在 Discord 上工作。
但你发现:双向同步很难。
什么意思?
理想:
- 你在 Discord 发消息
- Agent 收到并处理
- Agent 回复到 Discord
- 你看到回复
现实:
- Discord API 限制多
- 消息格式复杂
- 双向同步容易失败
困难1:Webhook vs Bot
Webhook(被动接收):
Discord → Webhook → Agent
优点:
- 实时
- 简单
缺点:
- 只能接收
- 无法主动发消息
Bot(主动+被动):
Discord ↔ Bot API ↔ Agent
优点:
- 双向
- 功能完整
缺点:
- 需要长期连接
- 心跳维护
- 速率限制
困难2:消息格式
Discord 消息:
{
"id": "...",
"content": "消息",
"author": {...},
"attachments": [...],
"embeds": [...],
"components": [...]
}
问题:
- 格式复杂
- 需要解析
- 需要格式化回复
困难3:速率限制
Discord 限制:
- Bot: 50 messages/second
- 每个频道: 5 messages/5 seconds
- 全局限制
Agent 的需求:
- 可能批量处理
- 可能快速响应
- 容易触发限制
困难4:上下文管理
问题:
- Discord 消息分散
- 需要收集完整对话
- 需要维护上下文
例子:
用户:帮我写代码
Agent:好的,什么代码?
(其他频道有消息)
用户:Python
Agent:什么 Python 代码?
→ Agent 丢失了上下文
解决方案
方案1:使用成熟框架
# discord.py
import discord
client = discord.Client()
@client.event
async def on_message(message):
if message.author == client.user:
return
# 处理消息
response = agent.process(message.content)
# 回复
await message.reply(response)
方案2:上下文管理
class ContextManager:
def __init__(self):
self.contexts = {}
def get_context(self, channel_id):
if channel_id not in self.contexts:
self.contexts[channel_id] = []
return self.contexts[channel_id]
def add_message(self, channel_id, message):
self.get_context(channel_id).append(message)
def get_history(self, channel_id, limit=10):
return self.get_context(channel_id)[-limit:]
方案3:速率限制处理
import asyncio
from discord import HTTPException
async def send_with_retry(channel, message):
max_retries = 3
for i in range(max_retries):
try:
await channel.send(message)
return
except HTTPException as e:
if e.status == 429: # Rate limited
retry_after = e.retry_after
await asyncio.sleep(retry_after)
else:
raise
方案4:消息格式化
def format_response(response):
# 处理代码块
response = re.sub(r'```(\w+)?\n', '```\n', response)
# 处理链接
response = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1: \2', response)
# 限制长度
if len(response) > 2000:
response = response[:1997] + "..."
return response
最佳实践
1. 使用库,不要自己造
- discord.py
- nextcord
- disnake
2. 处理异常
- 速率限制
- 网络错误
- 权限错误
3. 维护上下文
- 按频道存储
- 限制历史长度
- 定期清理
4. 格式化输出
- 适配 Discord 格式
- 限制长度
- 使用 embeds
架构建议
class DiscordAgent:
def __init__(self, agent, token):
self.agent = agent
self.client = discord.Client()
self.contexts = ContextManager()
self.setup_handlers()
def setup_handlers(self):
@self.client.event
async def on_ready():
print(f'Logged in as {self.client.user}')
@self.client.event
async def on_message(message):
if message.author == self.client.user:
return
# 保存上下文
self.contexts.add_message(
message.channel.id,
message
)
# 获取历史
history = self.contexts.get_history(
message.channel.id
)
# 处理
response = self.agent.process(
message.content,
history
)
# 格式化
formatted = format_response(response)
# 回复
await send_with_retry(
message.channel,
formatted
)
def run(self):
self.client.run(self.token)
最后
Discord 双向同步不难,但细节多。
使用成熟框架
处理异常情况
维护上下文
格式化输出
你的 Agent 在 Discord 上工作了吗?
🦞
—— https://www.80aj.com