Separate the control channel from data channel on two different threads.

Solved the DTLS problem of using a separate thread for the data channel.
This commit is contained in:
vemax78
2013-11-20 23:12:00 +01:00
parent 9a6589b636
commit 0747168361
13 changed files with 1004 additions and 667 deletions

View File

@ -3,14 +3,14 @@
#include "capwap_dfa.h"
#include "ac_session.h"
#include "ac_backend.h"
#include <arpa/inet.h>
#define PACKET_TIMEOUT -1
#define DTLS_SHUTDOWN -2
#define ACTION_SESSION -3
#define AC_ERROR_TIMEOUT -1000
#define AC_ERROR_ACTION_SESSION -1001
/* */
static int ac_session_action_execute(struct ac_session_t* session, struct ac_session_action* action) {
int result = ACTION_SESSION;
int result = AC_ERROR_ACTION_SESSION;
switch (action->action) {
case AC_SESSION_ACTION_RESET_WTP: {
@ -26,13 +26,34 @@ static int ac_session_action_execute(struct ac_session_t* session, struct ac_ses
break;
}
case AC_SESSION_ACTION_ESTABLISHED_SESSION_DATA: {
int valid = 0;
struct ac_soap_response* response;
/* Capwap handshake complete, notify event to backend */
response = ac_soap_runningwtpsession(session, session->wtpid);
if (response) {
valid = ((response->responsecode == HTTP_RESULT_OK) ? 1 : 0);
ac_soapclient_free_response(response);
}
if (valid) {
ac_dfa_change_state(session, CAPWAP_RUN_STATE);
capwap_set_timeout(AC_MAX_ECHO_INTERVAL, &session->timeout, CAPWAP_TIMER_CONTROL_CONNECTION);
} else {
result = CAPWAP_ERROR_CLOSE;
}
break;
}
}
return result;
}
/* */
static int ac_network_read(struct ac_session_t* session, void* buffer, int length, int* isctrlpacket, struct timeout_control* timeout) {
static int ac_network_read(struct ac_session_t* session, void* buffer, int length, struct timeout_control* timeout) {
int result = 0;
long indextimer;
long waittimeout;
@ -40,18 +61,17 @@ static int ac_network_read(struct ac_session_t* session, void* buffer, int lengt
ASSERT(session != NULL);
ASSERT(buffer != NULL);
ASSERT(length > 0);
ASSERT(isctrlpacket != NULL);
for (;;) {
capwap_lock_enter(&session->sessionlock);
if (!session->running) {
capwap_lock_exit(&session->sessionlock);
return DTLS_SHUTDOWN;
} else if (!session->waitresponse && (session->actionsession->count > 0)) {
return CAPWAP_ERROR_CLOSE;
} else if (!session->waitresponse && (session->action->count > 0)) {
struct capwap_list_item* itemaction;
itemaction = capwap_itemlist_remove_head(session->actionsession);
itemaction = capwap_itemlist_remove_head(session->action);
capwap_lock_exit(&session->sessionlock);
/* */
@ -60,44 +80,32 @@ static int ac_network_read(struct ac_session_t* session, void* buffer, int lengt
/* Free packet */
capwap_itemlist_free(itemaction);
return result;
} else if ((session->controlpackets->count > 0) || (session->datapackets->count > 0)) {
} else if (session->packets->count > 0) {
struct capwap_list_item* itempacket;
*isctrlpacket = ((session->controlpackets->count > 0) ? 1 : 0);
if (*isctrlpacket) {
capwap_logging_debug("Receive control packet");
} else {
capwap_logging_debug("Receive data packet");
}
capwap_logging_debug("Receive control packet");
/* Get packet */
itempacket = capwap_itemlist_remove_head((*isctrlpacket ? session->controlpackets : session->datapackets));
itempacket = capwap_itemlist_remove_head(session->packets);
capwap_lock_exit(&session->sessionlock);
if (itempacket) {
struct ac_packet* packet = (struct ac_packet*)itempacket->item;
long packetlength = itempacket->itemsize - sizeof(struct ac_packet);
struct capwap_dtls* dtls = (*isctrlpacket ? &session->ctrldtls : &session->datadtls);
if (!packet->plainbuffer && dtls->enable) {
int oldaction = dtls->action;
if (!packet->plainbuffer && session->dtls.enable) {
int oldaction = session->dtls.action;
/* Decrypt packet */
result = capwap_decrypt_packet(dtls, packet->buffer, packetlength, buffer, length);
result = capwap_decrypt_packet(&session->dtls, packet->buffer, packetlength, buffer, length);
if (result == CAPWAP_ERROR_AGAIN) {
/* Check is handshake complete */
if ((oldaction == CAPWAP_DTLS_ACTION_HANDSHAKE) && (dtls->action == CAPWAP_DTLS_ACTION_DATA)) {
if (*isctrlpacket) {
if (session->state == CAPWAP_DTLS_CONNECT_STATE) {
ac_dfa_change_state(session, CAPWAP_JOIN_STATE);
capwap_set_timeout(session->dfa.rfcWaitJoin, &session->timeout, CAPWAP_TIMER_CONTROL_CONNECTION);
}
if ((oldaction == CAPWAP_DTLS_ACTION_HANDSHAKE) && (session->dtls.action == CAPWAP_DTLS_ACTION_DATA)) {
if (session->state == CAPWAP_DTLS_CONNECT_STATE) {
ac_dfa_change_state(session, CAPWAP_JOIN_STATE);
capwap_set_timeout(session->dfa.rfcWaitJoin, &session->timeout, CAPWAP_TIMER_CONTROL_CONNECTION);
}
}
} else if (result == CAPWAP_ERROR_SHUTDOWN) {
if ((oldaction == CAPWAP_DTLS_ACTION_DATA) && (dtls->action == CAPWAP_DTLS_ACTION_SHUTDOWN)) {
result = DTLS_SHUTDOWN;
}
}
} else {
if (packetlength <= length) {
@ -119,7 +127,7 @@ static int ac_network_read(struct ac_session_t* session, void* buffer, int lengt
capwap_update_timeout(timeout);
waittimeout = capwap_get_timeout(timeout, &indextimer);
if ((waittimeout <= 0) && (indextimer != CAPWAP_TIMER_UNDEF)) {
return PACKET_TIMEOUT;
return AC_ERROR_TIMEOUT;
}
/* Wait packet */
@ -188,39 +196,7 @@ static void ac_dfa_execute(struct ac_session_t* session, struct capwap_parsed_pa
}
/* */
static struct capwap_packet_rxmng* ac_get_packet_rxmng(struct ac_session_t* session, int isctrlmsg) {
struct capwap_packet_rxmng* rxmngpacket = NULL;
if (isctrlmsg) {
if (!session->rxmngctrlpacket) {
session->rxmngctrlpacket = capwap_packet_rxmng_create_message(1);
}
rxmngpacket = session->rxmngctrlpacket;
} else {
if (!session->rxmngdatapacket) {
session->rxmngdatapacket = capwap_packet_rxmng_create_message(0);
}
rxmngpacket = session->rxmngdatapacket;
}
return rxmngpacket;
}
/* */
static void ac_free_packet_rxmng(struct ac_session_t* session, int isctrlmsg) {
if (isctrlmsg && session->rxmngctrlpacket) {
capwap_packet_rxmng_free(session->rxmngctrlpacket);
session->rxmngctrlpacket = NULL;
} else if (!isctrlmsg && session->rxmngdatapacket) {
capwap_packet_rxmng_free(session->rxmngdatapacket);
session->rxmngdatapacket = NULL;
}
}
/* */
static void ac_send_invalid_request(struct ac_session_t* session, struct capwap_packet_rxmng* rxmngpacket, struct capwap_connection* connection, uint32_t errorcode) {
static void ac_send_invalid_request(struct ac_session_t* session, uint32_t errorcode) {
struct capwap_header_data capwapheader;
struct capwap_packet_txmng* txmngpacket;
struct capwap_list* responsefragmentpacket;
@ -228,17 +204,17 @@ static void ac_send_invalid_request(struct ac_session_t* session, struct capwap_
struct capwap_header* header;
struct capwap_resultcode_element resultcode = { .code = errorcode };
ASSERT(rxmngpacket != NULL);
ASSERT(rxmngpacket->fragmentlist->first != NULL);
ASSERT(connection != NULL);
ASSERT(session != NULL);
ASSERT(session->rxmngpacket != NULL);
ASSERT(session->rxmngpacket->fragmentlist->first != NULL);
/* */
packet = (struct capwap_fragment_packet_item*)rxmngpacket->fragmentlist->first->item;
packet = (struct capwap_fragment_packet_item*)session->rxmngpacket->fragmentlist->first->item;
header = (struct capwap_header*)packet->buffer;
/* Odd message type */
capwap_header_init(&capwapheader, CAPWAP_RADIOID_NONE, GET_WBID_HEADER(header));
txmngpacket = capwap_packet_txmng_create_ctrl_message(&capwapheader, rxmngpacket->ctrlmsg.type + 1, rxmngpacket->ctrlmsg.seq, session->mtu);
txmngpacket = capwap_packet_txmng_create_ctrl_message(&capwapheader, session->rxmngpacket->ctrlmsg.type + 1, session->rxmngpacket->ctrlmsg.seq, session->mtu);
/* Add message element */
capwap_packet_txmng_add_message_element(txmngpacket, CAPWAP_ELEMENT_RESULTCODE, &resultcode);
@ -254,7 +230,7 @@ static void ac_send_invalid_request(struct ac_session_t* session, struct capwap_
capwap_packet_txmng_free(txmngpacket);
/* Send unknown response */
capwap_crypt_sendto_fragmentpacket(&session->ctrldtls, connection->socket.socket[connection->socket.type], responsefragmentpacket, &connection->localaddr, &connection->remoteaddr);
capwap_crypt_sendto_fragmentpacket(&session->dtls, session->connection.socket.socket[session->connection.socket.type], responsefragmentpacket, &session->connection.localaddr, &session->connection.remoteaddr);
/* Don't buffering a packets sent */
capwap_list_free(responsefragmentpacket);
@ -262,17 +238,23 @@ static void ac_send_invalid_request(struct ac_session_t* session, struct capwap_
/* Release reference of session */
static void ac_session_destroy(struct ac_session_t* session) {
ASSERT(session != NULL);
/* Free resource */
#ifdef DEBUG
char sessionname[33];
#endif
/* */
ASSERT(session != NULL);
#ifdef DEBUG
capwap_sessionid_printf(&session->sessionid, sessionname);
capwap_logging_debug("Release Session AC %s", sessionname);
#endif
/* Release session data reference */
if (session->sessiondata) {
ac_session_data_close(session->sessiondata);
ac_session_data_release_reference(session->sessiondata);
}
/* Release last reference */
capwap_lock_enter(&session->sessionlock);
session->count--;
@ -287,36 +269,37 @@ static void ac_session_destroy(struct ac_session_t* session) {
#ifdef DEBUG
capwap_logging_debug("Wait for release Session AC %s (count=%d)", sessionname, session->count);
#endif
/* */
capwap_event_reset(&session->changereference);
capwap_lock_exit(&session->sessionlock);
/* Wait */
capwap_lock_exit(&session->sessionlock);
sleep(10);
capwap_event_wait(&session->changereference);
capwap_lock_enter(&session->sessionlock);
}
capwap_lock_exit(&session->sessionlock);
/* Free DTSL Control */
capwap_crypt_freesession(&session->ctrldtls);
/* Free DTLS Data */
capwap_crypt_freesession(&session->datadtls);
capwap_crypt_freesession(&session->dtls);
/* Free resource */
while ((session->controlpackets->count > 0) || (session->datapackets->count > 0)) {
capwap_itemlist_free(capwap_itemlist_remove_head(((session->controlpackets->count > 0) ? session->controlpackets : session->datapackets)));
while (session->packets->count > 0) {
capwap_itemlist_free(capwap_itemlist_remove_head(session->packets));
}
/* */
capwap_event_destroy(&session->changereference);
capwap_event_destroy(&session->waitpacket);
capwap_lock_destroy(&session->sessionlock);
capwap_list_free(session->actionsession);
capwap_list_free(session->controlpackets);
capwap_list_free(session->datapackets);
capwap_list_free(session->action);
capwap_list_free(session->packets);
/* Free fragments packet */
ac_free_packet_rxmng(session, 1);
ac_free_packet_rxmng(session, 0);
if (session->rxmngpacket) {
capwap_packet_rxmng_free(session->rxmngpacket);
}
capwap_list_free(session->requestfragmentpacket);
capwap_list_free(session->responsefragmentpacket);
@ -333,17 +316,50 @@ static void ac_session_destroy(struct ac_session_t* session) {
capwap_itemlist_free(session->itemlist);
}
/* */
static void ac_session_report_connection(struct ac_session_t* session) {
char localip[INET6_ADDRSTRLEN + 10] = "";
char remoteip[INET6_ADDRSTRLEN + 10] = "";
if (session->connection.localaddr.ss_family == AF_INET) {
char buffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, (void*)&((struct sockaddr_in*)&session->connection.localaddr)->sin_addr, buffer, INET_ADDRSTRLEN);
sprintf(localip, "%s:%hu", buffer, ntohs(((struct sockaddr_in*)&session->connection.localaddr)->sin_port));
} else if (session->connection.localaddr.ss_family == AF_INET6) {
char buffer[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, (void*)&((struct sockaddr_in6*)&session->connection.localaddr)->sin6_addr, buffer, INET6_ADDRSTRLEN);
sprintf(localip, "%s:%hu", buffer, ntohs(((struct sockaddr_in6*)&session->connection.localaddr)->sin6_port));
}
if (session->connection.remoteaddr.ss_family == AF_INET) {
char buffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, (void*)&((struct sockaddr_in*)&session->connection.remoteaddr)->sin_addr, buffer, INET_ADDRSTRLEN);
sprintf(remoteip, "%s:%hu", buffer, ntohs(((struct sockaddr_in*)&session->connection.remoteaddr)->sin_port));
} else if (session->connection.remoteaddr.ss_family == AF_INET6) {
char buffer[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, (void*)&((struct sockaddr_in6*)&session->connection.remoteaddr)->sin6_addr, buffer, INET6_ADDRSTRLEN);
sprintf(remoteip, "%s:%hu", buffer, ntohs(((struct sockaddr_in6*)&session->connection.remoteaddr)->sin6_port));
}
capwap_logging_info("Start control channel from %s to %s", remoteip, localip);
}
/* */
static void ac_session_run(struct ac_session_t* session) {
int res;
int check;
int length;
int isctrlsocket;
struct capwap_connection connection;
char buffer[CAPWAP_MAX_PACKET_SIZE];
ASSERT(session != NULL);
/* */
ac_session_report_connection(session);
/* Configure DFA */
if (g_ac.enabledtls) {
if (!ac_dtls_setup(session)) {
@ -357,93 +373,89 @@ static void ac_session_run(struct ac_session_t* session) {
while (session->state != CAPWAP_DTLS_TEARDOWN_STATE) {
/* Get packet */
length = ac_network_read(session, buffer, sizeof(buffer), &isctrlsocket, &session->timeout);
length = ac_network_read(session, buffer, sizeof(buffer), &session->timeout);
if (length < 0) {
if (length == PACKET_TIMEOUT) {
if (length == AC_ERROR_TIMEOUT) {
ac_dfa_execute(session, NULL);
} else if (length == DTLS_SHUTDOWN) {
} else if ((length == CAPWAP_ERROR_SHUTDOWN) || (length == CAPWAP_ERROR_CLOSE)) {
ac_session_teardown(session);
} else if (length == ACTION_SESSION) {
/* Nothing */
}
} else if (length > 0) {
/* Accept data packet only in running state */
if (isctrlsocket || (session->state == CAPWAP_DATA_CHECK_TO_RUN_STATE) || (session->state == CAPWAP_RUN_STATE)) {
/* Check generic capwap packet */
check = capwap_sanity_check(isctrlsocket, CAPWAP_UNDEF_STATE, buffer, length, 0, 0);
if (check == CAPWAP_PLAIN_PACKET) {
struct capwap_parsed_packet packet;
struct capwap_packet_rxmng* rxmngpacket;
/* Check generic capwap packet */
check = capwap_sanity_check(1, CAPWAP_UNDEF_STATE, buffer, length, 0, 0);
if (check == CAPWAP_PLAIN_PACKET) {
struct capwap_parsed_packet packet;
/* Defragment management */
rxmngpacket = ac_get_packet_rxmng(session, isctrlsocket);
/* Defragment management */
if (!session->rxmngpacket) {
session->rxmngpacket = capwap_packet_rxmng_create_message(1);
}
/* If request, defragmentation packet */
check = capwap_packet_rxmng_add_recv_packet(rxmngpacket, buffer, length);
if (check == CAPWAP_RECEIVE_COMPLETE_PACKET) {
int ignorepacket = 0;
/* If request, defragmentation packet */
check = capwap_packet_rxmng_add_recv_packet(session->rxmngpacket, buffer, length);
if (check == CAPWAP_RECEIVE_COMPLETE_PACKET) {
int ignorepacket = 0;
/* Receive all fragment */
memcpy(&connection.socket, (isctrlsocket ? &session->ctrlsocket : &session->datasocket), sizeof(struct capwap_socket));
memcpy(&connection.localaddr, (isctrlsocket ? &session->acctrladdress : &session->acdataaddress), sizeof(struct sockaddr_storage));
memcpy(&connection.remoteaddr, (isctrlsocket ? &session->wtpctrladdress : &session->wtpdataaddress), sizeof(struct sockaddr_storage));
if (isctrlsocket) {
if (!capwap_recv_retrasmitted_request(&session->ctrldtls, rxmngpacket, &connection, session->lastrecvpackethash, session->responsefragmentpacket)) {
/* Check message type */
res = capwap_check_message_type(rxmngpacket);
if (res != VALID_MESSAGE_TYPE) {
if (res == INVALID_REQUEST_MESSAGE_TYPE) {
capwap_logging_warning("Unexpected Unrecognized Request, send Response Packet with error");
ac_send_invalid_request(session, rxmngpacket, &connection, CAPWAP_RESULTCODE_MSG_UNEXPECTED_UNRECOGNIZED_REQUEST);
}
ignorepacket = 1;
capwap_logging_debug("Invalid message type");
}
} else {
ignorepacket = 1;
capwap_logging_debug("Retrasmitted packet");
/* Receive all fragment */
if (!capwap_recv_retrasmitted_request(&session->dtls, session->rxmngpacket, &session->connection, session->lastrecvpackethash, session->responsefragmentpacket)) {
/* Check message type */
res = capwap_check_message_type(session->rxmngpacket);
if (res != VALID_MESSAGE_TYPE) {
if (res == INVALID_REQUEST_MESSAGE_TYPE) {
capwap_logging_warning("Unexpected Unrecognized Request, send Response Packet with error");
ac_send_invalid_request(session, CAPWAP_RESULTCODE_MSG_UNEXPECTED_UNRECOGNIZED_REQUEST);
}
ignorepacket = 1;
capwap_logging_debug("Invalid message type");
}
} else {
ignorepacket = 1;
capwap_logging_debug("Retrasmitted packet");
}
/* Parsing packet */
if (!ignorepacket) {
res = capwap_parsing_packet(rxmngpacket, &connection, &packet);
if (res == PARSING_COMPLETE) {
/* Validate packet */
if (capwap_validate_parsed_packet(&packet, NULL)) {
if (isctrlsocket && capwap_is_request_type(rxmngpacket->ctrlmsg.type)) {
capwap_logging_warning("Missing Mandatory Message Element, send Response Packet with error");
ac_send_invalid_request(session, rxmngpacket, &connection, CAPWAP_RESULTCODE_FAILURE_MISSING_MANDATORY_MSG_ELEMENT);
}
ignorepacket = 1;
capwap_logging_debug("Failed validation parsed packet");
}
} else {
if (isctrlsocket && (res == UNRECOGNIZED_MESSAGE_ELEMENT) && capwap_is_request_type(rxmngpacket->ctrlmsg.type)) {
capwap_logging_warning("Unrecognized Message Element, send Response Packet with error");
ac_send_invalid_request(session, rxmngpacket, &connection, CAPWAP_RESULTCODE_FAILURE_UNRECOGNIZED_MESSAGE_ELEMENT);
/* TODO: add the unrecognized message element */
/* Parsing packet */
if (!ignorepacket) {
res = capwap_parsing_packet(session->rxmngpacket, &session->connection, &packet);
if (res == PARSING_COMPLETE) {
/* Validate packet */
if (capwap_validate_parsed_packet(&packet, NULL)) {
if (capwap_is_request_type(session->rxmngpacket->ctrlmsg.type)) {
capwap_logging_warning("Missing Mandatory Message Element, send Response Packet with error");
ac_send_invalid_request(session, CAPWAP_RESULTCODE_FAILURE_MISSING_MANDATORY_MSG_ELEMENT);
}
ignorepacket = 1;
capwap_logging_debug("Failed parsing packet");
capwap_logging_debug("Failed validation parsed packet");
}
} else {
if ((res == UNRECOGNIZED_MESSAGE_ELEMENT) && capwap_is_request_type(session->rxmngpacket->ctrlmsg.type)) {
capwap_logging_warning("Unrecognized Message Element, send Response Packet with error");
ac_send_invalid_request(session, CAPWAP_RESULTCODE_FAILURE_UNRECOGNIZED_MESSAGE_ELEMENT);
/* TODO: add the unrecognized message element */
}
}
/* */
if (!ignorepacket) {
ac_dfa_execute(session, &packet);
ignorepacket = 1;
capwap_logging_debug("Failed parsing packet");
}
}
/* Free memory */
capwap_free_parsed_packet(&packet);
ac_free_packet_rxmng(session, isctrlsocket);
} else if (check != CAPWAP_REQUEST_MORE_FRAGMENT) {
/* Discard fragments */
ac_free_packet_rxmng(session, isctrlsocket);
/* */
if (!ignorepacket) {
ac_dfa_execute(session, &packet);
}
/* Free memory */
capwap_free_parsed_packet(&packet);
if (session->rxmngpacket) {
capwap_packet_rxmng_free(session->rxmngpacket);
session->rxmngpacket = NULL;
}
} else if (check != CAPWAP_REQUEST_MORE_FRAGMENT) {
/* Discard fragments */
if (session->rxmngpacket) {
capwap_packet_rxmng_free(session->rxmngpacket);
session->rxmngpacket = NULL;
}
}
}
@ -483,18 +495,13 @@ void ac_session_teardown(struct ac_session_t* session) {
capwap_rwlock_exit(&g_ac.sessionslock);
/* Remove all pending packets */
while ((session->controlpackets->count > 0) || (session->datapackets->count > 0)) {
capwap_itemlist_free(capwap_itemlist_remove_head(((session->controlpackets->count > 0) ? session->controlpackets : session->datapackets)));
while (session->packets->count > 0) {
capwap_itemlist_free(capwap_itemlist_remove_head(session->packets));
}
/* Close DTSL Control */
if (session->ctrldtls.enable) {
capwap_crypt_close(&session->ctrldtls);
}
/* Close DTLS Data */
if (session->datadtls.enable) {
capwap_crypt_close(&session->datadtls);
if (session->dtls.enable) {
capwap_crypt_close(&session->dtls);
}
/* */
@ -566,7 +573,7 @@ void ac_get_control_information(struct capwap_list* controllist) {
for (search = g_ac.sessions->first; search != NULL; search = search->next) {
struct ac_session_t* session = (struct ac_session_t*)search->item;
if (!capwap_compare_ip(&session->acctrladdress, &sessioncontrol->localaddress)) {
if (!capwap_compare_ip(&session->connection.localaddr, &sessioncontrol->localaddress)) {
sessioncontrol->count++;
}
}