diff --git a/sys/teseo/CMakeLists.txt b/sys/teseo/CMakeLists.txt index b6b2f40..518264e 100644 --- a/sys/teseo/CMakeLists.txt +++ b/sys/teseo/CMakeLists.txt @@ -30,6 +30,9 @@ TARGET_LINK_LIBRARIES(teseo_alg teseo1 td) add_executable(teseo_edge tests/teseo_test_edge.cpp) TARGET_LINK_LIBRARIES(teseo_edge teseo1 td) +add_executable(teseo_propotion tests/teseo_test_propotion.cpp) +TARGET_LINK_LIBRARIES(teseo_propotion teseo1 td) + add_executable(teseo_mem tests/teseo_test_memory.cpp) TARGET_LINK_LIBRARIES(teseo_mem teseo1 td) diff --git a/sys/teseo/tests/teseo_test_propotion.cpp b/sys/teseo/tests/teseo_test_propotion.cpp new file mode 100644 index 0000000..080d2ce --- /dev/null +++ b/sys/teseo/tests/teseo_test_propotion.cpp @@ -0,0 +1,244 @@ +// +// Created by yzy on 5/14/24. +// + +#include "teseo_test.h" + + +template +void insert_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void read_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + volatile auto result = tx.has_edge(query_srcs[pos], query_dests[pos]); // use volatile to make sure has_edge() is executed + tx.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void insert_read(graph &GA, std::vector &new_srcs, std::vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + while(1){ + try{ + auto tx = GA.start_transaction(); + volatile auto result = tx.has_edge(query_srcs[pos], query_dests[pos]); // use volatile to make sure has_edge() is executed + tx.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + +template +void delete_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && tx.has_edge(new_srcs[pos], new_dests[pos])){ + tx.remove_edge(new_srcs[pos], new_dests[pos]); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +void batch_ins_del_read(commandLine& P){ + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10,100,1000,10000,100000,1000000,10000000}; + std::vector update_sizes = {500000}; + std::vector update_sizes2 = {500000}; + std::vector avg_insert, avg_delete; + avg_insert.clear(); avg_delete.clear(); + for(size_t i=0; i(); + PRINT("=============== Batch Insert BEGIN ==============="); + for (size_t us=0; us(nn, r.ith_rand(100 + ts), a, b, c); + for (uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + // mix operation + insert_read(Ga, new_srcs, new_dests, query_srcs, query_dests, thd_num); + + // contrast test, do insert and read in group + // insert_edges(Ga, new_srcs, new_dests, thd_num); + // read_edges(Ga, query_srcs, query_dests, thd_num); + + + gettimeofday(&t_end, &tzp); + avg_insert[us] += cal_time_elapsed(&t_start, &t_end); + + delete_edges(Ga, new_srcs, new_dests, thd_num); + } + PRINT("=============== Batch Insert END ==============="); + } + + for (size_t us=0; us