diff --git a/src/api/v1/report.py b/src/api/v1/report.py index dfb8fa2..0786189 100644 --- a/src/api/v1/report.py +++ b/src/api/v1/report.py @@ -244,134 +244,138 @@ async def insert_report( """ Inserts detections into to the plugin database. """ - if random.randint(1, 10) == 1: - asyncio.create_task(insert_report_v2(detections)) - # remove duplicates - df = pd.DataFrame([d.dict() for d in detections]) - df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True) - - # data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports. - if len(df) > 5000 or df["reporter"].nunique() > 1: - logger.warning({"message": "Too many reports."}) - return + asyncio.create_task(insert_report_v2(detections)) + return {"detail": "ok"} - # data validation, checks for correct timing - now = int(time.time()) - now_upper = int(now + 3600) - now_lower = int(now - 25200) - - df_time = df.ts - mask = (df_time > now_upper) | (df_time < now_lower) - if len(df_time[mask].values) > 0: - logger.warning( - { - "message": "Data contains out of bounds time", - "reporter": df["reporter"].unique(), - "time": df_time[mask].values[0], - } - ) - return + # if random.randint(1, 10) == 1: + # asyncio.create_task(insert_report_v2(detections)) + + # # remove duplicates + # df = pd.DataFrame([d.dict() for d in detections]) + # df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True) - logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"}) + # # data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports. + # if len(df) > 5000 or df["reporter"].nunique() > 1: + # logger.warning({"message": "Too many reports."}) + # return - # Normalize names - df["reporter"] = df["reporter"].apply( - lambda name: name.lower().replace("_", " ").replace("-", " ").strip() - ) - df["reported"] = df["reported"].apply( - lambda name: name.lower().replace("_", " ").replace("-", " ").strip() - ) + # # data validation, checks for correct timing + # now = int(time.time()) + # now_upper = int(now + 3600) + # now_lower = int(now - 25200) + + # df_time = df.ts + # mask = (df_time > now_upper) | (df_time < now_lower) + # if len(df_time[mask].values) > 0: + # logger.warning( + # { + # "message": "Data contains out of bounds time", + # "reporter": df["reporter"].unique(), + # "time": df_time[mask].values[0], + # } + # ) + # return - # Get a list of unqiue reported names and reporter name - names = list(df["reported"].unique()) - names.extend(df["reporter"].unique()) - - # validate all names - valid_names = [ - await functions.to_jagex_name(name) - for name in names - if await functions.is_valid_rsn(name) - ] - # logger.debug(f"Valid names: {len(valid_names)}") - # logger.debug(f"{valid_names=}") - - # Get IDs for all unique valid names - data = await sql_select_players(valid_names) - # logger.debug(f"Found players before insert: {len(data)}") - # logger.debug(f"{data=}") - - # Create entries for players that do not yet exist in Players table - existing_names = [d["name"] for d in data] - # logger.debug(f"{existing_names=}") - - new_names = set([name for name in valid_names]).difference(existing_names) - # logger.debug(f"{new_names=}") - - # Get new player id's - if new_names: - param = [ - {"name": name, "normalized_name": await functions.to_jagex_name(name)} - for name in new_names - ] - await functions.batch_function(sql_insert_player, param) - players = await sql_select_players(new_names) - # logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}") - data.extend(players) - - # Insert detections into Reports table with user ids - # add reported & reporter id - df_names = pd.DataFrame(data) - - if len(df) == 0: - logger.warning( - {"message": "empty dataframe, before merge", "detections": detections} - ) - return + # logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"}) + + # # Normalize names + # df["reporter"] = df["reporter"].apply( + # lambda name: name.lower().replace("_", " ").replace("-", " ").strip() + # ) + # df["reported"] = df["reported"].apply( + # lambda name: name.lower().replace("_", " ").replace("-", " ").strip() + # ) + + # # Get a list of unqiue reported names and reporter name + # names = list(df["reported"].unique()) + # names.extend(df["reporter"].unique()) + + # # validate all names + # valid_names = [ + # await functions.to_jagex_name(name) + # for name in names + # if await functions.is_valid_rsn(name) + # ] + # # logger.debug(f"Valid names: {len(valid_names)}") + # # logger.debug(f"{valid_names=}") + + # # Get IDs for all unique valid names + # data = await sql_select_players(valid_names) + # # logger.debug(f"Found players before insert: {len(data)}") + # # logger.debug(f"{data=}") + + # # Create entries for players that do not yet exist in Players table + # existing_names = [d["name"] for d in data] + # # logger.debug(f"{existing_names=}") + + # new_names = set([name for name in valid_names]).difference(existing_names) + # # logger.debug(f"{new_names=}") + + # # Get new player id's + # if new_names: + # param = [ + # {"name": name, "normalized_name": await functions.to_jagex_name(name)} + # for name in new_names + # ] + # await functions.batch_function(sql_insert_player, param) + # players = await sql_select_players(new_names) + # # logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}") + # data.extend(players) + + # # Insert detections into Reports table with user ids + # # add reported & reporter id + # df_names = pd.DataFrame(data) + + # if len(df) == 0: + # logger.warning( + # {"message": "empty dataframe, before merge", "detections": detections} + # ) + # return - if len(df_names) == 0: - logger.warning( - {"message": "empty dataframe names, before merge", "detections": detections} - ) - return + # if len(df_names) == 0: + # logger.warning( + # {"message": "empty dataframe names, before merge", "detections": detections} + # ) + # return - df = df.merge(df_names, left_on="reported", right_on="name") + # df = df.merge(df_names, left_on="reported", right_on="name") - if len(df) == 0: - logger.warning( - {"message": "empty dataframe, after merge", "detections": detections} - ) - return + # if len(df) == 0: + # logger.warning( + # {"message": "empty dataframe, after merge", "detections": detections} + # ) + # return - reporter = df["reporter"].unique() + # reporter = df["reporter"].unique() - if len(reporter) != 1: - logger.warning({"message": "No reporter", "detections": detections}) - return + # if len(reporter) != 1: + # logger.warning({"message": "No reporter", "detections": detections}) + # return - reporter_id = df_names.query(f"name == {reporter}")["id"].to_list() + # reporter_id = df_names.query(f"name == {reporter}")["id"].to_list() - if len(reporter_id) == 0: - logger.warning({"message": "No reporter in df_names", "detections": detections}) - return + # if len(reporter_id) == 0: + # logger.warning({"message": "No reporter in df_names", "detections": detections}) + # return - asyncio.create_task(insert_active_reporter(reporter[0])) + # asyncio.create_task(insert_active_reporter(reporter[0])) - # if reporter_id[0] == 657248: - # logger.debug({"message": "Temporary ignoring anonymous reporter"}) - # return + # # if reporter_id[0] == 657248: + # # logger.debug({"message": "Temporary ignoring anonymous reporter"}) + # # return - df["reporter_id"] = reporter_id[0] + # df["reporter_id"] = reporter_id[0] - if manual_detect: - df["manual_detect"] = manual_detect + # if manual_detect: + # df["manual_detect"] = manual_detect - # Parse data to param - data = df.to_dict("records") - param = [await parse_detection(d) for d in data] + # # Parse data to param + # data = df.to_dict("records") + # param = [await parse_detection(d) for d in data] - # Parse query - await functions.batch_function(sql_insert_report, param) - return {"detail": "ok"} + # # Parse query + # await functions.batch_function(sql_insert_report, param) + # return {"detail": "ok"} @router.get("/report/count", tags=["Report"])