Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

建议增加一个 slack 的推送方式 #83

Open
secpool2000 opened this issue May 16, 2024 · 3 comments
Open

建议增加一个 slack 的推送方式 #83

secpool2000 opened this issue May 16, 2024 · 3 comments

Comments

@secpool2000
Copy link

No description provided.

@zema1
Copy link
Owner

zema1 commented May 20, 2024

我没有这个的使用需求,如果你用的比较多,欢迎提交一个 PR 来支持

@madman163
Copy link

我没有这个的使用需求,如果你用的比较多,欢迎提交一个 PR 来支持

外企用的slack+邮箱,需求+1

@madman163
Copy link

madman163 commented Oct 14, 2024

推送slack解决方案 python转发
docker-compose配置

version: '3'
services:
  watchvuln:
    image: zemal/watchvuln:latest
    container_name: watchvuln
    restart: always
    volumes:
      - /opt/watchvuln/config:/config
    environment:
      - WEBHOOK_URL=http://10.0.0.68:5000/webhook
      - INTERVAL=30m
      - WHITELIST_FILE=/config/whitelist.txt

slack webhook
后台运行
nohup python3 webhook_forwarder.py &

from flask import Flask, request, jsonify
import requests
import json
import logging

app = Flask(__name__)

# 设置日志记录
logging.basicConfig(level=logging.INFO)

# Slack Webhook URL
SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/xxxx'  # 你的 Slack Webhook URL

def send_to_slack(message):
    """发送消息到Slack"""
    response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(message), headers={'Content-Type': 'application/json'})
    return response

@app.route('/webhook', methods=['POST'])
def receive_webhook():
    # 获取请求体的数据
    data = request.json

    # 如果没有数据,返回错误
    if not data:
        return jsonify({'error': 'No data received'}), 400

    # 打印接收到的数据,便于后续排查
    logging.info(f"Received webhook data: {data}")

    # 解析 Webhook 类型
    webhook_type = data.get("type", "")
    content = data.get("content", {})

    # 根据不同的类型,构建不同的 Slack 消息格式
    if webhook_type == "watchvuln-initial":
        message = format_initial_message(content)
    elif webhook_type == "watchvuln-text":
        message = format_text_message(content)
    elif webhook_type == "watchvuln-vulninfo":
        message = format_vulninfo_message(content)
    else:
        return jsonify({'error': 'Unknown webhook type'}), 400

    # 发送消息到 Slack
    response = send_to_slack(message)
    if response.status_code != 200:
        return jsonify({'error': 'Failed to send message to Slack', 'response': response.text}), 500

    return jsonify({'status': 'success'}), 200

def format_initial_message(content):
    """格式化 watchvuln-initial 类型的消息"""
    version = content.get("version", "N/A")
    vuln_count = content.get("vuln_count", 0)
    interval = content.get("interval", "N/A")
    providers = content.get("provider", [])

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Vulnerability Watch Initial Report*\nVersion: `{version}`\nTotal Vulnerabilities: `{vuln_count}`\nCheck Interval: `{interval}`"
            }
        },
        {"type": "divider"}
    ]

    for provider in providers:
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*{provider['display_name']}*\nLink: <{provider['link']}|{provider['name']}>"
            }
        })

    return {"blocks": blocks}

def format_text_message(content):
    """格式化 watchvuln-text 类型的消息"""
    message_text = content.get("message", "No message provided")

    return {
        "text": f"*WatchVuln Notification*\n{message_text}"
    }

def format_vulninfo_message(content):
    """格式化 watchvuln-vulninfo 类型的消息"""
    unique_key = content.get("unique_key", "N/A")
    title = content.get("title", "No title provided")
    description = content.get("description", "No description provided")
    severity = content.get("severity", "N/A")
    cve = content.get("cve", "N/A")
    disclosure = content.get("disclosure", "N/A")
    references = content.get("references", [])
    tags = content.get("tags", []) or []  # 确保 tags 是列表,避免 None
    from_source = content.get("from", "")
    reason = content.get("reason", [])

    # 检查 references 是否为空或 None
    if not references:
        references_text = "No references provided"
    else:
        references_text = "\n".join([f"<{ref}>" for ref in references])

    # 构建消息块
    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Vulnerability Alert*\n*Title*: {title}\n*Severity*: {severity}\n*CVE*: {cve}\n*Disclosure Date*: {disclosure}"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Description*: {description}"
            }
        },
        {"type": "divider"},
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*References*:\n{references_text}"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Tags*: {', '.join(tags)}\n*Source*: <{from_source}>"
            }
        }
    ]

    return {"blocks": blocks}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants