From 30abb7559f4339400fbef29b3575939ae6463270 Mon Sep 17 00:00:00 2001 From: lmangani Date: Sat, 19 Oct 2024 10:46:47 +0000 Subject: [PATCH] refine, resynch --- main.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/main.py b/main.py index cd8c2ed..6360004 100644 --- a/main.py +++ b/main.py @@ -2,12 +2,18 @@ import duckdb import json import time +import tempfile +import hashlib +import base64 + from flask import Flask, request, jsonify from flask_httpauth import HTTPBasicAuth app = Flask(__name__, static_folder="public", static_url_path="") auth = HTTPBasicAuth() +dbpath = os.getenv('DBPATH', '/tmp/') + # Global connection conn = None @@ -18,9 +24,14 @@ def verify(username, password): print('stateless session') conn = duckdb.connect(':memory:') else: - path = f"{globals()['path']}/{hash(username + password)}.db" - print(f'stateful session {path}') - conn = duckdb.connect(path) + + global path + os.makedirs(path, exist_ok=True) + user_pass_hash = hashlib.sha256((username + password).encode()).hexdigest() + db_file = os.path.join(dbpath, f"{user_pass_hash}.db") + print(f'stateful session {db_file}') + conn = duckdb.connect(db_file) + return True def convert_to_ndjson(result): @@ -54,8 +65,38 @@ def convert_to_clickhouse_jsoncompact(result, query_time): return json.dumps(json_result) -def duckdb_query_with_errmsg(query, format): +def handle_insert_query(query, format, data=None): + table_name = query.split("INTO")[1].split()[0].strip() + + temp_file_name = None + if format.lower() == 'jsoneachrow' and data is not None: + temp_file_name = save_to_tempfile(data) + + if temp_file_name: + try: + ingest_query = f"COPY {table_name} FROM '{temp_file_name}' (FORMAT 'json')" + conn.execute(ingest_query) + except Exception as e: + return b"", str(e).encode() + finally: + os.remove(temp_file_name) + + return b"Ok", b"" + +def save_to_tempfile(data): + temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w+', encoding='utf-8') + temp_file.write(data) + temp_file.flush() + temp_file.close() + return temp_file.name + + +def duckdb_query_with_errmsg(query, format, data=None, request_method="GET"): try: + + if request_method == "POST" and query.strip().lower().startswith('insert into') and data: + return handle_insert_query(query, format, data) + start_time = time.time() result = conn.execute(query) query_time = time.time() - start_time @@ -83,6 +124,7 @@ def clickhouse(): query = request.args.get('query', default="", type=str) format = request.args.get('default_format', default="JSONCompact", type=str) database = request.args.get('database', default="", type=str) + data = None # Log incoming request data for debugging print(f"Received request: method={request.method}, query={query}, format={format}, database={database}") @@ -90,11 +132,14 @@ def clickhouse(): if not query: return app.send_static_file('play.html') + if request.method == "POST": + data = request.get_data(as_text=True) + if database: query = f"ATTACH '{database}' AS db; USE db; {query}" # Execute the query and capture the result and error message - result, errmsg = duckdb_query_with_errmsg(query.strip(), format) + result, errmsg = duckdb_query_with_errmsg(query.strip(), format, data, request.method) # Handle response for HEAD requests if len(errmsg) == 0: