Skip to content

Commit

Permalink
Merge pull request #9 from Lauszus/dev
Browse files Browse the repository at this point in the history
0.1.2
  • Loading branch information
Lauszus authored Apr 14, 2023
2 parents a3ed1f9 + 59dccd3 commit c385aa9
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 102 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,6 @@ dmypy.json

# Executables
client/client
client/client_forwarder
client/server
client/server_forwarder
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SocketCAN over TCP

#### Original author Thomas Bruen
#### Modified by Kristian Lauszus, 2019-2022
#### Modified by Kristian Lauszus, 2019-2023
_________

[![](https://github.com/Lauszus/socketsocketcan/workflows/socketsocketcan%20CI/badge.svg)](https://github.com/Lauszus/socketsocketcan/actions?query=branch%3Amaster)
Expand Down
214 changes: 114 additions & 100 deletions client/client.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* MIT License
*
* Copyright (c) 2019 Thomas Bruen
* Copyright (c) 2019-2022 Kristian Sloth Lauszus
* Copyright (c) 2019-2023 Kristian Sloth Lauszus
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -120,9 +120,9 @@ int open_can_socket(const char *port, const struct can_filter *filter, int numfi

// read a single CAN frame and add timestamp
#if CAN_FORWARDER_MODE
int read_frame(int soc, struct can_frame* frame);
int read_frame(int soc, struct can_frame* frame, bool nonblock);
#else
int read_frame(int soc, struct can_frame* frame, struct timeval* tv);
int read_frame(int soc, struct can_frame* frame, bool nonblock, struct timeval* tv);
#endif

// continually read CAN and add to buffer
Expand Down Expand Up @@ -150,7 +150,7 @@ void print_frame(const timestamped_frame* tf);

/* GLOBALS */
pthread_mutex_t read_mutex = PTHREAD_MUTEX_INITIALIZER;
sig_atomic_t poll = true; // for "infinite loops" in threads.
sig_atomic_t continue_running = true; // for "infinite loops" in threads.
bool tcp_ready_to_send = true; // only access inside of mutex
size_t socketcan_bytes_available = 0;// only access inside of mutex

Expand All @@ -163,19 +163,19 @@ uint8_t read_buf_tcp[BUF_SZ]; // where serialized CAN frames are copied to and s
void handle_signal(int signal)
{
(void)signal;
poll = false;
continue_running = false;
}

void error(const char* msg, int error_code)
{
poll = false;
continue_running = false;
perror(msg);
exit(error_code);
}

void pthread_error(const char* msg, int error_code)
{
poll = false;
continue_running = false;
fprintf(stderr, "%s: %d\n", msg, error_code);
pthread_exit(&error_code);
}
Expand Down Expand Up @@ -285,7 +285,7 @@ int create_tcp_socket(const char* hostname, int port)

// TODO: Remove this ugly hack
retry:
if (!poll)
if (!continue_running)
{
return -1;
}
Expand Down Expand Up @@ -381,14 +381,16 @@ int open_can_socket(const char *can_interface_name, const struct can_filter *p_f
}

#if CAN_FORWARDER_MODE
int read_frame(int soc, struct can_frame* frame)
int read_frame(int soc, struct can_frame* frame, bool nonblock)
#else
int read_frame(int soc, struct can_frame* frame, struct timeval* tv)
int read_frame(int soc, struct can_frame* frame, bool nonblock, struct timeval* tv)
#endif
{
int bytes;

bytes = read(soc, frame, sizeof(*frame));
int options = nonblock ? MSG_DONTWAIT : 0;
bytes = recv(soc, frame, sizeof(*frame), options);

#if !CAN_FORWARDER_MODE
ioctl(soc, SIOCGSTAMP, tv);
#endif
Expand Down Expand Up @@ -422,122 +424,134 @@ void* read_poll_can(void* args)
size_t count = 0;
uint8_t* bufpnt = read_buf_can;

while (poll)
while (continue_running)
{
if (count > (BUF_SZ - frame_sz))
{
//full buffer, drop data and start over. TODO: ring buffer, print/debug
bufpnt = read_buf_can;
count = 0;
}

#if CAN_FORWARDER_MODE
int num_bytes_can = read_frame(fd, &frame);
int num_bytes_can = read_frame(fd, &frame, count != 0);
#else
int num_bytes_can = read_frame(fd, &frame, &tv);
int num_bytes_can = read_frame(fd, &frame, count != 0, &tv);
#endif
if (num_bytes_can == -1)
if (num_bytes_can < 0)
{
// This happens when there is a timeout, we simply keep looping, as we want do not want to block while
// reading the CAN-Bus, as then we would never be able to shut down the threads if there was no activity
// on the bus
continue;
// Return value of -1 happens when there is a timeout, we simply keep looping, as we want do not want to block,
// as then we would never be able to shut down the threads if there was no activity on the bus
// Negative return value other than -1 should not occur according to documentation.
}
else if (num_bytes_can == 0)
{
// This will happen when we shut down the client, so report an success
pthread_error("Socket closed at other end... exiting", 0);
}

else
{
#if DEBUG
printf("read %d number of bytes\n", num_bytes_can);
print_frame(&frame);
printf("read %d number of bytes\n", num_bytes_can);
#if CAN_FORWARDER_MODE
print_frame(&frame);
#else
print_frame(&tf);
#endif
#endif

if (use_unordered_map)
{
if (use_unordered_map)
{
#if CAN_FORWARDER_MODE
hash_map[frame.can_id] = frame;
hash_map[frame.can_id] = frame;
#else
tf.tv_sec = tv.tv_sec;
tf.tv_usec = tv.tv_usec;
tf.id = frame.can_id;
tf.dlc = frame.can_dlc;
memcpy(tf.data, frame.data, sizeof(frame.data));
hash_map[tf.id] = tf;
#endif
}
else
{
tf.tv_sec = tv.tv_sec;
tf.tv_usec = tv.tv_usec;
tf.id = frame.can_id;
tf.dlc = frame.can_dlc;
memcpy(tf.data, frame.data, sizeof(frame.data));
hash_map[tf.id] = tf;
#endif
count = 1;
}
else
{
if (count > (BUF_SZ - frame_sz))
{
//full buffer, drop data and start over. TODO: ring buffer, print/debug
fprintf(stderr, "Intermediate TCP-CAN buffer overflow");
bufpnt = read_buf_can;
count = 0;
}

#if !CAN_FORWARDER_MODE
memcpy(bufpnt, (uint8_t*)&tv, sizeof(struct timeval));
bufpnt += sizeof(struct timeval);
count += sizeof(struct timeval);
memcpy(bufpnt, (uint8_t*)&tv, sizeof(struct timeval));
bufpnt += sizeof(struct timeval);
count += sizeof(struct timeval);
#endif

memcpy(bufpnt, (uint8_t*)&frame.can_id, sizeof(uint32_t));
bufpnt += sizeof(uint32_t);
count += sizeof(uint32_t);
memcpy(bufpnt, (uint8_t*)&frame.can_id, sizeof(uint32_t));
bufpnt += sizeof(uint32_t);
count += sizeof(uint32_t);

memcpy(bufpnt, (uint8_t*)&frame.can_dlc,sizeof(uint8_t));
bufpnt += sizeof(uint8_t);
count += sizeof(uint8_t);
memcpy(bufpnt, (uint8_t*)&frame.can_dlc,sizeof(uint8_t));
bufpnt += sizeof(uint8_t);
count += sizeof(uint8_t);

memcpy(bufpnt, (uint8_t*)&frame.data, sizeof(frame.data));
bufpnt += sizeof(frame.data);
count += sizeof(frame.data);
}
memcpy(bufpnt, (uint8_t*)&frame.data, sizeof(frame.data));
bufpnt += sizeof(frame.data);
count += sizeof(frame.data);
}

#if DEBUG
printf("message read: %zu\n", count);
printf("message read: %zu\n", count);
#endif
pthread_mutex_lock(&read_mutex);
if (tcp_ready_to_send) // other thread has said it is able to write to TCP socket
}

if (count > 0)
{
if (use_unordered_map)
pthread_mutex_lock(&read_mutex);
if (tcp_ready_to_send) // other thread has said it is able to write to TCP socket
{
socketcan_bytes_available = hash_map.size() * hash_map_value_size;
size_t i = 0;
for (const auto &n : hash_map)
{
memcpy(read_buf_tcp + i, (uint8_t*)&n.second, hash_map_value_size);
i += hash_map_value_size;
}

tcp_ready_to_send = false;
const int signal_rv = pthread_cond_signal(&tcp_send_copied);
if (signal_rv < 0)
if (use_unordered_map)
{
pthread_mutex_unlock(&read_mutex);
pthread_error("could not signal to other thread", signal_rv);
}
socketcan_bytes_available = hash_map.size() * hash_map_value_size;
size_t i = 0;
for (const auto &n : hash_map)
{
memcpy(read_buf_tcp + i, (uint8_t*)&n.second, hash_map_value_size);
i += hash_map_value_size;
}

tcp_ready_to_send = false;
const int signal_rv = pthread_cond_signal(&tcp_send_copied);
if (signal_rv < 0)
{
pthread_mutex_unlock(&read_mutex);
pthread_error("could not signal to other thread", signal_rv);
}

#if DEBUG
printf("%zu bytes copied to TCP buffer\n", socketcan_bytes_available);
printf("%zu bytes copied to TCP buffer\n", socketcan_bytes_available);
#endif
hash_map.clear();
}
else
{
socketcan_bytes_available = count;
memcpy(read_buf_tcp, read_buf_can, count);

tcp_ready_to_send = false;
const int signal_rv = pthread_cond_signal(&tcp_send_copied);
if (signal_rv < 0)
{
pthread_mutex_unlock(&read_mutex);
pthread_error("could not signal to other thread", signal_rv);
hash_map.clear();
count = 0;
}
else
{
socketcan_bytes_available = count;
memcpy(read_buf_tcp, read_buf_can, count);

tcp_ready_to_send = false;
const int signal_rv = pthread_cond_signal(&tcp_send_copied);
if (signal_rv < 0)
{
pthread_mutex_unlock(&read_mutex);
pthread_error("could not signal to other thread", signal_rv);
}

#if DEBUG
printf("%zu bytes copied to TCP buffer.\n", count);
printf("%zu bytes copied to TCP buffer.\n", count);
#endif
bufpnt = read_buf_can; //start filling up buffer again
count = 0;
bufpnt = read_buf_can; //start filling up buffer again
count = 0;
}
}
pthread_mutex_unlock(&read_mutex);
}
pthread_mutex_unlock(&read_mutex);
}

// Make sure the 'read_poll_tcp' thread is unblocked
Expand All @@ -563,16 +577,16 @@ void* read_poll_tcp(void* args)

size_t cpy_socketcan_bytes_available;
int wait_rv = 0;
while (poll)
while (continue_running)
{
pthread_mutex_lock(&read_mutex);
tcp_ready_to_send = true;
while (!socketcan_bytes_available)
{
wait_rv = pthread_cond_wait(&tcp_send_copied, &read_mutex);
if (!poll)
if (!continue_running)
{
// Break out if the poll flag has gone low
// Break out if the continue_running flag has gone low
socketcan_bytes_available = 0; // We do not care about the data, as we are about to exit
break;
}
Expand Down Expand Up @@ -651,14 +665,14 @@ void* write_poll(void* args)
const size_t can_struct_sz = sizeof(struct can_frame);
const size_t frame_sz = sizeof(uint32_t) + sizeof(uint8_t) + sizeof(frame.data);

while (poll)
while (continue_running)
{
int num_bytes_tcp = read(socks->tcp_sock, write_buf, BUF_SZ);
if (num_bytes_tcp == -1)
if (num_bytes_tcp < 0)
{
// This happens when there is a timeout, we simply keep looping, as we want do not want to block while
// writing to the CAN-Bus, as then we would never be able to shut down the threads if there was no activity
// on the bus
// Return value of -1 happens when there is a timeout, we simply keep looping, as we want do not want to block,
// as then we would never be able to shut down the threads if there was no new data over TCP
// Negative return value other than -1 should not occur according to documentation.
continue;
}
else if (num_bytes_tcp == 0)
Expand Down Expand Up @@ -841,7 +855,7 @@ int tcpclient(const char *can_interface_name, const char *hostname, int port, co
#endif
tcp_socket = create_tcp_socket(hostname, port);
sleep(1);
} while (poll && tcp_socket == -1);
} while (continue_running && tcp_socket == -1);
#if DEBUG
if (tcp_socket != -1)
{
Expand Down
2 changes: 1 addition & 1 deletion client/client_bindings.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* MIT License
*
* Copyright (c) 2019 Thomas Bruen
* Copyright (c) 2019-2022 Kristian Sloth Lauszus
* Copyright (c) 2019-2023 Kristian Sloth Lauszus
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down

0 comments on commit c385aa9

Please sign in to comment.