-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
建议增加一个 slack 的推送方式 #83
Labels
Comments
我没有这个的使用需求,如果你用的比较多,欢迎提交一个 PR 来支持 |
外企用的slack+邮箱,需求+1 |
推送slack解决方案 python转发 version: '3'
services:
watchvuln:
image: zemal/watchvuln:latest
container_name: watchvuln
restart: always
volumes:
- /opt/watchvuln/config:/config
environment:
- WEBHOOK_URL=http://10.0.0.68:5000/webhook
- INTERVAL=30m
- WHITELIST_FILE=/config/whitelist.txt slack webhook from flask import Flask, request, jsonify
import requests
import json
import logging
app = Flask(__name__)
# 设置日志记录
logging.basicConfig(level=logging.INFO)
# Slack Webhook URL
SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/xxxx' # 你的 Slack Webhook URL
def send_to_slack(message):
"""发送消息到Slack"""
response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(message), headers={'Content-Type': 'application/json'})
return response
@app.route('/webhook', methods=['POST'])
def receive_webhook():
# 获取请求体的数据
data = request.json
# 如果没有数据,返回错误
if not data:
return jsonify({'error': 'No data received'}), 400
# 打印接收到的数据,便于后续排查
logging.info(f"Received webhook data: {data}")
# 解析 Webhook 类型
webhook_type = data.get("type", "")
content = data.get("content", {})
# 根据不同的类型,构建不同的 Slack 消息格式
if webhook_type == "watchvuln-initial":
message = format_initial_message(content)
elif webhook_type == "watchvuln-text":
message = format_text_message(content)
elif webhook_type == "watchvuln-vulninfo":
message = format_vulninfo_message(content)
else:
return jsonify({'error': 'Unknown webhook type'}), 400
# 发送消息到 Slack
response = send_to_slack(message)
if response.status_code != 200:
return jsonify({'error': 'Failed to send message to Slack', 'response': response.text}), 500
return jsonify({'status': 'success'}), 200
def format_initial_message(content):
"""格式化 watchvuln-initial 类型的消息"""
version = content.get("version", "N/A")
vuln_count = content.get("vuln_count", 0)
interval = content.get("interval", "N/A")
providers = content.get("provider", [])
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Vulnerability Watch Initial Report*\nVersion: `{version}`\nTotal Vulnerabilities: `{vuln_count}`\nCheck Interval: `{interval}`"
}
},
{"type": "divider"}
]
for provider in providers:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{provider['display_name']}*\nLink: <{provider['link']}|{provider['name']}>"
}
})
return {"blocks": blocks}
def format_text_message(content):
"""格式化 watchvuln-text 类型的消息"""
message_text = content.get("message", "No message provided")
return {
"text": f"*WatchVuln Notification*\n{message_text}"
}
def format_vulninfo_message(content):
"""格式化 watchvuln-vulninfo 类型的消息"""
unique_key = content.get("unique_key", "N/A")
title = content.get("title", "No title provided")
description = content.get("description", "No description provided")
severity = content.get("severity", "N/A")
cve = content.get("cve", "N/A")
disclosure = content.get("disclosure", "N/A")
references = content.get("references", [])
tags = content.get("tags", []) or [] # 确保 tags 是列表,避免 None
from_source = content.get("from", "")
reason = content.get("reason", [])
# 检查 references 是否为空或 None
if not references:
references_text = "No references provided"
else:
references_text = "\n".join([f"<{ref}>" for ref in references])
# 构建消息块
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Vulnerability Alert*\n*Title*: {title}\n*Severity*: {severity}\n*CVE*: {cve}\n*Disclosure Date*: {disclosure}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Description*: {description}"
}
},
{"type": "divider"},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*References*:\n{references_text}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Tags*: {', '.join(tags)}\n*Source*: <{from_source}>"
}
}
]
return {"blocks": blocks}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
No description provided.
The text was updated successfully, but these errors were encountered: