Boost.Asio C++ 網(wǎng)絡(luò)編程之八:基于TCP的同步服務(wù)端
掃描二維碼
隨時(shí)隨地手機(jī)看文章
? ? ? ?同步服務(wù)端也是相當(dāng)簡(jiǎn)單的。它只需要兩個(gè)線程,一個(gè)負(fù)責(zé)監(jiān)聽新的客戶端連接,另外一個(gè)負(fù)責(zé)處理已經(jīng)存在的客戶端請(qǐng)求。它不能使用單線程,因?yàn)榈却碌目蛻舳诉B接是一個(gè)阻塞操作(因?yàn)閍ccept()是阻塞的),因此我們需要另外一個(gè)線程來(lái)處理已經(jīng)存在的客戶端請(qǐng)求。
基于TCP的同步服務(wù)端
1.流程圖
2.實(shí)現(xiàn)
#ifdef?WIN32
#define?_WIN32_WINNT?0x0501
#include#endif
#include#include#include#include#includeusing?namespace?boost::asio;
using?namespace?boost::posix_time;
io_service?service;
struct?talk_to_client;
typedef?boost::shared_ptrclient_ptr;
typedef?std::vectorarray;
array?clients;
//?thread-safe?access?to?clients?array
boost::recursive_mutex?cs;
void?update_clients_changed();
/**?simple?connection?to?server:
-?logs?in?just?with?username?(no?password)
-?all?connections?are?initiated?by?the?client:?client?asks,?server?answers
-?server?disconnects?any?client?that?hasn't?pinged?for?5?seconds
Possible?requests:
-?gets?a?list?of?all?connected?clients
-?ping:?the?server?answers?either?with?"ping?ok"?or?"ping?client_list_changed"
*/
struct?talk_to_client?:?boost::enable_shared_from_this{
talk_to_client()
:?sock_(service),?started_(false),?already_read_(0)?{
last_ping?=?microsec_clock::local_time();
}
std::string?username()?const?{?return?username_;?}
void?answer_to_client()?{
try?{
read_request();
process_request();
}
catch?(boost::system::system_error&)?{
stop();
}
if?(timed_out())?{
stop();
std::cout?<<?"stopping?"?<<?username_?<<?"?-?no?ping?in?time"?<<?std::endl;
}
}
void?set_clients_changed()?{?clients_changed_?=?true;?}
ip::tcp::socket?&?sock()?{?return?sock_;?}
bool?timed_out()?const?{
ptime?now?=?microsec_clock::local_time();
long?long?ms?=?(now?-?last_ping).total_milliseconds();
return?ms?>?5000;
}
void?stop()?{
//?close?client?connection
boost::system::error_code?err;
sock_.close(err);
}
private:
void?read_request()?{
if?(sock_.available())
already_read_?+=?sock_.read_some(
buffer(buff_?+?already_read_,?max_msg?-?already_read_));
}
void?process_request()?{
bool?found_enter?=?std::find(buff_,?buff_?+?already_read_,?'n')
<?buff_?+?already_read_;
if?(!found_enter)
return;?//?message?is?not?full
//?process?the?msg
last_ping?=?microsec_clock::local_time();
size_t?pos?=?std::find(buff_,?buff_?+?already_read_,?'n')?-?buff_;
std::string?msg(buff_,?pos);
std::copy(buff_?+?already_read_,?buff_?+?max_msg,?buff_);
already_read_?-=?pos?+?1;
if?(msg.find("login?")?==?0)?on_login(msg);
else?if?(msg.find("ping")?==?0)?on_ping();
else?if?(msg.find("ask_clients")?==?0)?on_clients();
else?std::cerr?<<?"invalid?msg?"?<<?msg?<<?std::endl;
}
void?on_login(const?std::string?&?msg)?{
std::istringstream?in(msg);
in?>>?username_?>>?username_;
std::cout?<<?username_?<<?"?logged?in"?<<?std::endl;
write("login?okn");
update_clients_changed();
}
void?on_ping()?{
write(clients_changed_???"ping?client_list_changedn"?:?"ping?okn");
clients_changed_?=?false;
}
void?on_clients()?{
std::string?msg;
{?boost::recursive_mutex::scoped_lock?lk(cs);
for?(array::const_iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
msg?+=?(*b)->username()?+?"?";
}
write("clients?"?+?msg?+?"n");
}
void?write(const?std::string?&?msg)?{
sock_.write_some(buffer(msg));
}
private:
ip::tcp::socket?sock_;
enum?{?max_msg?=?1024?};
int?already_read_;
char?buff_[max_msg];
bool?started_;
std::string?username_;
bool?clients_changed_;
ptime?last_ping;
};
void?update_clients_changed()?{
boost::recursive_mutex::scoped_lock?lk(cs);
for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
(*b)->set_clients_changed();
}
void?accept_thread()?{
ip::tcp::acceptor?acceptor(service,?ip::tcp::endpoint(ip::tcp::v4(),?8001));
while?(true)?{
client_ptr?new_(new?talk_to_client);
acceptor.accept(new_->sock());
boost::recursive_mutex::scoped_lock?lk(cs);
clients.push_back(new_);
}
}
void?handle_clients_thread()?{
while?(true)?{
boost::this_thread::sleep(millisec(1));
boost::recursive_mutex::scoped_lock?lk(cs);
for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
(*b)->answer_to_client();
//?erase?clients?that?timed?out
clients.erase(std::remove_if(clients.begin(),?clients.end(),
boost::bind(&talk_to_client::timed_out,?_1)),?clients.end());
}
}
int?main(int?argc,?char*?argv[])?{
boost::thread_group?threads;
threads.create_thread(accept_thread);
threads.create_thread(handle_clients_thread);
threads.join_all();
}? ? ? ?在accept_thread中會(huì)循環(huán)接受客戶端的鏈接,因?yàn)閏lients容器中的元素在兩個(gè)線程中都要訪問(wèn),所以需要加鎖進(jìn)行同步。
? ? ? ?在handle_clients_thread線程中會(huì)處理和各客戶端的消息會(huì)話,并且把掉線的客戶端從clients容器中刪除。這里用到了std::remove_if,它通常配合std::vector::erase使用。std::remove_if定義于頭文件
templateForwardIterator?remove_if?(ForwardIterator?first,?ForwardIterator?last,UnaryPredicate?pred);
? ? ? ?函數(shù)remove_if()移除序列[first, last)中所有應(yīng)用于謂詞predict返回true的元素。
? ? ? ?remove_if()并不會(huì)實(shí)際移除序列[first, last)中的元素;如果在一個(gè)容器上應(yīng)用remove_if(), 容器的長(zhǎng)度并不會(huì)改變(remove_if()不可能僅通過(guò)迭代器改變?nèi)萜鞯膶傩?, 所有的元素都還在容器里面。實(shí)際做法是, remove_if()將所有應(yīng)該移除的元素都移動(dòng)到了容器尾部并返回一個(gè)分界的迭代器, 移除的所有元素仍然可以通過(guò)返回的迭代器訪問(wèn)到。為了實(shí)際移除元素, 你必須對(duì)容器自行調(diào)用erase()以擦除需要移除的元素。
? ? ? ?下面是std::remove_if的一個(gè)例子:
#include#includebool?IsOdd(int?i)?{?return?((i?%?2)?==?1);?}
int?main()?{
int?myints[]?=?{?1,?2,?3,?4,?5,?6,?7,?8,?9?};???????
int*?pbegin?=?myints;??????????????????????????
int*?pend?=?myints?+?sizeof(myints)?/?sizeof(int);??????????????
pend?=?std::remove_if(pbegin,?pend,?IsOdd);?//?將符合要求的元素都移動(dòng)到尾部
//?^???????^
std::cout?<<?"the?range?contains:";?????????//?輸出:the?range?contains:?2?4?6?8
for?(int*?p?=?pbegin;?p?!=?pend;?++p)
std::cout?<<?'?'?<<?*p;
std::cout?<<?'n';
system("pause");
return?0;
}




