Moved the thread out of the NetworkQueue class

This commit is contained in:
Kayne Ruse
2013-11-09 20:04:06 +11:00
parent bf15a5d957
commit c2941cd3e8
4 changed files with 115 additions and 145 deletions
+10 -72
View File
@@ -1,71 +1,23 @@
#include "network_queue.hpp" #include "network_queue.hpp"
#include "utility.hpp"
#include <stdexcept> #include <stdexcept>
#include <iostream>
int networkQueueThread(void* ptr) { NetworkQueue::NetworkQueue() {
NetworkQueue* netQueue = reinterpret_cast<NetworkQueue*>(ptr);
NetworkPacket packet;
while(netQueue->running) {
SDL_SemWait(netQueue->lock);
//suck in the waiting packets
while(netQueue->netUtil->Receive()) {
memcpy(&packet, netQueue->netUtil->GetInData(), sizeof(NetworkPacket));
//this is important: keep track of the source address
packet.meta.srcAddress = netQueue->netUtil->GetInPacket()->address;
netQueue->queue.push_back(packet);
}
SDL_SemPost(netQueue->lock);
SDL_Delay(10);
}
return 0;
}
void NetworkQueue::Init(UDPNetworkUtility* ptr) {
if (running) {
throw(std::runtime_error("Network Queue is already running"));
}
running = true;
netUtil = ptr;
lock = SDL_CreateSemaphore(1); lock = SDL_CreateSemaphore(1);
thread = SDL_CreateThread(networkQueueThread, this); if (!lock) {
if (!thread) { throw(std::runtime_error("Failed to create NetworkQueue::lock"));
throw(std::runtime_error("Failed to create the network thread"));
} }
} }
void NetworkQueue::Quit() { NetworkQueue::~NetworkQueue() {
if (!running) { SDL_DestroySemaphore(lock);
return;
}
//end the thread
running = false;
int ret;
SDL_WaitThread(thread, &ret);
ResetMembers();
//handle the return
if (ret != 0) {
throw(std::runtime_error(std::string() + "Network thread returned error code: " + to_string_custom(ret)));
}
} }
void NetworkQueue::Kill() { NetworkPacket NetworkQueue::Push(NetworkPacket packet) {
if (!running) { SDL_SemWait(lock);
return; queue.push_back(packet);
} SDL_SemPost(lock);
return packet;
running = false;
SDL_KillThread(thread);
ResetMembers();
} }
NetworkPacket NetworkQueue::Peek() { NetworkPacket NetworkQueue::Peek() {
@@ -91,24 +43,10 @@ NetworkPacket NetworkQueue::Pop() {
void NetworkQueue::Flush() { void NetworkQueue::Flush() {
SDL_SemWait(lock); SDL_SemWait(lock);
while(netUtil->Receive());
queue.clear(); queue.clear();
SDL_SemPost(lock); SDL_SemPost(lock);
} }
void NetworkQueue::ResetMembers() {
if (running) {
throw(std::logic_error("Cannon reset the queue while running"));
}
//reset
netUtil = nullptr;
SDL_DestroySemaphore(lock);
lock = nullptr;
thread = nullptr;
queue.clear();
}
int NetworkQueue::Size() { int NetworkQueue::Size() {
//can't be sure if std::deque::size() is thread safe //can't be sure if std::deque::size() is thread safe
int ret; int ret;
+6 -15
View File
@@ -1,7 +1,6 @@
#ifndef NETWORKQUEUE_HPP_ #ifndef NETWORKQUEUE_HPP_
#define NETWORKQUEUE_HPP_ #define NETWORKQUEUE_HPP_
#include "udp_network_utility.hpp"
#include "network_packet.hpp" #include "network_packet.hpp"
#include "SDL/SDL_thread.h" #include "SDL/SDL_thread.h"
@@ -10,28 +9,20 @@
class NetworkQueue { class NetworkQueue {
public: public:
NetworkQueue() = default; NetworkQueue();
~NetworkQueue() = default; ~NetworkQueue();
void Init(UDPNetworkUtility*);
void Quit();
void Kill();
NetworkPacket Push(NetworkPacket);
NetworkPacket Peek(); NetworkPacket Peek();
NetworkPacket Pop(); NetworkPacket Pop();
void Flush(); void Flush();
int Size(); int Size();
SDL_sem* GetLock() const { return lock; }
private: private:
friend int networkQueueThread(void*);
void ResetMembers();
bool running = false;
UDPNetworkUtility* netUtil = nullptr;
SDL_sem* lock = nullptr;
SDL_Thread* thread = nullptr;
std::deque<NetworkPacket> queue; std::deque<NetworkPacket> queue;
SDL_sem* lock;
}; };
#endif #endif
+82 -49
View File
@@ -28,7 +28,38 @@
#include <string> #include <string>
#include <fstream> #include <fstream>
int ClientEntry::indexCounter = 0; using namespace std;
//-------------------------
//Declarations
//-------------------------
int ServerApplication::ClientEntry::indexCounter = 0;
//-------------------------
//Define the network thread
//-------------------------
int networkQueueThread(void* ptr) {
ServerApplication* app = reinterpret_cast<ServerApplication*>(ptr);
NetworkPacket packet;
while(app->running) {
//suck in the waiting packets
while(app->networkUtil.Receive()) {
memcpy(&packet, app->networkUtil.GetInData(), sizeof(NetworkPacket));
//this is important: keep track of the source address
packet.meta.srcAddress = app->networkUtil.GetInPacket()->address;
app->networkQueue.Push(packet);
}
SDL_Delay(10);
}
return 0;
}
//-------------------------
//Define the ServerApplication
//-------------------------
ServerApplication::ServerApplication() { ServerApplication::ServerApplication() {
// //
@@ -41,68 +72,62 @@ ServerApplication::~ServerApplication() {
void ServerApplication::Init(int argc, char** argv) { void ServerApplication::Init(int argc, char** argv) {
//TODO: proper command line option parsing //TODO: proper command line option parsing
//Check thread safety //Check prerequisites
if (!sqlite3_threadsafe()) { if (!sqlite3_threadsafe()) {
throw(std::runtime_error("Cannot run without thread safety")); throw(runtime_error("Cannot run without thread safety"));
} }
else { cout << "Thread safety confirmed" << endl;
std::cout << "Thread safety confirmed" << std::endl;
if (running) {
throw(std::runtime_error("Multiple calls to ServerApplication::Init() is not allowed"));
} }
running = true;
//Init SDL //Init SDL
if (SDL_Init(0)) { if (SDL_Init(0)) {
throw(std::runtime_error("Failed to initialize SDL")); throw(runtime_error("Failed to initialize SDL"));
}
else {
std::cout << "SDL initialized" << std::endl;
} }
cout << "initialized SDL" << endl;
//Init SDL_net //Init SDL_net
if (SDLNet_Init()) { if (SDLNet_Init()) {
throw(std::runtime_error("Failed to init SDL_net")); throw(runtime_error("Failed to init SDL_net"));
} }
else { networkUtil.Open(21795, sizeof(NetworkPacket));
std::cout << "SDL_net initialized" << std::endl; cout << "initialized SDL_net" << endl;
}
networkUtil.Open(21795, 1024);
networkQueue.Init(&networkUtil);
//Init SQL //Init SQL
std::string dbname = (argc > 1) ? argv[1] : argv[0]; string dbname = (argc > 1) ? argv[1] : argv[0]; //fancy and unnecessary
int ret = sqlite3_open_v2((dbname + ".db").c_str(), &database, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_FULLMUTEX, nullptr); int ret = sqlite3_open_v2((dbname + ".db").c_str(), &database, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_FULLMUTEX, nullptr);
if (ret != SQLITE_OK || !database) { if (ret != SQLITE_OK || !database) {
throw(std::runtime_error("Failed to open the server database")); throw(runtime_error("Failed to open the server database"));
}
else {
std::cout << "Database filename: \"" << dbname << ".db\"" << std::endl;
} }
cout << "initialized SQL" << endl;
cout << "Database filename: \"" << dbname << ".db\"" << endl;
//TODO: move this into a function?
//Run setup scripts //Run setup scripts
std::ifstream is("rsc\\scripts\\setup_server.sql"); ifstream is("rsc\\scripts\\setup_server.sql");
if (!is.is_open()) { if (!is.is_open()) {
throw(std::runtime_error("Failed to run database setup script")); throw(runtime_error("Failed to run database setup script"));
} }
else { string script;
std::cout << "Running the database script" << std::endl;
}
std::string script;
getline(is, script, '\0'); getline(is, script, '\0');
is.close(); is.close();
sqlite3_exec(database, script.c_str(), nullptr, nullptr, nullptr); sqlite3_exec(database, script.c_str(), nullptr, nullptr, nullptr);
//debugging //setup the threads
//create the debug packets networkQueueThread = SDL_CreateThread(&::networkQueueThread, this);
NetworkPacket packet; if (!networkQueueThread) {
throw(runtime_error("Failed to create the networkQueueThread"));
}
cout << "initialized networkQueueThread" << endl;
//debugging
NetworkPacket packet;
packet.meta.type = NetworkPacket::Type::PING; packet.meta.type = NetworkPacket::Type::PING;
strcpy(packet.serverInfo.name,"Foo"); strcpy(packet.serverInfo.name, "foo");
networkUtil.Send("127.0.0.1", 21795, reinterpret_cast<void*>(&packet), sizeof(NetworkPacket)); networkUtil.Send("127.0.0.1", 21795, &packet, sizeof(NetworkPacket));
strcpy(packet.serverInfo.name,"Bar");
networkUtil.Send("127.0.0.1", 21795, reinterpret_cast<void*>(&packet), sizeof(NetworkPacket));
strcpy(packet.serverInfo.name,"World");
networkUtil.Send("127.0.0.1", 21795, reinterpret_cast<void*>(&packet), sizeof(NetworkPacket));
} }
void ServerApplication::Loop() { void ServerApplication::Loop() {
@@ -113,16 +138,22 @@ void ServerApplication::Loop() {
try { try {
HandlePacket(networkQueue.Pop()); HandlePacket(networkQueue.Pop());
} }
catch(std::exception& e) { catch(exception& e) {
std::cerr << "Network Error: " << e.what() << std::endl; cerr << "Network Error: " << e.what() << endl;
} }
}; };
} }
void ServerApplication::Quit() { void ServerApplication::Quit() {
sqlite3_close_v2(database); //catch all signal
networkQueue.Quit(); running = false;
//members
SDL_WaitThread(networkQueueThread, nullptr);
networkUtil.Close(); networkUtil.Close();
//APIs
sqlite3_close_v2(database);
SDLNet_Quit(); SDLNet_Quit();
SDL_Quit(); SDL_Quit();
} }
@@ -131,6 +162,8 @@ void ServerApplication::HandlePacket(NetworkPacket packet) {
switch(packet.meta.type) { switch(packet.meta.type) {
case NetworkPacket::Type::PING: case NetworkPacket::Type::PING:
//NOT USED //NOT USED
//debugging
cout << packet.serverInfo.name << endl;
break; break;
case NetworkPacket::Type::PONG: case NetworkPacket::Type::PONG:
//NOT USED //NOT USED
@@ -138,15 +171,15 @@ void ServerApplication::HandlePacket(NetworkPacket packet) {
case NetworkPacket::Type::BROADCAST_REQUEST: case NetworkPacket::Type::BROADCAST_REQUEST:
// //
break; break;
// case NetworkPacket::Type::BROADCAST_RESPONSE: case NetworkPacket::Type::BROADCAST_RESPONSE:
// // //
// break; break;
case NetworkPacket::Type::JOIN_REQUEST: case NetworkPacket::Type::JOIN_REQUEST:
// //
break; break;
// case NetworkPacket::Type::JOIN_RESPONSE: case NetworkPacket::Type::JOIN_RESPONSE:
// // //
// break; break;
case NetworkPacket::Type::DISCONNECT: case NetworkPacket::Type::DISCONNECT:
// //
break; break;
@@ -156,10 +189,10 @@ void ServerApplication::HandlePacket(NetworkPacket packet) {
//handle errors //handle errors
case NetworkPacket::Type::NONE: case NetworkPacket::Type::NONE:
throw(std::runtime_error("NetworkPacket::Type::NONE encountered")); throw(runtime_error("NetworkPacket::Type::NONE encountered"));
break; break;
default: default:
throw(std::runtime_error("Unknown NetworkPacket::Type encountered")); throw(runtime_error("Unknown NetworkPacket::Type encountered"));
break; break;
} }
} }
+17 -9
View File
@@ -27,19 +27,14 @@
#include "sqlite3/sqlite3.h" #include "sqlite3/sqlite3.h"
#include "SDL/SDL.h" #include "SDL/SDL.h"
#include "SDL/SDL_thread.h"
#include <list> #include <list>
//hold the info about the clients
struct ClientEntry {
static int indexCounter;
int index = indexCounter++;
IPaddress add = {0, 0};
};
//The main application class //The main application class
class ServerApplication { class ServerApplication {
public: public:
//standard functions
ServerApplication(); ServerApplication();
~ServerApplication(); ~ServerApplication();
@@ -47,14 +42,27 @@ public:
void Loop(); void Loop();
void Quit(); void Quit();
friend int networkQueueThread(void*);
private: private:
void HandlePacket(NetworkPacket); void HandlePacket(NetworkPacket);
bool running = true;
sqlite3* database = nullptr; //members
bool running = false;
//networking
UDPNetworkUtility networkUtil; UDPNetworkUtility networkUtil;
NetworkQueue networkQueue; NetworkQueue networkQueue;
SDL_Thread* networkQueueThread = nullptr;
//database
sqlite3* database = nullptr;
//clients
struct ClientEntry {
static int indexCounter;
int index = indexCounter++;
IPaddress add = {0, 0};
};
std::list<ClientEntry> clientEntries; std::list<ClientEntry> clientEntries;
}; };