Better threading and IO

This commit is contained in:
Naim A 2016-01-29 04:32:47 +02:00
parent 9d319aa946
commit cac3f5d209
3 changed files with 94 additions and 137 deletions

View file

@ -162,7 +162,7 @@ int main(int argc, char *argv[])
try
{
apiSrv = std::shared_ptr<UDPT::Server::HTTPServer>(new UDPT::Server::HTTPServer(var_map));
webApp = std::shared_ptr<UDPT::Server::WebApp>(new UDPT::Server::WebApp(apiSrv, tracker->conn, var_map));
webApp = std::shared_ptr<UDPT::Server::WebApp>(new UDPT::Server::WebApp(apiSrv, tracker->m_conn.get(), var_map));
webApp->deploy();
}
catch (const UDPT::Server::ServerException &e)
@ -177,11 +177,11 @@ int main(int argc, char *argv[])
}
std::cout << "Hit Control-C to exit." << endl;
std::cin.get();
tracker->wait();
tracker->stop();
std::cout << endl << "Goodbye." << endl;
#ifdef WIN32
::WSACleanup();
#endif

View file

@ -38,7 +38,6 @@ namespace UDPT
{
UDPTracker::UDPTracker(const boost::program_options::variables_map& conf) : m_conf(conf)
{
this->m_allowRemotes = conf["tracker.allow_remotes"].as<bool>();
this->m_allowIANA_IPs = conf["tracker.allow_iana_ips"].as<bool>();
this->m_isDynamic = conf["tracker.is_dynamic"].as<bool>();
@ -59,61 +58,11 @@ namespace UDPT
}
this->m_localEndpoint = addrs.front();
this->m_threads = new HANDLE[this->m_threadCount];
this->m_isRunning = false;
this->conn = nullptr;
}
UDPTracker::~UDPTracker()
{
int i; // loop index
this->m_isRunning = false;
// drop listener connection to continue thread loops.
// wait for request to finish (1 second max; allot of time for a computer!).
#ifdef linux
::close(this->m_sock);
::sleep(1);
#elif defined (WIN32)
::closesocket(this->m_sock);
::Sleep(1000);
#endif
for (i = 0;i < this->m_threadCount;i++)
{
#ifdef WIN32
::TerminateThread(this->m_threads[i], 0x00);
#elif defined (linux)
::pthread_detach(this->m_threads[i]);
::pthread_cancel(this->m_threads[i]);
#endif
std::stringstream str;
str << "Thread (" << (i + 1) << "/" << ((int)this->m_threadCount) << ") terminated.";
logger->log(Logger::LL_INFO, str.str());
}
if (this->conn != NULL)
delete this->conn;
delete[] this->m_threads;
}
void UDPTracker::wait()
{
#ifdef WIN32
::WaitForMultipleObjects(this->m_threadCount, this->m_threads, TRUE, INFINITE);
#else
int i;
for (i = 0;i < this->m_threadCount; i++)
{
::pthread_join(this->m_threads[i], NULL);
}
#endif
// left empty.
}
void UDPTracker::start()
@ -134,6 +83,19 @@ namespace UDPT
yup = 1;
::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yup, 1);
{
// don't block recvfrom for too long.
#if defined(linux)
timeval timeout = { 0 };
timeout.tv_sec = 5;
#elif defined(WIN32)
DWORD timeout = 5000;
#else
#error Unsupported OS.
#endif
::setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
}
this->m_localEndpoint.sin_family = AF_INET;
r = ::bind(sock, reinterpret_cast<SOCKADDR*>(&this->m_localEndpoint), sizeof(SOCKADDR_IN));
@ -149,20 +111,15 @@ namespace UDPT
this->m_sock = sock;
this->conn = new Data::SQLite3Driver(m_conf, this->m_isDynamic);
this->m_isRunning = true;
this->m_conn = std::shared_ptr<DatabaseDriver>(new Data::SQLite3Driver(m_conf, this->m_isDynamic));
ss.str("");
ss << "Starting maintenance thread (1/" << ((int)this->m_threadCount) << ")";
logger->log(Logger::LL_INFO, ss.str());
// create maintainer thread.
#ifdef WIN32
this->m_threads[0] = ::CreateThread(NULL, 0, reinterpret_cast<LPTHREAD_START_ROUTINE>(_maintainance_start), (LPVOID)this, 0, NULL);
#elif defined (linux)
::pthread_create(&this->m_threads[0], NULL, _maintainance_start, (void*)this);
#endif
m_threads.push_back(boost::thread(UDPTracker::_maintainance_start, this));
for (i = 1;i < this->m_threadCount; i++)
{
@ -170,14 +127,33 @@ namespace UDPT
ss << "Starting thread (" << (i + 1) << "/" << ((int)this->m_threadCount) << ")";
logger->log(Logger::LL_INFO, ss.str());
#ifdef WIN32
this->m_threads[i] = ::CreateThread(NULL, 0, reinterpret_cast<LPTHREAD_START_ROUTINE>(_thread_start), (LPVOID)this, 0, NULL);
#elif defined (linux)
::pthread_create(&(this->m_threads[i]), NULL, _thread_start, (void*)this);
#endif
m_threads.push_back(boost::thread(UDPTracker::_thread_start, this));
}
}
void UDPTracker::stop()
{
#ifdef linux
::close(m_sock);
#elif defined (WIN32)
::closesocket(m_sock);
#endif
for (std::vector<boost::thread>::iterator it = m_threads.begin(); it != m_threads.end(); ++it)
{
std::cout << "Interrupted thread " << it->get_id() << std::endl;
it->interrupt();
}
for (std::vector<boost::thread>::iterator it = m_threads.begin(); it != m_threads.end(); ++it)
{
std::cout << "waiting for " << it->get_id() << std::endl;
it->join();
}
std::cout << "All threads terminated." << std::endl;
}
int UDPTracker::sendError(UDPTracker* usi, SOCKADDR_IN* remote, uint32_t transactionID, const std::string &msg)
{
struct udp_error_response error;
@ -214,7 +190,7 @@ namespace UDPT
resp.action = m_hton32(0);
resp.transaction_id = req->transaction_id;
if (!usi->conn->genConnectionId(&resp.connection_id,
if (!usi->m_conn->genConnectionId(&resp.connection_id,
m_hton32(remote->sin_addr.s_addr),
m_hton16(remote->sin_port)))
{
@ -240,7 +216,7 @@ namespace UDPT
req = (AnnounceRequest*)data;
if (!usi->conn->verifyConnectionId(req->connection_id,
if (!usi->m_conn->verifyConnectionId(req->connection_id,
m_hton32(remote->sin_addr.s_addr),
m_hton16(remote->sin_port)))
{
@ -262,7 +238,7 @@ namespace UDPT
return 0;
}
if (!usi->conn->isTorrentAllowed(req->info_hash))
if (!usi->m_conn->isTorrentAllowed(req->info_hash))
{
UDPTracker::sendError(usi, remote, req->transaction_id, "info_hash not registered.");
return 0;
@ -271,7 +247,7 @@ namespace UDPT
// load peers
q = 30;
if (req->num_want >= 1)
q = std::min(q, req->num_want);
q = std::min<int>(q, req->num_want);
peers = new DatabaseDriver::PeerEntry [q];
@ -297,13 +273,13 @@ namespace UDPT
q = 0; // no need for peers when stopping.
if (q > 0)
usi->conn->getPeers(req->info_hash, &q, peers);
usi->m_conn->getPeers(req->info_hash, &q, peers);
bSize = 20; // header is 20 bytes
bSize += (6 * q); // + 6 bytes per peer.
tE.info_hash = req->info_hash;
usi->conn->getTorrentInfo(&tE);
usi->m_conn->getTorrentInfo(&tE);
resp = (AnnounceResponse*)buff;
resp->action = m_hton32(1);
@ -337,7 +313,7 @@ namespace UDPT
ip = m_hton32 (remote->sin_addr.s_addr);
else
ip = req->ip_address;
usi->conn->updatePeer(req->peer_id, req->info_hash, ip, req->port,
usi->m_conn->updatePeer(req->peer_id, req->info_hash, ip, req->port,
req->downloaded, req->left, req->uploaded, event);
return 0;
@ -362,7 +338,7 @@ namespace UDPT
return 0;
}
if (!usi->conn->verifyConnectionId(sR->connection_id,
if (!usi->m_conn->verifyConnectionId(sR->connection_id,
m_hton32(remote->sin_addr.s_addr),
m_hton16(remote->sin_port)))
{
@ -391,7 +367,7 @@ namespace UDPT
DatabaseDriver::TorrentEntry tE;
tE.info_hash = hash;
if (!usi->conn->getTorrentInfo(&tE))
if (!usi->m_conn->getTorrentInfo(&tE))
{
sendError(usi, remote, sR->transaction_id, "Scrape Failed: couldn't retrieve torrent data");
return 0;
@ -407,7 +383,7 @@ namespace UDPT
return 0;
}
static int _isIANA_IP (uint32_t ip)
int UDPTracker::isIANAIP(uint32_t ip)
{
uint8_t x = (ip % 256);
if (x == 0 || x == 10 || x == 127 || x >= 224)
@ -424,7 +400,7 @@ static int _isIANA_IP (uint32_t ip)
if (!usi->m_allowIANA_IPs)
{
if (_isIANA_IP(remote->sin_addr.s_addr))
if (isIANAIP(remote->sin_addr.s_addr))
{
return 0; // Access Denied: IANA reserved IP.
}
@ -445,14 +421,10 @@ static int _isIANA_IP (uint32_t ip)
return 0;
}
#ifdef WIN32
DWORD UDPTracker::_thread_start (LPVOID arg)
#elif defined (linux)
void* UDPTracker::_thread_start (void *arg)
#endif
void UDPTracker::_thread_start(UDPTracker *usi)
{
UDPTracker *usi;
SOCKADDR_IN remoteAddr;
char tmpBuff[UDP_BUFFER_SIZE];
#ifdef linux
socklen_t addrSz;
@ -460,51 +432,37 @@ static int _isIANA_IP (uint32_t ip)
int addrSz;
#endif
int r;
char tmpBuff[UDP_BUFFER_SIZE];
usi = reinterpret_cast<UDPTracker*>(arg);
addrSz = sizeof(SOCKADDR_IN);
while (usi->m_isRunning)
while (true)
{
// peek into the first 12 bytes of data; determine if connection request or announce request.
r = ::recvfrom(usi->m_sock, (char*)tmpBuff, UDP_BUFFER_SIZE, 0, (SOCKADDR*)&remoteAddr, &addrSz);
int r = ::recvfrom(usi->m_sock, (char*)tmpBuff, UDP_BUFFER_SIZE, 0, (SOCKADDR*)&remoteAddr, &addrSz);
if (r <= 0)
continue; // bad request...
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
continue;
}
{
boost::this_thread::disable_interruption di;
r = UDPTracker::resolveRequest(usi, &remoteAddr, tmpBuff, r);
}
#ifdef linux
::pthread_exit (NULL);
#endif
return 0;
}
}
#ifdef WIN32
DWORD UDPTracker::_maintainance_start(LPVOID arg)
#elif defined (linux)
void* UDPTracker::_maintainance_start(void *arg)
#endif
void UDPTracker::_maintainance_start(UDPTracker* usi)
{
UDPTracker* usi = reinterpret_cast<UDPTracker*>(arg);
while (usi->m_isRunning)
while (true)
{
usi->conn->cleanup();
#ifdef WIN32
::Sleep(usi->m_cleanupInterval * 1000);
#elif defined (linux)
::sleep(usi->m_cleanupInterval);
#else
#error Unsupported OS.
#endif
{
boost::this_thread::disable_interruption di;
usi->m_conn->cleanup();
}
return 0;
boost::this_thread::sleep_for(boost::chrono::seconds(usi->m_cleanupInterval));
}
}
};

View file

@ -22,6 +22,9 @@
#include <stdint.h>
#include <boost/thread.hpp>
#include <chrono>
#include <algorithm>
#include <boost/program_options.hpp>
#include <string>
#include "exceptions.h"
@ -124,9 +127,9 @@ namespace UDPT
void start();
/**
* Joins all threads, and waits for all of them to terminate.
* Terminates tracker.
*/
void wait();
void stop();
/**
* Destroys resources that were created by constructor
@ -134,29 +137,24 @@ namespace UDPT
*/
virtual ~UDPTracker();
Data::DatabaseDriver *conn;
std::shared_ptr<UDPT::Data::DatabaseDriver> m_conn;
private:
SOCKET m_sock;
SOCKADDR_IN m_localEndpoint;
uint16_t m_port;
uint8_t m_threadCount;
bool m_isRunning;
bool m_isDynamic;
bool m_allowRemotes;
bool m_allowIANA_IPs;
HANDLE *m_threads;
std::vector<boost::thread> m_threads;
uint32_t m_announceInterval;
uint32_t m_cleanupInterval;
const boost::program_options::variables_map& m_conf;
#ifdef WIN32
static DWORD _thread_start(LPVOID arg);
static DWORD _maintainance_start(LPVOID arg);
#elif defined (linux)
static void* _thread_start(void *arg);
static void* _maintainance_start(void *arg);
#endif
static void _thread_start(UDPTracker *usi);
static void _maintainance_start(UDPTracker* usi);
static int resolveRequest(UDPTracker *usi, SOCKADDR_IN *remote, char *data, int r);
@ -166,6 +164,7 @@ namespace UDPT
static int sendError(UDPTracker *, SOCKADDR_IN *remote, uint32_t transId, const std::string &);
static int isIANAIP(uint32_t ip);
};
};