Improved the management of locks in multithreaded sessions.
Managed the assurance of all threads is terminate before close the application.
This commit is contained in:
@ -1,12 +1,142 @@
|
||||
#include "ac.h"
|
||||
#include "capwap_dfa.h"
|
||||
#include "capwap_event.h"
|
||||
#include "ac_session.h"
|
||||
#include "ac_discovery.h"
|
||||
#include "ac_backend.h"
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
#define AC_RECV_NOERROR_MSGQUEUE -1001
|
||||
|
||||
/* */
|
||||
static int ac_recvmsgqueue(int fd, struct ac_session_msgqueue_item_t* item) {
|
||||
int packetsize = -1;
|
||||
|
||||
do {
|
||||
packetsize = recv(fd, (void*)item, sizeof(struct ac_session_msgqueue_item_t), 0);
|
||||
} while ((packetsize < 0) && ((errno == EAGAIN) || (errno == EINTR)));
|
||||
|
||||
return ((packetsize == sizeof(struct ac_session_msgqueue_item_t)) ? 1 : 0);
|
||||
}
|
||||
|
||||
/* */
|
||||
static void ac_session_msgqueue_parsing_item(struct ac_session_msgqueue_item_t* item) {
|
||||
switch (item->message) {
|
||||
case AC_MESSAGE_QUEUE_CLOSE_THREAD: {
|
||||
struct capwap_list_item* search = g_ac.sessionsthread->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_thread_t* sessionthread = (struct ac_session_thread_t*)search->item;
|
||||
ASSERT(sessionthread != NULL);
|
||||
|
||||
if (sessionthread->threadid == item->message_close_thread.threadid) {
|
||||
void* dummy;
|
||||
|
||||
/* Clean thread resource */
|
||||
pthread_join(sessionthread->threadid, &dummy);
|
||||
capwap_itemlist_free(capwap_itemlist_remove(g_ac.sessionsthread, search));
|
||||
break;
|
||||
}
|
||||
|
||||
/* */
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
capwap_logging_debug("Unknown message queue item: %lu", item->message);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* */
|
||||
static void ac_wait_terminate_allsessions(void) {
|
||||
struct ac_session_msgqueue_item_t item;
|
||||
|
||||
/* Wait that list is empty */
|
||||
while (g_ac.sessionsthread->count > 0) {
|
||||
capwap_logging_debug("Waiting for %d session terminate", g_ac.sessionsthread->count);
|
||||
|
||||
/* Receive message queue packet */
|
||||
if (!ac_recvmsgqueue(g_ac.fdmsgsessions[1], &item)) {
|
||||
capwap_logging_debug("Unable to receive message queue");
|
||||
break;
|
||||
}
|
||||
|
||||
/* Parsing message queue packet */
|
||||
if (item.message == AC_MESSAGE_QUEUE_CLOSE_THREAD) {
|
||||
ac_session_msgqueue_parsing_item(&item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Initialize message queue */
|
||||
int ac_session_msgqueue_init(void) {
|
||||
if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, g_ac.fdmsgsessions)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Free sessions message queue */
|
||||
void ac_session_msgqueue_free(void) {
|
||||
close(g_ac.fdmsgsessions[1]);
|
||||
close(g_ac.fdmsgsessions[0]);
|
||||
}
|
||||
|
||||
/* */
|
||||
void ac_session_msgqueue_notify_closethread(pthread_t threadid) {
|
||||
struct ac_session_msgqueue_item_t item;
|
||||
|
||||
/* Send message */
|
||||
memset(&item, 0, sizeof(struct ac_session_msgqueue_item_t));
|
||||
item.message = AC_MESSAGE_QUEUE_CLOSE_THREAD;
|
||||
item.message_close_thread.threadid = threadid;
|
||||
|
||||
send(g_ac.fdmsgsessions[0], (void*)&item, sizeof(struct ac_session_msgqueue_item_t), 0);
|
||||
}
|
||||
|
||||
/* */
|
||||
static int ac_recvfrom(struct pollfd* fds, int fdscount, void* buffer, int* size, struct sockaddr_storage* recvfromaddr, struct sockaddr_storage* recvtoaddr, struct timeout_control* timeout) {
|
||||
int index;
|
||||
|
||||
ASSERT(fds);
|
||||
ASSERT(fdscount > 0);
|
||||
ASSERT(buffer != NULL);
|
||||
ASSERT(size != NULL);
|
||||
ASSERT(*size > 0);
|
||||
ASSERT(recvfromaddr != NULL);
|
||||
ASSERT(recvtoaddr != NULL);
|
||||
|
||||
/* Wait packet */
|
||||
index = capwap_wait_recvready(fds, fdscount, timeout);
|
||||
if (index < 0) {
|
||||
return index;
|
||||
} else if (index == (fdscount - 1)) {
|
||||
struct ac_session_msgqueue_item_t item;
|
||||
|
||||
/* Receive message queue packet */
|
||||
if (!ac_recvmsgqueue(fds[index].fd, &item)) {
|
||||
return CAPWAP_RECV_ERROR_SOCKET;
|
||||
}
|
||||
|
||||
/* Parsing message queue packet */
|
||||
ac_session_msgqueue_parsing_item(&item);
|
||||
|
||||
return AC_RECV_NOERROR_MSGQUEUE;
|
||||
}
|
||||
|
||||
/* Receive packet */
|
||||
if (!capwap_recvfrom_fd(fds[index].fd, buffer, size, recvfromaddr, recvtoaddr)) {
|
||||
return CAPWAP_RECV_ERROR_SOCKET;
|
||||
}
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
/* Add packet to session */
|
||||
static void ac_session_add_packet(struct ac_session_t* session, char* buffer, int size, int isctrlsocket, int plainbuffer) {
|
||||
struct capwap_list_item* item;
|
||||
@ -30,7 +160,7 @@ static void ac_session_add_packet(struct ac_session_t* session, char* buffer, in
|
||||
}
|
||||
|
||||
/* Add action to session */
|
||||
void ac_session_send_action(struct ac_session_t* session, long action, long param, void* data, long length) {
|
||||
static void ac_session_send_action(struct ac_session_t* session, long action, long param, void* data, long length) {
|
||||
struct capwap_list_item* item;
|
||||
struct ac_session_action* actionsession;
|
||||
|
||||
@ -62,7 +192,7 @@ static struct ac_session_t* ac_search_session_from_wtpaddress(struct sockaddr_st
|
||||
|
||||
ASSERT(address != NULL);
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
@ -70,19 +200,20 @@ static struct ac_session_t* ac_search_session_from_wtpaddress(struct sockaddr_st
|
||||
ASSERT(session != NULL);
|
||||
|
||||
if (!capwap_compare_ip(address, (isctrlsocket ? &session->wtpctrladdress : &session->wtpdataaddress))) {
|
||||
/* Return session if not teardown */
|
||||
if (!session->teardown) {
|
||||
session->count++;
|
||||
result = session;
|
||||
}
|
||||
/* Increment session count */
|
||||
capwap_lock_enter(&session->sessionlock);
|
||||
session->count++;
|
||||
capwap_lock_exit(&session->sessionlock);
|
||||
|
||||
/* */
|
||||
result = session;
|
||||
break;
|
||||
}
|
||||
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -94,12 +225,11 @@ int ac_has_sessionid(struct capwap_sessionid_element* sessionid) {
|
||||
|
||||
ASSERT(sessionid != NULL);
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_t* session = (struct ac_session_t*)search->item;
|
||||
|
||||
ASSERT(session != NULL);
|
||||
|
||||
if (!memcmp(sessionid, &session->sessionid, sizeof(struct capwap_sessionid_element))) {
|
||||
@ -110,7 +240,7 @@ int ac_has_sessionid(struct capwap_sessionid_element* sessionid) {
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -124,12 +254,11 @@ int ac_has_wtpid(char* wtpid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_t* session = (struct ac_session_t*)search->item;
|
||||
|
||||
ASSERT(session != NULL);
|
||||
|
||||
if (session->wtpid && !strcmp(session->wtpid, wtpid)) {
|
||||
@ -140,7 +269,7 @@ int ac_has_wtpid(char* wtpid) {
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -197,27 +326,28 @@ static struct ac_session_t* ac_get_session_from_keepalive(void* buffer, int buff
|
||||
struct capwap_list_item* search;
|
||||
struct capwap_sessionid_element* sessionid = (struct capwap_sessionid_element*)capwap_get_message_element_data(&packet, CAPWAP_ELEMENT_SESSIONID);
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_t* session = (struct ac_session_t*)search->item;
|
||||
|
||||
ASSERT(session != NULL);
|
||||
|
||||
if (!memcmp(sessionid, &session->sessionid, sizeof(struct capwap_sessionid_element))) {
|
||||
if (!session->teardown) {
|
||||
session->count++;
|
||||
result = session;
|
||||
}
|
||||
/* Increment session count */
|
||||
capwap_lock_enter(&session->sessionlock);
|
||||
session->count++;
|
||||
capwap_lock_exit(&session->sessionlock);
|
||||
|
||||
/* */
|
||||
result = session;
|
||||
break;
|
||||
}
|
||||
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
} else {
|
||||
capwap_logging_debug("Failed validation parsed data packet");
|
||||
}
|
||||
@ -232,25 +362,47 @@ static struct ac_session_t* ac_get_session_from_keepalive(void* buffer, int buff
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Close sessions */
|
||||
static void ac_close_sessions() {
|
||||
/* */
|
||||
void ac_notify_event_session(struct capwap_sessionid_element* sessionid, long param) {
|
||||
struct capwap_list_item* search;
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
ASSERT(sessionid != NULL);
|
||||
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_t* session = (struct ac_session_t*)search->item;
|
||||
ASSERT(session != NULL);
|
||||
|
||||
if (!session->teardown) {
|
||||
ac_session_send_action(session, AC_SESSION_ACTION_CLOSE, 0, NULL, 0);
|
||||
if (!memcmp(sessionid, &session->sessionid, sizeof(struct capwap_sessionid_element))) {
|
||||
ac_session_send_action(session, AC_SESSION_ACTION_NOTIFY_EVENT, param, NULL, 0);
|
||||
break;
|
||||
}
|
||||
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
}
|
||||
|
||||
/* Close sessions */
|
||||
static void ac_close_sessions() {
|
||||
struct capwap_list_item* search;
|
||||
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
|
||||
search = g_ac.sessions->first;
|
||||
while (search != NULL) {
|
||||
struct ac_session_t* session = (struct ac_session_t*)search->item;
|
||||
ASSERT(session != NULL);
|
||||
|
||||
ac_session_send_action(session, AC_SESSION_ACTION_CLOSE, 0, NULL, 0);
|
||||
|
||||
search = search->next;
|
||||
}
|
||||
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
}
|
||||
|
||||
/* DTLS Handshake BIO send */
|
||||
@ -386,6 +538,7 @@ static struct ac_session_t* ac_create_session(struct sockaddr_storage* wtpaddres
|
||||
session = (struct ac_session_t*)itemlist->item;
|
||||
memset(session, 0, sizeof(struct ac_session_t));
|
||||
|
||||
session->itemlist = itemlist;
|
||||
session->count = 2;
|
||||
memcpy(&session->acctrladdress, acaddress, sizeof(struct sockaddr_storage));
|
||||
memcpy(&session->wtpctrladdress, wtpaddress, sizeof(struct sockaddr_storage));
|
||||
@ -425,39 +578,51 @@ static struct ac_session_t* ac_create_session(struct sockaddr_storage* wtpaddres
|
||||
session->state = CAPWAP_IDLE_STATE;
|
||||
|
||||
/* Update session list */
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_wrlock(&g_ac.sessionslock);
|
||||
capwap_itemlist_insert_after(g_ac.sessions, NULL, itemlist);
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
|
||||
/* Create thread */
|
||||
result = pthread_create(&session->threadid, NULL, ac_session_thread, (void*)session);
|
||||
if (!result) {
|
||||
pthread_detach(session->threadid);
|
||||
struct ac_session_thread_t* sessionthread;
|
||||
|
||||
/* Notify change session list */
|
||||
capwap_event_signal(&g_ac.changesessionlist);
|
||||
/* Keeps trace of active threads */
|
||||
itemlist = capwap_itemlist_create(sizeof(struct ac_session_thread_t));
|
||||
sessionthread = (struct ac_session_thread_t*)itemlist->item;
|
||||
sessionthread->threadid = session->threadid;
|
||||
|
||||
/* */
|
||||
capwap_itemlist_insert_after(g_ac.sessionsthread, NULL, itemlist);
|
||||
} else {
|
||||
capwap_logging_debug("Unable create session thread, error code %d", result);
|
||||
|
||||
/* Destroy element */
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_itemlist_free(capwap_itemlist_remove(g_ac.sessions, itemlist));
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
|
||||
session = NULL;
|
||||
capwap_logging_fatal("Unable create session thread, error code %d", result);
|
||||
capwap_exit(CAPWAP_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
/* Release reference of session */
|
||||
void ac_session_release_reference(struct ac_session_t* session) {
|
||||
ASSERT(session != NULL);
|
||||
|
||||
capwap_lock_enter(&session->sessionlock);
|
||||
|
||||
/* Release reference must not destroy device, reference count > 0 */
|
||||
session->count--;
|
||||
ASSERT(session->count > 0);
|
||||
|
||||
capwap_lock_exit(&session->sessionlock);
|
||||
}
|
||||
|
||||
/* Update statistics */
|
||||
void ac_update_statistics(void) {
|
||||
|
||||
g_ac.descriptor.stations = 0; /* TODO */
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
g_ac.descriptor.activewtp = g_ac.sessions->count;
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
}
|
||||
|
||||
/* Handler signal */
|
||||
@ -469,10 +634,10 @@ static void ac_signal_handler(int signum) {
|
||||
|
||||
/* AC running */
|
||||
int ac_execute(void) {
|
||||
int fdscount = CAPWAP_MAX_SOCKETS * 2;
|
||||
struct pollfd* fds;
|
||||
int result = CAPWAP_SUCCESSFUL;
|
||||
|
||||
int fdscount = CAPWAP_MAX_SOCKETS * 2 + 1;
|
||||
|
||||
int index;
|
||||
int check;
|
||||
int isctrlsocket = 0;
|
||||
@ -488,11 +653,16 @@ int ac_execute(void) {
|
||||
|
||||
/* Configure poll struct */
|
||||
fds = (struct pollfd*)capwap_alloc(sizeof(struct pollfd) * fdscount);
|
||||
|
||||
|
||||
/* Retrive all socket for polling */
|
||||
fdscount = capwap_network_set_pollfd(&g_ac.net, fds, fdscount);
|
||||
ASSERT(fdscount > 0);
|
||||
|
||||
/* Unix socket message queue */
|
||||
fds[fdscount].events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
|
||||
fds[fdscount].fd = g_ac.fdmsgsessions[1];
|
||||
fdscount++;
|
||||
|
||||
/* Handler signal */
|
||||
g_ac.running = 1;
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
@ -519,7 +689,7 @@ int ac_execute(void) {
|
||||
/* Receive packet */
|
||||
isrecvpacket = 0;
|
||||
buffersize = sizeof(buffer);
|
||||
index = capwap_recvfrom(fds, fdscount, buffer, &buffersize, &recvfromaddr, &recvtoaddr, NULL);
|
||||
index = ac_recvfrom(fds, fdscount, buffer, &buffersize, &recvfromaddr, &recvtoaddr, NULL);
|
||||
if (!g_ac.running) {
|
||||
capwap_logging_debug("Closing AC");
|
||||
break;
|
||||
@ -557,9 +727,9 @@ int ac_execute(void) {
|
||||
unsigned short sessioncount;
|
||||
|
||||
/* Get current session number */
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
capwap_rwlock_rdlock(&g_ac.sessionslock);
|
||||
sessioncount = g_ac.sessions->count;
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
capwap_rwlock_exit(&g_ac.sessionslock);
|
||||
|
||||
/* PreParsing packet for reduce a DoS attack */
|
||||
check = capwap_sanity_check(isctrlsocket, CAPWAP_UNDEF_STATE, buffer, buffersize, g_ac.enabledtls, 0);
|
||||
@ -584,12 +754,10 @@ int ac_execute(void) {
|
||||
|
||||
/* Create a new session */
|
||||
session = ac_create_session(&recvfromaddr, &recvtoaddr, &ctrlsock);
|
||||
if (session) {
|
||||
ac_session_add_packet(session, buffer, buffersize, isctrlsocket, 1);
|
||||
ac_session_add_packet(session, buffer, buffersize, isctrlsocket, 1);
|
||||
|
||||
/* Release reference */
|
||||
ac_session_release_reference(session);
|
||||
}
|
||||
/* Release reference */
|
||||
ac_session_release_reference(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -606,12 +774,10 @@ int ac_execute(void) {
|
||||
|
||||
/* Create a new session */
|
||||
session = ac_create_session(&recvfromaddr, &recvtoaddr, &ctrlsock);
|
||||
if (session) {
|
||||
ac_session_add_packet(session, buffer, buffersize, isctrlsocket, 0);
|
||||
ac_session_add_packet(session, buffer, buffersize, isctrlsocket, 0);
|
||||
|
||||
/* Release reference */
|
||||
ac_session_release_reference(session);
|
||||
}
|
||||
/* Release reference */
|
||||
ac_session_release_reference(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -623,7 +789,7 @@ int ac_execute(void) {
|
||||
ac_update_session_from_datapacket(&datasocket, &recvfromaddr, &recvtoaddr, buffer, buffersize);
|
||||
}
|
||||
}
|
||||
} else if (index == CAPWAP_RECV_ERROR_INTR) {
|
||||
} else if ((index == CAPWAP_RECV_ERROR_INTR) || (index == AC_RECV_NOERROR_MSGQUEUE)) {
|
||||
/* Ignore recv */
|
||||
continue;
|
||||
} else if (index == CAPWAP_RECV_ERROR_SOCKET) {
|
||||
@ -642,24 +808,9 @@ int ac_execute(void) {
|
||||
ac_close_sessions();
|
||||
|
||||
/* Wait to terminate all sessions */
|
||||
capwap_event_reset(&g_ac.changesessionlist);
|
||||
for (;;) {
|
||||
int count;
|
||||
|
||||
capwap_lock_enter(&g_ac.sessionslock);
|
||||
count = g_ac.sessions->count;
|
||||
capwap_lock_exit(&g_ac.sessionslock);
|
||||
|
||||
if (!count) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Wait that list is changed */
|
||||
capwap_logging_debug("Waiting for %d session terminate", count);
|
||||
capwap_event_wait(&g_ac.changesessionlist);
|
||||
}
|
||||
ac_wait_terminate_allsessions();
|
||||
|
||||
/* Free handshark session */
|
||||
/* Free handshake session */
|
||||
while (g_ac.datasessionshandshake->first != NULL) {
|
||||
struct ac_data_session_handshake* handshake = (struct ac_data_session_handshake*)g_ac.datasessionshandshake->first->item;
|
||||
|
||||
|
Reference in New Issue
Block a user