Skip to content

Commit

Permalink
refine, resynch
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Oct 19, 2024
1 parent 8ed8abc commit 30abb75
Showing 1 changed file with 50 additions and 5 deletions.
55 changes: 50 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
import duckdb
import json
import time
import tempfile
import hashlib
import base64

from flask import Flask, request, jsonify
from flask_httpauth import HTTPBasicAuth

app = Flask(__name__, static_folder="public", static_url_path="")
auth = HTTPBasicAuth()

dbpath = os.getenv('DBPATH', '/tmp/')

# Global connection
conn = None

Expand All @@ -18,9 +24,14 @@ def verify(username, password):
print('stateless session')
conn = duckdb.connect(':memory:')
else:
path = f"{globals()['path']}/{hash(username + password)}.db"
print(f'stateful session {path}')
conn = duckdb.connect(path)

global path
os.makedirs(path, exist_ok=True)
user_pass_hash = hashlib.sha256((username + password).encode()).hexdigest()
db_file = os.path.join(dbpath, f"{user_pass_hash}.db")
print(f'stateful session {db_file}')
conn = duckdb.connect(db_file)

return True

def convert_to_ndjson(result):
Expand Down Expand Up @@ -54,8 +65,38 @@ def convert_to_clickhouse_jsoncompact(result, query_time):

return json.dumps(json_result)

def duckdb_query_with_errmsg(query, format):
def handle_insert_query(query, format, data=None):
table_name = query.split("INTO")[1].split()[0].strip()

temp_file_name = None
if format.lower() == 'jsoneachrow' and data is not None:
temp_file_name = save_to_tempfile(data)

if temp_file_name:
try:
ingest_query = f"COPY {table_name} FROM '{temp_file_name}' (FORMAT 'json')"
conn.execute(ingest_query)
except Exception as e:
return b"", str(e).encode()
finally:
os.remove(temp_file_name)

return b"Ok", b""

def save_to_tempfile(data):
temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w+', encoding='utf-8')
temp_file.write(data)
temp_file.flush()
temp_file.close()
return temp_file.name


def duckdb_query_with_errmsg(query, format, data=None, request_method="GET"):
try:

if request_method == "POST" and query.strip().lower().startswith('insert into') and data:
return handle_insert_query(query, format, data)

start_time = time.time()
result = conn.execute(query)
query_time = time.time() - start_time
Expand Down Expand Up @@ -83,18 +124,22 @@ def clickhouse():
query = request.args.get('query', default="", type=str)
format = request.args.get('default_format', default="JSONCompact", type=str)
database = request.args.get('database', default="", type=str)
data = None

# Log incoming request data for debugging
print(f"Received request: method={request.method}, query={query}, format={format}, database={database}")

if not query:
return app.send_static_file('play.html')

if request.method == "POST":
data = request.get_data(as_text=True)

if database:
query = f"ATTACH '{database}' AS db; USE db; {query}"

# Execute the query and capture the result and error message
result, errmsg = duckdb_query_with_errmsg(query.strip(), format)
result, errmsg = duckdb_query_with_errmsg(query.strip(), format, data, request.method)

# Handle response for HEAD requests
if len(errmsg) == 0:
Expand Down

0 comments on commit 30abb75

Please sign in to comment.